feat(api): add streaming support to search route

This commit is contained in:
OTYAK
2025-03-26 11:28:05 +01:00
parent 27286465a3
commit d3b2f8983d

View File

@ -33,6 +33,7 @@ interface ChatRequestBody {
embeddingModel?: embeddingModel;
query: string;
history: Array<[string, string]>;
stream?: boolean;
}
export const POST = async (req: Request) => {
@ -48,6 +49,7 @@ export const POST = async (req: Request) => {
body.history = body.history || [];
body.optimizationMode = body.optimizationMode || 'balanced';
body.stream = body.stream || false;
const history: BaseMessage[] = body.history.map((msg) => {
return msg[0] === 'human'
@ -125,6 +127,7 @@ export const POST = async (req: Request) => {
[],
);
if (!body.stream) {
return new Promise(
(
resolve: (value: Response) => void,
@ -133,7 +136,7 @@ export const POST = async (req: Request) => {
let message = '';
let sources: any[] = [];
emitter.on('data', (data) => {
emitter.on('data', (data: string) => {
try {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
@ -152,13 +155,98 @@ export const POST = async (req: Request) => {
resolve(Response.json({ message, sources }, { status: 200 }));
});
emitter.on('error', (error) => {
emitter.on('error', (error: any) => {
reject(
Response.json({ message: 'Search error', error }, { status: 500 }),
);
});
},
);
}
const encoder = new TextEncoder();
// Create an AbortController to handle cancellation
const abortController = new AbortController();
const { signal } = abortController;
const stream = new ReadableStream({
start(controller) {
let sources: any[] = [];
// Send an initial message to keep the connection alive
controller.enqueue(encoder.encode("data: " + JSON.stringify({
type: 'init',
data: 'Stream connected'
}) + "\n\n"));
// Set up cleanup function for when client disconnects
signal.addEventListener('abort', () => {
// Remove all listeners from emitter to prevent memory leaks
emitter.removeAllListeners();
// Close the controller if it's still active
try {
controller.close();
} catch (error) {
// Controller might already be closed
}
});
emitter.on('data', (data: string) => {
// Check if request has been cancelled before processing
if (signal.aborted) return;
try {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
controller.enqueue(encoder.encode("data: " + JSON.stringify({
type: 'response',
data: parsedData.data
}) + "\n\n"));
} else if (parsedData.type === 'sources') {
sources = parsedData.data;
controller.enqueue(encoder.encode("data: " + JSON.stringify({
type: 'sources',
data: sources
}) + "\n\n"));
}
} catch (error) {
controller.error(error);
}
});
emitter.on('end', () => {
// Check if request has been cancelled before processing
if (signal.aborted) return;
controller.enqueue(encoder.encode("data: " + JSON.stringify({
type: 'done'
}) + "\n\n"));
controller.close();
});
emitter.on('error', (error: any) => {
// Check if request has been cancelled before processing
if (signal.aborted) return;
controller.error(error);
});
},
cancel() {
abortController.abort();
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
},
});
} catch (err: any) {
console.error(`Error in getting search results: ${err.message}`);
return Response.json(