feat(actions): stream results internally

This commit is contained in:
ItzCrazyKns
2025-12-08 13:10:11 +05:30
parent 8aed9518a2
commit 6016090f12
3 changed files with 179 additions and 60 deletions

View File

@@ -1,7 +1,8 @@
import z from 'zod'; import z from 'zod';
import { ResearchAction } from '../../types'; import { ResearchAction } from '../../types';
import { Chunk } from '@/lib/types'; import { Chunk, ReadingResearchBlock } from '@/lib/types';
import TurnDown from 'turndown'; import TurnDown from 'turndown';
import path from 'path';
const turndownService = new TurnDown(); const turndownService = new TurnDown();
@@ -12,12 +13,19 @@ const schema = z.object({
const scrapeURLAction: ResearchAction<typeof schema> = { const scrapeURLAction: ResearchAction<typeof schema> = {
name: 'scrape_url', name: 'scrape_url',
description: description:
'Use after __plan to scrape and extract content from the provided URLs. This is useful when you need detailed information from specific web pages or if the user asks you to summarize or analyze content from certain links. You can scrape maximum of 3 URLs.', 'Use this tool to scrape and extract content from the provided URLs. This is useful when you the user has asked you to extract or summarize information from specific web pages. You can provide up to 3 URLs at a time. NEVER CALL THIS TOOL EXPLICITLY YOURSELF UNLESS INSTRUCTED TO DO SO BY THE USER.',
schema: schema, schema: schema,
enabled: (_) => true, enabled: (_) => true,
execute: async (params, additionalConfig) => { execute: async (params, additionalConfig) => {
params.urls = params.urls.slice(0, 3); params.urls = params.urls.slice(0, 3);
let readingBlockId = crypto.randomUUID();
let readingEmitted = false;
const researchBlock = additionalConfig.session.getBlock(
additionalConfig.researchBlockId,
);
const results: Chunk[] = []; const results: Chunk[] = [];
await Promise.all( await Promise.all(
@@ -28,6 +36,70 @@ const scrapeURLAction: ResearchAction<typeof schema> = {
const title = const title =
text.match(/<title>(.*?)<\/title>/i)?.[1] || `Content from ${url}`; text.match(/<title>(.*?)<\/title>/i)?.[1] || `Content from ${url}`;
if (
!readingEmitted &&
researchBlock &&
researchBlock.type === 'research'
) {
readingEmitted = true;
researchBlock.data.subSteps.push({
id: readingBlockId,
type: 'reading',
reading: [
{
content: '',
metadata: {
url,
title: title,
},
},
],
});
additionalConfig.session.updateBlock(
additionalConfig.researchBlockId,
[
{
op: 'replace',
path: '/data/subSteps',
value: researchBlock.data.subSteps,
},
],
);
} else if (
readingEmitted &&
researchBlock &&
researchBlock.type === 'research'
) {
const subStepIndex = researchBlock.data.subSteps.findIndex(
(step: any) => step.id === readingBlockId,
);
const subStep = researchBlock.data.subSteps[
subStepIndex
] as ReadingResearchBlock;
subStep.reading.push({
content: '',
metadata: {
url,
title: title,
},
});
additionalConfig.session.updateBlock(
additionalConfig.researchBlockId,
[
{
op: 'replace',
path: '/data/subSteps',
value: researchBlock.data.subSteps,
},
],
);
}
const markdown = turndownService.turndown(text); const markdown = turndownService.turndown(text);
results.push({ results.push({

View File

@@ -1,7 +1,7 @@
import z from 'zod'; import z from 'zod';
import { ResearchAction } from '../../types'; import { ResearchAction } from '../../types';
import { searchSearxng } from '@/lib/searxng'; import { searchSearxng } from '@/lib/searxng';
import { Chunk } from '@/lib/types'; import { Chunk, SearchResultsResearchBlock } from '@/lib/types';
const actionSchema = z.object({ const actionSchema = z.object({
type: z.literal('web_search'), type: z.literal('web_search'),
@@ -28,23 +28,90 @@ const webSearchAction: ResearchAction<typeof actionSchema> = {
schema: actionSchema, schema: actionSchema,
enabled: (config) => enabled: (config) =>
config.classification.classification.skipSearch === false, config.classification.classification.skipSearch === false,
execute: async (input, _) => { execute: async (input, additionalConfig) => {
input.queries = input.queries.slice(0, 3); input.queries = input.queries.slice(0, 3);
const researchBlock = additionalConfig.session.getBlock(
additionalConfig.researchBlockId,
);
if (researchBlock && researchBlock.type === 'research') {
researchBlock.data.subSteps.push({
id: crypto.randomUUID(),
type: 'searching',
searching: input.queries,
});
additionalConfig.session.updateBlock(additionalConfig.researchBlockId, [
{
op: 'replace',
path: '/data/subSteps',
value: researchBlock.data.subSteps,
},
]);
}
const searchResultsBlockId = crypto.randomUUID();
let searchResultsEmitted = false;
let results: Chunk[] = []; let results: Chunk[] = [];
const search = async (q: string) => { const search = async (q: string) => {
const res = await searchSearxng(q); const res = await searchSearxng(q);
res.results.forEach((r) => { const resultChunks: Chunk[] = res.results.map((r) => ({
results.push({ content: r.content || r.title,
content: r.content || r.title, metadata: {
metadata: { title: r.title,
title: r.title, url: r.url,
url: r.url, },
}, }));
results.push(...resultChunks);
if (
!searchResultsEmitted &&
researchBlock &&
researchBlock.type === 'research'
) {
searchResultsEmitted = true;
researchBlock.data.subSteps.push({
id: searchResultsBlockId,
type: 'search_results',
reading: resultChunks,
}); });
});
additionalConfig.session.updateBlock(additionalConfig.researchBlockId, [
{
op: 'replace',
path: '/data/subSteps',
value: researchBlock.data.subSteps,
},
]);
} else if (
searchResultsEmitted &&
researchBlock &&
researchBlock.type === 'research'
) {
const subStepIndex = researchBlock.data.subSteps.findIndex(
(step) => step.id === searchResultsBlockId,
);
const subStep = researchBlock.data.subSteps[
subStepIndex
] as SearchResultsResearchBlock;
subStep.reading.push(...resultChunks);
additionalConfig.session.updateBlock(additionalConfig.researchBlockId, [
{
op: 'replace',
path: '/data/subSteps',
value: researchBlock.data.subSteps,
},
]);
}
}; };
await Promise.all(input.queries.map(search)); await Promise.all(input.queries.map(search));

View File

@@ -154,33 +154,11 @@ class Researcher {
tool_calls: finalToolCalls, tool_calls: finalToolCalls,
}); });
const searchCalls = finalToolCalls.filter(
(tc) =>
tc.name === 'web_search' ||
tc.name === 'academic_search' ||
tc.name === 'discussion_search',
);
if (searchCalls.length > 0 && block && block.type === 'research') {
block.data.subSteps.push({
id: crypto.randomUUID(),
type: 'searching',
searching: searchCalls.map((sc) => sc.arguments.queries).flat(),
});
session.updateBlock(researchBlockId, [
{
op: 'replace',
path: '/data/subSteps',
value: block.data.subSteps,
},
]);
}
const actionResults = await ActionRegistry.executeAll(finalToolCalls, { const actionResults = await ActionRegistry.executeAll(finalToolCalls, {
llm: input.config.llm, llm: input.config.llm,
embedding: input.config.embedding, embedding: input.config.embedding,
session: session, session: session,
researchBlockId: researchBlockId,
}); });
actionOutput.push(...actionResults); actionOutput.push(...actionResults);
@@ -193,39 +171,41 @@ class Researcher {
content: JSON.stringify(action), content: JSON.stringify(action),
}); });
}); });
const searchResults = actionResults.filter(
(a) => a.type === 'search_results',
);
if (searchResults.length > 0 && block && block.type === 'research') {
block.data.subSteps.push({
id: crypto.randomUUID(),
type: 'reading',
reading: searchResults.flatMap((a) => a.results),
});
session.updateBlock(researchBlockId, [
{
op: 'replace',
path: '/data/subSteps',
value: block.data.subSteps,
},
]);
}
} }
const searchResults = actionOutput.filter( const searchResults = actionOutput
(a) => a.type === 'search_results', .filter((a) => a.type === 'search_results')
); .flatMap((a) => a.results);
const seenUrls = new Map<string, number>();
const filteredSearchResults = searchResults
.map((result, index) => {
if (result.metadata.url && !seenUrls.has(result.metadata.url)) {
seenUrls.set(result.metadata.url, index);
return result;
} else if (result.metadata.url && seenUrls.has(result.metadata.url)) {
const existingIndex = seenUrls.get(result.metadata.url)!;
const existingResult = searchResults[existingIndex];
existingResult.content += `\n\n${result.content}`;
return undefined;
}
return result;
})
.filter((r) => r !== undefined);
session.emit('data', { session.emit('data', {
type: 'sources', type: 'sources',
data: searchResults.flatMap((a) => a.results), data: filteredSearchResults,
}); });
return { return {
findings: actionOutput, findings: actionOutput,
searchFindings: filteredSearchResults,
}; };
} }
} }