Skip to content

Commit

Permalink
feat(source): create bulk event
Browse files Browse the repository at this point in the history
  • Loading branch information
alishazaee committed Feb 13, 2025
1 parent 0f8d847 commit 3004e3c
Show file tree
Hide file tree
Showing 30 changed files with 758 additions and 148 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ ifneq (,$(findstring NT,$(OS)))
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/internalevent/internalevent.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/project/project.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/source/source.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/destination/destination.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user_info.proto
else
Expand Down
1 change: 1 addition & 0 deletions adapter/scylladb/scyllainitialize/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func CreateKeySpace(consistency gocql.Consistency, keyspace string, hosts ...str
func RunMigrations(dbConn *ScyllaDBConnection, dir string) error {
logger.L().Debug("running migrations...")
for _, host := range dbConn.hosts {
logger.L().Info(dir)
migration := New(dir, host, dbConn.keyspace)
err := migration.Run()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions adapter/scylladb/sessionx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type SessionxInterface interface {
ExecStmt(stmt string) error
AwaitSchemaAgreement(ctx context.Context) error
Close()
NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch
ExecuteBatch(batch *gocql.Batch) error
}

type Session struct {
Expand All @@ -42,6 +44,14 @@ func (s *Session) AwaitSchemaAgreement(ctx context.Context) error {
func (s *Session) Close() {
s.S.Close()
}
func (s *Session) NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch {
batch := s.S.NewBatch(batchType)
return batch.WithContext(ctx)
}

func (s *Session) ExecuteBatch(batch *gocql.Batch) error {
return s.S.ExecuteBatch(batch)
}

func NewSession(session *gocql.Session) SessionxInterface {
gocqlxSession := gocqlx.NewSession(session)
Expand Down
3 changes: 2 additions & 1 deletion cmd/source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter)
writeKeySvc = writekey.New(&writeKeyRepo, cfg.Source)
eventHandler = *sourceevent.NewConsumer(outputAdapter, writeKeySvc)

DB, err := scylladb.New(cfg.Source.ScyllaDBConfig)
if err != nil {
Expand All @@ -146,5 +145,7 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

eventValidator = eventvalidator.New(&writeKeyRepo, cfg.Source)

eventHandler = *sourceevent.NewConsumer(outputAdapter, writeKeySvc, eventSvc, cfg.Source.RetryNumber)

return writeKeySvc, eventHandler, eventSvc, eventValidator
}
1 change: 1 addition & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ source:
new_event_queue_name: "new-event-received"
write_key_expiration: 120
undelivered_event_retransmit_period: 1
retry_number: 5
new_source_event_name: "new-source-event"
buffersize: 100
number_instants: 10
Expand Down
193 changes: 193 additions & 0 deletions contract/go/destination/destination.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions contract/protobuf/destination/destination.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

package destination;
option go_package = "github.com/ormushq/ormus/contract/go/destination";

import "google/protobuf/timestamp.proto";

message DeliveredEvent {
string message_id = 1;
string write_key = 9;
}

message DeliveredEventsList {
repeated DeliveredEvent events = 1;
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/rabbitmq/amqp091-go v1.10.0
github.com/redis/go-redis/v9 v9.5.1
github.com/scylladb/gocqlx/v2 v2.8.0
github.com/scylladb/gocqlx/v3 v3.0.1
github.com/spf13/cobra v1.8.1
github.com/streadway/amqp v1.1.0
github.com/stretchr/testify v1.9.0
Expand All @@ -34,9 +35,8 @@ require (
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.23.0
golang.org/x/sync v0.6.0
golang.org/x/sync v0.7.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1
Expand Down Expand Up @@ -95,6 +95,7 @@ require (
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.15.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ github.com/scylladb/go-reflectx v1.0.1 h1:b917wZM7189pZdlND9PbIJ6NQxfDPfBvUaQ7cj
github.com/scylladb/go-reflectx v1.0.1/go.mod h1:rWnOfDIRWBGN0miMLIcoPt/Dhi2doCMZqwMCJ3KupFc=
github.com/scylladb/gocqlx/v2 v2.8.0 h1:f/oIgoEPjKDKd+RIoeHqexsIQVIbalVmT+axwvUqQUg=
github.com/scylladb/gocqlx/v2 v2.8.0/go.mod h1:4/+cga34PVqjhgSoo5Nr2fX1MQIqZB5eCE5DK4xeDig=
github.com/scylladb/gocqlx/v3 v3.0.1 h1:JBvOUBz62LQ2lbIgJqQbwVMiDftbtrJSi63KVxvRYOQ=
github.com/scylladb/gocqlx/v3 v3.0.1/go.mod h1:EjbSZM0VR2a57ZUxCRQ3v3CSoWIkH1WTMwxeDbFQorY=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down Expand Up @@ -492,8 +494,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sourcehandler

import (
"context"
"github.com/ormushq/ormus/logger"

source_proto "github.com/ormushq/ormus/contract/go/source"
"github.com/ormushq/ormus/manager/service/sourceservice"
Expand All @@ -22,6 +23,7 @@ func New(soureSvc sourceservice.Service) *WriteKeyValidationHandler {
func (w WriteKeyValidationHandler) IsWriteKeyValid(_ context.Context, req *source_proto.ValidateWriteKeyReq) (*source_proto.ValidateWriteKeyResp, error) {
resp, err := w.SourceSvc.IsWriteKeyValid(req)
if err != nil {
logger.L().Error(err.Error())
return nil, richerror.New("delivery.grpc_server").WithWrappedError(err).WhitKind(richerror.KindUnexpected)
}

Expand Down
2 changes: 1 addition & 1 deletion manager/repository/scyllarepo/scyllasource/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r Repository) IsWriteKeyValid(writeKey string) (*source_proto.ValidateWrit
"write_key": writeKey,
})
var writeKeyResp source_proto.ValidateWriteKeyResp
if err := query.Get(&writeKeyResp); err != nil {
if err := query.Get(&writeKeyResp); err != nil && err.Error() != "not found" {
return nil, err
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/channel/adapter/rabbitmqchannel/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,18 @@ func WaitForConnection(rabbitmq *Rabbitmq) {
rabbitmq.cond.Wait()
}
}

func (ca *ChannelAdapter) PurgeTheChannel(name string) (int, error) {
ch, err := ca.rabbitmq.connection.Channel()
if err != nil {
logger.WithGroup(loggerGroupName).Error(errmsg.ErrFailedToOpenChannel,
slog.String("error", err.Error()))
return 0, err
}
purged, err := ch.QueuePurge(name, true)
if err != nil {
logger.WithGroup(loggerGroupName).Error(errmsg.ErrFailedToCloseChannel,
slog.String("error", err.Error()))
}
return purged, err
}
31 changes: 31 additions & 0 deletions pkg/encoder/processedevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package encoder

import (
"encoding/base64"
"github.com/ormushq/ormus/contract/go/destination"
"google.golang.org/protobuf/proto"
)

func EncodeProcessedEvent(event *destination.DeliveredEventsList) string {
payload, err := proto.Marshal(event)
if err != nil {
return ""
}

return base64.StdEncoding.EncodeToString(payload)
}

func DecodeProcessedEvent(event string) *destination.DeliveredEventsList {
payload, err := base64.StdEncoding.DecodeString(event)
if err != nil {
return nil
}
mu := destination.DeliveredEventsList{}
if err := proto.Unmarshal(payload, &mu); err != nil {
return nil
}

return &destination.DeliveredEventsList{
Events: mu.Events,
}
}
Loading

0 comments on commit 3004e3c

Please sign in to comment.