mirror of
https://github.com/ItzCrazyKns/Perplexica.git
synced 2025-12-23 20:18:15 +00:00
feat(chat-route): add history saving, disconnect on abort, use subscribe method
This commit is contained in:
@@ -5,6 +5,10 @@ import SearchAgent from '@/lib/agents/search';
|
||||
import SessionManager from '@/lib/session';
|
||||
import { ChatTurnMessage } from '@/lib/types';
|
||||
import { SearchSources } from '@/lib/agents/search/types';
|
||||
import db from '@/lib/db';
|
||||
import { eq } from 'drizzle-orm';
|
||||
import { chats } from '@/lib/db/schema';
|
||||
import UploadManager from '@/lib/uploads/manager';
|
||||
|
||||
export const runtime = 'nodejs';
|
||||
export const dynamic = 'force-dynamic';
|
||||
@@ -64,6 +68,38 @@ const safeValidateBody = (data: unknown) => {
|
||||
};
|
||||
};
|
||||
|
||||
const ensureChatExists = async (input: {
|
||||
id: string;
|
||||
sources: SearchSources[];
|
||||
query: string;
|
||||
fileIds: string[];
|
||||
}) => {
|
||||
try {
|
||||
const exists = await db.query.chats
|
||||
.findFirst({
|
||||
where: eq(chats.id, input.id),
|
||||
})
|
||||
.execute();
|
||||
|
||||
if (!exists) {
|
||||
await db.insert(chats).values({
|
||||
id: input.id,
|
||||
createdAt: new Date().toISOString(),
|
||||
sources: input.sources,
|
||||
title: input.query,
|
||||
files: input.fileIds.map((id) => {
|
||||
return {
|
||||
fileId: id,
|
||||
name: UploadManager.getFile(id)?.name || 'Uploaded File',
|
||||
};
|
||||
}),
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to check/save chat:', err);
|
||||
}
|
||||
};
|
||||
|
||||
export const POST = async (req: Request) => {
|
||||
try {
|
||||
const reqBody = (await req.json()) as Body;
|
||||
@@ -120,86 +156,65 @@ export const POST = async (req: Request) => {
|
||||
const writer = responseStream.writable.getWriter();
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
let receivedMessage = '';
|
||||
|
||||
session.addListener('data', (data: any) => {
|
||||
if (data.type === 'response') {
|
||||
const disconnect = session.subscribe((event: string, data: any) => {
|
||||
if (event === 'data') {
|
||||
if (data.type === 'block') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'block',
|
||||
block: data.block,
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
} else if (data.type === 'updateBlock') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'updateBlock',
|
||||
blockId: data.blockId,
|
||||
patch: data.patch,
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
} else if (data.type === 'researchComplete') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'researchComplete',
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
}
|
||||
} else if (event === 'end') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'message',
|
||||
type: 'messageEnd',
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
writer.close();
|
||||
session.removeAllListeners();
|
||||
} else if (event === 'error') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'error',
|
||||
data: data.data,
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
receivedMessage += data.data;
|
||||
} else if (data.type === 'sources') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'sources',
|
||||
data: data.data,
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
} else if (data.type === 'block') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'block',
|
||||
block: data.block,
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
} else if (data.type === 'updateBlock') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'updateBlock',
|
||||
blockId: data.blockId,
|
||||
patch: data.patch,
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
} else if (data.type === 'researchComplete') {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'researchComplete',
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
writer.close();
|
||||
session.removeAllListeners();
|
||||
}
|
||||
});
|
||||
|
||||
session.addListener('end', () => {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'messageEnd',
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
writer.close();
|
||||
session.removeAllListeners();
|
||||
});
|
||||
|
||||
session.addListener('error', (data: any) => {
|
||||
writer.write(
|
||||
encoder.encode(
|
||||
JSON.stringify({
|
||||
type: 'error',
|
||||
data: data.data,
|
||||
}) + '\n',
|
||||
),
|
||||
);
|
||||
writer.close();
|
||||
session.removeAllListeners();
|
||||
});
|
||||
|
||||
agent.searchAsync(session, {
|
||||
chatHistory: history,
|
||||
followUp: message.content,
|
||||
chatId: body.message.chatId,
|
||||
messageId: body.message.messageId,
|
||||
config: {
|
||||
llm,
|
||||
embedding: embedding,
|
||||
@@ -209,7 +224,17 @@ export const POST = async (req: Request) => {
|
||||
},
|
||||
});
|
||||
|
||||
/* handleHistorySave(message, humanMessageId, body.focusMode, body.files); */
|
||||
ensureChatExists({
|
||||
id: body.message.chatId,
|
||||
sources: body.sources as SearchSources[],
|
||||
fileIds: body.files,
|
||||
query: body.message.content,
|
||||
});
|
||||
|
||||
req.signal.addEventListener('abort', () => {
|
||||
disconnect();
|
||||
writer.close();
|
||||
});
|
||||
|
||||
return new Response(responseStream.readable, {
|
||||
headers: {
|
||||
|
||||
Reference in New Issue
Block a user