From 146b7ca7ad6343f795eb8d29ea9c2061053d152f Mon Sep 17 00:00:00 2001 From: David Rochow Date: Mon, 24 Mar 2025 15:02:16 +0100 Subject: [PATCH 1/2] feat(nats): initial adding Nats --- Makefile | 3 + cmd/event_handler/main.go | 60 +++++++++++++ cmd/heureka/main.go | 2 +- docker-compose.yaml | 12 +++ go.mod | 3 + go.sum | 6 ++ .../app/activity/activity_handler_events.go | 57 +++++++++++- .../app/activity/activity_handler_test.go | 3 +- .../app/component/component_handler_events.go | 33 ++++++- .../app/component/component_handler_test.go | 3 +- .../component_instance_handler_events.go | 35 +++++++- .../component_instance_handler_test.go | 3 +- .../component_version_handler_events.go | 27 +++++- .../component_version_handler_test.go | 3 +- internal/app/event/events.go | 11 --- internal/app/event/interface.go | 15 ++++ .../app/evidence/evidence_handler_events.go | 27 +++++- .../app/evidence/evidence_handler_test.go | 3 +- internal/app/heureka.go | 46 ++++++++-- internal/app/issue/issue_handler_events.go | 57 +++++++++++- internal/app/issue/issue_handler_test.go | 3 +- .../issue_match/issue_match_handler_events.go | 57 ++++++++++-- .../issue_match/issue_match_handler_test.go | 3 +- .../issue_match_change_handler_test.go | 3 +- .../issue_match_handler_events.go | 27 +++++- .../issue_repository_handler_events.go | 27 +++++- .../issue_repository_handler_test.go | 3 +- .../issue_variant_handler_events.go | 33 ++++++- .../issue_variant_handler_test.go | 3 +- .../app/scanner_run/scanner_run_events.go | 15 +++- .../app/scanner_run/scanner_run_handler.go | 4 +- internal/app/scanner_run/scanner_run_test.go | 3 +- .../app/service/service_handler_events.go | 63 ++++++++++++- internal/app/service/service_handler_test.go | 9 +- .../app/severity/severity_handler_events.go | 9 +- .../app/severity/severity_handler_test.go | 3 +- .../support_group_handler_events.go | 63 ++++++++++++- .../support_group_handler_test.go | 3 +- internal/app/user/user_handler_events.go | 39 +++++++- internal/app/user/user_handler_test.go | 3 +- internal/{app => }/event/event_registry.go | 21 +---- .../{app => }/event/event_registry_test.go | 6 +- internal/event/events.go | 33 +++++++ internal/event/nats/event_registry.go | 90 +++++++++++++++++++ internal/server/server.go | 21 ++++- internal/util/config.go | 1 + tools/oidc_provider_mock/go.mod | 10 +-- tools/oidc_provider_mock/go.sum | 9 ++ 48 files changed, 884 insertions(+), 89 deletions(-) create mode 100644 cmd/event_handler/main.go delete mode 100644 internal/app/event/events.go create mode 100644 internal/app/event/interface.go rename internal/{app => }/event/event_registry.go (71%) rename internal/{app => }/event/event_registry_test.go (97%) create mode 100644 internal/event/events.go create mode 100644 internal/event/nats/event_registry.go diff --git a/Makefile b/Makefile index a75ce7ff..67c68fed 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,9 @@ run-%: mockery gqlgen start: stop docker-compose --profile db up +start-with-queue: stop + docker-compose --profile storage up + # Start all container. This expects the heureka bin to be amd64 because the image in the docker-compose is amd64 start-all-%: stop docker-compose --profile db --profile heureka up --build --force-recreate diff --git a/cmd/event_handler/main.go b/cmd/event_handler/main.go new file mode 100644 index 00000000..b34c169b --- /dev/null +++ b/cmd/event_handler/main.go @@ -0,0 +1,60 @@ +// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Greenhouse contributors +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "fmt" + "github.com/cloudoperators/heureka/internal/app" + "github.com/cloudoperators/heureka/internal/database/mariadb" + "github.com/cloudoperators/heureka/internal/event/nats" + "os" + "os/signal" + "syscall" + + "github.com/cloudoperators/heureka/internal/util" + "github.com/cloudoperators/heureka/pkg/log" + "github.com/kelseyhightower/envconfig" + "github.com/sirupsen/logrus" +) + +var ( + mode string +) + +func main() { + fmt.Println(util.HeurekaFiglet) + var cfg util.Config + log.InitLog() + + err := envconfig.Process("heureka", &cfg) + if err != nil { + logrus.WithField("error", err).Fatal("Error while reading env config %s", "test") + return + } + cfg.ConfigToConsole() + + // initialize the database + db, err := mariadb.NewSqlDatabase(cfg) + if err != nil { + logrus.WithError(err).Fatalln("Error while Creating Db") + } + defer db.CloseConnection() + + er := nats.NewEventRegistry(db) + defer er.Shutdown() + + // initialize the application + application := app.NewHeurekaApp(db, er) + defer application.Shutdown() + + application.SubscribeHandlers() + + // Create a channel to listen for OS signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Block until a signal is received + sig := <-sigChan + fmt.Printf("Received signal: %s, shutting down...\n", sig) +} diff --git a/cmd/heureka/main.go b/cmd/heureka/main.go index 6240434e..73de17f4 100644 --- a/cmd/heureka/main.go +++ b/cmd/heureka/main.go @@ -5,7 +5,6 @@ package main import ( "fmt" - "github.com/cloudoperators/heureka/internal/database/mariadb/test" "github.com/cloudoperators/heureka/internal/server" "github.com/cloudoperators/heureka/internal/util" @@ -37,6 +36,7 @@ func main() { if err != nil { logrus.WithError(err).Fatalln("Error while resetting database schema.") } + err = dbManager.Setup() if err != nil { logrus.WithError(err).Fatalln("Error while setting up database.") diff --git a/docker-compose.yaml b/docker-compose.yaml index 637c2c67..c13be5de 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -10,6 +10,7 @@ services: restart: always profiles: - db + - storage environment: MARIADB_USER: ${DB_USER} MARIADB_PASSWORD: ${DB_PASSWORD} @@ -31,6 +32,17 @@ services: ports: - "3306:3306" + heureka-nats: + image: nats:latest + container_name: nats + restart: always + profiles: + - queue + - storage + ports: + - "4222:4222" + - "8222:8222" + heureka-app: build: . container_name: heureka-app diff --git a/go.mod b/go.mod index 7ee7bac2..d3bf7236 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,9 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/nats.go v1.39.1 // indirect + github.com/nats-io/nkeys v0.4.9 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect diff --git a/go.sum b/go.sum index b80462a4..bdac3de4 100644 --- a/go.sum +++ b/go.sum @@ -160,6 +160,12 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk= +github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo/v2 v2.23.0 h1:FA1xjp8ieYDzlgS5ABTpdUDB7wtngggONc8a7ku2NqQ= diff --git a/internal/app/activity/activity_handler_events.go b/internal/app/activity/activity_handler_events.go index 4e9a6854..2aca3c6a 100644 --- a/internal/app/activity/activity_handler_events.go +++ b/internal/app/activity/activity_handler_events.go @@ -4,8 +4,9 @@ package activity import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -24,6 +25,12 @@ type ActivityCreateEvent struct { Activity *entity.Activity } +func (e ActivityCreateEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ActivityCreateEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (a *ActivityCreateEvent) Name() event.EventName { return ActivityCreateEventName } @@ -32,6 +39,12 @@ type ActivityUpdateEvent struct { Activity *entity.Activity } +func (e ActivityUpdateEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ActivityUpdateEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (a *ActivityUpdateEvent) Name() event.EventName { return ActivityUpdateEventName } @@ -40,6 +53,12 @@ type ActivityDeleteEvent struct { ActivityID int64 } +func (e ActivityDeleteEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ActivityDeleteEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (a *ActivityDeleteEvent) Name() event.EventName { return ActivityDeleteEventName } @@ -49,6 +68,12 @@ type AddServiceToActivityEvent struct { ServiceID int64 } +func (e AddServiceToActivityEvent) Unmarshal(data []byte) (event.Event, error) { + event := &AddServiceToActivityEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (a *AddServiceToActivityEvent) Name() event.EventName { return AddServiceToActivityEventName } @@ -58,6 +83,12 @@ type RemoveServiceFromActivityEvent struct { ServiceID int64 } +func (e RemoveServiceFromActivityEvent) Unmarshal(data []byte) (event.Event, error) { + event := &RemoveServiceFromActivityEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (a *RemoveServiceFromActivityEvent) Name() event.EventName { return RemoveServiceFromActivityEventName } @@ -67,6 +98,12 @@ type AddIssueToActivityEvent struct { IssueID int64 } +func (e AddIssueToActivityEvent) Unmarshal(data []byte) (event.Event, error) { + event := &AddIssueToActivityEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (a *AddIssueToActivityEvent) Name() event.EventName { return AddIssueToActivityEventName } @@ -76,6 +113,12 @@ type RemoveIssueFromActivityEvent struct { IssueID int64 } +func (e RemoveIssueFromActivityEvent) Unmarshal(data []byte) (event.Event, error) { + event := &RemoveIssueFromActivityEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (a *RemoveIssueFromActivityEvent) Name() event.EventName { return RemoveIssueFromActivityEventName } @@ -86,6 +129,12 @@ type ListActivitiesEvent struct { Activities *entity.List[entity.ActivityResult] } +func (e ListActivitiesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListActivitiesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (l *ListActivitiesEvent) Name() event.EventName { return ListActivitiesEventName } @@ -95,6 +144,12 @@ type GetActivityEvent struct { Activity *entity.Activity } +func (e GetActivityEvent) Unmarshal(data []byte) (event.Event, error) { + event := &GetActivityEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (g *GetActivityEvent) Name() event.EventName { return GetActivityEventName } diff --git a/internal/app/activity/activity_handler_test.go b/internal/app/activity/activity_handler_test.go index ea3f920c..b2d15347 100644 --- a/internal/app/activity/activity_handler_test.go +++ b/internal/app/activity/activity_handler_test.go @@ -4,6 +4,7 @@ package activity_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -29,7 +30,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func activityFilter() *entity.ActivityFilter { diff --git a/internal/app/component/component_handler_events.go b/internal/app/component/component_handler_events.go index 1a155577..fcc516ae 100644 --- a/internal/app/component/component_handler_events.go +++ b/internal/app/component/component_handler_events.go @@ -4,8 +4,9 @@ package component import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -22,6 +23,12 @@ type ListComponentsEvent struct { Components *entity.List[entity.ComponentResult] } +func (e ListComponentsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListComponentsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListComponentsEvent) Name() event.EventName { return ListComponentsEventName } @@ -30,6 +37,12 @@ type CreateComponentEvent struct { Component *entity.Component } +func (e CreateComponentEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateComponentEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateComponentEvent) Name() event.EventName { return CreateComponentEventName } @@ -38,6 +51,12 @@ type UpdateComponentEvent struct { Component *entity.Component } +func (e UpdateComponentEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateComponentEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateComponentEvent) Name() event.EventName { return UpdateComponentEventName } @@ -46,6 +65,12 @@ type DeleteComponentEvent struct { ComponentID int64 } +func (e DeleteComponentEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteComponentEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteComponentEvent) Name() event.EventName { return DeleteComponentEventName } @@ -56,6 +81,12 @@ type ListComponentCcrnsEvent struct { CCRNs []string } +func (e ListComponentCcrnsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListComponentCcrnsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListComponentCcrnsEvent) Name() event.EventName { return ListComponentCcrnsEventName } diff --git a/internal/app/component/component_handler_test.go b/internal/app/component/component_handler_test.go index b5db7c05..b39dbe05 100644 --- a/internal/app/component/component_handler_test.go +++ b/internal/app/component/component_handler_test.go @@ -4,6 +4,7 @@ package component_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -28,7 +29,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getComponentFilter() *entity.ComponentFilter { diff --git a/internal/app/component_instance/component_instance_handler_events.go b/internal/app/component_instance/component_instance_handler_events.go index 600436fd..13e61ffa 100644 --- a/internal/app/component_instance/component_instance_handler_events.go +++ b/internal/app/component_instance/component_instance_handler_events.go @@ -4,8 +4,9 @@ package component_instance import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -17,11 +18,17 @@ const ( ) type ListComponentInstancesEvent struct { - Filter *entity.ComponentInstanceFilter + Filter *entity.ComponentInstanceFilter `json:"filter"` Options *entity.ListOptions ComponentInstances *entity.List[entity.ComponentInstanceResult] } +func (e ListComponentInstancesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListComponentInstancesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListComponentInstancesEvent) Name() event.EventName { return ListComponentInstancesEventName } @@ -30,6 +37,12 @@ type CreateComponentInstanceEvent struct { ComponentInstance *entity.ComponentInstance } +func (e CreateComponentInstanceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateComponentInstanceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateComponentInstanceEvent) Name() event.EventName { return CreateComponentInstanceEventName } @@ -38,6 +51,12 @@ type UpdateComponentInstanceEvent struct { ComponentInstance *entity.ComponentInstance } +func (e UpdateComponentInstanceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateComponentInstanceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateComponentInstanceEvent) Name() event.EventName { return UpdateComponentInstanceEventName } @@ -46,6 +65,12 @@ type DeleteComponentInstanceEvent struct { ComponentInstanceID int64 } +func (e DeleteComponentInstanceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteComponentInstanceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteComponentInstanceEvent) Name() event.EventName { return DeleteComponentInstanceEventName } @@ -55,6 +80,12 @@ type ListCcrnEvent struct { Ccrn []string } +func (e ListCcrnEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListCcrnEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListCcrnEvent) Name() event.EventName { return ListCcrnEventName } diff --git a/internal/app/component_instance/component_instance_handler_test.go b/internal/app/component_instance/component_instance_handler_test.go index 4780dbab..8c5c0a38 100644 --- a/internal/app/component_instance/component_instance_handler_test.go +++ b/internal/app/component_instance/component_instance_handler_test.go @@ -4,6 +4,7 @@ package component_instance_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -28,7 +29,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func componentInstanceFilter() *entity.ComponentInstanceFilter { diff --git a/internal/app/component_version/component_version_handler_events.go b/internal/app/component_version/component_version_handler_events.go index 9dbdc5d9..e0bf91d6 100644 --- a/internal/app/component_version/component_version_handler_events.go +++ b/internal/app/component_version/component_version_handler_events.go @@ -4,8 +4,9 @@ package component_version import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -21,6 +22,12 @@ type ListComponentVersionsEvent struct { ComponentVersions *entity.List[entity.ComponentVersionResult] } +func (e ListComponentVersionsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListComponentVersionsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListComponentVersionsEvent) Name() event.EventName { return ListComponentVersionsEventName } @@ -29,6 +36,12 @@ type CreateComponentVersionEvent struct { ComponentVersion *entity.ComponentVersion } +func (e CreateComponentVersionEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateComponentVersionEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateComponentVersionEvent) Name() event.EventName { return CreateComponentVersionEventName } @@ -37,6 +50,12 @@ type UpdateComponentVersionEvent struct { ComponentVersion *entity.ComponentVersion } +func (e UpdateComponentVersionEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateComponentVersionEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateComponentVersionEvent) Name() event.EventName { return UpdateComponentVersionEventName } @@ -45,6 +64,12 @@ type DeleteComponentVersionEvent struct { ComponentVersionID int64 } +func (e DeleteComponentVersionEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteComponentVersionEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteComponentVersionEvent) Name() event.EventName { return DeleteComponentVersionEventName } diff --git a/internal/app/component_version/component_version_handler_test.go b/internal/app/component_version/component_version_handler_test.go index 312c8fa8..60554d01 100644 --- a/internal/app/component_version/component_version_handler_test.go +++ b/internal/app/component_version/component_version_handler_test.go @@ -4,6 +4,7 @@ package component_version_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -27,7 +28,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getComponentVersionFilter() *entity.ComponentVersionFilter { diff --git a/internal/app/event/events.go b/internal/app/event/events.go deleted file mode 100644 index 9ee29477..00000000 --- a/internal/app/event/events.go +++ /dev/null @@ -1,11 +0,0 @@ -// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Greenhouse contributors -// SPDX-License-Identifier: Apache-2.0 - -package event - -// EventName type -type EventName string - -type Event interface { - Name() EventName -} diff --git a/internal/app/event/interface.go b/internal/app/event/interface.go new file mode 100644 index 00000000..b9cade70 --- /dev/null +++ b/internal/app/event/interface.go @@ -0,0 +1,15 @@ +package event + +import ( + "context" + "github.com/cloudoperators/heureka/internal/event" +) + +// EventRegistry is the central point for managing handlers for all kind of events +type EventRegistry interface { + RegisterEventHandler(event.EventName, event.EventHandler) + PushEvent(event.Event) + Run(ctx context.Context) + // todo: add shutdown + // Shutdown stops the event registry +} diff --git a/internal/app/evidence/evidence_handler_events.go b/internal/app/evidence/evidence_handler_events.go index b46c9363..0363f8ce 100644 --- a/internal/app/evidence/evidence_handler_events.go +++ b/internal/app/evidence/evidence_handler_events.go @@ -4,8 +4,9 @@ package evidence import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -21,6 +22,12 @@ type ListEvidencesEvent struct { Results *entity.List[entity.EvidenceResult] } +func (e ListEvidencesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListEvidencesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListEvidencesEvent) Name() event.EventName { return ListEvidencesEventName } @@ -29,6 +36,12 @@ type CreateEvidenceEvent struct { Evidence *entity.Evidence } +func (e CreateEvidenceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateEvidenceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateEvidenceEvent) Name() event.EventName { return CreateEvidenceEventName } @@ -37,6 +50,12 @@ type UpdateEvidenceEvent struct { Evidence *entity.Evidence } +func (e UpdateEvidenceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateEvidenceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateEvidenceEvent) Name() event.EventName { return UpdateEvidenceEventName } @@ -45,6 +64,12 @@ type DeleteEvidenceEvent struct { EvidenceID int64 } +func (e DeleteEvidenceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteEvidenceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteEvidenceEvent) Name() event.EventName { return DeleteEvidenceEventName } diff --git a/internal/app/evidence/evidence_handler_test.go b/internal/app/evidence/evidence_handler_test.go index c4744571..0cdeda28 100644 --- a/internal/app/evidence/evidence_handler_test.go +++ b/internal/app/evidence/evidence_handler_test.go @@ -4,6 +4,7 @@ package evidence_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -27,7 +28,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func evidenceFilter() *entity.EvidenceFilter { diff --git a/internal/app/heureka.go b/internal/app/heureka.go index 90786458..b5aefb10 100644 --- a/internal/app/heureka.go +++ b/internal/app/heureka.go @@ -4,7 +4,8 @@ package app import ( - "context" + events "github.com/cloudoperators/heureka/internal/event" + log "github.com/sirupsen/logrus" "github.com/cloudoperators/heureka/internal/app/activity" "github.com/cloudoperators/heureka/internal/app/component" @@ -46,12 +47,11 @@ type HeurekaApp struct { database database.Database } -func NewHeurekaApp(db database.Database) *HeurekaApp { - er := event.NewEventRegistry(db) +// todo: inject event registry +func NewHeurekaApp(db database.Database, er event.EventRegistry) *HeurekaApp { rh := issue_repository.NewIssueRepositoryHandler(db, er) ivh := issue_variant.NewIssueVariantHandler(db, er, rh) sh := severity.NewSeverityHandler(db, er, ivh) - er.Run(context.Background()) heureka := &HeurekaApp{ ActivityHandler: activity.NewActivityHandler(db, er), ComponentHandler: component.NewComponentHandler(db, er), @@ -71,34 +71,62 @@ func NewHeurekaApp(db database.Database) *HeurekaApp { eventRegistry: er, database: db, } - heureka.SubscribeHandlers() return heureka } +// todo: move to event package of app package func (h *HeurekaApp) SubscribeHandlers() { + h.eventRegistry.RegisterEventHandler( + component_instance.ListComponentInstancesEventName, + events.EventHandler{ + func(db database.Database, event events.Event) { + //do nothing + log.Info("Received ListComponentInstancesEvent and calling handler....") + if listEvent, ok := event.(*component_instance.ListComponentInstancesEvent); ok { + log.WithField("event", listEvent).Infof("Marshalled event") + } + return + }, + component_instance.ListComponentInstancesEvent{}.Unmarshal, + }, + ) + // Event handlers for Components h.eventRegistry.RegisterEventHandler( component_instance.CreateComponentInstanceEventName, - event.EventHandlerFunc(issue_match.OnComponentInstanceCreate), + events.EventHandler{ + //todo: move handler to component_instance? + issue_match.OnComponentInstanceCreate, + component_instance.CreateComponentInstanceEvent{}.Unmarshal, + }, ) // Event handlers for Services h.eventRegistry.RegisterEventHandler( service.CreateServiceEventName, - event.EventHandlerFunc(service.OnServiceCreate), + events.EventHandler{ + service.OnServiceCreate, + service.CreateServiceEvent{}.Unmarshal, + }, ) // Event handlers for IssueRepositories h.eventRegistry.RegisterEventHandler( issue_repository.CreateIssueRepositoryEventName, - event.EventHandlerFunc(issue_repository.OnIssueRepositoryCreate), + events.EventHandler{ + issue_repository.OnIssueRepositoryCreate, + issue_repository.CreateIssueRepositoryEvent{}.Unmarshal, + }, ) // Event handlers for ComponentVersion attachments to Issues h.eventRegistry.RegisterEventHandler( issue.AddComponentVersionToIssueEventName, - event.EventHandlerFunc(issue.OnComponentVersionAttachmentToIssue), + events.EventHandler{ + issue.OnComponentVersionAttachmentToIssue, + issue.AddComponentVersionToIssueEvent{}.Unmarshal, + }, ) } diff --git a/internal/app/issue/issue_handler_events.go b/internal/app/issue/issue_handler_events.go index 596e9b63..ac8f18de 100644 --- a/internal/app/issue/issue_handler_events.go +++ b/internal/app/issue/issue_handler_events.go @@ -4,10 +4,11 @@ package issue import ( + "github.com/cloudoperators/heureka/internal/event" "time" + "encoding/json" "github.com/cloudoperators/heureka/internal/app/common" - "github.com/cloudoperators/heureka/internal/app/event" "github.com/cloudoperators/heureka/internal/app/shared" "github.com/cloudoperators/heureka/internal/database" "github.com/cloudoperators/heureka/internal/entity" @@ -30,6 +31,12 @@ type CreateIssueEvent struct { Issue *entity.Issue } +func (e CreateIssueEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateIssueEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateIssueEvent) Name() event.EventName { return CreateIssueEventName } @@ -38,6 +45,12 @@ type UpdateIssueEvent struct { Issue *entity.Issue } +func (e UpdateIssueEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateIssueEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateIssueEvent) Name() event.EventName { return UpdateIssueEventName } @@ -46,6 +59,12 @@ type DeleteIssueEvent struct { IssueID int64 } +func (e DeleteIssueEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteIssueEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteIssueEvent) Name() event.EventName { return DeleteIssueEventName } @@ -55,6 +74,12 @@ type AddComponentVersionToIssueEvent struct { ComponentVersionID int64 } +func (e AddComponentVersionToIssueEvent) Unmarshal(data []byte) (event.Event, error) { + event := &AddComponentVersionToIssueEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *AddComponentVersionToIssueEvent) Name() event.EventName { return AddComponentVersionToIssueEventName } @@ -64,6 +89,12 @@ type RemoveComponentVersionFromIssueEvent struct { ComponentVersionID int64 } +func (e RemoveComponentVersionFromIssueEvent) Unmarshal(data []byte) (event.Event, error) { + event := &RemoveComponentVersionFromIssueEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *RemoveComponentVersionFromIssueEvent) Name() event.EventName { return RemoveComponentVersionFromIssueEventName } @@ -74,6 +105,12 @@ type ListIssuesEvent struct { Issues *entity.IssueList } +func (e ListIssuesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListIssuesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListIssuesEvent) Name() event.EventName { return ListIssuesEventName } @@ -83,6 +120,12 @@ type GetIssueEvent struct { Issue *entity.Issue } +func (e GetIssueEvent) Unmarshal(data []byte) (event.Event, error) { + event := &GetIssueEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *GetIssueEvent) Name() event.EventName { return GetIssueEventName } @@ -93,6 +136,12 @@ type ListIssueNamesEvent struct { Names []string } +func (e ListIssueNamesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListIssueNamesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListIssueNamesEvent) Name() event.EventName { return ListIssueNamesEventName } @@ -102,6 +151,12 @@ type GetIssueSeverityCountsEvent struct { Counts *entity.IssueSeverityCounts } +func (e GetIssueSeverityCountsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &GetIssueSeverityCountsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *GetIssueSeverityCountsEvent) Name() event.EventName { return GetIssueSeverityCountsEventName } diff --git a/internal/app/issue/issue_handler_test.go b/internal/app/issue/issue_handler_test.go index 388a96aa..fc8512e5 100644 --- a/internal/app/issue/issue_handler_test.go +++ b/internal/app/issue/issue_handler_test.go @@ -4,6 +4,7 @@ package issue_test import ( "errors" + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -28,7 +29,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getIssueFilter() *entity.IssueFilter { diff --git a/internal/app/issue_match/issue_match_handler_events.go b/internal/app/issue_match/issue_match_handler_events.go index 1460b30d..7e4627f1 100644 --- a/internal/app/issue_match/issue_match_handler_events.go +++ b/internal/app/issue_match/issue_match_handler_events.go @@ -4,10 +4,11 @@ package issue_match import ( + "github.com/cloudoperators/heureka/internal/event" "time" + "encoding/json" "github.com/cloudoperators/heureka/internal/app/component_instance" - "github.com/cloudoperators/heureka/internal/app/event" "github.com/cloudoperators/heureka/internal/app/shared" "github.com/cloudoperators/heureka/internal/database" "github.com/cloudoperators/heureka/internal/entity" @@ -31,6 +32,12 @@ type ListIssueMatchesEvent struct { Results *entity.List[entity.IssueMatchResult] } +func (e ListIssueMatchesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListIssueMatchesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListIssueMatchesEvent) Name() event.EventName { return ListIssueMatchesEventName } @@ -40,6 +47,12 @@ type GetIssueMatchEvent struct { Result *entity.IssueMatch } +func (e GetIssueMatchEvent) Unmarshal(data []byte) (event.Event, error) { + event := &GetIssueMatchEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *GetIssueMatchEvent) Name() event.EventName { return GetIssueMatchEventName } @@ -48,6 +61,18 @@ type CreateIssueMatchEvent struct { IssueMatch *entity.IssueMatch } +func (e CreateIssueMatchEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateIssueMatchEvent{} + err := json.Unmarshal(data, event) + return event, err +} + +func OnComponentInstanceCreate(db database.Database, event event.Event) { + if createEvent, ok := event.(*component_instance.CreateComponentInstanceEvent); ok { + OnComponentVersionAssignmentToComponentInstance(db, createEvent.ComponentInstance.Id, createEvent.ComponentInstance.ComponentVersionId) + } +} + func (e *CreateIssueMatchEvent) Name() event.EventName { return CreateIssueMatchEventName } @@ -56,6 +81,12 @@ type UpdateIssueMatchEvent struct { IssueMatch *entity.IssueMatch } +func (e UpdateIssueMatchEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateIssueMatchEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateIssueMatchEvent) Name() event.EventName { return UpdateIssueMatchEventName } @@ -64,6 +95,12 @@ type DeleteIssueMatchEvent struct { IssueMatchID int64 } +func (e DeleteIssueMatchEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteIssueMatchEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteIssueMatchEvent) Name() event.EventName { return DeleteIssueMatchEventName } @@ -73,6 +110,12 @@ type AddEvidenceToIssueMatchEvent struct { EvidenceID int64 } +func (e AddEvidenceToIssueMatchEvent) Unmarshal(data []byte) (event.Event, error) { + event := &AddEvidenceToIssueMatchEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *AddEvidenceToIssueMatchEvent) Name() event.EventName { return AddEvidenceToIssueMatchEventName } @@ -82,14 +125,14 @@ type RemoveEvidenceFromIssueMatchEvent struct { EvidenceID int64 } -func (e *RemoveEvidenceFromIssueMatchEvent) Name() event.EventName { - return RemoveEvidenceFromIssueMatchEventName +func (e RemoveEvidenceFromIssueMatchEvent) Unmarshal(data []byte) (event.Event, error) { + event := &RemoveEvidenceFromIssueMatchEvent{} + err := json.Unmarshal(data, event) + return event, err } -func OnComponentInstanceCreate(db database.Database, event event.Event) { - if createEvent, ok := event.(*component_instance.CreateComponentInstanceEvent); ok { - OnComponentVersionAssignmentToComponentInstance(db, createEvent.ComponentInstance.Id, createEvent.ComponentInstance.ComponentVersionId) - } +func (e *RemoveEvidenceFromIssueMatchEvent) Name() event.EventName { + return RemoveEvidenceFromIssueMatchEventName } // BuildIssueVariantMap builds a map of issue id to issue variant for the given issues and component instance id diff --git a/internal/app/issue_match/issue_match_handler_test.go b/internal/app/issue_match/issue_match_handler_test.go index 92ef4b81..2dd3e547 100644 --- a/internal/app/issue_match/issue_match_handler_test.go +++ b/internal/app/issue_match/issue_match_handler_test.go @@ -5,6 +5,7 @@ package issue_match_test import ( "errors" + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" "time" @@ -35,7 +36,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getIssueMatchFilter() *entity.IssueMatchFilter { diff --git a/internal/app/issue_match_change/issue_match_change_handler_test.go b/internal/app/issue_match_change/issue_match_change_handler_test.go index 4a0ca93d..cf72e569 100644 --- a/internal/app/issue_match_change/issue_match_change_handler_test.go +++ b/internal/app/issue_match_change/issue_match_change_handler_test.go @@ -4,6 +4,7 @@ package issue_match_change_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -27,7 +28,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getIssueMatchChangeFilter() *entity.IssueMatchChangeFilter { diff --git a/internal/app/issue_match_change/issue_match_handler_events.go b/internal/app/issue_match_change/issue_match_handler_events.go index 5b2eda3e..32cab135 100644 --- a/internal/app/issue_match_change/issue_match_handler_events.go +++ b/internal/app/issue_match_change/issue_match_handler_events.go @@ -4,8 +4,9 @@ package issue_match_change import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -21,6 +22,12 @@ type ListIssueMatchChangesEvent struct { Results *entity.List[entity.IssueMatchChangeResult] } +func (e ListIssueMatchChangesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListIssueMatchChangesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListIssueMatchChangesEvent) Name() event.EventName { return ListIssueMatchChangesEventName } @@ -29,6 +36,12 @@ type CreateIssueMatchChangeEvent struct { IssueMatchChange *entity.IssueMatchChange } +func (e CreateIssueMatchChangeEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateIssueMatchChangeEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateIssueMatchChangeEvent) Name() event.EventName { return CreateIssueMatchChangeEventName } @@ -37,6 +50,12 @@ type UpdateIssueMatchChangeEvent struct { IssueMatchChange *entity.IssueMatchChange } +func (e UpdateIssueMatchChangeEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateIssueMatchChangeEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateIssueMatchChangeEvent) Name() event.EventName { return UpdateIssueMatchChangeEventName } @@ -45,6 +64,12 @@ type DeleteIssueMatchChangeEvent struct { IssueMatchChangeID int64 } +func (e DeleteIssueMatchChangeEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteIssueMatchChangeEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteIssueMatchChangeEvent) Name() event.EventName { return DeleteIssueMatchChangeEventName } diff --git a/internal/app/issue_repository/issue_repository_handler_events.go b/internal/app/issue_repository/issue_repository_handler_events.go index 02a4cc33..60c2f824 100644 --- a/internal/app/issue_repository/issue_repository_handler_events.go +++ b/internal/app/issue_repository/issue_repository_handler_events.go @@ -4,9 +4,10 @@ package issue_repository import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/database" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" "github.com/sirupsen/logrus" ) @@ -23,6 +24,12 @@ type ListIssueRepositoriesEvent struct { Results *entity.List[entity.IssueRepositoryResult] } +func (e ListIssueRepositoriesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListIssueRepositoriesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListIssueRepositoriesEvent) Name() event.EventName { return ListIssueRepositoriesEventName } @@ -31,6 +38,12 @@ type CreateIssueRepositoryEvent struct { IssueRepository *entity.IssueRepository } +func (e CreateIssueRepositoryEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateIssueRepositoryEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateIssueRepositoryEvent) Name() event.EventName { return CreateIssueRepositoryEventName } @@ -39,6 +52,12 @@ type UpdateIssueRepositoryEvent struct { IssueRepository *entity.IssueRepository } +func (e UpdateIssueRepositoryEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateIssueRepositoryEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateIssueRepositoryEvent) Name() event.EventName { return UpdateIssueRepositoryEventName } @@ -47,6 +66,12 @@ type DeleteIssueRepositoryEvent struct { IssueRepositoryID int64 } +func (e DeleteIssueRepositoryEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteIssueRepositoryEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteIssueRepositoryEvent) Name() event.EventName { return DeleteIssueRepositoryEventName } diff --git a/internal/app/issue_repository/issue_repository_handler_test.go b/internal/app/issue_repository/issue_repository_handler_test.go index 517a3ea4..0ecacfb7 100644 --- a/internal/app/issue_repository/issue_repository_handler_test.go +++ b/internal/app/issue_repository/issue_repository_handler_test.go @@ -4,6 +4,7 @@ package issue_repository_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -27,7 +28,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getIssueRepositoryFilter() *entity.IssueRepositoryFilter { diff --git a/internal/app/issue_variant/issue_variant_handler_events.go b/internal/app/issue_variant/issue_variant_handler_events.go index 66d189e4..be7923a4 100644 --- a/internal/app/issue_variant/issue_variant_handler_events.go +++ b/internal/app/issue_variant/issue_variant_handler_events.go @@ -4,8 +4,9 @@ package issue_variant import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -22,6 +23,12 @@ type ListIssueVariantsEvent struct { Results *entity.List[entity.IssueVariantResult] } +func (e ListIssueVariantsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListIssueVariantsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListIssueVariantsEvent) Name() event.EventName { return ListIssueVariantsEventName } @@ -32,6 +39,12 @@ type ListEffectiveIssueVariantsEvent struct { Results *entity.List[entity.IssueVariantResult] } +func (e ListEffectiveIssueVariantsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListEffectiveIssueVariantsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListEffectiveIssueVariantsEvent) Name() event.EventName { return ListEffectiveIssueVariantsEventName } @@ -40,6 +53,12 @@ type CreateIssueVariantEvent struct { IssueVariant *entity.IssueVariant } +func (e CreateIssueVariantEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateIssueVariantEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateIssueVariantEvent) Name() event.EventName { return CreateIssueVariantEventName } @@ -48,6 +67,12 @@ type UpdateIssueVariantEvent struct { IssueVariant *entity.IssueVariant } +func (e UpdateIssueVariantEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateIssueVariantEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateIssueVariantEvent) Name() event.EventName { return UpdateIssueVariantEventName } @@ -56,6 +81,12 @@ type DeleteIssueVariantEvent struct { IssueVariantID int64 } +func (e DeleteIssueVariantEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteIssueVariantEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteIssueVariantEvent) Name() event.EventName { return DeleteIssueVariantEventName } diff --git a/internal/app/issue_variant/issue_variant_handler_test.go b/internal/app/issue_variant/issue_variant_handler_test.go index e80c25e8..d637b7d5 100644 --- a/internal/app/issue_variant/issue_variant_handler_test.go +++ b/internal/app/issue_variant/issue_variant_handler_test.go @@ -4,6 +4,7 @@ package issue_variant_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -28,7 +29,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) diff --git a/internal/app/scanner_run/scanner_run_events.go b/internal/app/scanner_run/scanner_run_events.go index 501fadea..8afdd6a5 100644 --- a/internal/app/scanner_run/scanner_run_events.go +++ b/internal/app/scanner_run/scanner_run_events.go @@ -4,8 +4,9 @@ package scanner_run import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -17,10 +18,22 @@ type CreateScannerRunEvent struct { ScannerRun *entity.ScannerRun } +func (e CreateScannerRunEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateScannerRunEvent{} + err := json.Unmarshal(data, event) + return event, err +} + type UpdateScannerRunEvent struct { successfulRun bool } +func (e UpdateScannerRunEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateScannerRunEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (csr *CreateScannerRunEvent) Name() event.EventName { return CreateScannerRunEventName } diff --git a/internal/app/scanner_run/scanner_run_handler.go b/internal/app/scanner_run/scanner_run_handler.go index 9e1120ea..b56f11bb 100644 --- a/internal/app/scanner_run/scanner_run_handler.go +++ b/internal/app/scanner_run/scanner_run_handler.go @@ -32,7 +32,9 @@ func (srh *scannerRunHandler) CreateScannerRun(sr *entity.ScannerRun) (bool, err return false, &ScannerRunHandlerError{msg: "Error creating scanner run"} } - srh.eventRegistry.PushEvent(&CreateScannerRunEvent{sr}) + srh.eventRegistry.PushEvent(&CreateScannerRunEvent{ + ScannerRun: sr, + }) return true, nil } diff --git a/internal/app/scanner_run/scanner_run_test.go b/internal/app/scanner_run/scanner_run_test.go index 323bfa60..1bb9e9db 100644 --- a/internal/app/scanner_run/scanner_run_test.go +++ b/internal/app/scanner_run/scanner_run_test.go @@ -4,6 +4,7 @@ package scanner_run import ( + event2 "github.com/cloudoperators/heureka/internal/event" "testing" "github.com/cloudoperators/heureka/internal/app/event" @@ -24,7 +25,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) var sre *entity.ScannerRun diff --git a/internal/app/service/service_handler_events.go b/internal/app/service/service_handler_events.go index d5789a93..67989491 100644 --- a/internal/app/service/service_handler_events.go +++ b/internal/app/service/service_handler_events.go @@ -4,9 +4,10 @@ package service import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/database" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" "github.com/sirupsen/logrus" ) @@ -27,6 +28,12 @@ type CreateServiceEvent struct { Service *entity.Service } +func (e CreateServiceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateServiceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateServiceEvent) Name() event.EventName { return CreateServiceEventName } @@ -35,6 +42,12 @@ type UpdateServiceEvent struct { Service *entity.Service } +func (e UpdateServiceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateServiceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateServiceEvent) Name() event.EventName { return UpdateServiceEventName } @@ -43,6 +56,12 @@ type DeleteServiceEvent struct { ServiceID int64 } +func (e DeleteServiceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteServiceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteServiceEvent) Name() event.EventName { return DeleteServiceEventName } @@ -52,6 +71,12 @@ type AddOwnerToServiceEvent struct { OwnerID int64 } +func (e AddOwnerToServiceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &AddOwnerToServiceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *AddOwnerToServiceEvent) Name() event.EventName { return AddOwnerToServiceEventName } @@ -61,6 +86,12 @@ type RemoveOwnerFromServiceEvent struct { OwnerID int64 } +func (e RemoveOwnerFromServiceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &RemoveOwnerFromServiceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *RemoveOwnerFromServiceEvent) Name() event.EventName { return RemoveOwnerFromServiceEventName } @@ -71,6 +102,12 @@ type ListServicesEvent struct { Services *entity.List[entity.ServiceResult] } +func (e ListServicesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListServicesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListServicesEvent) Name() event.EventName { return ListServicesEventName } @@ -80,6 +117,12 @@ type GetServiceEvent struct { Service *entity.Service } +func (e GetServiceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &GetServiceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *GetServiceEvent) Name() event.EventName { return GetServiceEventName } @@ -90,6 +133,12 @@ type ListServiceCcrnsEvent struct { Ccrns []string } +func (e ListServiceCcrnsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListServiceCcrnsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListServiceCcrnsEvent) Name() event.EventName { return ListServiceCcrnsEventName } @@ -99,6 +148,12 @@ type AddIssueRepositoryToServiceEvent struct { RepositoryID int64 } +func (e AddIssueRepositoryToServiceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &AddIssueRepositoryToServiceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *AddIssueRepositoryToServiceEvent) Name() event.EventName { return AddIssueRepositoryToServiceEventName } @@ -108,6 +163,12 @@ type RemoveIssueRepositoryFromServiceEvent struct { RepositoryID int64 } +func (e RemoveIssueRepositoryFromServiceEvent) Unmarshal(data []byte) (event.Event, error) { + event := &RemoveIssueRepositoryFromServiceEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *RemoveIssueRepositoryFromServiceEvent) Name() event.EventName { return RemoveIssueRepositoryFromServiceEventName } diff --git a/internal/app/service/service_handler_test.go b/internal/app/service/service_handler_test.go index d96b8f63..51828c2a 100644 --- a/internal/app/service/service_handler_test.go +++ b/internal/app/service/service_handler_test.go @@ -5,6 +5,7 @@ package service_test import ( "errors" + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -31,7 +32,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getServiceFilter() *entity.ServiceFilter { @@ -261,7 +262,7 @@ var _ = Describe("When creating Service", Label("app", "CreateService"), func() } // Use type assertion to convert a CreateServiceEvent into an Event - var event event.Event = createEvent + var event event2.Event = createEvent // Create IssueRepository defaultRepoName := "nvd" @@ -288,7 +289,7 @@ var _ = Describe("When creating Service", Label("app", "CreateService"), func() invalidEvent := &s.UpdateServiceEvent{} // Use type assertion to convert - var event event.Event = invalidEvent + var event event2.Event = invalidEvent s.OnServiceCreate(db, event) @@ -307,7 +308,7 @@ var _ = Describe("When creating Service", Label("app", "CreateService"), func() } // Use type assertion to convert a CreateServiceEvent into an Event - var event event.Event = createEvent + var event event2.Event = createEvent defaultRepoName := "nvd" db.On("GetIssueRepositories", &entity.IssueRepositoryFilter{ diff --git a/internal/app/severity/severity_handler_events.go b/internal/app/severity/severity_handler_events.go index f7fe8ea5..70b2850e 100644 --- a/internal/app/severity/severity_handler_events.go +++ b/internal/app/severity/severity_handler_events.go @@ -4,8 +4,9 @@ package severity import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -17,6 +18,12 @@ type GetSeverityEvent struct { Result *entity.Severity } +func (e GetSeverityEvent) Unmarshal(data []byte) (event.Event, error) { + event := &GetSeverityEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *GetSeverityEvent) Name() event.EventName { return GetSeverityEventName } diff --git a/internal/app/severity/severity_handler_test.go b/internal/app/severity/severity_handler_test.go index b7a2e639..3b345352 100644 --- a/internal/app/severity/severity_handler_test.go +++ b/internal/app/severity/severity_handler_test.go @@ -6,6 +6,7 @@ package severity_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "testing" "github.com/cloudoperators/heureka/internal/app/event" @@ -29,7 +30,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func severityFilter() *entity.SeverityFilter { diff --git a/internal/app/support_group/support_group_handler_events.go b/internal/app/support_group/support_group_handler_events.go index bdd17bd5..f00029d3 100644 --- a/internal/app/support_group/support_group_handler_events.go +++ b/internal/app/support_group/support_group_handler_events.go @@ -4,8 +4,9 @@ package support_group import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -27,6 +28,12 @@ type ListSupportGroupsEvent struct { SupportGroups *entity.List[entity.SupportGroupResult] } +func (e ListSupportGroupsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListSupportGroupsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListSupportGroupsEvent) Name() event.EventName { return ListSupportGroupsEventName } @@ -36,6 +43,12 @@ type GetSupportGroupEvent struct { SupportGroup *entity.SupportGroup } +func (e GetSupportGroupEvent) Unmarshal(data []byte) (event.Event, error) { + event := &GetSupportGroupEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *GetSupportGroupEvent) Name() event.EventName { return GetSupportGroupEventName } @@ -44,6 +57,12 @@ type CreateSupportGroupEvent struct { SupportGroup *entity.SupportGroup } +func (e CreateSupportGroupEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateSupportGroupEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateSupportGroupEvent) Name() event.EventName { return CreateSupportGroupEventName } @@ -52,6 +71,12 @@ type UpdateSupportGroupEvent struct { SupportGroup *entity.SupportGroup } +func (e UpdateSupportGroupEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateSupportGroupEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateSupportGroupEvent) Name() event.EventName { return UpdateSupportGroupEventName } @@ -60,6 +85,12 @@ type DeleteSupportGroupEvent struct { SupportGroupID int64 } +func (e DeleteSupportGroupEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteSupportGroupEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteSupportGroupEvent) Name() event.EventName { return DeleteSupportGroupEventName } @@ -69,6 +100,12 @@ type AddServiceToSupportGroupEvent struct { ServiceID int64 } +func (e AddServiceToSupportGroupEvent) Unmarshal(data []byte) (event.Event, error) { + event := &AddServiceToSupportGroupEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *AddServiceToSupportGroupEvent) Name() event.EventName { return AddServiceToSupportGroupEventName } @@ -78,6 +115,12 @@ type RemoveServiceFromSupportGroupEvent struct { ServiceID int64 } +func (e RemoveServiceFromSupportGroupEvent) Unmarshal(data []byte) (event.Event, error) { + event := &RemoveServiceFromSupportGroupEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *RemoveServiceFromSupportGroupEvent) Name() event.EventName { return RemoveServiceFromSupportGroupEventName } @@ -87,6 +130,12 @@ type AddUserToSupportGroupEvent struct { UserID int64 } +func (e AddUserToSupportGroupEvent) Unmarshal(data []byte) (event.Event, error) { + event := &AddUserToSupportGroupEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *AddUserToSupportGroupEvent) Name() event.EventName { return AddUserToSupportGroupEventName } @@ -96,6 +145,12 @@ type RemoveUserFromSupportGroupEvent struct { UserID int64 } +func (e RemoveUserFromSupportGroupEvent) Unmarshal(data []byte) (event.Event, error) { + event := &RemoveUserFromSupportGroupEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *RemoveUserFromSupportGroupEvent) Name() event.EventName { return RemoveUserFromSupportGroupEventName } @@ -106,6 +161,12 @@ type ListSupportGroupCcrnsEvent struct { Ccrns []string } +func (e ListSupportGroupCcrnsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListSupportGroupCcrnsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListSupportGroupCcrnsEvent) Name() event.EventName { return ListSupportGroupCcrnsEventName } diff --git a/internal/app/support_group/support_group_handler_test.go b/internal/app/support_group/support_group_handler_test.go index cd0f271b..83ac830b 100644 --- a/internal/app/support_group/support_group_handler_test.go +++ b/internal/app/support_group/support_group_handler_test.go @@ -4,6 +4,7 @@ package support_group_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -27,7 +28,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getSupportGroupFilter() *entity.SupportGroupFilter { diff --git a/internal/app/user/user_handler_events.go b/internal/app/user/user_handler_events.go index 08d85bd6..b310e8a3 100644 --- a/internal/app/user/user_handler_events.go +++ b/internal/app/user/user_handler_events.go @@ -4,8 +4,9 @@ package user import ( - "github.com/cloudoperators/heureka/internal/app/event" + "encoding/json" "github.com/cloudoperators/heureka/internal/entity" + "github.com/cloudoperators/heureka/internal/event" ) const ( @@ -23,6 +24,12 @@ type ListUsersEvent struct { Users *entity.List[entity.UserResult] } +func (e ListUsersEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListUsersEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListUsersEvent) Name() event.EventName { return ListUsersEventName } @@ -31,6 +38,12 @@ type CreateUserEvent struct { User *entity.User } +func (e CreateUserEvent) Unmarshal(data []byte) (event.Event, error) { + event := &CreateUserEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *CreateUserEvent) Name() event.EventName { return CreateUserEventName } @@ -39,6 +52,12 @@ type UpdateUserEvent struct { User *entity.User } +func (e UpdateUserEvent) Unmarshal(data []byte) (event.Event, error) { + event := &UpdateUserEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *UpdateUserEvent) Name() event.EventName { return UpdateUserEventName } @@ -47,6 +66,12 @@ type DeleteUserEvent struct { UserID int64 } +func (e DeleteUserEvent) Unmarshal(data []byte) (event.Event, error) { + event := &DeleteUserEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *DeleteUserEvent) Name() event.EventName { return DeleteUserEventName } @@ -57,6 +82,12 @@ type ListUserNamesEvent struct { Names []string } +func (e ListUserNamesEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListUserNamesEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListUserNamesEvent) Name() event.EventName { return ListUserNamesEventName } @@ -67,6 +98,12 @@ type ListUniqueUserIDsEvent struct { IDs []string } +func (e ListUniqueUserIDsEvent) Unmarshal(data []byte) (event.Event, error) { + event := &ListUniqueUserIDsEvent{} + err := json.Unmarshal(data, event) + return event, err +} + func (e *ListUniqueUserIDsEvent) Name() event.EventName { return ListUniqueUserIDsEventName } diff --git a/internal/app/user/user_handler_test.go b/internal/app/user/user_handler_test.go index 0053451b..d5a7e2e3 100644 --- a/internal/app/user/user_handler_test.go +++ b/internal/app/user/user_handler_test.go @@ -4,6 +4,7 @@ package user_test import ( + event2 "github.com/cloudoperators/heureka/internal/event" "math" "testing" @@ -27,7 +28,7 @@ var er event.EventRegistry var _ = BeforeSuite(func() { db := mocks.NewMockDatabase(GinkgoT()) - er = event.NewEventRegistry(db) + er = event2.NewEventRegistry(db) }) func getUserFilter() *entity.UserFilter { diff --git a/internal/app/event/event_registry.go b/internal/event/event_registry.go similarity index 71% rename from internal/app/event/event_registry.go rename to internal/event/event_registry.go index 4fe46a5d..5d989f09 100644 --- a/internal/app/event/event_registry.go +++ b/internal/event/event_registry.go @@ -5,28 +5,9 @@ package event import ( "context" - "github.com/cloudoperators/heureka/internal/database" ) -// Implement same logic as in the net/http std lib -type EventHandler interface { - HandleEvent(database.Database, Event) -} - -type EventHandlerFunc func(database.Database, Event) - -func (f EventHandlerFunc) HandleEvent(db database.Database, e Event) { - f(db, e) -} - -// EventRegistry is the central point for managing handlers for all kind of events -type EventRegistry interface { - RegisterEventHandler(EventName, EventHandler) - PushEvent(Event) - Run(ctx context.Context) -} - type eventRegistry struct { handlers map[EventName][]EventHandler db database.Database @@ -51,7 +32,7 @@ func (er *eventRegistry) PushEvent(event Event) { // NewEventRegistry returns an event registry where for each incoming event a list of // handlers is called. We use a buffered channel for the worker go routines. -func NewEventRegistry(db database.Database) EventRegistry { +func NewEventRegistry(db database.Database) *eventRegistry { return &eventRegistry{ handlers: make(map[EventName][]EventHandler), ch: make(chan Event, 1000), diff --git a/internal/app/event/event_registry_test.go b/internal/event/event_registry_test.go similarity index 97% rename from internal/app/event/event_registry_test.go rename to internal/event/event_registry_test.go index 6cda9d84..201739ec 100644 --- a/internal/app/event/event_registry_test.go +++ b/internal/event/event_registry_test.go @@ -34,9 +34,13 @@ func (e *TestEvent) Name() EventName { return EventName(e.name) } +func (e *TestEvent) Unmarshal(data []byte) (Event, error) { + return e, nil +} + var _ = Describe("EventRegistry", Label("app", "event", "EventRegistry"), func() { var ( - er EventRegistry + er *eventRegistry db *mocks.MockDatabase ctx context.Context cancel context.CancelFunc diff --git a/internal/event/events.go b/internal/event/events.go new file mode 100644 index 00000000..7a0df8e4 --- /dev/null +++ b/internal/event/events.go @@ -0,0 +1,33 @@ +// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Greenhouse contributors +// SPDX-License-Identifier: Apache-2.0 +package event + +import ( + "github.com/cloudoperators/heureka/internal/database" +) + +// EventName type +type EventName string + +type Event interface { + Name() EventName +} + +type EventHandlerFunc func(database.Database, Event) + +func (f EventHandlerFunc) HandleEvent(db database.Database, e Event) { + f(db, e) +} + +type EventHandler struct { + Handler func(database.Database, Event) + Unmarshaler func(data []byte) (Event, error) +} + +func (h *EventHandler) HandleEvent(db database.Database, e Event) { + h.Handler(db, e) +} + +func (h *EventHandler) Unmarshal(data []byte) (Event, error) { + return h.Unmarshaler(data) +} diff --git a/internal/event/nats/event_registry.go b/internal/event/nats/event_registry.go new file mode 100644 index 00000000..1c7deb1a --- /dev/null +++ b/internal/event/nats/event_registry.go @@ -0,0 +1,90 @@ +// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Greenhouse contributors +// SPDX-License-Identifier: Apache-2.0 + +package nats + +import ( + "context" + "encoding/json" + "github.com/cloudoperators/heureka/internal/database" + "github.com/cloudoperators/heureka/internal/event" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +type eventRegistry struct { + db database.Database + nc *nats.Conn + subscriptions map[event.EventName]*nats.Subscription +} + +func (er *eventRegistry) RegisterEventHandler(subject event.EventName, handler event.EventHandler) { + if er.subscriptions == nil { + er.subscriptions = make(map[event.EventName]*nats.Subscription) + } + if er.subscriptions[subject] != nil { + log.Debugf("Handler for event %s already registered", subject) + } else { + log.Infof("Registering handler for event %s", subject) + + subscription, err := er.nc.QueueSubscribe(string(subject), "heureka-core", func(msg *nats.Msg) { + event, err := handler.Unmarshal(msg.Data) + if err != nil { + //todo: log error + return + } + //todo: should return error + handler.HandleEvent(er.db, event) + }) + if err != nil { + //todo: log error + return + } + er.subscriptions[subject] = subscription + } +} + +func (er *eventRegistry) PushEvent(event event.Event) { + + b, err := json.Marshal(event) + log.WithField("bytes", b).WithField("error", err).WithField("event", event).Infof("Pushing event %s", event.Name()) + if err != nil { + //todo: log error + return + } + + err = er.nc.Publish(string(event.Name()), b) + if err != nil { + //todo: log error + return + } +} + +// NewEventRegistry returns an event registry using nats as the event bus +func NewEventRegistry(db database.Database) *eventRegistry { + //todo: move initialization outside of this + //todo: adding options for secure & authenticated nats connection + nc, err := nats.Connect("nats://localhost:4222") + if err != nil { + log.Fatal("Failed to connect to nats server", err) + } + return &eventRegistry{ + db: db, + nc: nc, + } +} + +// todo: this is currently only needed for channel based version, we should remove it and start running once the first event handler +func (er *eventRegistry) Run(ctx context.Context) { + //todo: remove + er.subscriptions = make(map[event.EventName]*nats.Subscription) + return +} + +// todo: add to interface +func (er *eventRegistry) Shutdown() { + for _, s := range er.subscriptions { + s.Unsubscribe() + } + er.nc.Close() +} diff --git a/internal/server/server.go b/internal/server/server.go index 966eef88..4eb27876 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -6,6 +6,9 @@ package server import ( "context" "fmt" + "github.com/cloudoperators/heureka/internal/app/event" + events "github.com/cloudoperators/heureka/internal/event" + "github.com/cloudoperators/heureka/internal/event/nats" "log" "net/http" "os/signal" @@ -41,7 +44,23 @@ func NewServer(cfg util.Config) *Server { if err != nil { logrus.WithError(err).Fatalln("Error while Creating Db") } - application := app.NewHeurekaApp(db) + var er event.EventRegistry + switch cfg.EventRegistryType { + case "channel": + er = events.NewEventRegistry(db) + case "nats": + er = nats.NewEventRegistry(db) + default: + + logrus.WithField("event_registry_type", cfg.EventRegistryType).Fatalln("Unknown event registry type") + } + + application := app.NewHeurekaApp(db, er) + + if cfg.EventRegistryType == "channel" { + application.SubscribeHandlers() + er.Run(context.Background()) + } s := Server{ router: &gin.Engine{}, diff --git a/internal/util/config.go b/internal/util/config.go index ded00440..447db493 100644 --- a/internal/util/config.go +++ b/internal/util/config.go @@ -38,6 +38,7 @@ type Config struct { AuthOidcUrl string `envconfig:"AUTH_OIDC_URL" required:"false" json:"-"` DefaultIssuePriority int64 `envconfig:"DEFAULT_ISSUE_PRIORITY" default:"100" json:"defaultIssuePriority"` DefaultRepositoryName string `envconfig:"DEFAULT_REPOSITORY_NAME" default:"nvd" json:"defaultRepositoryName"` + EventRegistryType string `envconfig:"EVENT_REGISTRY_TYPE" required:"true" json:"eventRegistryType" default:"channel"` } func (c *Config) ConfigToConsole() { diff --git a/tools/oidc_provider_mock/go.mod b/tools/oidc_provider_mock/go.mod index c0c43fed..88407cde 100644 --- a/tools/oidc_provider_mock/go.mod +++ b/tools/oidc_provider_mock/go.mod @@ -31,10 +31,10 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect golang.org/x/arch v0.12.0 // indirect - golang.org/x/crypto v0.31.0 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/text v0.21.0 // indirect - google.golang.org/protobuf v1.36.1 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/net v0.37.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/tools/oidc_provider_mock/go.sum b/tools/oidc_provider_mock/go.sum index ffc820ef..ca647412 100644 --- a/tools/oidc_provider_mock/go.sum +++ b/tools/oidc_provider_mock/go.sum @@ -76,21 +76,30 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 276e40f5170cf99a18fd4b42c3d41711ca9eb492 Mon Sep 17 00:00:00 2001 From: License Bot Date: Mon, 24 Mar 2025 14:06:21 +0000 Subject: [PATCH 2/2] Automatic application of license header --- internal/app/event/interface.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/app/event/interface.go b/internal/app/event/interface.go index b9cade70..e7043d8c 100644 --- a/internal/app/event/interface.go +++ b/internal/app/event/interface.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Greenhouse contributors +// SPDX-License-Identifier: Apache-2.0 + package event import (