Skip to content

Commit

Permalink
feat(source): create bulk event
Browse files Browse the repository at this point in the history
  • Loading branch information
alishazaee committed Feb 18, 2025
1 parent 0f8d847 commit 43942db
Show file tree
Hide file tree
Showing 36 changed files with 915 additions and 127 deletions.
3 changes: 1 addition & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ linters:
- nilerr
- nilnil

# Forces newlines in some places.
- nlreturn

# Finds sending HTTP request without context.Context.
- noctx
Expand Down Expand Up @@ -226,6 +224,7 @@ linters:
# I'm fine to check the error from json.Marshal
- errchkjson


# All SQL queries MUST BE covered with tests.
- execinquery

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// TODO: add commands for build and run in dev/produciton mode

ROOT=$(realpath $(dir $(lastword $(MAKEFILE_LIST))))
export ROOT=$(realpath $(dir $(lastword $(MAKEFILE_LIST))))

OS := $(shell uname -s)

Expand Down Expand Up @@ -35,6 +35,7 @@ ifneq (,$(findstring NT,$(OS)))
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/internalevent/internalevent.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/project/project.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/source/source.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/destination/destination.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user_info.proto
else
Expand Down
1 change: 1 addition & 0 deletions adapter/scylladb/scyllainitialize/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func CreateKeySpace(consistency gocql.Consistency, keyspace string, hosts ...str
func RunMigrations(dbConn *ScyllaDBConnection, dir string) error {
logger.L().Debug("running migrations...")
for _, host := range dbConn.hosts {
logger.L().Info(dir)
migration := New(dir, host, dbConn.keyspace)
err := migration.Run()
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions adapter/scylladb/sessionx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type SessionxInterface interface {
ExecStmt(stmt string) error
AwaitSchemaAgreement(ctx context.Context) error
Close()
NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch
ExecuteBatch(batch *gocql.Batch) error
}

type Session struct {
Expand All @@ -43,6 +45,16 @@ func (s *Session) Close() {
s.S.Close()
}

func (s *Session) NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch {
batch := s.S.NewBatch(batchType)

return batch.WithContext(ctx)
}

func (s *Session) ExecuteBatch(batch *gocql.Batch) error {
return s.S.ExecuteBatch(batch)
}

func NewSession(session *gocql.Session) SessionxInterface {
gocqlxSession := gocqlx.NewSession(session)

Expand Down
52 changes: 52 additions & 0 deletions cmd/source/faker/undeliveredevents_publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"sync"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/contract/go/destination"
"github.com/ormushq/ormus/pkg/channel"
"github.com/ormushq/ormus/pkg/channel/adapter/rabbitmqchannel"
"github.com/ormushq/ormus/pkg/encoder"
)

func main() {
cfg := config.C()
done := make(chan bool)
wg := &sync.WaitGroup{}
bufferSize := cfg.Source.BufferSize
maxRetryPolicy := cfg.Source.MaxRetry
testCount := 1

err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole})
if err != nil {
panic(err.Error())
}
inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq)
err = inputAdapter.NewChannel(cfg.Source.UndeliveredEventsQueueName, channel.InputOnlyMode, bufferSize, maxRetryPolicy)
if err != nil {
panic(err.Error())
}
inputChannel, err := inputAdapter.GetInputChannel(cfg.Source.UndeliveredEventsQueueName)
if err != nil {
panic(err.Error())
}

wg.Add(1)
go func() {
defer wg.Done()
for messageID := 0; messageID < testCount; messageID++ {
msg := encoder.EncodeProcessedEvent(&destination.DeliveredEventsList{
Events: []*destination.DeliveredEvent{
{
MessageId: "d5aacd53-f866-4406-8e2f-d6f1dbc96975",
},
},
})
inputChannel <- []byte(msg)

}
}()
wg.Wait()
}
18 changes: 12 additions & 6 deletions cmd/source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func main() {
}

cfg := config.C()
_, Consumer, eventSvc, eventValidator := SetupSourceServices(cfg)
Consumer.Consume(context.Background(), cfg.Source.NewSourceEventName, done, wg, Consumer.ProcessNewSourceEvent)

_, consumer, eventSvc, eventValidator := SetupSourceServices(cfg)
consumer.Consume(context.Background(), cfg.Source.NewSourceEventName, done, wg, consumer.ProcessNewSourceEvent)
consumer.Consume(context.Background(), cfg.Source.UndeliveredEventsQueueName, done, wg, consumer.EventHasDeliveredToDestination)
//----------------- Setup Tracer -----------------//
otelcfg := otela.Config{
Endpoint: config.C().Source.Otel.Endpoint,
Expand Down Expand Up @@ -108,7 +108,7 @@ func main() {
wg.Wait()
}

func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, eventHandler sourceevent.Consumer, eventSvc eventsvc.Service, eventValidator eventvalidator.Validator) {
func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, consumer sourceevent.Consumer, eventSvc eventsvc.Service, eventValidator eventvalidator.Validator) {
done := make(chan bool)
wg := &sync.WaitGroup{}

Expand All @@ -118,6 +118,11 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event
panic(err)
}

err = outputAdapter.NewChannel(cfg.Source.UndeliveredEventsQueueName, channel.OutputOnly, cfg.Source.BufferSize, cfg.Source.MaxRetry)
if err != nil {
panic(err)
}

inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq)
err = inputAdapter.NewChannel(cfg.Source.NewEventQueueName, channel.InputOnlyMode, cfg.Source.BufferSize, cfg.Source.MaxRetry)
if err != nil {
Expand All @@ -135,7 +140,6 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter)
writeKeySvc = writekey.New(&writeKeyRepo, cfg.Source)
eventHandler = *sourceevent.NewConsumer(outputAdapter, writeKeySvc)

DB, err := scylladb.New(cfg.Source.ScyllaDBConfig)
if err != nil {
Expand All @@ -146,5 +150,7 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

eventValidator = eventvalidator.New(&writeKeyRepo, cfg.Source)

return writeKeySvc, eventHandler, eventSvc, eventValidator
consumer = *sourceevent.NewConsumer(outputAdapter, writeKeySvc, eventSvc, cfg.Source.RetryNumber)

return writeKeySvc, consumer, eventSvc, eventValidator
}
2 changes: 2 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ source:
port: 8082
network: "tcp"
write_key_validation_address: "127.0.0.1:8081"
undelivered_events_queue_name: "undelivered_events"
new_event_queue_name: "new-event-received"
write_key_expiration: 120
undelivered_event_retransmit_period: 1
retry_number: 1
new_source_event_name: "new-source-event"
buffersize: 100
number_instants: 10
Expand Down
184 changes: 184 additions & 0 deletions contract/go/destination/destination.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions contract/protobuf/destination/destination.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package destination;
option go_package = "github.com/ormushq/ormus/contract/go/destination";

import "google/protobuf/timestamp.proto";

message DeliveredEvent {
string message_id = 1;
}

message DeliveredEventsList {
repeated DeliveredEvent events = 1;
}
4 changes: 0 additions & 4 deletions doc/swagger/manager_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,10 +1186,6 @@ const docTemplatemanager = `{
"name": {
"type": "string",
"example": "updated name"
},
"status": {
"type": "string",
"example": "active"
}
}
},
Expand Down
Loading

0 comments on commit 43942db

Please sign in to comment.