Skip to content

Commit

Permalink
feat(source): create bulk event
Browse files Browse the repository at this point in the history
  • Loading branch information
alishazaee committed Feb 14, 2025
1 parent 0f8d847 commit ee9730f
Show file tree
Hide file tree
Showing 31 changed files with 861 additions and 155 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// TODO: add commands for build and run in dev/produciton mode

ROOT=$(realpath $(dir $(lastword $(MAKEFILE_LIST))))
export ROOT=$(realpath $(dir $(lastword $(MAKEFILE_LIST))))

OS := $(shell uname -s)

Expand Down Expand Up @@ -35,6 +35,7 @@ ifneq (,$(findstring NT,$(OS)))
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/internalevent/internalevent.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/project/project.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/source/source.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/destination/destination.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user_info.proto
else
Expand Down
1 change: 1 addition & 0 deletions adapter/scylladb/scyllainitialize/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func CreateKeySpace(consistency gocql.Consistency, keyspace string, hosts ...str
func RunMigrations(dbConn *ScyllaDBConnection, dir string) error {
logger.L().Debug("running migrations...")
for _, host := range dbConn.hosts {
logger.L().Info(dir)
migration := New(dir, host, dbConn.keyspace)
err := migration.Run()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions adapter/scylladb/sessionx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type SessionxInterface interface {
ExecStmt(stmt string) error
AwaitSchemaAgreement(ctx context.Context) error
Close()
NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch
ExecuteBatch(batch *gocql.Batch) error
}

type Session struct {
Expand All @@ -42,6 +44,14 @@ func (s *Session) AwaitSchemaAgreement(ctx context.Context) error {
func (s *Session) Close() {
s.S.Close()
}
func (s *Session) NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch {
batch := s.S.NewBatch(batchType)
return batch.WithContext(ctx)

Check failure on line 49 in adapter/scylladb/sessionx.go

View workflow job for this annotation

GitHub Actions / build

return with no blank line before (nlreturn)
}

func (s *Session) ExecuteBatch(batch *gocql.Batch) error {
return s.S.ExecuteBatch(batch)
}

func NewSession(session *gocql.Session) SessionxInterface {
gocqlxSession := gocqlx.NewSession(session)
Expand Down
52 changes: 52 additions & 0 deletions cmd/source/faker/undeliveredevents_publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"sync"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/contract/go/destination"
"github.com/ormushq/ormus/pkg/channel"
"github.com/ormushq/ormus/pkg/channel/adapter/rabbitmqchannel"
"github.com/ormushq/ormus/pkg/encoder"
)

func main() {
cfg := config.C()
done := make(chan bool)
wg := &sync.WaitGroup{}
bufferSize := cfg.Source.BufferSize
maxRetryPolicy := cfg.Source.MaxRetry
testCount := 1

err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole})
if err != nil {
panic(err.Error())
}
inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq)
err = inputAdapter.NewChannel(cfg.Source.UndeliveredEventsQueueName, channel.InputOnlyMode, bufferSize, maxRetryPolicy)
if err != nil {
panic(err.Error())
}
inputChannel, err := inputAdapter.GetInputChannel(cfg.Source.UndeliveredEventsQueueName)
if err != nil {
panic(err.Error())
}

wg.Add(1)
go func() {
defer wg.Done()
for messageID := 0; messageID < testCount; messageID++ {
msg := encoder.EncodeProcessedEvent(&destination.DeliveredEventsList{
Events: []*destination.DeliveredEvent{
{
MessageId: "d5aacd53-f866-4406-8e2f-d6f1dbc96975",
},
},
})
inputChannel <- []byte(msg)

}
}()
wg.Wait()
}
14 changes: 10 additions & 4 deletions cmd/source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func main() {
cfg := config.C()
_, Consumer, eventSvc, eventValidator := SetupSourceServices(cfg)
Consumer.Consume(context.Background(), cfg.Source.NewSourceEventName, done, wg, Consumer.ProcessNewSourceEvent)

Consumer.Consume(context.Background(), cfg.Source.UndeliveredEventsQueueName, done, wg, Consumer.EventHasDeliveredToDestination)
//----------------- Setup Tracer -----------------//
otelcfg := otela.Config{
Endpoint: config.C().Source.Otel.Endpoint,
Expand Down Expand Up @@ -108,7 +108,7 @@ func main() {
wg.Wait()
}

func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, eventHandler sourceevent.Consumer, eventSvc eventsvc.Service, eventValidator eventvalidator.Validator) {
func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, Consumer sourceevent.Consumer, eventSvc eventsvc.Service, eventValidator eventvalidator.Validator) {

Check failure on line 111 in cmd/source/main.go

View workflow job for this annotation

GitHub Actions / build

captLocal: `Consumer' should not be capitalized (gocritic)
done := make(chan bool)
wg := &sync.WaitGroup{}

Expand All @@ -118,6 +118,11 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event
panic(err)
}

err = outputAdapter.NewChannel(cfg.Source.UndeliveredEventsQueueName, channel.OutputOnly, cfg.Source.BufferSize, cfg.Source.MaxRetry)
if err != nil {
panic(err)
}

inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq)
err = inputAdapter.NewChannel(cfg.Source.NewEventQueueName, channel.InputOnlyMode, cfg.Source.BufferSize, cfg.Source.MaxRetry)
if err != nil {
Expand All @@ -135,7 +140,6 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter)
writeKeySvc = writekey.New(&writeKeyRepo, cfg.Source)
eventHandler = *sourceevent.NewConsumer(outputAdapter, writeKeySvc)

DB, err := scylladb.New(cfg.Source.ScyllaDBConfig)
if err != nil {
Expand All @@ -146,5 +150,7 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

eventValidator = eventvalidator.New(&writeKeyRepo, cfg.Source)

return writeKeySvc, eventHandler, eventSvc, eventValidator
Consumer = *sourceevent.NewConsumer(outputAdapter, writeKeySvc, eventSvc, cfg.Source.RetryNumber)

return writeKeySvc, Consumer, eventSvc, eventValidator
}
2 changes: 2 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ source:
port: 8082
network: "tcp"
write_key_validation_address: "127.0.0.1:8081"
undelivered_events_queue_name: "undelivered_events"
new_event_queue_name: "new-event-received"
write_key_expiration: 120
undelivered_event_retransmit_period: 1
retry_number: 1
new_source_event_name: "new-source-event"
buffersize: 100
number_instants: 10
Expand Down
183 changes: 183 additions & 0 deletions contract/go/destination/destination.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions contract/protobuf/destination/destination.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package destination;
option go_package = "github.com/ormushq/ormus/contract/go/destination";

import "google/protobuf/timestamp.proto";

message DeliveredEvent {
string message_id = 1;
}

message DeliveredEventsList {
repeated DeliveredEvent events = 1;
}
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ module github.com/ormushq/ormus
go 1.22.0

require (
github.com/bxcodec/faker/v3 v3.8.1
github.com/go-co-op/gocron v1.37.0
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/gocql/gocql v1.6.0
github.com/gofrs/uuid/v5 v5.3.1
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/golang/protobuf v1.5.4
Expand Down Expand Up @@ -34,9 +36,8 @@ require (
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.23.0
golang.org/x/sync v0.6.0
golang.org/x/sync v0.7.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1
Expand Down Expand Up @@ -95,6 +96,7 @@ require (
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.15.0 // indirect
Expand Down
Loading

0 comments on commit ee9730f

Please sign in to comment.