-
Notifications
You must be signed in to change notification settings - Fork 0
/
queueWebpageScrape.ts
91 lines (69 loc) · 2.45 KB
/
queueWebpageScrape.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import z from "zod";
import hermes from "..";
import { scrapeWebpage } from "../../app/services/webscraper";
import { summarizeWebpageContent } from "../../app/services/ai/groq.service";
import { HTMLToMarkdown } from "../../app/services/webscraper/webpageToMarkdown";
import { createUserEmbedding } from "../../app/services/user_embeddings/user_embeddings.service";
import logger from "../../utils/logger";
import { IEvent } from "@swarnim/hermes";
import { openAIEmbedText } from "../../app/services/ai/openai.embeddings.service";
const messagePayloadSchema = z.object({
url: z.string(),
user_id: z.number(),
});
export let queueWebpageScrape: IEvent<z.infer<typeof messagePayloadSchema>>;
async function initializeQueueWebpageScrape() {
try {
queueWebpageScrape = await hermes.registerEvent(
"webpage-scrape-queue",
messagePayloadSchema,
{
maxRetries: 3,
}
);
} catch (error) {
logger.error(`Failed to register event: ${error}`);
}
}
initializeQueueWebpageScrape();
export function registerWebpageScrapeQueueHandler() {
queueWebpageScrape.subscribe(async ({ data, msg }) => {
logger.info(`URL queued to scrape: ${data.url}`);
logger.info(`MSG_MAX_RETRIES: ${msg.maxRetries}`);
logger.info(`MSG_RETRY_COUNT: ${msg.retryCount}`);
const crawlWebpageResult = await scrapeWebpage(data.url);
if (!crawlWebpageResult.ok) {
throw crawlWebpageResult.error;
}
logger.info("CRAWL_WEBPAGE_OK");
const HTMLToMarkdownResult = await HTMLToMarkdown(crawlWebpageResult.value);
if (!HTMLToMarkdownResult.ok) {
throw HTMLToMarkdownResult.error;
}
logger.info("HTML_TO_MD_OK");
const summarizedContent = await summarizeWebpageContent(
HTMLToMarkdownResult.value,
{ model: "mixtral-8x7b-32768" }
);
if (!summarizedContent.ok) {
throw summarizedContent.error;
}
logger.info("SUMMARIZATION_OK");
const summaryEmbedding = await openAIEmbedText(summarizedContent.value);
if (!summaryEmbedding.ok) {
throw summaryEmbedding.error;
}
logger.info("SUMMARY_EMBEDDINGS_OK");
const embeddingWriteResult = await createUserEmbedding({
user_id: data.user_id,
embedding: summaryEmbedding.value,
content: summarizedContent.value,
url: data.url,
});
if (!embeddingWriteResult.ok) {
throw embeddingWriteResult.error;
}
logger.info(`===SUCCESSFULLY INGESTED WEBPAGE===`);
await msg.ack();
});
}