Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Code Checks

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build-and-test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: 'latest'
cache: 'npm'

- name: Install dependencies
run: npm ci

- name: Build
run: npm run build

- name: Lint
run: npm run lint

- name: Install Playwright
run: npx playwright install

- name: Test
run: npm run test
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ storage

# Added by Apify CLI
.venv
.aider*

# Actor run input
input.json
29 changes: 15 additions & 14 deletions src/crawlers.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { MemoryStorage } from '@crawlee/memory-storage';
import { RequestQueue } from 'apify';
import { CheerioAPI } from 'cheerio';
import type { CheerioAPI } from 'cheerio';
import {
CheerioCrawler,
CheerioCrawlerOptions,
CheerioCrawlingContext,
type CheerioCrawlerOptions,
type CheerioCrawlingContext,
log,
PlaywrightCrawler,
PlaywrightCrawlerOptions,
PlaywrightCrawlingContext,
RequestOptions,
type PlaywrightCrawlerOptions,
type PlaywrightCrawlingContext,
type RequestOptions,
} from 'crawlee';

import { ContentCrawlerTypes } from './const.js';
Expand All @@ -32,7 +32,7 @@ export function getCrawlerKey(crawlerOptions: CheerioCrawlerOptions | Playwright
*/
export async function createAndStartSearchCrawler(
searchCrawlerOptions: CheerioCrawlerOptions,
startCrawler: boolean = true,
startCrawler = true,
) {
const key = getCrawlerKey(searchCrawlerOptions);
if (crawlers.has(key)) {
Expand Down Expand Up @@ -88,7 +88,7 @@ export async function createAndStartSearchCrawler(
});
if (startCrawler) {
crawler.run().then(
() => log.warning(`Google-search-crawler has finished`),
() => log.warning('Google-search-crawler has finished'),
() => { },
);
log.info('Google-search-crawler has started 🫡');
Expand All @@ -105,7 +105,7 @@ export async function createAndStartSearchCrawler(
*/
export async function createAndStartContentCrawler(
contentCrawlerOptions: ContentCrawlerOptions,
startCrawler: boolean = true,
startCrawler = true,
) {
const { type: crawlerType, crawlerOptions } = contentCrawlerOptions;

Expand Down Expand Up @@ -139,9 +139,9 @@ async function createPlaywrightContentCrawler(
...crawlerOptions,
keepAlive: crawlerOptions.keepAlive,
requestQueue: await RequestQueue.open(key, { storageClient: client }),
requestHandler: async (context) => {
requestHandler: crawlerOptions.requestHandler ?? (async (context) => {
await requestHandlerPlaywright(context as unknown as PlaywrightCrawlingContext<ContentCrawlerUserData>);
},
}),
failedRequestHandler: ({ request }, err) => failedRequestHandler(request, err, ContentCrawlerTypes.PLAYWRIGHT),
});
}
Expand All @@ -155,9 +155,10 @@ async function createCheerioContentCrawler(
...crawlerOptions,
keepAlive: crawlerOptions.keepAlive,
requestQueue: await RequestQueue.open(key, { storageClient: client }),
requestHandler: async (context) => {
await requestHandlerCheerio(context as unknown as CheerioCrawlingContext<ContentCrawlerUserData>);
},
requestHandler: crawlerOptions.requestHandler ?? (async (context) => {
await requestHandlerCheerio(context as unknown as CheerioCrawlingContext<ContentCrawlerUserData>,
);
}),
failedRequestHandler: ({ request }, err) => failedRequestHandler(request, err, ContentCrawlerTypes.CHEERIO),
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/input.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Actor } from 'apify';
import { BrowserName, CheerioCrawlerOptions, log, ProxyConfiguration } from 'crawlee';
import { BrowserName, type CheerioCrawlerOptions, log, type ProxyConfiguration } from 'crawlee';
import { firefox } from 'playwright';

import { ContentCrawlerTypes, defaults } from './const.js';
Expand Down Expand Up @@ -41,7 +41,7 @@ export async function processInput(originalInput: Partial<Input> | Partial<Stand
*/
async function processInputInternal(
originalInput: Partial<Input> | Partial<StandbyInput>,
standbyInit: boolean = false,
standbyInit = false,
) {
if (originalInput.outputFormats && typeof originalInput.outputFormats === 'string') {
originalInput.outputFormats = originalInput.outputFormats.split(',').map((format) => format.trim()) as OutputFormats[];
Expand Down
63 changes: 12 additions & 51 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { Actor } from 'apify';
import { log } from 'crawlee';
import express, { Request, Response } from 'express';

import { Routes } from './const.js';
import { createAndStartContentCrawler, createAndStartSearchCrawler } from './crawlers.js';
import { processInput, processStandbyInput } from './input.js';
import { RagWebBrowserServer } from './mcp/server.js';
import { addTimeoutToAllResponses } from './responses.js';
import { handleSearchRequest, handleSearchNormalMode } from './search.js';
import { Input } from './types.js';
import { handleSearchNormalMode } from './search.js';
import { createServer } from './server.js';
import type { Input } from './types.js';
import { isActorStandby } from './utils.js';

await Actor.init();
Expand All @@ -18,53 +15,13 @@ Actor.on('migrating', () => {
addTimeoutToAllResponses(60);
});

const app = express();

const mcpServer = new RagWebBrowserServer();
let transport: SSEServerTransport;

const HELP_MESSAGE = `Send a GET request to ${process.env.ACTOR_STANDBY_URL}/search?query=hello+world`
+ ` or to ${process.env.ACTOR_STANDBY_URL}/messages to use Model context protocol.`;

app.get('/', async (req, res) => {
log.info(`Received GET message at: ${req.url}`);
res.status(200).json({ message: `Actor is running in Standby mode. ${HELP_MESSAGE}` });
});

app.get(Routes.SEARCH, async (req: Request, res: Response) => {
log.info(`Received GET message at: ${req.url}`);
await handleSearchRequest(req, res);
});

app.head(Routes.SEARCH, async (req: Request, res: Response) => {
log.info(`Received HEAD message at: ${req.url}`);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end();
});

app.get(Routes.SSE, async (req: Request, res: Response) => {
log.info(`Received GET message at: ${req.url}`);
transport = new SSEServerTransport(Routes.MESSAGE, res);
await mcpServer.connect(transport);
});

app.post(Routes.MESSAGE, async (req: Request, res: Response) => {
log.info(`Received POST message at: ${req.url}`);
await transport.handlePostMessage(req, res);
});

// Catch-all for undefined routes
app.use((req, res) => {
res.status(404).json({ message: `The is nothing at route ${req.method} ${req.originalUrl}. ${HELP_MESSAGE}` });
});

const originalInput = await Actor.getInput<Partial<Input>>() ?? {} as Input;

if (isActorStandby()) {
log.info('Actor is running in the STANDBY mode.');

const host = Actor.isAtHome() ? process.env.ACTOR_STANDBY_URL : 'http://localhost';
const port = Actor.isAtHome() ? process.env.ACTOR_STANDBY_PORT : 3000;
const host = Actor.isAtHome() ? process.env.ACTOR_STANDBY_URL as string : 'http://localhost';
const port = Actor.isAtHome() ? Number(process.env.ACTOR_STANDBY_PORT) : 3000;

const {
input,
Expand All @@ -79,16 +36,20 @@ if (isActorStandby()) {
contentScraperSettings ${JSON.stringify(contentScraperSettings)}
`);

app.listen(port, async () => {
log.info(`The Actor web server is listening for user requests at ${host}:${port}`);
const app = createServer();

const startCrawlers = async () => {
const promises: Promise<unknown>[] = [];
promises.push(createAndStartSearchCrawler(searchCrawlerOptions));
for (const settings of contentCrawlerOptions) {
promises.push(createAndStartContentCrawler(settings));
}

await Promise.all(promises);
};

app.listen(port, async () => {
log.info(`The Actor web server is listening for user requests at ${host}:${port}`);
await startCrawlers();
});
} else {
log.info('Actor is running in the NORMAL mode.');
Expand Down
31 changes: 25 additions & 6 deletions src/request-handler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Actor } from 'apify';
import { load } from 'cheerio';
import { CheerioCrawlingContext, htmlToText, log, PlaywrightCrawlingContext, sleep, Request } from 'crawlee';
import { type CheerioCrawlingContext, htmlToText, log, type PlaywrightCrawlingContext, sleep, type Request } from 'crawlee';

import { ContentCrawlerStatus, ContentCrawlerTypes } from './const.js';
import { addResultToResponse, responseData, sendResponseIfFinished } from './responses.js';
import { Output, ContentCrawlerUserData } from './types.js';
import type { Output, ContentCrawlerUserData, ResultCallbackInput } from './types.js';
import { addTimeMeasureEvent, isActorStandby, transformTimeMeasuresToRelative } from './utils.js';
import { processHtml } from './website-content-crawler/html-processing.js';
import { htmlToMarkdown } from './website-content-crawler/markdown.js';
Expand Down Expand Up @@ -107,11 +107,25 @@ async function checkValidResponse(
return true;
}

async function pushResultToDataset(
input: ResultCallbackInput,
) {
const { result, context, request } = input;
if (!context) throw new Error('Context must be passed to the pushResultToDataset function');
if (!request) throw new Error('Request must be passed to the pushResultToDataset function');

log.info(`Adding result to the Apify dataset, url: ${request.url}`);
await context.pushData(result);
}

async function handleContent(
$: CheerioCrawlingContext['$'],
crawlerType: ContentCrawlerTypes,
statusCode: number | undefined,
context: PlaywrightCrawlingContext<ContentCrawlerUserData> | CheerioCrawlingContext<ContentCrawlerUserData>,
resultCallback: (
input: ResultCallbackInput,
) => Promise<void> = pushResultToDataset,
) {
const { request } = context;
const { responseId, contentScraperSettings: settings } = request.userData;
Expand Down Expand Up @@ -150,8 +164,7 @@ async function handleContent(
if (settings.debugMode) {
result.crawl.debug = { timeMeasures: transformTimeMeasuresToRelative(request.userData.timeMeasures!) };
}
log.info(`Adding result to the Apify dataset, url: ${request.url}`);
await context.pushData(result);
await resultCallback({ result, context, request });

// Get responseId from the request.userData, which corresponds to the original search request
if (responseId) {
Expand All @@ -162,6 +175,9 @@ async function handleContent(

export async function requestHandlerPlaywright(
context: PlaywrightCrawlingContext<ContentCrawlerUserData>,
resultCallback: (
input: ResultCallbackInput,
) => Promise<void> = pushResultToDataset,
) {
const { request, response, page, closeCookieModals } = context;
const { contentScraperSettings: settings, responseId } = request.userData;
Expand Down Expand Up @@ -191,11 +207,14 @@ export async function requestHandlerPlaywright(

const statusCode = response?.status();

await handleContent($, ContentCrawlerTypes.PLAYWRIGHT, statusCode, context);
await handleContent($, ContentCrawlerTypes.PLAYWRIGHT, statusCode, context, resultCallback);
}

export async function requestHandlerCheerio(
context: CheerioCrawlingContext<ContentCrawlerUserData>,
resultCallback: (
input: ResultCallbackInput,
) => Promise<void> = pushResultToDataset,
) {
const { $, request, response } = context;
const { responseId } = request.userData;
Expand All @@ -210,7 +229,7 @@ export async function requestHandlerCheerio(

const statusCode = response?.statusCode;

await handleContent($, ContentCrawlerTypes.CHEERIO, statusCode, context);
await handleContent($, ContentCrawlerTypes.CHEERIO, statusCode, context, resultCallback);
}

export async function failedRequestHandler(request: Request, err: Error, crawlerType: ContentCrawlerTypes) {
Expand Down
4 changes: 2 additions & 2 deletions src/responses.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { log } from 'apify';
import { RequestOptions } from 'crawlee';
import type { RequestOptions } from 'crawlee';

import { ContentCrawlerStatus } from './const.js';
import { Output, ContentCrawlerUserData } from './types.js';
import type { Output, ContentCrawlerUserData } from './types.js';

type ResponseData = {
resultsMap: Map<string, Output>;
Expand Down
6 changes: 3 additions & 3 deletions src/search.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { CheerioCrawlerOptions, log } from 'crawlee';
import { IncomingMessage, ServerResponse } from 'http';
import { type CheerioCrawlerOptions, log } from 'crawlee';
import type { IncomingMessage, ServerResponse } from 'node:http';

import { PLAYWRIGHT_REQUEST_TIMEOUT_NORMAL_MODE_SECS, Routes } from './const.js';
import { addContentCrawlRequest, addSearchRequest, createAndStartSearchCrawler, createAndStartContentCrawler } from './crawlers.js';
import { UserInputError } from './errors.js';
import { processInput } from './input.js';
import { createResponsePromise } from './responses.js';
import { Input, Output, ContentScraperSettings, ContentCrawlerOptions } from './types.js';
import type { Input, Output, ContentScraperSettings, ContentCrawlerOptions } from './types.js';
import {
addTimeMeasureEvent,
createRequest,
Expand Down
50 changes: 50 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { log } from 'crawlee';
import express, { type Request, type Response } from 'express';

import { Routes } from './const.js';
import { RagWebBrowserServer } from './mcp/server.js';
import { handleSearchRequest } from './search.js';

export function createServer(): express.Express {
const app = express();
const mcpServer = new RagWebBrowserServer();
let transport: SSEServerTransport;

const HELP_MESSAGE = `Send a GET request to ${process.env.ACTOR_STANDBY_URL}/search?query=hello+world`
+ ` or to ${process.env.ACTOR_STANDBY_URL}/messages to use Model context protocol.`;

app.get('/', async (req, res) => {
log.info(`Received GET message at: ${req.url}`);
res.status(200).json({ message: `Actor is running in Standby mode. ${HELP_MESSAGE}` });
});

app.get(Routes.SEARCH, async (req: Request, res: Response) => {
log.info(`Received GET message at: ${req.url}`);
await handleSearchRequest(req, res);
});

app.head(Routes.SEARCH, async (req: Request, res: Response) => {
log.info(`Received HEAD message at: ${req.url}`);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end();
});

app.get(Routes.SSE, async (req: Request, res: Response) => {
log.info(`Received GET message at: ${req.url}`);
transport = new SSEServerTransport(Routes.MESSAGE, res);
await mcpServer.connect(transport);
});

app.post(Routes.MESSAGE, async (req: Request, res: Response) => {
log.info(`Received POST message at: ${req.url}`);
await transport.handlePostMessage(req, res);
});

// Catch-all for undefined routes
app.use((req, res) => {
res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${HELP_MESSAGE}` });
});

return app;
}
Loading
Loading