From 8532294205be3f9d83f0d6859b50e925fdaf1ba1 Mon Sep 17 00:00:00 2001 From: ogzhanolguncu Date: Fri, 3 May 2024 18:04:16 +0300 Subject: [PATCH] feat: allow ratelimitting --- bun.lockb | Bin 218654 -> 218654 bytes index.ts | 1 + package.json | 4 +-- src/clients/redis/index.test.ts | 3 +- src/clients/redis/index.ts | 3 +- src/clients/vector/index.test.ts | 3 +- src/clients/vector/index.ts | 3 +- src/config.test.ts | 3 +- src/config.ts | 24 ++++---------- src/constants.ts | 10 ++++++ src/error/index.ts | 1 + src/error/internal.ts | 2 +- src/error/model.ts | 2 +- src/error/ratelimit.ts | 14 ++++++++ src/rag-chat.ts | 54 +++++++++++++++++++------------ src/services/history.ts | 2 +- src/services/ratelimit.ts | 22 +++++++++++++ src/types.ts | 29 +++++++++++++++++ src/utils.ts | 10 ++++++ 19 files changed, 140 insertions(+), 50 deletions(-) create mode 100644 src/constants.ts create mode 100644 src/error/index.ts create mode 100644 src/error/ratelimit.ts create mode 100644 src/services/ratelimit.ts diff --git a/bun.lockb b/bun.lockb index 98a9e9d50b1b2c42f804e66feb348179a8ee05f6..d0edba9b5d6cf3e4f34e47ed854e9344b01b50dc 100755 GIT binary patch delta 142 zcmV;90CE4GtPP&54UjG%kJ;7eb=y-K$X;48zP*MeJ~$mj9jPuVyiJrsB8;P9u}-Qp z0UERUGqklp#QVh^NUuGTa*OuV)KZZ>S~s#j>9X`)I6GF79HjwT`esznXu&0%hj#rh wsJUv05|!ItP~xpP9d9G&Fqft!gYD_J?dbuuzX3M4qNxHd69G22(YyjpnX69*O{^8D> { + const options_ = appendDefaultsIfNeeded(options); + const { success, resetTime } = await this.ratelimitService.checkLimit( + options_.ratelimitSessionId + ); + + if (!success) { + throw new RatelimitUpstashError("Couldn't process chat due to ratelimit.", { + error: "ERR:USER_RATELIMITED", + resetTime: resetTime, + }); + } + const { question, facts } = await this.prepareChat(input, options.similarityThreshold); return options.stream - ? this.streamingChainCall(question, facts, options) - : this.chainCall(options, question, facts); + ? this.streamingChainCall(options_, question, facts) + : this.chainCall(options_, question, facts); } private streamingChainCall = ( + chatOptions: ChatOptions, question: string, - facts: string, - chatOptions: ChatOptions + facts: string ): StreamingTextResponse => { const { stream, handlers } = LangChainStream(); void this.chainCall(chatOptions, question, facts, [handlers]); @@ -75,7 +84,7 @@ export class RAGChat { }; private chainCall( - chatOptions: { sessionId: string; includeHistory?: number }, + chatOptions: ChatOptions, question: string, facts: string, handlers?: Callbacks @@ -113,7 +122,9 @@ export class RAGChat { ); } - static async initialize(config: Config): Promise { + static async initialize( + config: RAGChatConfig & { email: string; token: string } + ): Promise { const clientFactory = new ClientFactory( new Config(config.email, config.token, { redis: config.redis, @@ -125,12 +136,13 @@ export class RAGChat { const historyService = new HistoryService(redis); const retrievalService = new RetrievalService(index); + const ratelimitService = new RateLimitService(config.ratelimit); if (!config.model) { throw new UpstashModelError("Model can not be undefined!"); } - return new RAGChat(retrievalService, historyService, { + return new RAGChat(retrievalService, historyService, ratelimitService, { model: config.model, template: config.template ?? QA_TEMPLATE, }); diff --git a/src/services/history.ts b/src/services/history.ts index c541591..d876eab 100644 --- a/src/services/history.ts +++ b/src/services/history.ts @@ -1,8 +1,8 @@ import type { Redis } from "@upstash/sdk"; import { CustomUpstashRedisChatMessageHistory } from "./redis-custom-history"; -import type { RAGChatConfig } from "../config"; import { Config } from "../config"; import { ClientFactory } from "../client-factory"; +import type { RAGChatConfig } from "../types"; const DAY_IN_SECONDS = 86_400; const TOP_6 = 5; diff --git a/src/services/ratelimit.ts b/src/services/ratelimit.ts new file mode 100644 index 0000000..ca131b3 --- /dev/null +++ b/src/services/ratelimit.ts @@ -0,0 +1,22 @@ +import type { Ratelimit } from "@upstash/sdk"; + +export class RateLimitService { + private ratelimit?: Ratelimit; + + constructor(ratelimit?: Ratelimit) { + this.ratelimit = ratelimit; + } + + async checkLimit(sessionId: string): Promise<{ success: boolean; resetTime?: number }> { + if (!this.ratelimit) { + // If no ratelimit object is provided, always allow the operation. + return { success: true }; + } + + const result = await this.ratelimit.limit(sessionId); + if (!result.success) { + return { success: false, resetTime: result.reset }; + } + return { success: true }; + } +} diff --git a/src/types.ts b/src/types.ts index c0e75f8..bc4a451 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1 +1,30 @@ +import type { BaseLanguageModelInterface } from "@langchain/core/language_models/base"; +import type { PromptTemplate } from "@langchain/core/prompts"; +import type { Index, Ratelimit, Redis } from "@upstash/sdk"; + export type PreferredRegions = "eu-west-1" | "us-east-1"; + +export type ChatOptions = { + stream: boolean; + sessionId?: string; + includeHistory?: number; + similarityThreshold?: number; + ratelimitSessionId?: string; +}; + +export type PrepareChatResult = { + question: string; + facts: string; +}; + +type RAGChatConfigCommon = { + model?: BaseLanguageModelInterface; + template?: PromptTemplate; + region?: PreferredRegions; + ratelimit?: Ratelimit; +}; + +export type RAGChatConfig = { + vector?: string | Index; + redis?: string | Redis; +} & RAGChatConfigCommon; diff --git a/src/utils.ts b/src/utils.ts index 3c5ac68..2a3a624 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,6 @@ import type { BaseMessage } from "@langchain/core/messages"; +import type { ChatOptions } from "./types"; +import { DEFAULT_CHAT_SESSION_ID, DEFAULT_CHAT_RATELIMIT_SESSION_ID } from "./constants"; export const sanitizeQuestion = (question: string) => { return question.trim().replaceAll("\n", " "); @@ -17,3 +19,11 @@ export const formatChatHistory = (chatHistory: BaseMessage[]) => { return formatFacts(formattedDialogueTurns); }; + +export function appendDefaultsIfNeeded(options: ChatOptions) { + return { + ...options, + sessionId: options.sessionId ?? DEFAULT_CHAT_SESSION_ID, + ratelimitSessionId: options.ratelimitSessionId ?? DEFAULT_CHAT_RATELIMIT_SESSION_ID, + } satisfies ChatOptions; +}