From d3b2f8983dc12ec908f787438e922835dc23c696 Mon Sep 17 00:00:00 2001 From: OTYAK <118303871+OmarElKadri@users.noreply.github.com> Date: Wed, 26 Mar 2025 11:28:05 +0100 Subject: [PATCH 1/4] feat(api): add streaming support to search route --- src/app/api/search/route.ts | 122 +++++++++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 17 deletions(-) diff --git a/src/app/api/search/route.ts b/src/app/api/search/route.ts index b980623..e136d54 100644 --- a/src/app/api/search/route.ts +++ b/src/app/api/search/route.ts @@ -33,6 +33,7 @@ interface ChatRequestBody { embeddingModel?: embeddingModel; query: string; history: Array<[string, string]>; + stream?: boolean; } export const POST = async (req: Request) => { @@ -48,6 +49,7 @@ export const POST = async (req: Request) => { body.history = body.history || []; body.optimizationMode = body.optimizationMode || 'balanced'; + body.stream = body.stream || false; const history: BaseMessage[] = body.history.map((msg) => { return msg[0] === 'human' @@ -125,40 +127,126 @@ export const POST = async (req: Request) => { [], ); - return new Promise( - ( - resolve: (value: Response) => void, - reject: (value: Response) => void, - ) => { - let message = ''; + if (!body.stream) { + return new Promise( + ( + resolve: (value: Response) => void, + reject: (value: Response) => void, + ) => { + let message = ''; + let sources: any[] = []; + + emitter.on('data', (data: string) => { + try { + const parsedData = JSON.parse(data); + if (parsedData.type === 'response') { + message += parsedData.data; + } else if (parsedData.type === 'sources') { + sources = parsedData.data; + } + } catch (error) { + reject( + Response.json({ message: 'Error parsing data' }, { status: 500 }), + ); + } + }); + + emitter.on('end', () => { + resolve(Response.json({ message, sources }, { status: 200 })); + }); + + emitter.on('error', (error: any) => { + reject( + Response.json({ message: 'Search error', error }, { status: 500 }), + ); + }); + }, + ); + } + + const encoder = new TextEncoder(); + + // Create an AbortController to handle cancellation + const abortController = new AbortController(); + const { signal } = abortController; + + const stream = new ReadableStream({ + start(controller) { let sources: any[] = []; - emitter.on('data', (data) => { + // Send an initial message to keep the connection alive + controller.enqueue(encoder.encode("data: " + JSON.stringify({ + type: 'init', + data: 'Stream connected' + }) + "\n\n")); + + // Set up cleanup function for when client disconnects + signal.addEventListener('abort', () => { + // Remove all listeners from emitter to prevent memory leaks + emitter.removeAllListeners(); + + // Close the controller if it's still active + try { + controller.close(); + } catch (error) { + // Controller might already be closed + } + }); + + emitter.on('data', (data: string) => { + // Check if request has been cancelled before processing + if (signal.aborted) return; + try { const parsedData = JSON.parse(data); + if (parsedData.type === 'response') { - message += parsedData.data; + controller.enqueue(encoder.encode("data: " + JSON.stringify({ + type: 'response', + data: parsedData.data + }) + "\n\n")); } else if (parsedData.type === 'sources') { sources = parsedData.data; + controller.enqueue(encoder.encode("data: " + JSON.stringify({ + type: 'sources', + data: sources + }) + "\n\n")); } } catch (error) { - reject( - Response.json({ message: 'Error parsing data' }, { status: 500 }), - ); + controller.error(error); } }); emitter.on('end', () => { - resolve(Response.json({ message, sources }, { status: 200 })); + // Check if request has been cancelled before processing + if (signal.aborted) return; + + controller.enqueue(encoder.encode("data: " + JSON.stringify({ + type: 'done' + }) + "\n\n")); + controller.close(); }); - emitter.on('error', (error) => { - reject( - Response.json({ message: 'Search error', error }, { status: 500 }), - ); + emitter.on('error', (error: any) => { + // Check if request has been cancelled before processing + if (signal.aborted) return; + + controller.error(error); }); }, - ); + + cancel() { + abortController.abort(); + } + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + 'Connection': 'keep-alive', + }, + }); } catch (err: any) { console.error(`Error in getting search results: ${err.message}`); return Response.json( From 191d1dc25f0936d9e9c5233b07bc59525caa32d7 Mon Sep 17 00:00:00 2001 From: OTYAK <118303871+OmarElKadri@users.noreply.github.com> Date: Wed, 26 Mar 2025 11:32:46 +0100 Subject: [PATCH 2/4] refactor(api): clean up comments and improve abort handling in search route --- src/app/api/search/route.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/app/api/search/route.ts b/src/app/api/search/route.ts index e136d54..24990ad 100644 --- a/src/app/api/search/route.ts +++ b/src/app/api/search/route.ts @@ -166,7 +166,6 @@ export const POST = async (req: Request) => { const encoder = new TextEncoder(); - // Create an AbortController to handle cancellation const abortController = new AbortController(); const { signal } = abortController; @@ -174,27 +173,21 @@ export const POST = async (req: Request) => { start(controller) { let sources: any[] = []; - // Send an initial message to keep the connection alive controller.enqueue(encoder.encode("data: " + JSON.stringify({ type: 'init', data: 'Stream connected' }) + "\n\n")); - // Set up cleanup function for when client disconnects signal.addEventListener('abort', () => { - // Remove all listeners from emitter to prevent memory leaks emitter.removeAllListeners(); - // Close the controller if it's still active try { controller.close(); } catch (error) { - // Controller might already be closed } }); emitter.on('data', (data: string) => { - // Check if request has been cancelled before processing if (signal.aborted) return; try { @@ -218,7 +211,6 @@ export const POST = async (req: Request) => { }); emitter.on('end', () => { - // Check if request has been cancelled before processing if (signal.aborted) return; controller.enqueue(encoder.encode("data: " + JSON.stringify({ @@ -228,7 +220,6 @@ export const POST = async (req: Request) => { }); emitter.on('error', (error: any) => { - // Check if request has been cancelled before processing if (signal.aborted) return; controller.error(error); From 5d60ab113942c307e20bb105ca18063dc0784b99 Mon Sep 17 00:00:00 2001 From: OTYAK <118303871+OmarElKadri@users.noreply.github.com> Date: Thu, 27 Mar 2025 13:04:09 +0100 Subject: [PATCH 3/4] feat(api): Switch to newline-delimited JSON streaming instead of SSE --- docs/API/SEARCH.md | 29 +++++++++++++++++++++++++++-- src/app/api/search/route.ts | 28 ++++++++++++++++++---------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/docs/API/SEARCH.md b/docs/API/SEARCH.md index 3007901..3a28a78 100644 --- a/docs/API/SEARCH.md +++ b/docs/API/SEARCH.md @@ -32,7 +32,8 @@ The API accepts a JSON object in the request body, where you define the focus mo "history": [ ["human", "Hi, how are you?"], ["assistant", "I am doing well, how can I help you today?"] - ] + ], + "stream": false } ``` @@ -71,11 +72,13 @@ The API accepts a JSON object in the request body, where you define the focus mo ] ``` +- **`stream`** (boolean, optional): When set to `true`, enables streaming responses. Default is `false`. + ### Response The response from the API includes both the final message and the sources used to generate that message. -#### Example Response +#### Standard Response (stream: false) ```json { @@ -100,6 +103,28 @@ The response from the API includes both the final message and the sources used t } ``` +#### Streaming Response (stream: true) + +When streaming is enabled, the API returns a stream of newline-delimited JSON objects. Each line contains a complete, valid JSON object. The response has Content-Type: application/json. + +Example of streamed response objects: + +``` +{"type":"init","data":"Stream connected"} +{"type":"sources","data":[{"pageContent":"...","metadata":{"title":"...","url":"..."}},...]} +{"type":"response","data":"Perplexica is an "} +{"type":"response","data":"innovative, open-source "} +{"type":"response","data":"AI-powered search engine..."} +{"type":"done"} +``` + +Clients should process each line as a separate JSON object. The different message types include: + +- **`init`**: Initial connection message +- **`sources`**: All sources used for the response +- **`response`**: Chunks of the generated answer text +- **`done`**: Indicates the stream is complete + ### Fields in the Response - **`message`** (string): The search result, generated based on the query and focus mode. diff --git a/src/app/api/search/route.ts b/src/app/api/search/route.ts index 24990ad..b2be3f9 100644 --- a/src/app/api/search/route.ts +++ b/src/app/api/search/route.ts @@ -166,6 +166,7 @@ export const POST = async (req: Request) => { const encoder = new TextEncoder(); + // Create an AbortController to handle cancellation const abortController = new AbortController(); const { signal } = abortController; @@ -173,37 +174,43 @@ export const POST = async (req: Request) => { start(controller) { let sources: any[] = []; - controller.enqueue(encoder.encode("data: " + JSON.stringify({ + // Send an initial message to keep the connection alive + controller.enqueue(encoder.encode(JSON.stringify({ type: 'init', data: 'Stream connected' - }) + "\n\n")); + }) + '\n')); + // Set up cleanup function for when client disconnects signal.addEventListener('abort', () => { + // Remove all listeners from emitter to prevent memory leaks emitter.removeAllListeners(); + // Close the controller if it's still active try { controller.close(); } catch (error) { + // Controller might already be closed } }); emitter.on('data', (data: string) => { + // Check if request has been cancelled before processing if (signal.aborted) return; try { const parsedData = JSON.parse(data); if (parsedData.type === 'response') { - controller.enqueue(encoder.encode("data: " + JSON.stringify({ + controller.enqueue(encoder.encode(JSON.stringify({ type: 'response', data: parsedData.data - }) + "\n\n")); + }) + '\n')); } else if (parsedData.type === 'sources') { sources = parsedData.data; - controller.enqueue(encoder.encode("data: " + JSON.stringify({ + controller.enqueue(encoder.encode(JSON.stringify({ type: 'sources', data: sources - }) + "\n\n")); + }) + '\n')); } } catch (error) { controller.error(error); @@ -211,21 +218,22 @@ export const POST = async (req: Request) => { }); emitter.on('end', () => { + // Check if request has been cancelled before processing if (signal.aborted) return; - controller.enqueue(encoder.encode("data: " + JSON.stringify({ + controller.enqueue(encoder.encode(JSON.stringify({ type: 'done' - }) + "\n\n")); + }) + '\n')); controller.close(); }); emitter.on('error', (error: any) => { + // Check if request has been cancelled before processing if (signal.aborted) return; controller.error(error); }); }, - cancel() { abortController.abort(); } @@ -233,7 +241,7 @@ export const POST = async (req: Request) => { return new Response(stream, { headers: { - 'Content-Type': 'text/event-stream', + 'Content-Type': 'application/json', 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', }, From 90e303f737e9c9efe494fd141f951ed2b94d1b7d Mon Sep 17 00:00:00 2001 From: ItzCrazyKns <95534749+ItzCrazyKns@users.noreply.github.com> Date: Sun, 30 Mar 2025 21:12:04 +0530 Subject: [PATCH 4/4] feat(search): lint & beautify, update content type --- src/app/api/search/route.ts | 88 +++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/src/app/api/search/route.ts b/src/app/api/search/route.ts index b2be3f9..d3e98ca 100644 --- a/src/app/api/search/route.ts +++ b/src/app/api/search/route.ts @@ -146,7 +146,10 @@ export const POST = async (req: Request) => { } } catch (error) { reject( - Response.json({ message: 'Error parsing data' }, { status: 500 }), + Response.json( + { message: 'Error parsing data' }, + { status: 500 }, + ), ); } }); @@ -157,7 +160,10 @@ export const POST = async (req: Request) => { emitter.on('error', (error: any) => { reject( - Response.json({ message: 'Search error', error }, { status: 500 }), + Response.json( + { message: 'Search error', error }, + { status: 500 }, + ), ); }); }, @@ -165,52 +171,56 @@ export const POST = async (req: Request) => { } const encoder = new TextEncoder(); - - // Create an AbortController to handle cancellation + const abortController = new AbortController(); const { signal } = abortController; - + const stream = new ReadableStream({ start(controller) { let sources: any[] = []; - // Send an initial message to keep the connection alive - controller.enqueue(encoder.encode(JSON.stringify({ - type: 'init', - data: 'Stream connected' - }) + '\n')); + controller.enqueue( + encoder.encode( + JSON.stringify({ + type: 'init', + data: 'Stream connected', + }) + '\n', + ), + ); - // Set up cleanup function for when client disconnects signal.addEventListener('abort', () => { - // Remove all listeners from emitter to prevent memory leaks emitter.removeAllListeners(); - - // Close the controller if it's still active + try { controller.close(); - } catch (error) { - // Controller might already be closed - } + } catch (error) {} }); emitter.on('data', (data: string) => { - // Check if request has been cancelled before processing if (signal.aborted) return; - + try { const parsedData = JSON.parse(data); - + if (parsedData.type === 'response') { - controller.enqueue(encoder.encode(JSON.stringify({ - type: 'response', - data: parsedData.data - }) + '\n')); + controller.enqueue( + encoder.encode( + JSON.stringify({ + type: 'response', + data: parsedData.data, + }) + '\n', + ), + ); } else if (parsedData.type === 'sources') { sources = parsedData.data; - controller.enqueue(encoder.encode(JSON.stringify({ - type: 'sources', - data: sources - }) + '\n')); + controller.enqueue( + encoder.encode( + JSON.stringify({ + type: 'sources', + data: sources, + }) + '\n', + ), + ); } } catch (error) { controller.error(error); @@ -218,32 +228,34 @@ export const POST = async (req: Request) => { }); emitter.on('end', () => { - // Check if request has been cancelled before processing if (signal.aborted) return; - - controller.enqueue(encoder.encode(JSON.stringify({ - type: 'done' - }) + '\n')); + + controller.enqueue( + encoder.encode( + JSON.stringify({ + type: 'done', + }) + '\n', + ), + ); controller.close(); }); emitter.on('error', (error: any) => { - // Check if request has been cancelled before processing if (signal.aborted) return; - + controller.error(error); }); }, cancel() { abortController.abort(); - } + }, }); return new Response(stream, { headers: { - 'Content-Type': 'application/json', + 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache, no-transform', - 'Connection': 'keep-alive', + Connection: 'keep-alive', }, }); } catch (err: any) {