Skip to content

Commit 8925b0f

Browse files
authored
feat(worker): extract worker back to its own process (#987)
Because - Now that client and worker are decoupled, we can use a separate process to run the Temporal workers, which will allow us to scale them independently. This commit - Moves back the Temporal worker logic to `cmd/worker`. - Uses worker sessions to guarantee that activities share the workflow memory. - Makes sure the worfklow memory (in-memory and in MinIO) is cleaned up.
1 parent fc36c6c commit 8925b0f

File tree

10 files changed

+822
-607
lines changed

10 files changed

+822
-607
lines changed

cmd/main/main.go

+7-163
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,16 @@ import (
99
"os"
1010
"os/signal"
1111
"regexp"
12-
"strconv"
1312
"strings"
1413
"syscall"
1514
"time"
1615

17-
"github.com/gofrs/uuid"
1816
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
1917
"github.com/redis/go-redis/v9"
2018
"go.opentelemetry.io/contrib/propagators/b3"
2119
"go.opentelemetry.io/otel"
2220
"go.opentelemetry.io/otel/propagation"
23-
"go.temporal.io/api/workflowservice/v1"
2421
"go.temporal.io/sdk/client"
25-
"go.temporal.io/sdk/worker"
2622
"go.uber.org/zap"
2723
"golang.org/x/net/http2"
2824
"golang.org/x/net/http2/h2c"
@@ -31,7 +27,6 @@ import (
3127
"google.golang.org/grpc/credentials/insecure"
3228
"google.golang.org/grpc/reflection"
3329
"google.golang.org/protobuf/encoding/protojson"
34-
"google.golang.org/protobuf/types/known/durationpb"
3530

3631
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
3732
grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
@@ -57,7 +52,6 @@ import (
5752
componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"
5853
database "github.com/instill-ai/pipeline-backend/pkg/db"
5954
customotel "github.com/instill-ai/pipeline-backend/pkg/logger/otel"
60-
pipelineworker "github.com/instill-ai/pipeline-backend/pkg/worker"
6155
pb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta"
6256
)
6357

@@ -87,32 +81,6 @@ func grpcHandlerFunc(grpcServer *grpc.Server, gwHandler http.Handler) http.Handl
8781
)
8882
}
8983

90-
// InitPipelinePublicServiceClient initialises a PipelineServiceClient instance
91-
func InitPipelinePublicServiceClient(ctx context.Context) (pb.PipelinePublicServiceClient, *grpc.ClientConn) {
92-
logger, _ := logger.GetZapLogger(ctx)
93-
94-
var clientDialOpts grpc.DialOption
95-
var creds credentials.TransportCredentials
96-
var err error
97-
if config.Config.Server.HTTPS.Cert != "" && config.Config.Server.HTTPS.Key != "" {
98-
creds, err = credentials.NewServerTLSFromFile(config.Config.Server.HTTPS.Cert, config.Config.Server.HTTPS.Key)
99-
if err != nil {
100-
logger.Fatal(err.Error())
101-
}
102-
clientDialOpts = grpc.WithTransportCredentials(creds)
103-
} else {
104-
clientDialOpts = grpc.WithTransportCredentials(insecure.NewCredentials())
105-
}
106-
107-
clientConn, err := grpc.NewClient(fmt.Sprintf(":%v", config.Config.Server.PublicPort), clientDialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(constant.MaxPayloadSize), grpc.MaxCallSendMsgSize(constant.MaxPayloadSize)))
108-
if err != nil {
109-
logger.Error(err.Error())
110-
return nil, nil
111-
}
112-
113-
return pb.NewPipelinePublicServiceClient(clientConn), clientConn
114-
}
115-
11684
func main() {
11785
if err := config.Init(config.ParseConfigFlag()); err != nil {
11886
log.Fatal(err.Error())
@@ -121,14 +89,15 @@ func main() {
12189
// setup tracing and metrics
12290
ctx, cancel := context.WithCancel(context.Background())
12391

124-
if tp, err := customotel.SetupTracing(ctx, "pipeline-backend"); err != nil {
92+
tp, err := customotel.SetupTracing(ctx, "pipeline-backend")
93+
if err != nil {
12594
panic(err)
126-
} else {
127-
defer func() {
128-
err = tp.Shutdown(ctx)
129-
}()
13095
}
13196

97+
defer func() {
98+
err = tp.Shutdown(ctx)
99+
}()
100+
132101
ctx, span := otel.Tracer("main-tracer").Start(ctx,
133102
"main",
134103
)
@@ -147,7 +116,6 @@ func main() {
147116
defer database.Close(db)
148117

149118
var temporalClientOptions client.Options
150-
var err error
151119
if config.Config.Temporal.Ca != "" && config.Config.Temporal.Cert != "" && config.Config.Temporal.Key != "" {
152120
if temporalClientOptions, err = temporal.GetTLSClientOption(
153121
config.Config.Temporal.HostPort,
@@ -244,7 +212,7 @@ func main() {
244212
grpcServerOpts = append(grpcServerOpts, grpc.MaxRecvMsgSize(constant.MaxPayloadSize))
245213
grpcServerOpts = append(grpcServerOpts, grpc.MaxSendMsgSize(constant.MaxPayloadSize))
246214

247-
pipelinePublicServiceClient, pipelinePublicServiceClientConn := InitPipelinePublicServiceClient(ctx)
215+
pipelinePublicServiceClient, pipelinePublicServiceClientConn := external.InitPipelinePublicServiceClient(ctx)
248216
if pipelinePublicServiceClientConn != nil {
249217
defer pipelinePublicServiceClientConn.Close()
250218
}
@@ -296,7 +264,6 @@ func main() {
296264
BinaryFetcher: binaryFetcher,
297265
TemporalClient: temporalClient,
298266
})
299-
workerUID, _ := uuid.NewV4()
300267

301268
pubsub := pubsub.NewRedisPubSub(redisClient)
302269
ms := memory.NewStore(pubsub, minIOClient.WithLogger(logger))
@@ -320,7 +287,6 @@ func main() {
320287
minIOClient,
321288
compStore,
322289
ms,
323-
workerUID,
324290
retentionHandler,
325291
binaryFetcher,
326292
artifactPublicServiceClient,
@@ -468,81 +434,6 @@ func main() {
468434
span.End()
469435
logger.Info("gRPC server is running.")
470436

471-
// for only local temporal cluster
472-
if config.Config.Temporal.Ca == "" && config.Config.Temporal.Cert == "" && config.Config.Temporal.Key == "" {
473-
initTemporalNamespace(ctx, temporalClient)
474-
}
475-
476-
timeseries := repository.MustNewInfluxDB(ctx)
477-
defer timeseries.Close()
478-
479-
cw := pipelineworker.NewWorker(
480-
pipelineworker.WorkerConfig{
481-
Repository: repo,
482-
RedisClient: redisClient,
483-
InfluxDBWriteClient: timeseries.WriteAPI(),
484-
Component: compStore,
485-
MinioClient: minIOClient,
486-
MemoryStore: ms,
487-
WorkerUID: workerUID,
488-
ArtifactPublicServiceClient: artifactPublicServiceClient,
489-
ArtifactPrivateServiceClient: artifactPrivateServiceClient,
490-
BinaryFetcher: binaryFetcher,
491-
PipelinePublicServiceClient: pipelinePublicServiceClient,
492-
},
493-
)
494-
495-
w := worker.New(temporalClient, pipelineworker.TaskQueue, worker.Options{
496-
WorkflowPanicPolicy: worker.BlockWorkflow,
497-
WorkerStopTimeout: gracefulShutdownTimeout,
498-
MaxConcurrentWorkflowTaskExecutionSize: 100,
499-
})
500-
lw := worker.New(temporalClient, workerUID.String(), worker.Options{
501-
WorkflowPanicPolicy: worker.BlockWorkflow,
502-
WorkerStopTimeout: gracefulShutdownTimeout,
503-
MaxConcurrentActivityExecutionSize: 100,
504-
})
505-
mw := worker.New(temporalClient, fmt.Sprintf("%s-minio", workerUID.String()), worker.Options{
506-
WorkflowPanicPolicy: worker.BlockWorkflow,
507-
WorkerStopTimeout: gracefulShutdownTimeout,
508-
MaxConcurrentActivityExecutionSize: 50,
509-
})
510-
511-
w.RegisterWorkflow(cw.TriggerPipelineWorkflow)
512-
w.RegisterWorkflow(cw.SchedulePipelineWorkflow)
513-
514-
lw.RegisterActivity(cw.LoadWorkflowMemory)
515-
lw.RegisterActivity(cw.ComponentActivity)
516-
lw.RegisterActivity(cw.OutputActivity)
517-
lw.RegisterActivity(cw.PreIteratorActivity)
518-
lw.RegisterActivity(cw.PostIteratorActivity)
519-
lw.RegisterActivity(cw.InitComponentsActivity)
520-
lw.RegisterActivity(cw.SendStartedEventActivity)
521-
lw.RegisterActivity(cw.PostTriggerActivity)
522-
lw.RegisterActivity(cw.ClosePipelineActivity)
523-
lw.RegisterActivity(cw.IncreasePipelineTriggerCountActivity)
524-
lw.RegisterActivity(cw.UpdatePipelineRunActivity)
525-
lw.RegisterActivity(cw.UpsertComponentRunActivity)
526-
527-
mw.RegisterActivity(cw.UploadOutputsToMinIOActivity)
528-
mw.RegisterActivity(cw.UploadRecipeToMinIOActivity)
529-
mw.RegisterActivity(cw.UploadComponentInputsActivity)
530-
mw.RegisterActivity(cw.UploadComponentOutputsActivity)
531-
532-
err = w.Start()
533-
if err != nil {
534-
logger.Fatal(fmt.Sprintf("Unable to start worker: %s", err))
535-
}
536-
err = lw.Start()
537-
if err != nil {
538-
logger.Fatal(fmt.Sprintf("Unable to start local worker: %s", err))
539-
}
540-
err = mw.Start()
541-
if err != nil {
542-
logger.Fatal(fmt.Sprintf("Unable to start minio worker: %s", err))
543-
}
544-
logger.Info("worker is running.")
545-
546437
// kill (no param) default send syscall.SIGTERM
547438
// kill -2 is syscall.SIGINT
548439
// kill -9 is syscall.SIGKILL but can't be catch, so don't need add it
@@ -571,11 +462,6 @@ func main() {
571462
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout)
572463
defer shutdownCancel()
573464

574-
logger.Info("Shutting down worker...")
575-
w.Stop()
576-
lw.Stop()
577-
mw.Stop()
578-
579465
logger.Info("Shutting down HTTP server...")
580466
_ = privateHTTPServer.Shutdown(shutdownCtx)
581467
_ = publicHTTPServer.Shutdown(shutdownCtx)
@@ -585,45 +471,3 @@ func main() {
585471

586472
}
587473
}
588-
589-
func initTemporalNamespace(ctx context.Context, client client.Client) {
590-
logger, _ := logger.GetZapLogger(ctx)
591-
592-
resp, err := client.WorkflowService().ListNamespaces(ctx, &workflowservice.ListNamespacesRequest{})
593-
if err != nil {
594-
logger.Fatal(fmt.Sprintf("Unable to list namespaces: %s", err))
595-
}
596-
597-
found := false
598-
for _, n := range resp.GetNamespaces() {
599-
if n.NamespaceInfo.Name == config.Config.Temporal.Namespace {
600-
found = true
601-
}
602-
}
603-
604-
if !found {
605-
if _, err := client.WorkflowService().RegisterNamespace(ctx,
606-
&workflowservice.RegisterNamespaceRequest{
607-
Namespace: config.Config.Temporal.Namespace,
608-
WorkflowExecutionRetentionPeriod: func() *durationpb.Duration {
609-
// Check if the string ends with "d" for day.
610-
s := config.Config.Temporal.Retention
611-
if strings.HasSuffix(s, "d") {
612-
// Parse the number of days.
613-
days, err := strconv.Atoi(s[:len(s)-1])
614-
if err != nil {
615-
logger.Fatal(fmt.Sprintf("Unable to parse retention period in day: %s", err))
616-
}
617-
// Convert days to hours and then to a duration.
618-
t := time.Hour * 24 * time.Duration(days)
619-
return durationpb.New(t)
620-
}
621-
logger.Fatal(fmt.Sprintf("Unable to parse retention period in day: %s", err))
622-
return nil
623-
}(),
624-
},
625-
); err != nil {
626-
logger.Fatal(fmt.Sprintf("Unable to register namespace: %s", err))
627-
}
628-
}
629-
}

0 commit comments

Comments
 (0)