diff --git a/.golangci.yml b/.golangci.yml index 0fedd0d8..3a28cc38 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -133,8 +133,6 @@ linters: - nilerr - nilnil - # Forces newlines in some places. - - nlreturn # Finds sending HTTP request without context.Context. - noctx @@ -226,6 +224,7 @@ linters: # I'm fine to check the error from json.Marshal - errchkjson + # All SQL queries MUST BE covered with tests. - execinquery diff --git a/Makefile b/Makefile index 5b1f0289..c69d7dbf 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ // TODO: add commands for build and run in dev/produciton mode -ROOT=$(realpath $(dir $(lastword $(MAKEFILE_LIST)))) +export ROOT=$(realpath $(dir $(lastword $(MAKEFILE_LIST)))) OS := $(shell uname -s) @@ -35,6 +35,7 @@ ifneq (,$(findstring NT,$(OS))) protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/internalevent/internalevent.proto protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/project/project.proto protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/source/source.proto + protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/destination/destination.proto protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user.proto protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user_info.proto else diff --git a/adapter/scylladb/scyllainitialize/init.go b/adapter/scylladb/scyllainitialize/init.go index 64577e19..bd5e2f79 100644 --- a/adapter/scylladb/scyllainitialize/init.go +++ b/adapter/scylladb/scyllainitialize/init.go @@ -36,6 +36,7 @@ func CreateKeySpace(consistency gocql.Consistency, keyspace string, hosts ...str func RunMigrations(dbConn *ScyllaDBConnection, dir string) error { logger.L().Debug("running migrations...") for _, host := range dbConn.hosts { + logger.L().Info(dir) migration := New(dir, host, dbConn.keyspace) err := migration.Run() if err != nil { diff --git a/adapter/scylladb/sessionx.go b/adapter/scylladb/sessionx.go index 74c8e03e..ed9dd7a9 100644 --- a/adapter/scylladb/sessionx.go +++ b/adapter/scylladb/sessionx.go @@ -17,6 +17,8 @@ type SessionxInterface interface { ExecStmt(stmt string) error AwaitSchemaAgreement(ctx context.Context) error Close() + NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch + ExecuteBatch(batch *gocql.Batch) error } type Session struct { @@ -43,6 +45,16 @@ func (s *Session) Close() { s.S.Close() } +func (s *Session) NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch { + batch := s.S.NewBatch(batchType) + + return batch.WithContext(ctx) +} + +func (s *Session) ExecuteBatch(batch *gocql.Batch) error { + return s.S.ExecuteBatch(batch) +} + func NewSession(session *gocql.Session) SessionxInterface { gocqlxSession := gocqlx.NewSession(session) diff --git a/cmd/source/faker/undeliveredevents_publisher/main.go b/cmd/source/faker/undeliveredevents_publisher/main.go new file mode 100644 index 00000000..b3186c34 --- /dev/null +++ b/cmd/source/faker/undeliveredevents_publisher/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "sync" + + "github.com/ormushq/ormus/adapter/otela" + "github.com/ormushq/ormus/config" + "github.com/ormushq/ormus/contract/go/destination" + "github.com/ormushq/ormus/pkg/channel" + "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmqchannel" + "github.com/ormushq/ormus/pkg/encoder" +) + +func main() { + cfg := config.C() + done := make(chan bool) + wg := &sync.WaitGroup{} + bufferSize := cfg.Source.BufferSize + maxRetryPolicy := cfg.Source.MaxRetry + testCount := 1 + + err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole}) + if err != nil { + panic(err.Error()) + } + inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq) + err = inputAdapter.NewChannel(cfg.Source.UndeliveredEventsQueueName, channel.InputOnlyMode, bufferSize, maxRetryPolicy) + if err != nil { + panic(err.Error()) + } + inputChannel, err := inputAdapter.GetInputChannel(cfg.Source.UndeliveredEventsQueueName) + if err != nil { + panic(err.Error()) + } + + wg.Add(1) + go func() { + defer wg.Done() + for messageID := 0; messageID < testCount; messageID++ { + msg := encoder.EncodeProcessedEvent(&destination.DeliveredEventsList{ + Events: []*destination.DeliveredEvent{ + { + MessageId: "d5aacd53-f866-4406-8e2f-d6f1dbc96975", + }, + }, + }) + inputChannel <- []byte(msg) + + } + }() + wg.Wait() +} diff --git a/cmd/source/main.go b/cmd/source/main.go index 1ce378ea..02dbfe64 100644 --- a/cmd/source/main.go +++ b/cmd/source/main.go @@ -74,9 +74,9 @@ func main() { } cfg := config.C() - _, Consumer, eventSvc, eventValidator := SetupSourceServices(cfg) - Consumer.Consume(context.Background(), cfg.Source.NewSourceEventName, done, wg, Consumer.ProcessNewSourceEvent) - + _, consumer, eventSvc, eventValidator := SetupSourceServices(cfg) + consumer.Consume(context.Background(), cfg.Source.NewSourceEventName, done, wg, consumer.ProcessNewSourceEvent) + consumer.Consume(context.Background(), cfg.Source.UndeliveredEventsQueueName, done, wg, consumer.EventHasDeliveredToDestination) //----------------- Setup Tracer -----------------// otelcfg := otela.Config{ Endpoint: config.C().Source.Otel.Endpoint, @@ -108,7 +108,7 @@ func main() { wg.Wait() } -func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, eventHandler sourceevent.Consumer, eventSvc eventsvc.Service, eventValidator eventvalidator.Validator) { +func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, consumer sourceevent.Consumer, eventSvc eventsvc.Service, eventValidator eventvalidator.Validator) { done := make(chan bool) wg := &sync.WaitGroup{} @@ -118,6 +118,11 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event panic(err) } + err = outputAdapter.NewChannel(cfg.Source.UndeliveredEventsQueueName, channel.OutputOnly, cfg.Source.BufferSize, cfg.Source.MaxRetry) + if err != nil { + panic(err) + } + inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq) err = inputAdapter.NewChannel(cfg.Source.NewEventQueueName, channel.InputOnlyMode, cfg.Source.BufferSize, cfg.Source.MaxRetry) if err != nil { @@ -135,7 +140,6 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter) writeKeySvc = writekey.New(&writeKeyRepo, cfg.Source) - eventHandler = *sourceevent.NewConsumer(outputAdapter, writeKeySvc) DB, err := scylladb.New(cfg.Source.ScyllaDBConfig) if err != nil { @@ -146,5 +150,7 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event eventValidator = eventvalidator.New(&writeKeyRepo, cfg.Source) - return writeKeySvc, eventHandler, eventSvc, eventValidator + consumer = *sourceevent.NewConsumer(outputAdapter, writeKeySvc, eventSvc, cfg.Source.RetryNumber) + + return writeKeySvc, consumer, eventSvc, eventValidator } diff --git a/config.yml b/config.yml index 9c39ad90..f50071d9 100644 --- a/config.yml +++ b/config.yml @@ -5,9 +5,11 @@ source: port: 8082 network: "tcp" write_key_validation_address: "127.0.0.1:8081" + undelivered_events_queue_name: "undelivered_events" new_event_queue_name: "new-event-received" write_key_expiration: 120 undelivered_event_retransmit_period: 1 + retry_number: 1 new_source_event_name: "new-source-event" buffersize: 100 number_instants: 10 diff --git a/contract/go/destination/destination.pb.go b/contract/go/destination/destination.pb.go new file mode 100644 index 00000000..31cde812 --- /dev/null +++ b/contract/go/destination/destination.pb.go @@ -0,0 +1,184 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.5 +// protoc v3.6.1 +// source: destination/destination.proto + +package destination + +import ( + reflect "reflect" + sync "sync" + unsafe "unsafe" + + _ "github.com/golang/protobuf/ptypes/timestamp" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type DeliveredEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + MessageId string `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DeliveredEvent) Reset() { + *x = DeliveredEvent{} + mi := &file_destination_destination_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DeliveredEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeliveredEvent) ProtoMessage() {} + +func (x *DeliveredEvent) ProtoReflect() protoreflect.Message { + mi := &file_destination_destination_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeliveredEvent.ProtoReflect.Descriptor instead. +func (*DeliveredEvent) Descriptor() ([]byte, []int) { + return file_destination_destination_proto_rawDescGZIP(), []int{0} +} + +func (x *DeliveredEvent) GetMessageId() string { + if x != nil { + return x.MessageId + } + return "" +} + +type DeliveredEventsList struct { + state protoimpl.MessageState `protogen:"open.v1"` + Events []*DeliveredEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DeliveredEventsList) Reset() { + *x = DeliveredEventsList{} + mi := &file_destination_destination_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DeliveredEventsList) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeliveredEventsList) ProtoMessage() {} + +func (x *DeliveredEventsList) ProtoReflect() protoreflect.Message { + mi := &file_destination_destination_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeliveredEventsList.ProtoReflect.Descriptor instead. +func (*DeliveredEventsList) Descriptor() ([]byte, []int) { + return file_destination_destination_proto_rawDescGZIP(), []int{1} +} + +func (x *DeliveredEventsList) GetEvents() []*DeliveredEvent { + if x != nil { + return x.Events + } + return nil +} + +var File_destination_destination_proto protoreflect.FileDescriptor + +var file_destination_destination_proto_rawDesc = string([]byte{ + 0x0a, 0x1d, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x64, 0x65, + 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x1f, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x2f, 0x0a, + 0x0e, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, + 0x1d, 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x22, 0x4a, + 0x0a, 0x13, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x73, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x33, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x72, 0x6d, 0x75, 0x73, 0x68, 0x71, + 0x2f, 0x6f, 0x72, 0x6d, 0x75, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x2f, + 0x67, 0x6f, 0x2f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +}) + +var ( + file_destination_destination_proto_rawDescOnce sync.Once + file_destination_destination_proto_rawDescData []byte +) + +func file_destination_destination_proto_rawDescGZIP() []byte { + file_destination_destination_proto_rawDescOnce.Do(func() { + file_destination_destination_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_destination_destination_proto_rawDesc), len(file_destination_destination_proto_rawDesc))) + }) + return file_destination_destination_proto_rawDescData +} + +var file_destination_destination_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_destination_destination_proto_goTypes = []any{ + (*DeliveredEvent)(nil), // 0: destination.DeliveredEvent + (*DeliveredEventsList)(nil), // 1: destination.DeliveredEventsList +} +var file_destination_destination_proto_depIdxs = []int32{ + 0, // 0: destination.DeliveredEventsList.events:type_name -> destination.DeliveredEvent + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_destination_destination_proto_init() } +func file_destination_destination_proto_init() { + if File_destination_destination_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_destination_destination_proto_rawDesc), len(file_destination_destination_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_destination_destination_proto_goTypes, + DependencyIndexes: file_destination_destination_proto_depIdxs, + MessageInfos: file_destination_destination_proto_msgTypes, + }.Build() + File_destination_destination_proto = out.File + file_destination_destination_proto_goTypes = nil + file_destination_destination_proto_depIdxs = nil +} diff --git a/contract/protobuf/destination/destination.proto b/contract/protobuf/destination/destination.proto new file mode 100644 index 00000000..babc0c8d --- /dev/null +++ b/contract/protobuf/destination/destination.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package destination; +option go_package = "github.com/ormushq/ormus/contract/go/destination"; + +import "google/protobuf/timestamp.proto"; + +message DeliveredEvent { + string message_id = 1; +} + +message DeliveredEventsList { + repeated DeliveredEvent events = 1; +} diff --git a/doc/swagger/manager_docs.go b/doc/swagger/manager_docs.go index 75bb9c48..44c5b44a 100644 --- a/doc/swagger/manager_docs.go +++ b/doc/swagger/manager_docs.go @@ -1186,10 +1186,6 @@ const docTemplatemanager = `{ "name": { "type": "string", "example": "updated name" - }, - "status": { - "type": "string", - "example": "active" } } }, diff --git a/doc/swagger/manager_swagger.json b/doc/swagger/manager_swagger.json index 0c9ba288..7c39ccad 100644 --- a/doc/swagger/manager_swagger.json +++ b/doc/swagger/manager_swagger.json @@ -1175,10 +1175,6 @@ "name": { "type": "string", "example": "updated name" - }, - "status": { - "type": "string", - "example": "active" } } }, diff --git a/doc/swagger/manager_swagger.yaml b/doc/swagger/manager_swagger.yaml index ed51ba91..c0110ba9 100644 --- a/doc/swagger/manager_swagger.yaml +++ b/doc/swagger/manager_swagger.yaml @@ -234,9 +234,6 @@ definitions: name: example: updated name type: string - status: - example: active - type: string type: object sourceparam.UpdateResponse: properties: diff --git a/go.mod b/go.mod index c5a51497..b0261bd0 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,11 @@ module github.com/ormushq/ormus go 1.22.0 require ( + github.com/bxcodec/faker/v3 v3.8.1 github.com/go-co-op/gocron v1.37.0 github.com/go-ozzo/ozzo-validation/v4 v4.3.0 github.com/gocql/gocql v1.6.0 + github.com/gofrs/uuid/v5 v5.3.1 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/golang-migrate/migrate/v4 v4.17.1 github.com/golang/protobuf v1.5.4 @@ -34,9 +36,8 @@ require ( go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/sdk/metric v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 - go.uber.org/zap v1.17.0 golang.org/x/crypto v0.23.0 - golang.org/x/sync v0.6.0 + golang.org/x/sync v0.7.0 google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.34.1 gopkg.in/natefinch/lumberjack.v2 v2.2.1 @@ -95,6 +96,7 @@ require ( go.opentelemetry.io/proto/otlp v1.2.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.17.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/go.sum b/go.sum index 3fb7d73f..ed6b8a61 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bxcodec/faker/v3 v3.8.1 h1:qO/Xq19V6uHt2xujwpaetgKhraGCapqY2CRWGD/SqcM= +github.com/bxcodec/faker/v3 v3.8.1/go.mod h1:DdSDccxF5msjFo5aO4vrobRQ8nIApg8kq3QWPEQD6+o= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -121,6 +123,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6 github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU= github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid/v5 v5.3.1 h1:aPx49MwJbekCzOyhZDjJVb0hx3A0KLjlbLx6p2gY0p0= +github.com/gofrs/uuid/v5 v5.3.1/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -492,8 +496,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/manager/delivery/grpcserver/sourcehandler/writekey_validation.go b/manager/delivery/grpcserver/sourcehandler/writekey_validation.go index 76906dc5..b1721592 100644 --- a/manager/delivery/grpcserver/sourcehandler/writekey_validation.go +++ b/manager/delivery/grpcserver/sourcehandler/writekey_validation.go @@ -4,6 +4,7 @@ import ( "context" source_proto "github.com/ormushq/ormus/contract/go/source" + "github.com/ormushq/ormus/logger" "github.com/ormushq/ormus/manager/service/sourceservice" "github.com/ormushq/ormus/pkg/richerror" ) @@ -22,6 +23,7 @@ func New(soureSvc sourceservice.Service) *WriteKeyValidationHandler { func (w WriteKeyValidationHandler) IsWriteKeyValid(_ context.Context, req *source_proto.ValidateWriteKeyReq) (*source_proto.ValidateWriteKeyResp, error) { resp, err := w.SourceSvc.IsWriteKeyValid(req) if err != nil { + logger.L().Error(err.Error()) return nil, richerror.New("delivery.grpc_server").WithWrappedError(err).WhitKind(richerror.KindUnexpected) } diff --git a/manager/repository/scyllarepo/scyllasource/get.go b/manager/repository/scyllarepo/scyllasource/get.go index 743ad732..50a0b794 100644 --- a/manager/repository/scyllarepo/scyllasource/get.go +++ b/manager/repository/scyllarepo/scyllasource/get.go @@ -46,7 +46,7 @@ func (r Repository) IsWriteKeyValid(writeKey string) (*source_proto.ValidateWrit "write_key": writeKey, }) var writeKeyResp source_proto.ValidateWriteKeyResp - if err := query.Get(&writeKeyResp); err != nil { + if err := query.Get(&writeKeyResp); err != nil && err.Error() != "not found" { return nil, err } diff --git a/pkg/channel/adapter/rabbitmqchannel/adapter.go b/pkg/channel/adapter/rabbitmqchannel/adapter.go index 32a8e682..66879eb5 100644 --- a/pkg/channel/adapter/rabbitmqchannel/adapter.go +++ b/pkg/channel/adapter/rabbitmqchannel/adapter.go @@ -60,6 +60,7 @@ func New(done <-chan bool, wg *sync.WaitGroup, config Config) *ChannelAdapter { } logger.WithGroup(loggerGroupName).Error("rabbitmq connection failed", slog.String("error", err.Error())) + time.Sleep(time.Second) } return c @@ -168,3 +169,21 @@ func WaitForConnection(rabbitmq *Rabbitmq) { rabbitmq.cond.Wait() } } + +func (ca *ChannelAdapter) PurgeTheChannel(name string) (int, error) { + ch, err := ca.rabbitmq.connection.Channel() + if err != nil { + logger.WithGroup(loggerGroupName).Error(errmsg.ErrFailedToOpenChannel, + slog.String("error", err.Error())) + return 0, err + + } + purged, err := ch.QueueDelete(name+"-queue", false, false, false) + if err != nil { + logger.WithGroup(loggerGroupName).Error(errmsg.ErrFailedToCloseChannel, + slog.String("error", err.Error())) + return 0, err + + } + return purged, err +} diff --git a/pkg/encoder/processedevent.go b/pkg/encoder/processedevent.go new file mode 100644 index 00000000..d82247aa --- /dev/null +++ b/pkg/encoder/processedevent.go @@ -0,0 +1,32 @@ +package encoder + +import ( + "encoding/base64" + + "github.com/ormushq/ormus/contract/go/destination" + "google.golang.org/protobuf/proto" +) + +func EncodeProcessedEvent(event *destination.DeliveredEventsList) string { + payload, err := proto.Marshal(event) + if err != nil { + return "" + } + + return base64.StdEncoding.EncodeToString(payload) +} + +func DecodeProcessedEvent(event string) *destination.DeliveredEventsList { + payload, err := base64.StdEncoding.DecodeString(event) + if err != nil { + return nil + } + mu := destination.DeliveredEventsList{} + if err := proto.Unmarshal(payload, &mu); err != nil { + return nil + } + + return &destination.DeliveredEventsList{ + Events: mu.Events, + } +} diff --git a/pkg/errmsg/message.go b/pkg/errmsg/message.go index 80da4522..1bbb7379 100644 --- a/pkg/errmsg/message.go +++ b/pkg/errmsg/message.go @@ -17,4 +17,5 @@ const ( ErrFailedToSetQosOnChannel = "failed to set QOS on channel" ErrFailedToCloseChannel = "failed to close rabbitmq channel" ErrAccessDenied = "Access denied" + ErrFailedToPurgeChannel = "failed to purge channel" ) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go new file mode 100644 index 00000000..b6b7c1aa --- /dev/null +++ b/pkg/retry/retry.go @@ -0,0 +1,15 @@ +package retry + +type RetryableFunc func() error + +func Do(fn RetryableFunc, maxAttemps int) error { + var err error + for i := 0; i < maxAttemps; i++ { + err = fn() + if err == nil { + break + } + } + + return err +} diff --git a/source/config.go b/source/config.go index 3eb49f0a..142d3f94 100644 --- a/source/config.go +++ b/source/config.go @@ -24,4 +24,6 @@ type Config struct { ScyllaDBConfig scylladb.Config `koanf:"scylla_db_config"` NewEventQueueName string `koanf:"new_event_queue_name"` UndeliveredEventRetransmitPeriod int `koanf:"undelivered_event_retransmit_period"` + RetryNumber int `koanf:"retry_number"` + UndeliveredEventsQueueName string `koanf:"undelivered_events_queue_name"` } diff --git a/source/delivery/httpserver/eventhandler/event.go b/source/delivery/httpserver/eventhandler/event.go index 3aeaa6b0..300178d2 100644 --- a/source/delivery/httpserver/eventhandler/event.go +++ b/source/delivery/httpserver/eventhandler/event.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/labstack/echo/v4" + "github.com/ormushq/ormus/logger" "github.com/ormushq/ormus/pkg/errmsg" "github.com/ormushq/ormus/pkg/httpmsg" "github.com/ormushq/ormus/pkg/httputil" @@ -12,14 +13,15 @@ import ( ) func (h Handler) NewEvent(ctx echo.Context) error { - var req params.TrackEventRequest + var req []params.TrackEventRequest if err := ctx.Bind(&req); err != nil { + logger.L().Error(err.Error()) return httputil.NewError(ctx, http.StatusBadRequest, errmsg.ErrBadRequest) } - resp, err := h.eventSvc.CreateNewEvent(context.Background(), req) + resp, err := h.eventSvc.CreateNewEvent(context.Background(), req, ctx.Get("invalid_write_keys").([]string)) if err != nil { msg, code := httpmsg.Error(err) - + logger.L().Error(err.Error()) return ctx.JSON(code, echo.Map{ "message": msg, "errors": err, diff --git a/source/delivery/middlewares/writekey.go b/source/delivery/middlewares/writekey.go index fac4aa32..3ffdbfc6 100644 --- a/source/delivery/middlewares/writekey.go +++ b/source/delivery/middlewares/writekey.go @@ -7,6 +7,7 @@ import ( "net/http" "github.com/labstack/echo/v4" + "github.com/ormushq/ormus/logger" "github.com/ormushq/ormus/source/params" "github.com/ormushq/ormus/source/validator/eventvalidator/eventvalidator" ) @@ -16,26 +17,42 @@ func WriteKeyMiddleware(validator eventvalidator.Validator) echo.MiddlewareFunc return func(c echo.Context) error { body, err := io.ReadAll(c.Request().Body) if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, "Failed to read body") } - c.Request().Body = io.NopCloser(bytes.NewReader(body)) - - var req params.TrackEventRequest + var req []params.TrackEventRequest if err := json.Unmarshal(body, &req); err != nil { + logger.L().Error(err.Error()) + return echo.NewHTTPError(http.StatusBadRequest, "Failed to unmarshal body") } + invalidWriteKeys := make([]string, 0) + filteredRequests := make([]params.TrackEventRequest, 0, len(req)) + for _, r := range req { + isValid, err := validator.ValidateWriteKey(c.Request().Context(), r.WriteKey) + if err != nil { - isValid, err := validator.ValidateWriteKey(c.Request().Context(), req.WriteKey) - if err != nil { - return c.JSON(http.StatusInternalServerError, echo.Map{ - "message": "something went wrong", - }) + return c.JSON(http.StatusInternalServerError, echo.Map{ + "message": "something went wrong", + }) + + } + if isValid { + filteredRequests = append(filteredRequests, r) + } else { + invalidWriteKeys = append(invalidWriteKeys, r.WriteKey) + } } - if !isValid { - return c.JSON(http.StatusForbidden, "the write key is invalid") + + filteredBody, err := json.Marshal(filteredRequests) + if err != nil { + logger.L().Error(err.Error()) + + return echo.NewHTTPError(http.StatusBadRequest, "Failed to marshal body") } - c.Set("body", req) + c.Request().Body = io.NopCloser(bytes.NewReader(filteredBody)) + c.Set("invalid_write_keys", invalidWriteKeys) return next(c) } diff --git a/source/eventhandler/event_delivered.go b/source/eventhandler/event_delivered.go new file mode 100644 index 00000000..6ac55e0b --- /dev/null +++ b/source/eventhandler/event_delivered.go @@ -0,0 +1,33 @@ +package eventhandler + +import ( + "context" + "fmt" + + "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/pkg/channel" + "github.com/ormushq/ormus/pkg/encoder" + "github.com/ormushq/ormus/pkg/retry" +) + +func (c Consumer) EventHasDeliveredToDestination(ctx context.Context, msg channel.Message) error { + decodedEvent := encoder.DecodeProcessedEvent(string(msg.Body)) + err := c.eventService.EventHasDelivered(ctx, decodedEvent) + if err != nil { + logger.L().Error(fmt.Sprintf("err on change delivered status of events : %s", err.Error())) + fn := func() error { + return c.eventService.EventHasDelivered(ctx, decodedEvent) + } + e := retry.Do(fn, c.retryNumber) + if e != nil { + return e + } + } + logger.L().Info(fmt.Sprintf("processed event event array: %v has been retrieved", decodedEvent.Events)) + err = msg.Ack() + if err != nil { + logger.L().Debug(fmt.Sprintf("ack failed for message : %s", err.Error())) + return err + } + return nil +} diff --git a/source/eventhandler/eventhandler.go b/source/eventhandler/eventhandler.go index 154078b0..a11daf11 100644 --- a/source/eventhandler/eventhandler.go +++ b/source/eventhandler/eventhandler.go @@ -11,6 +11,7 @@ import ( "github.com/ormushq/ormus/pkg/channel" "github.com/ormushq/ormus/pkg/encoder" "github.com/ormushq/ormus/pkg/richerror" + "github.com/ormushq/ormus/source/service/event" "github.com/ormushq/ormus/source/service/writekey" ) @@ -19,42 +20,19 @@ type ProcessMessage func(ctx context.Context, msg channel.Message) error type Consumer struct { broker channel.Adapter writeKeyService writekey.Service + eventService event.Service + retryNumber int } -func NewConsumer(broker channel.Adapter, writeKeyService writekey.Service) *Consumer { +func NewConsumer(broker channel.Adapter, writeKeyService writekey.Service, eventService event.Service, retryNumber int) *Consumer { return &Consumer{ broker: broker, writeKeyService: writeKeyService, + eventService: eventService, + retryNumber: retryNumber, } } -func (c Consumer) ProcessNewSourceEvent(ctx context.Context, msg channel.Message) error { - decodedEvent := encoder.DecodeNewSourceEvent(string(msg.Body)) - - // Log retrieval - logger.L().Info(fmt.Sprintf("project id : %s, write key: %s, owner id: %s: has been retrieved", - decodedEvent.ProjectId, decodedEvent.WriteKey, decodedEvent.OwnerId)) - - err := c.writeKeyService.CreateNewWriteKey(ctx, decodedEvent.OwnerId, decodedEvent.ProjectId, decodedEvent.WriteKey) - if err != nil { - logger.L().Error(fmt.Sprintf("err on creating writekey in redis : %s ", err.Error())) - - // TODO support Nack in pkg - return err - } - - logger.L().Debug("the message has been received") - - err = msg.Ack() - if err != nil { - logger.L().Debug(fmt.Sprintf("ack failed for message : %s", err.Error())) - - return err - } - - return nil -} - func (c Consumer) Consume(ctx context.Context, queueName string, done <-chan bool, wg *sync.WaitGroup, processMessage ProcessMessage) { logger.L().Debug("Consumer started") wg.Add(1) @@ -64,7 +42,7 @@ func (c Consumer) Consume(ctx context.Context, queueName string, done <-chan boo msgChan, err := c.broker.GetOutputChannel(queueName) if err != nil { - logger.L().Debug(fmt.Sprintf("error while subscribing to source topic %s", err.Error())) + logger.L().Error(fmt.Sprintf("error while subscribing to source topic %s", err.Error())) } for { select { @@ -100,7 +78,6 @@ func (p Publisher) Publish(ctx context.Context, queueName string, wg *sync.WaitG WithKind(richerror.KindUnexpected). WithWrappedError(err) } - wg.Add(len(messages)) for _, message := range messages { go func(message *proto_source.NewEvent) { diff --git a/source/eventhandler/newsource.go b/source/eventhandler/newsource.go new file mode 100644 index 00000000..3e30f4c2 --- /dev/null +++ b/source/eventhandler/newsource.go @@ -0,0 +1,42 @@ +package eventhandler + +import ( + "context" + "fmt" + + "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/pkg/channel" + "github.com/ormushq/ormus/pkg/encoder" + "github.com/ormushq/ormus/pkg/retry" +) + +func (c Consumer) ProcessNewSourceEvent(ctx context.Context, msg channel.Message) error { + decodedEvent := encoder.DecodeNewSourceEvent(string(msg.Body)) + + logger.L().Info(fmt.Sprintf("project id : %s, write key: %s, owner id: %s: has been retrieved", + decodedEvent.ProjectId, decodedEvent.WriteKey, decodedEvent.OwnerId)) + + err := c.writeKeyService.CreateNewWriteKey(ctx, decodedEvent.OwnerId, decodedEvent.ProjectId, decodedEvent.WriteKey) + if err != nil { + logger.L().Error(fmt.Sprintf("err on creating writekey in redis : %s ", err.Error())) + fn := func() error { + return c.writeKeyService.CreateNewWriteKey(ctx, decodedEvent.OwnerId, decodedEvent.ProjectId, decodedEvent.WriteKey) + } + e := retry.Do(fn, c.retryNumber) + if e != nil { + return e + } + } + + logger.L().Debug("the message has been received") + + err = msg.Ack() + if err != nil { + logger.L().Debug(fmt.Sprintf("ack failed for message : %s", err.Error())) + + return err + + } + + return nil +} diff --git a/source/params/event.go b/source/params/event.go index 07166ba8..b053e985 100644 --- a/source/params/event.go +++ b/source/params/event.go @@ -16,5 +16,8 @@ type TrackEventRequest struct { } type TrackEventResponse struct { - ID string `json:"id"` + ID []string `json:"ids"` + InvalidWriteKeys []string `json:"invalid_write_keys"` + Success int `json:"success"` + FAIL int `json:"failed"` } diff --git a/source/repository/scylladb/db.go b/source/repository/scylladb/db.go index 77b36b7f..428f117b 100644 --- a/source/repository/scylladb/db.go +++ b/source/repository/scylladb/db.go @@ -1,9 +1,12 @@ package scylladb import ( + "context" "fmt" "log" + "os" + "github.com/gocql/gocql" "github.com/ormushq/ormus/adapter/scylladb" "github.com/ormushq/ormus/adapter/scylladb/scyllainitialize" "github.com/ormushq/ormus/logger" @@ -42,8 +45,9 @@ func New(scylladbConfig scylladb.Config) (*DB, error) { if err != nil { log.Fatal("Failed to create ScyllaDB keyspace:", err) } - err = scyllainitialize.RunMigrations(Sconn, "./source/repository/scylladb/") + err = scyllainitialize.RunMigrations(Sconn, fmt.Sprintf("%s/source/repository/scylladb/", os.Getenv("ROOT"))) if err != nil { + logger.L().Error(fmt.Sprintf("Failed to run migrations: %v", err)) panic(err) } Session, Err := scyllainitialize.GetConnection(Sconn) @@ -54,6 +58,7 @@ func New(scylladbConfig scylladb.Config) (*DB, error) { return &DB{ conn: Session, }, nil + } func (d *DB) GetConn() scylladb.SessionxInterface { @@ -71,8 +76,8 @@ func (d *DB) GetStatement(state Statement) (gocqlx.Queryx, error) { if statement, ok := statements[state.Query]; ok { return statement, nil } - return gocqlx.Queryx{}, richerror.New("db.GetStatement").WhitKind(richerror.KindNotFound).WithMessage("statement not found") + } func (d *DB) RegisterStatements(states map[string]Statement) { @@ -80,3 +85,12 @@ func (d *DB) RegisterStatements(states map[string]Statement) { d.RegisterStatement(stat) } } + +func (d *DB) NewBatch(ctx context.Context) *gocql.Batch { + + return d.GetConn().NewBatch(ctx, gocql.UnloggedBatch) +} +func (d *DB) ExecuteBatch(batch *gocql.Batch) error { + + return d.GetConn().ExecuteBatch(batch) +} diff --git a/source/repository/scylladb/event/create.go b/source/repository/scylladb/event/create.go index f1f01663..bc438beb 100644 --- a/source/repository/scylladb/event/create.go +++ b/source/repository/scylladb/event/create.go @@ -2,12 +2,14 @@ package event import ( "context" + "fmt" "sync" "time" "github.com/google/uuid" proto "github.com/ormushq/ormus/contract/go/source" "github.com/ormushq/ormus/event" + "github.com/ormushq/ormus/logger" "github.com/ormushq/ormus/pkg/errmsg" "github.com/ormushq/ormus/pkg/richerror" "github.com/ormushq/ormus/source/repository/scylladb" @@ -17,52 +19,91 @@ import ( func init() { statements["create"] = scylladb.Statement{ - Query: `INSERT INTO event (id, type, name, send_at, received_at, timestamp, event, write_key, created_at, updated_at, properties,delivered) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,false)`, - Values: []string{"id", "type", "name", "send_at", "received_at", "timestamp", "event", "write_key", "created_at", "updated_at", "properties"}, + Query: `INSERT INTO event ( + write_key, + id, + type, + name, + send_at, + received_at, + event, + timestamp, + created_at, + updated_at, + delivered + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)`, + Values: []string{ + "write_key", + "id", + "type", + "name", + "send_at", + "received_at", + "event", + "timestamp", + "created_at", + "updated_at", + "delivered", + }, } } -func (r Repository) CreateNewEvent(ctx context.Context, evt event.CoreEvent, wg *sync.WaitGroup, queueName string) (string, error) { - query, err := r.db.GetStatement(statements["create"]) +func (r Repository) CreateNewEvent(ctx context.Context, evt []event.CoreEvent, wg *sync.WaitGroup, queueName string) ([]string, error) { + messages := make([]*proto.NewEvent, 0) + ids := make([]string, 0) + batch := r.db.NewBatch(ctx) + stmt, err := r.db.GetStatement(statements["create"]) if err != nil { - return "", richerror.New("source.repository").WithWrappedError(err).WithKind(richerror.KindUnexpected).WithMessage(errmsg.ErrSomeThingWentWrong) - } + logger.L().Error(err.Error()) - id := uuid.New().String() - query.BindMap(qb.M{ - "write_key": evt.WriteKey, - "id": id, - "type": evt.Type, - "name": evt.Name, - "send_at": evt.SendAt, - "received_at": evt.ReceivedAt, - "event": evt.Event, - "timestamp": evt.Timestamp, - "created_at": time.Now(), - "updated_at": time.Now(), - "properties": evt.Properties, - }) - - if err := query.Exec(); err != nil { - return "", richerror.New("source.repository").WithWrappedError(err).WithKind(richerror.KindUnexpected).WithMessage(errmsg.ErrSomeThingWentWrong) + return nil, richerror.New("source.repository").WithWrappedError(err).WithMessage("failed to get prepared statement") } - messages := []*proto.NewEvent{ - { + for _, e := range evt { + id := uuid.New().String() + ids = append(ids, id) + stmt.BindMap(qb.M{ + "write_key": e.WriteKey, + "id": id, + "type": e.Type, + "name": e.Name, + "send_at": e.SendAt, + "received_at": e.ReceivedAt, + "event": e.Event, + "timestamp": e.Timestamp, + "created_at": time.Now(), + "updated_at": time.Now(), + "properties": e.Properties, + "delivered": false, + }) + batch.Query(stmt.Statement(), stmt.Values()...) + + messages = append(messages, &proto.NewEvent{ Id: id, - Name: evt.Name, - WriteKey: evt.WriteKey, - Event: evt.Event, - SendAt: timestamppb.New(evt.SendAt), - ReceivedAt: timestamppb.New(evt.ReceivedAt), - Timestamp: timestamppb.New(evt.Timestamp), - Type: string(evt.Type), - Properties: *(evt.Properties), + Name: e.Name, + WriteKey: e.WriteKey, + Event: e.Event, + SendAt: timestamppb.New(e.SendAt), + ReceivedAt: timestamppb.New(e.ReceivedAt), + Timestamp: timestamppb.New(e.Timestamp), + Type: string(e.Type), + Properties: *(e.Properties), }, + ) + } + logger.L().Info(fmt.Sprintf("event %s has been received", messages)) + err = r.db.ExecuteBatch(batch) + if err != nil { + logger.L().Error(err.Error()) + + return nil, richerror.New("source.repository").WithWrappedError(err). + WithMessage("failed to get insert statement"). + WithKind(richerror.KindUnexpected) + } err = r.eventBroker.Publish(ctx, queueName, wg, messages) if err != nil { - return "", richerror.New("source.repository").WithWrappedError(err).WithKind(richerror.KindUnexpected).WithMessage(errmsg.ErrSomeThingWentWrong) + return []string{}, richerror.New("source.repository").WithWrappedError(err).WithKind(richerror.KindUnexpected).WithMessage(errmsg.ErrSomeThingWentWrong) } - return id, nil + return ids, nil } diff --git a/source/repository/scylladb/event/delivered_event.go b/source/repository/scylladb/event/delivered_event.go new file mode 100644 index 00000000..7cbcedb1 --- /dev/null +++ b/source/repository/scylladb/event/delivered_event.go @@ -0,0 +1,45 @@ +package event + +import ( + "context" + + "github.com/ormushq/ormus/contract/go/destination" + "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/pkg/richerror" + "github.com/ormushq/ormus/source/repository/scylladb" + "github.com/scylladb/gocqlx/v2/qb" +) + +func init() { + statements["delivered_event"] = scylladb.Statement{ + Query: `UPDATE event SET delivered = true WHERE id = ?`, + Values: []string{"id"}, + } +} + +func (r Repository) EventHasDelivered(ctx context.Context, evt *destination.DeliveredEventsList) error { + query, err := r.db.GetStatement(statements["delivered_event"]) + batch := r.db.NewBatch(ctx) + if err != nil { + logger.L().Error(err.Error()) + return richerror.New("source.repository").WithWrappedError(err). + WithMessage("failed to update delivered status due to delivered_event statement"). + WithKind(richerror.KindUnexpected) + + } + for _, e := range evt.Events { + query.BindMap(qb.M{ + "id": e.MessageId, + }) + batch.Query(query.Statement(), query.Values()...) + } + err = r.db.ExecuteBatch(batch) + if err != nil { + logger.L().Error(err.Error()) + return richerror.New("source.repository").WithWrappedError(err). + WithMessage("failed to update delivered status due to bulk update failure"). + WithKind(richerror.KindUnexpected) + + } + return nil +} diff --git a/source/service/event/service.go b/source/service/event/service.go index 67185abf..da054384 100644 --- a/source/service/event/service.go +++ b/source/service/event/service.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/ormushq/ormus/contract/go/destination" "github.com/ormushq/ormus/event" "github.com/ormushq/ormus/logger" "github.com/ormushq/ormus/pkg/errmsg" @@ -13,7 +14,8 @@ import ( ) type Repository interface { - CreateNewEvent(ctx context.Context, evt event.CoreEvent, wg *sync.WaitGroup, queueName string) (string, error) + CreateNewEvent(ctx context.Context, evt []event.CoreEvent, wg *sync.WaitGroup, queueName string) ([]string, error) + EventHasDelivered(ctx context.Context, evt *destination.DeliveredEventsList) error } type Service struct { @@ -30,19 +32,22 @@ func New(eventRepo Repository, config source.Config, wg *sync.WaitGroup) *Servic } } -func (s Service) CreateNewEvent(ctx context.Context, newEvent params.TrackEventRequest) (*params.TrackEventResponse, error) { - e := event.CoreEvent{ - Name: newEvent.Name, - WriteKey: newEvent.WriteKey, - Event: newEvent.Event, - SendAt: newEvent.SendAt, - ReceivedAt: newEvent.ReceivedAt, - Timestamp: newEvent.Timestamp, - Type: event.Type(newEvent.Type), - Properties: (*event.Properties)(&newEvent.Properties), +func (s Service) CreateNewEvent(ctx context.Context, newEvent []params.TrackEventRequest, invalidWriteKeys []string) (*params.TrackEventResponse, error) { + events := make([]event.CoreEvent, 0) + for _, e := range newEvent { + events = append(events, event.CoreEvent{ + Name: e.Name, + WriteKey: e.WriteKey, + Event: e.Event, + SendAt: e.SendAt, + ReceivedAt: e.ReceivedAt, + Timestamp: e.Timestamp, + Type: event.Type(e.Type), + Properties: (*event.Properties)(&e.Properties), + }) } - id, err := s.eventRepo.CreateNewEvent(ctx, e, s.wg, s.config.NewEventQueueName) + ids, err := s.eventRepo.CreateNewEvent(ctx, events, s.wg, s.config.NewEventQueueName) if err != nil { logger.L().Error(err.Error()) @@ -50,6 +55,20 @@ func (s Service) CreateNewEvent(ctx context.Context, newEvent params.TrackEventR } return ¶ms.TrackEventResponse{ - ID: id, + ID: ids, + InvalidWriteKeys: invalidWriteKeys, + Success: len(ids), + FAIL: len(invalidWriteKeys), }, nil } + +func (s Service) EventHasDelivered(ctx context.Context, evt *destination.DeliveredEventsList) error { + err := s.eventRepo.EventHasDelivered(ctx, evt) + if err != nil { + logger.L().Error(err.Error()) + + return richerror.New("source.service").WithMessage(errmsg.ErrSomeThingWentWrong).WhitKind(richerror.KindUnexpected).WithWrappedError(err) + } + + return nil +} diff --git a/source/service/event/service_test.go b/source/service/event/service_test.go index 3ed8c6b0..7b0b6b55 100644 --- a/source/service/event/service_test.go +++ b/source/service/event/service_test.go @@ -5,6 +5,7 @@ import ( "sync" "testing" + "github.com/ormushq/ormus/contract/go/destination" "github.com/ormushq/ormus/event" "github.com/ormushq/ormus/pkg/richerror" "github.com/ormushq/ormus/source" @@ -17,30 +18,51 @@ type MockRepository struct { mock.Mock } -func (m *MockRepository) CreateNewEvent(ctx context.Context, evt event.CoreEvent, wg *sync.WaitGroup, queueName string) (string, error) { +func (m *MockRepository) CreateNewEvent(ctx context.Context, evt []event.CoreEvent, wg *sync.WaitGroup, queueName string) ([]string, error) { args := m.Called(ctx, evt, wg, queueName) - return args.String(0), args.Error(1) + return args.Get(0).([]string), args.Error(1) +} + +func (m *MockRepository) EventHasDelivered(ctx context.Context, evt *destination.DeliveredEventsList) error { + args := m.Called(ctx, evt) + return args.Error(0) } func TestCreateNewEvent_Success(T *testing.T) { var wg *sync.WaitGroup mockRepo := new(MockRepository) - mockRepo.On("CreateNewEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("2sdf121fa-dfe21", nil) + mockRepo.On("CreateNewEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"2sdf121fa-dfe21"}, nil) svc := New(mockRepo, source.Config{}, wg) - resp, err := svc.CreateNewEvent(context.Background(), params.TrackEventRequest{}) + resp, err := svc.CreateNewEvent(context.Background(), []params.TrackEventRequest{ + { + Type: "test_type", + Name: "test_name", + }, + }, []string{"1234"}) + mockRepo.AssertCalled(T, "CreateNewEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything) assert.Nil(T, err) assert.NotNil(T, resp) - assert.Equal(T, ¶ms.TrackEventResponse{ID: "2sdf121fa-dfe21"}, resp) + assert.Equal(T, ¶ms.TrackEventResponse{ + ID: []string{"2sdf121fa-dfe21"}, + InvalidWriteKeys: []string{"1234"}, + FAIL: 1, + Success: 1, + }, resp) } func TestCreateNewEvent_Has_Error(T *testing.T) { var wg *sync.WaitGroup mockRepo := new(MockRepository) - mockRepo.On("CreateNewEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", richerror.New("sample")) + mockRepo.On("CreateNewEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, richerror.New("sample")) svc := New(mockRepo, source.Config{}, wg) - resp, err := svc.CreateNewEvent(context.Background(), params.TrackEventRequest{}) + resp, err := svc.CreateNewEvent(context.Background(), []params.TrackEventRequest{ + { + Type: "test_type", + Name: "test_name", + }, + }, []string{}) assert.Nil(T, resp) assert.NotNil(T, err) } diff --git a/source/tests/create_events_test.go b/source/tests/create_events_test.go new file mode 100644 index 00000000..25eb36dd --- /dev/null +++ b/source/tests/create_events_test.go @@ -0,0 +1,68 @@ +package tests + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/bxcodec/faker/v3" + "github.com/ormushq/ormus/event" + "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/pkg/encoder" + "github.com/stretchr/testify/assert" +) + +func TestCreateEvents(t *testing.T) { + c := GetConfigs() + wg := sync.WaitGroup{} + ctx := context.Background() + events := []event.CoreEvent{ + { + WriteKey: faker.NAME, + Type: faker.NAME, + Name: "event1", + SendAt: time.Now(), + ReceivedAt: time.Now(), + Event: faker.NAME, + Timestamp: time.Now(), + Properties: (*event.Properties)(&map[string]string{ + "user": "ali", + }), + }, + { + WriteKey: faker.ID, + Type: faker.NAME, + Name: "event2", + SendAt: time.Now(), + ReceivedAt: time.Now(), + Event: faker.NAME, + Timestamp: time.Now(), + Properties: (*event.Properties)(&map[string]string{ + "user": "ali", + }), + }, + } + + _, err := c.EventRepo.CreateNewEvent(ctx, events, &wg, c.Cfg.Source.NewEventQueueName) + if err != nil { + logger.L().Error(err.Error()) + } + result := []struct { + Count int `db:"count"` + }{} + + q := c.SylladbConn.Query("SELECT count(*) FROM test_source.event;", []string{}) + err = q.Select(&result) + if err != nil { + logger.L().Error(err.Error()) + } + assert.Equal(t, len(events), result[0].Count) + ch, _ := c.brokerAdapter.GetOutputChannel(c.Cfg.Source.NewEventQueueName) + for range 2 { + tmp := <-ch + decodedEvent := encoder.DecodeNewEvent(string(tmp.Body)) + assert.Contains(t, []string{events[0].Name, events[1].Name}, decodedEvent.Name) + assert.Contains(t, []string{events[0].WriteKey, events[1].WriteKey}, decodedEvent.WriteKey) + } +} diff --git a/source/tests/requirements_test.go b/source/tests/requirements_test.go new file mode 100644 index 00000000..3e1774b2 --- /dev/null +++ b/source/tests/requirements_test.go @@ -0,0 +1,98 @@ +package tests + +import ( + "fmt" + "os" + "sync" + "testing" + + "github.com/ormushq/ormus/adapter/redis" + scylladbsession "github.com/ormushq/ormus/adapter/scylladb" + "github.com/ormushq/ormus/config" + "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/pkg/channel" + "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmqchannel" + "github.com/ormushq/ormus/source/adapter/manager" + sourceevent "github.com/ormushq/ormus/source/eventhandler" + writekeyrepo "github.com/ormushq/ormus/source/repository/redis/rediswritekey" + "github.com/ormushq/ormus/source/repository/scylladb" + eventrepo "github.com/ormushq/ormus/source/repository/scylladb/event" + eventsvc "github.com/ormushq/ormus/source/service/event" + "github.com/ormushq/ormus/source/service/writekey" +) + +type TestConfig struct { + EventRepo eventrepo.Repository + Cfg config.Config + SylladbConn scylladbsession.SessionxInterface + brokerAdapter *rabbitmqchannel.ChannelAdapter + Consumer sourceevent.Consumer + writeKeyRepo writekeyrepo.DB + redisAdapter redis.Adapter +} + +func ClearUp() { + c := GetConfigs() + err := c.SylladbConn.ExecStmt("TRUNCATE test_source.event") + if err != nil { + logger.L().Error(err.Error()) + } + _, err = c.brokerAdapter.PurgeTheChannel(c.Cfg.Source.NewEventQueueName) + if err != nil { + logger.L().Error(err.Error()) + } +} + +func TestMain(m *testing.M) { + ClearUp() + exitVal := m.Run() + ClearUp() + os.Exit(exitVal) +} + +func GetConfigs() TestConfig { + done := make(chan bool) + wg := &sync.WaitGroup{} + cfg := config.New(config.Option{ + Prefix: "ORMUS_", + Delimiter: ".", + Separator: "__", + YamlFilePath: fmt.Sprintf("%s/test_config.yml", os.Getenv("ROOT")), + }) + Adapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq) + err := Adapter.NewChannel(cfg.Source.NewSourceEventName, channel.BothMode, cfg.Source.BufferSize, cfg.Source.MaxRetry) + if err != nil { + panic(err) + } + err = Adapter.NewChannel(cfg.Source.NewEventQueueName, channel.BothMode, cfg.Source.BufferSize, cfg.Source.MaxRetry) + if err != nil { + panic(err) + } + + Publisher := sourceevent.NewPublisher(Adapter) + DB, err := scylladb.New(cfg.Source.ScyllaDBConfig) + if err != nil { + panic(err) + } + + repo := eventrepo.New(DB, Publisher) + redisAdapter, err := redis.New(cfg.Redis) + if err != nil { + panic(err) + } + ManagerAdapter := manager.New(cfg.Source) + writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter) + writeKeySvc := writekey.New(&writeKeyRepo, cfg.Source) + eventRepo := eventrepo.New(DB, Publisher) + eventSvc := *eventsvc.New(eventRepo, cfg.Source, wg) + Consumer := *sourceevent.NewConsumer(Adapter, writeKeySvc, eventSvc, cfg.Source.RetryNumber) + return TestConfig{ + EventRepo: *repo, + Cfg: cfg, + SylladbConn: DB.GetConn(), + brokerAdapter: Adapter, + Consumer: Consumer, + writeKeyRepo: writeKeyRepo, + redisAdapter: redisAdapter, + } +} diff --git a/source/tests/write_key_validation_test.go b/source/tests/write_key_validation_test.go new file mode 100644 index 00000000..7d3d5a38 --- /dev/null +++ b/source/tests/write_key_validation_test.go @@ -0,0 +1,21 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWriteKeyValidation(t *testing.T) { + c := GetConfigs() + c.redisAdapter.Client().Set(context.Background(), "valid_fake_writekey", "sample", time.Minute) + isValid, err := c.writeKeyRepo.IsWriteKeyValid(context.Background(), "valid_fake_writekey", 3) + assert.Nil(t, err) + assert.True(t, isValid) + // Assume that manager service is not available + isValid, err = c.writeKeyRepo.IsWriteKeyValid(context.Background(), "inivalid_fake_writekey", 3) + assert.Nil(t, err) + assert.False(t, isValid) +} diff --git a/test_config.yml b/test_config.yml new file mode 100644 index 00000000..264c7789 --- /dev/null +++ b/test_config.yml @@ -0,0 +1,49 @@ +type: yml + +source: + http_server: + port: 8082 + network: "tcp" + write_key_validation_address: "127.0.0.1:8081" + new_event_queue_name: "test-new-event-received" + write_key_expiration: 120 + undelivered_event_retransmit_period: 1 + retry_number: 5 + new_source_event_name: "test-new-source-event" + buffersize: 0 + undelivered_events_queue_name: "undelivered_events" + number_instants: 10 + maxretry: 0 + otel: + endpoint: "otel_collector:4317" + service_name: "source" + enable_metric_expose: true + metric_expose_port: 8081 + metric_expose_path: "metrics" + scylla_db_config: + hosts: + - 127.0.0.1:9042 + consistency: 4 + keyspace: "test_source" + timeout_cluster: 5s + num_retries: 10 + min_retry_delay: 1s + max_retry_delay: 10s +scylladb: + hosts: + - 127.0.0.1:9042 + keyspace: ormus +rabbitmq: + host: "localhost" + port: 5672 + password: "guest" + user: "guest" + reconnect_second: 1 + vhost: "/" +redis: + port: 6379 + host: 127.0.0.1 + db: 0 + password: "" + +