Skip to content

Commit 5a4d2a1

Browse files
authored
feat: add worker (#4)
* chore: add blocked user and allowed user * chore: refine task * chore: add worker * chore: add worker
1 parent a7395f5 commit 5a4d2a1

File tree

10 files changed

+96
-23
lines changed

10 files changed

+96
-23
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
)
1313

1414
require (
15+
github.com/alitto/pond/v2 v2.5.0 // indirect
1516
github.com/benbjohnson/clock v1.3.5 // indirect
1617
github.com/bytedance/gopkg v0.1.3 // indirect
1718
github.com/bytedance/sonic v1.14.1 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/alitto/pond/v2 v2.5.0 h1:vPzS5GnvSDRhWQidmj2djHllOmjFExVFbDGCw1jdqDw=
2+
github.com/alitto/pond/v2 v2.5.0/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE=
13
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
24
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
35
github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=

internal/adapters/llm/openai/openai.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package openai
33
import (
44
"context"
55
"log/slog"
6-
"time"
76

87
"github.com/nduyhai/valjean/internal/app/entities"
98
goopenai "github.com/openai/openai-go/v3"
@@ -19,27 +18,32 @@ func NewClient(ai goopenai.Client, logger *slog.Logger) *Client {
1918
}
2019

2120
func (c *Client) Evaluate(ctx context.Context, in entities.EvalInput) (entities.EvalOutput, error) {
22-
ctxTimeout, cancelFunc := context.WithTimeout(ctx, 10*time.Second)
23-
defer cancelFunc()
2421

25-
var messages []goopenai.ChatCompletionMessageParamUnion
26-
27-
for _, contextText := range in.ContextSnips {
28-
if contextText != "" {
29-
messages = append(messages, goopenai.UserMessage(contextText))
22+
// Combine context snippets with the main message
23+
var fullMessage string
24+
if len(in.ContextSnips) > 0 {
25+
fullMessage = "Context:\n"
26+
for _, contextText := range in.ContextSnips {
27+
if contextText != "" {
28+
fullMessage += contextText + "\n"
29+
}
3030
}
31+
fullMessage += "\nUser message:\n" + in.Text
32+
} else {
33+
fullMessage = in.Text
3134
}
3235

33-
messages = append(messages, goopenai.UserMessage(in.Text))
36+
messages := []goopenai.ChatCompletionMessageParamUnion{
37+
goopenai.UserMessage(fullMessage),
38+
}
3439

35-
chatCompletion, err := c.ai.Chat.Completions.New(ctxTimeout, goopenai.ChatCompletionNewParams{
40+
chatCompletion, err := c.ai.Chat.Completions.New(ctx, goopenai.ChatCompletionNewParams{
3641
Messages: messages,
3742
Model: goopenai.ChatModelGPT4o,
3843
})
39-
msg := "Please try again"
4044

45+
msg := "Please try again"
4146
if err != nil {
42-
c.logger.Error("failed to evaluate", slog.Any("error", err))
4347
return entities.EvalOutput{
4448
Summary: msg,
4549
}, err

internal/adapters/worker/memory.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package worker
2+
3+
import "github.com/alitto/pond/v2"
4+
5+
type Memory struct {
6+
pool pond.Pool
7+
}
8+
9+
func NewMemory() *Memory {
10+
pool := pond.NewPool(100)
11+
return &Memory{pool: pool}
12+
}
13+
14+
func (m *Memory) Submit(f func()) {
15+
m.pool.Submit(f)
16+
}
17+
18+
func (m *Memory) Shutdown() {
19+
m.pool.StopAndWait()
20+
}

internal/app/service/moderation.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package service
22

33
import (
44
"context"
5+
"slices"
56
"strings"
67

78
"github.com/nduyhai/valjean/internal/app/entities"
89
"github.com/nduyhai/valjean/internal/infra/config"
910
)
1011

12+
const PrivateChat = "private"
13+
1114
type Moderation interface {
1215
Allowed(ctx context.Context, in entities.EvalInput) bool // ok, reason
1316
}
@@ -22,6 +25,15 @@ func NewModeration(config config.Config) Moderation {
2225

2326
func (m *moderation) Allowed(ctx context.Context, in entities.EvalInput) bool {
2427
text := strings.TrimSpace(in.Text)
28+
29+
if len(m.telegram.BlockedUsers) > 0 && slices.Contains(m.telegram.BlockedUsers, in.UserHandle) {
30+
return false
31+
}
32+
33+
if len(m.telegram.AllowedUsers) > 0 && !slices.Contains(m.telegram.AllowedUsers, in.UserHandle) {
34+
return false
35+
}
36+
2537
if m.telegram.Prefix != "" && strings.HasPrefix(text, m.telegram.Prefix) {
2638
return true
2739
}
@@ -34,7 +46,7 @@ func (m *moderation) Allowed(ctx context.Context, in entities.EvalInput) bool {
3446
return true
3547
}
3648

37-
if in.ChatType == "private" {
49+
if in.ChatType == PrivateChat {
3850
return true
3951
}
4052

internal/app/usecase/evaluate.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,23 @@ type EvaluateUseCase struct {
1818
moderation service.Moderation
1919
rateLimiter ports.RateLimiter
2020
eventProducer ports.EventProducer
21+
worker ports.Worker
2122
telegram config.Telegram
2223
cooldown time.Duration
2324
logger *slog.Logger
2425
}
2526

26-
func NewEvaluateUseCase(evaluator ports.Evaluator, moderation service.Moderation, rateLimiter ports.RateLimiter, eventProducer ports.EventProducer, config config.Config, logger *slog.Logger) *EvaluateUseCase {
27-
return &EvaluateUseCase{evaluator: evaluator, moderation: moderation, rateLimiter: rateLimiter, eventProducer: eventProducer, telegram: config.Telegram, cooldown: 2 * time.Second, logger: logger}
27+
func NewEvaluateUseCase(evaluator ports.Evaluator, moderation service.Moderation, rateLimiter ports.RateLimiter, eventProducer ports.EventProducer, worker ports.Worker, config config.Config, logger *slog.Logger) *EvaluateUseCase {
28+
return &EvaluateUseCase{
29+
evaluator: evaluator,
30+
moderation: moderation,
31+
rateLimiter: rateLimiter,
32+
eventProducer: eventProducer,
33+
worker: worker,
34+
telegram: config.Telegram,
35+
cooldown: 60 * time.Second,
36+
logger: logger,
37+
}
2838

2939
}
3040

@@ -38,11 +48,23 @@ func (e *EvaluateUseCase) Handle(ctx context.Context, in entities.EvalInput) err
3848

3949
return errors.New("cooling down—try again in a moment")
4050
}
51+
52+
e.worker.Submit(func() {
53+
e.process(in)
54+
})
55+
56+
return nil
57+
}
58+
59+
func (e *EvaluateUseCase) process(in entities.EvalInput) {
60+
ctx, cancelFunc := context.WithTimeout(context.Background(), e.cooldown)
61+
defer cancelFunc()
62+
4163
// moderation
4264
allowed := e.moderation.Allowed(ctx, in)
4365
if !allowed {
4466
e.logger.Warn("message skipped")
45-
return errors.New("message skipped")
67+
return
4668
}
4769
out, err := e.evaluator.Evaluate(ctx, in)
4870
if err != nil || out.Summary == "" {
@@ -51,12 +73,10 @@ func (e *EvaluateUseCase) Handle(ctx context.Context, in entities.EvalInput) err
5173

5274
e.sendMsg(ctx, in, "i couldn’t evaluate that right now")
5375

54-
return errors.New("i couldn’t evaluate that right now")
76+
return
5577
}
5678

5779
e.sendMsg(ctx, in, out.Summary)
58-
59-
return nil
6080
}
6181

6282
func (e *EvaluateUseCase) sendMsg(ctx context.Context, in entities.EvalInput, replyMsg string) {

internal/infra/config/config.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ type HTTPConfig struct {
1313
}
1414

1515
type Telegram struct {
16-
Prefix string `env:"TELEGRAM_PREFIX" envDefault:"!eval"` // "!eval "
17-
BotUsername string `env:"TELEGRAM_BOT_USERNAME" envDefault:"valjean"`
18-
Token string `env:"TELEGRAM_BOT_TOKEN"`
19-
WebhookSecret string `env:"TELEGRAM_WEBHOOK_SECRET"`
16+
Prefix string `env:"TELEGRAM_PREFIX" envDefault:"!eval"` // "!eval "
17+
BotUsername string `env:"TELEGRAM_BOT_USERNAME" envDefault:"valjean"`
18+
Token string `env:"TELEGRAM_BOT_TOKEN"`
19+
WebhookSecret string `env:"TELEGRAM_WEBHOOK_SECRET"`
20+
BlockedUsers []string `env:"TELEGRAM_BLOCKED_USERS"`
21+
AllowedUsers []string `env:"TELEGRAM_ALLOWED_USERS"`
2022
}
2123

2224
type OpenAI struct {

internal/infra/fxmodules/modules.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/nduyhai/valjean/internal/adapters/limiter"
99
"github.com/nduyhai/valjean/internal/adapters/llm/openai"
1010
"github.com/nduyhai/valjean/internal/adapters/producer"
11+
"github.com/nduyhai/valjean/internal/adapters/worker"
1112
"github.com/nduyhai/valjean/internal/app/service"
1213
"github.com/nduyhai/valjean/internal/app/usecase"
1314
"github.com/nduyhai/valjean/internal/infra/config"
@@ -43,6 +44,7 @@ var LimiterModule = fx.Module("adapters",
4344
fx.Annotate(openai.NewClient, fx.As(new(ports.Evaluator))),
4445
NewTelegramSdk,
4546
fx.Annotate(producer.NewTelegram, fx.As(new(ports.EventProducer))),
47+
fx.Annotate(worker.NewMemory, fx.As(new(ports.Worker))),
4648
),
4749
)
4850

internal/infra/fxmodules/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/nduyhai/valjean/internal/adapters/http"
99
"github.com/nduyhai/valjean/internal/infra/config"
1010
"github.com/nduyhai/valjean/internal/infra/httpserver"
11+
"github.com/nduyhai/valjean/internal/ports"
1112
"go.uber.org/fx"
1213
)
1314

@@ -58,6 +59,7 @@ var ServerModule = fx.Options(
5859
type ServerParams struct {
5960
fx.In
6061
HTTPServer *httpserver.Server
62+
Worker ports.Worker
6163
}
6264

6365
func ServerLifecycle(lc fx.Lifecycle, servers ServerParams) {
@@ -79,6 +81,8 @@ func ServerLifecycle(lc fx.Lifecycle, servers ServerParams) {
7981
// Log error
8082
}
8183

84+
servers.Worker.Shutdown()
85+
8286
return nil
8387
},
8488
})

internal/ports/worker.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package ports
2+
3+
type Worker interface {
4+
Submit(func())
5+
Shutdown()
6+
}

0 commit comments

Comments
 (0)