feat(app): handle new architecture

This commit is contained in:
ItzCrazyKns
2025-11-23 19:58:46 +05:30
parent e0ba476ca4
commit 956a768a86
14 changed files with 945 additions and 508 deletions

View File

@@ -1,14 +1,10 @@
import crypto from 'crypto';
import { AIMessage, BaseMessage, HumanMessage } from '@langchain/core/messages';
import { EventEmitter } from 'stream';
import db from '@/lib/db';
import { chats, messages as messagesSchema } from '@/lib/db/schema';
import { and, eq, gt } from 'drizzle-orm';
import { getFileDetails } from '@/lib/utils/files';
import { searchHandlers } from '@/lib/search';
import { z } from 'zod';
import ModelRegistry from '@/lib/models/registry';
import { ModelWithProvider } from '@/lib/models/types';
import SearchAgent from '@/lib/agents/search';
import SessionManager from '@/lib/session';
import { ChatTurnMessage } from '@/lib/types';
export const runtime = 'nodejs';
export const dynamic = 'force-dynamic';
@@ -20,47 +16,25 @@ const messageSchema = z.object({
});
const chatModelSchema: z.ZodType<ModelWithProvider> = z.object({
providerId: z.string({
errorMap: () => ({
message: 'Chat model provider id must be provided',
}),
}),
key: z.string({
errorMap: () => ({
message: 'Chat model key must be provided',
}),
}),
providerId: z.string({ message: 'Chat model provider id must be provided' }),
key: z.string({ message: 'Chat model key must be provided' }),
});
const embeddingModelSchema: z.ZodType<ModelWithProvider> = z.object({
providerId: z.string({
errorMap: () => ({
message: 'Embedding model provider id must be provided',
}),
}),
key: z.string({
errorMap: () => ({
message: 'Embedding model key must be provided',
}),
message: 'Embedding model provider id must be provided',
}),
key: z.string({ message: 'Embedding model key must be provided' }),
});
const bodySchema = z.object({
message: messageSchema,
optimizationMode: z.enum(['speed', 'balanced', 'quality'], {
errorMap: () => ({
message: 'Optimization mode must be one of: speed, balanced, quality',
}),
message: 'Optimization mode must be one of: speed, balanced, quality',
}),
focusMode: z.string().min(1, 'Focus mode is required'),
history: z
.array(
z.tuple([z.string(), z.string()], {
errorMap: () => ({
message: 'History items must be tuples of two strings',
}),
}),
)
.array(z.tuple([z.string(), z.string()]))
.optional()
.default([]),
files: z.array(z.string()).optional().default([]),
@@ -78,7 +52,7 @@ const safeValidateBody = (data: unknown) => {
if (!result.success) {
return {
success: false,
error: result.error.errors.map((e) => ({
error: result.error.issues.map((e: any) => ({
path: e.path.join('.'),
message: e.message,
})),
@@ -91,151 +65,12 @@ const safeValidateBody = (data: unknown) => {
};
};
const handleEmitterEvents = async (
stream: EventEmitter,
writer: WritableStreamDefaultWriter,
encoder: TextEncoder,
chatId: string,
) => {
let receivedMessage = '';
const aiMessageId = crypto.randomBytes(7).toString('hex');
stream.on('data', (data) => {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
writer.write(
encoder.encode(
JSON.stringify({
type: 'message',
data: parsedData.data,
messageId: aiMessageId,
}) + '\n',
),
);
receivedMessage += parsedData.data;
} else if (parsedData.type === 'sources') {
writer.write(
encoder.encode(
JSON.stringify({
type: 'sources',
data: parsedData.data,
messageId: aiMessageId,
}) + '\n',
),
);
const sourceMessageId = crypto.randomBytes(7).toString('hex');
db.insert(messagesSchema)
.values({
chatId: chatId,
messageId: sourceMessageId,
role: 'source',
sources: parsedData.data,
createdAt: new Date().toString(),
})
.execute();
}
});
stream.on('end', () => {
writer.write(
encoder.encode(
JSON.stringify({
type: 'messageEnd',
}) + '\n',
),
);
writer.close();
db.insert(messagesSchema)
.values({
content: receivedMessage,
chatId: chatId,
messageId: aiMessageId,
role: 'assistant',
createdAt: new Date().toString(),
})
.execute();
});
stream.on('error', (data) => {
const parsedData = JSON.parse(data);
writer.write(
encoder.encode(
JSON.stringify({
type: 'error',
data: parsedData.data,
}),
),
);
writer.close();
});
};
const handleHistorySave = async (
message: Message,
humanMessageId: string,
focusMode: string,
files: string[],
) => {
const chat = await db.query.chats.findFirst({
where: eq(chats.id, message.chatId),
});
const fileData = files.map(getFileDetails);
if (!chat) {
await db
.insert(chats)
.values({
id: message.chatId,
title: message.content,
createdAt: new Date().toString(),
focusMode: focusMode,
files: fileData,
})
.execute();
} else if (JSON.stringify(chat.files ?? []) != JSON.stringify(fileData)) {
db.update(chats)
.set({
files: files.map(getFileDetails),
})
.where(eq(chats.id, message.chatId));
}
const messageExists = await db.query.messages.findFirst({
where: eq(messagesSchema.messageId, humanMessageId),
});
if (!messageExists) {
await db
.insert(messagesSchema)
.values({
content: message.content,
chatId: message.chatId,
messageId: humanMessageId,
role: 'user',
createdAt: new Date().toString(),
})
.execute();
} else {
await db
.delete(messagesSchema)
.where(
and(
gt(messagesSchema.id, messageExists.id),
eq(messagesSchema.chatId, message.chatId),
),
)
.execute();
}
};
export const POST = async (req: Request) => {
try {
const reqBody = (await req.json()) as Body;
const parseBody = safeValidateBody(reqBody);
if (!parseBody.success) {
return Response.json(
{ message: 'Invalid request body', error: parseBody.error },
@@ -265,48 +100,116 @@ export const POST = async (req: Request) => {
),
]);
const humanMessageId =
message.messageId ?? crypto.randomBytes(7).toString('hex');
const history: BaseMessage[] = body.history.map((msg) => {
const history: ChatTurnMessage[] = body.history.map((msg) => {
if (msg[0] === 'human') {
return new HumanMessage({
return {
role: 'user',
content: msg[1],
});
};
} else {
return new AIMessage({
return {
role: 'assistant',
content: msg[1],
});
};
}
});
const handler = searchHandlers[body.focusMode];
if (!handler) {
return Response.json(
{
message: 'Invalid focus mode',
},
{ status: 400 },
);
}
const stream = await handler.searchAndAnswer(
message.content,
history,
llm,
embedding,
body.optimizationMode,
body.files,
body.systemInstructions as string,
);
const agent = new SearchAgent();
const session = SessionManager.createSession();
const responseStream = new TransformStream();
const writer = responseStream.writable.getWriter();
const encoder = new TextEncoder();
handleEmitterEvents(stream, writer, encoder, message.chatId);
handleHistorySave(message, humanMessageId, body.focusMode, body.files);
let receivedMessage = '';
session.addListener('data', (data: any) => {
if (data.type === 'response') {
writer.write(
encoder.encode(
JSON.stringify({
type: 'message',
data: data.data,
}) + '\n',
),
);
receivedMessage += data.data;
} else if (data.type === 'sources') {
writer.write(
encoder.encode(
JSON.stringify({
type: 'sources',
data: data.data,
}) + '\n',
),
);
} else if (data.type === 'block') {
writer.write(
encoder.encode(
JSON.stringify({
type: 'block',
block: data.block,
}) + '\n',
),
);
} else if (data.type === 'updateBlock') {
writer.write(
encoder.encode(
JSON.stringify({
type: 'updateBlock',
blockId: data.blockId,
patch: data.patch,
}) + '\n',
),
);
} else if (data.type === 'researchComplete') {
writer.write(
encoder.encode(
JSON.stringify({
type: 'researchComplete',
}) + '\n',
),
);
}
});
session.addListener('end', () => {
writer.write(
encoder.encode(
JSON.stringify({
type: 'messageEnd',
}) + '\n',
),
);
writer.close();
session.removeAllListeners();
});
session.addListener('error', (data: any) => {
writer.write(
encoder.encode(
JSON.stringify({
type: 'error',
data: data.data,
}) + '\n',
),
);
writer.close();
session.removeAllListeners();
});
agent.searchAsync(session, {
chatHistory: history,
followUp: message.content,
config: {
llm,
embedding: embedding,
sources: ['web'],
mode: body.optimizationMode,
},
});
/* handleHistorySave(message, humanMessageId, body.focusMode, body.files); */
return new Response(responseStream.readable, {
headers: {