diff --git a/src/app/api/search/route.ts b/src/app/api/search/route.ts index b980623..e136d54 100644 --- a/src/app/api/search/route.ts +++ b/src/app/api/search/route.ts @@ -33,6 +33,7 @@ interface ChatRequestBody { embeddingModel?: embeddingModel; query: string; history: Array<[string, string]>; + stream?: boolean; } export const POST = async (req: Request) => { @@ -48,6 +49,7 @@ export const POST = async (req: Request) => { body.history = body.history || []; body.optimizationMode = body.optimizationMode || 'balanced'; + body.stream = body.stream || false; const history: BaseMessage[] = body.history.map((msg) => { return msg[0] === 'human' @@ -125,40 +127,126 @@ export const POST = async (req: Request) => { [], ); - return new Promise( - ( - resolve: (value: Response) => void, - reject: (value: Response) => void, - ) => { - let message = ''; + if (!body.stream) { + return new Promise( + ( + resolve: (value: Response) => void, + reject: (value: Response) => void, + ) => { + let message = ''; + let sources: any[] = []; + + emitter.on('data', (data: string) => { + try { + const parsedData = JSON.parse(data); + if (parsedData.type === 'response') { + message += parsedData.data; + } else if (parsedData.type === 'sources') { + sources = parsedData.data; + } + } catch (error) { + reject( + Response.json({ message: 'Error parsing data' }, { status: 500 }), + ); + } + }); + + emitter.on('end', () => { + resolve(Response.json({ message, sources }, { status: 200 })); + }); + + emitter.on('error', (error: any) => { + reject( + Response.json({ message: 'Search error', error }, { status: 500 }), + ); + }); + }, + ); + } + + const encoder = new TextEncoder(); + + // Create an AbortController to handle cancellation + const abortController = new AbortController(); + const { signal } = abortController; + + const stream = new ReadableStream({ + start(controller) { let sources: any[] = []; - emitter.on('data', (data) => { + // Send an initial message to keep the connection alive + controller.enqueue(encoder.encode("data: " + JSON.stringify({ + type: 'init', + data: 'Stream connected' + }) + "\n\n")); + + // Set up cleanup function for when client disconnects + signal.addEventListener('abort', () => { + // Remove all listeners from emitter to prevent memory leaks + emitter.removeAllListeners(); + + // Close the controller if it's still active + try { + controller.close(); + } catch (error) { + // Controller might already be closed + } + }); + + emitter.on('data', (data: string) => { + // Check if request has been cancelled before processing + if (signal.aborted) return; + try { const parsedData = JSON.parse(data); + if (parsedData.type === 'response') { - message += parsedData.data; + controller.enqueue(encoder.encode("data: " + JSON.stringify({ + type: 'response', + data: parsedData.data + }) + "\n\n")); } else if (parsedData.type === 'sources') { sources = parsedData.data; + controller.enqueue(encoder.encode("data: " + JSON.stringify({ + type: 'sources', + data: sources + }) + "\n\n")); } } catch (error) { - reject( - Response.json({ message: 'Error parsing data' }, { status: 500 }), - ); + controller.error(error); } }); emitter.on('end', () => { - resolve(Response.json({ message, sources }, { status: 200 })); + // Check if request has been cancelled before processing + if (signal.aborted) return; + + controller.enqueue(encoder.encode("data: " + JSON.stringify({ + type: 'done' + }) + "\n\n")); + controller.close(); }); - emitter.on('error', (error) => { - reject( - Response.json({ message: 'Search error', error }, { status: 500 }), - ); + emitter.on('error', (error: any) => { + // Check if request has been cancelled before processing + if (signal.aborted) return; + + controller.error(error); }); }, - ); + + cancel() { + abortController.abort(); + } + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + 'Connection': 'keep-alive', + }, + }); } catch (err: any) { console.error(`Error in getting search results: ${err.message}`); return Response.json(