Skip to content

Commit 2afa6fb

Browse files
CROSSLINK-181 Create notice and not task
1 parent c342298 commit 2afa6fb

File tree

7 files changed

+96
-90
lines changed

7 files changed

+96
-90
lines changed

broker/app/app.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,6 @@ func AddDefaultHandlers(eventBus events.EventBus, iso18626Client client.Iso18626
291291

292292
eventBus.HandleEventCreated(events.EventNameInvokeAction, prActionService.InvokeAction)
293293
eventBus.HandleTaskCompleted(events.EventNameInvokeAction, prApiHandler.ConfirmActionProcess)
294-
eventBus.HandleEventCreated(events.EventNamePatronRequestMessage, prMessageHandler.PatronRequestMessage)
295294
}
296295
func StartEventBus(ctx context.Context, eventBus events.EventBus) error {
297296
err := eventBus.Start(common.CreateExtCtxWithArgs(ctx, nil))

broker/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ services:
1111
ports:
1212
- "25432:5432"
1313
volumes:
14-
- ./data:/var/lib/postgresql/data
14+
- ./pg_data:/var/lib/postgresql

broker/events/eventbus.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ const DEFAULT_PATRON_REQUEST_ID = "00000000-0000-0000-0000-000000000002"
2323

2424
type EventBus interface {
2525
Start(ctx common.ExtendedContext) error
26-
CreateTask(id string, eventName EventName, data EventData, eventClass EventDomain, parentId *string) (string, error)
27-
CreateTaskBroadcast(id string, eventName EventName, data EventData, eventClass EventDomain, parentId *string) (string, error)
28-
CreateNotice(id string, eventName EventName, data EventData, status EventStatus, eventClass EventDomain) (string, error)
29-
CreateNoticeBroadcast(id string, eventName EventName, data EventData, status EventStatus, eventClass EventDomain) (string, error)
26+
CreateTask(id string, eventName EventName, data EventData, eventDomain EventDomain, parentId *string) (string, error)
27+
CreateTaskBroadcast(id string, eventName EventName, data EventData, eventDomain EventDomain, parentId *string) (string, error)
28+
CreateNotice(id string, eventName EventName, data EventData, status EventStatus, eventDomain EventDomain) (string, error)
29+
CreateNoticeBroadcast(id string, eventName EventName, data EventData, status EventStatus, eventDomain EventDomain) (string, error)
3030
BeginTask(eventId string) (Event, error)
3131
CompleteTask(eventId string, result *EventResult, status EventStatus) (Event, error)
3232
HandleEventCreated(eventName EventName, f func(ctx common.ExtendedContext, event Event))
@@ -132,6 +132,7 @@ func (p *PostgresEventBus) Start(ctx common.ExtendedContext) error {
132132
if err != nil {
133133
ctx.Logger().Error("failed to unmarshal notification", "error", err, "payload", notification.Payload)
134134
}
135+
// TODO We could run this method in separate go routine
135136
go p.handleNotify(notifyData)
136137
}
137138
}()
@@ -185,17 +186,17 @@ func triggerHandlers(eventCtx common.ExtendedContext, event Event, handlersMap m
185186
eventCtx.Logger().Debug("all handlers finished", "eventName", event.EventName, "signal", signal)
186187
}
187188

188-
func (p *PostgresEventBus) CreateTask(classId string, eventName EventName, data EventData, eventClass EventDomain, parentId *string) (string, error) {
189-
return p.createTask(classId, eventName, data, eventClass, parentId, false)
189+
func (p *PostgresEventBus) CreateTask(classId string, eventName EventName, data EventData, eventDomain EventDomain, parentId *string) (string, error) {
190+
return p.createTask(classId, eventName, data, eventDomain, parentId, false)
190191
}
191192

192-
func (p *PostgresEventBus) CreateTaskBroadcast(illTransactionID string, eventName EventName, data EventData, eventClass EventDomain, parentId *string) (string, error) {
193-
return p.createTask(illTransactionID, eventName, data, eventClass, parentId, true)
193+
func (p *PostgresEventBus) CreateTaskBroadcast(illTransactionID string, eventName EventName, data EventData, eventDomain EventDomain, parentId *string) (string, error) {
194+
return p.createTask(illTransactionID, eventName, data, eventDomain, parentId, true)
194195
}
195196

196-
func (p *PostgresEventBus) createTask(classId string, eventName EventName, data EventData, eventClass EventDomain, parentId *string, broadcast bool) (string, error) {
197+
func (p *PostgresEventBus) createTask(classId string, eventName EventName, data EventData, eventDomain EventDomain, parentId *string, broadcast bool) (string, error) {
197198
id := uuid.New().String()
198-
illTransactionID, patronRequestID := getIllTransactionAndPatronRequestId(classId, eventClass)
199+
illTransactionID, patronRequestID := getIllTransactionAndPatronRequestId(classId, eventDomain)
199200
return id, p.repo.WithTxFunc(p.ctx, func(eventRepo EventRepo) error {
200201
event, err := eventRepo.SaveEvent(p.ctx, SaveEventParams{
201202
ID: id,
@@ -219,17 +220,17 @@ func (p *PostgresEventBus) createTask(classId string, eventName EventName, data
219220
})
220221
}
221222

222-
func (p *PostgresEventBus) CreateNotice(classId string, eventName EventName, data EventData, status EventStatus, eventClass EventDomain) (string, error) {
223-
return p.createNotice(classId, eventName, data, status, eventClass, false)
223+
func (p *PostgresEventBus) CreateNotice(classId string, eventName EventName, data EventData, status EventStatus, eventDomain EventDomain) (string, error) {
224+
return p.createNotice(classId, eventName, data, status, eventDomain, false)
224225
}
225226

226-
func (p *PostgresEventBus) CreateNoticeBroadcast(classId string, eventName EventName, data EventData, status EventStatus, eventClass EventDomain) (string, error) {
227-
return p.createNotice(classId, eventName, data, status, eventClass, true)
227+
func (p *PostgresEventBus) CreateNoticeBroadcast(classId string, eventName EventName, data EventData, status EventStatus, eventDomain EventDomain) (string, error) {
228+
return p.createNotice(classId, eventName, data, status, eventDomain, true)
228229
}
229230

230-
func (p *PostgresEventBus) createNotice(classId string, eventName EventName, data EventData, status EventStatus, eventClass EventDomain, broadcast bool) (string, error) {
231+
func (p *PostgresEventBus) createNotice(classId string, eventName EventName, data EventData, status EventStatus, eventDomain EventDomain, broadcast bool) (string, error) {
231232
id := uuid.New().String()
232-
illTransactionID, patronRequestID := getIllTransactionAndPatronRequestId(classId, eventClass)
233+
illTransactionID, patronRequestID := getIllTransactionAndPatronRequestId(classId, eventDomain)
233234
return id, p.repo.WithTxFunc(p.ctx, func(eventRepo EventRepo) error {
234235
event, err := eventRepo.SaveEvent(p.ctx, SaveEventParams{
235236
ID: id,
@@ -412,8 +413,8 @@ func getPgText(value *string) pgtype.Text {
412413
}
413414
}
414415

415-
func getIllTransactionAndPatronRequestId(classId string, eventClass EventDomain) (string, string) {
416-
if eventClass == EventDomainPatronRequest {
416+
func getIllTransactionAndPatronRequestId(classId string, eventDomain EventDomain) (string, string) {
417+
if eventDomain == EventDomainPatronRequest {
417418
return DEFAULT_ILL_TRANSACTION_ID, classId
418419
} else {
419420
return classId, DEFAULT_PATRON_REQUEST_ID

broker/migrations/013_allow_patron_request_events.up.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ ALTER TABLE event ADD COLUMN patron_request_id VARCHAR NOT NULL DEFAULT '000000
88
INSERT INTO event_config (event_name, event_type, retry_count)
99
VALUES ('invoke-action', 'TASK', 1);
1010
INSERT INTO event_config (event_name, event_type, retry_count)
11-
VALUES ('patron-request-message', 'TASK', 1);
11+
VALUES ('patron-request-message', 'NOTICE', 1);

broker/patron_request/service/action_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,13 @@ func (m *MockEventBus) CreateTask(id string, eventName events.EventName, data ev
321321
return id, nil
322322
}
323323

324+
func (m *MockEventBus) CreateNotice(id string, eventName events.EventName, data events.EventData, status events.EventStatus, eventDomain events.EventDomain) (string, error) {
325+
if id == "error" {
326+
return "", errors.New("event bus error")
327+
}
328+
return id, nil
329+
}
330+
324331
type MockPrRepo struct {
325332
mock.Mock
326333
pr_db.PgPrRepo

broker/patron_request/service/message-handler.go

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,11 @@ import (
99
"github.com/indexdata/crosslink/iso18626"
1010
"github.com/jackc/pgx/v5/pgtype"
1111
"strings"
12-
"sync"
1312
)
1413

1514
const COMP_MESSAGE = "pr_massage_handler"
1615
const RESHARE_ADD_LOAN_CONDITION = "#ReShareAddLoanCondition#"
1716

18-
var waitings = map[string]*sync.WaitGroup{}
19-
2017
type PatronRequestMessageHandler struct {
2118
prRepo pr_db.PrRepo
2219
eventRepo events.EventRepo
@@ -43,41 +40,31 @@ func (m *PatronRequestMessageHandler) HandleMessage(ctx common.ExtendedContext,
4340
if err != nil {
4441
return nil, err
4542
}
46-
47-
eventId, err := m.eventBus.CreateTask(pr.ID, events.EventNamePatronRequestMessage, events.EventData{CommonEventData: events.CommonEventData{IncomingMessage: msg}}, events.EventDomainPatronRequest, nil)
43+
// Create notice with result
44+
status, response, err := m.handlePatronRequestMessage(ctx, msg)
45+
eventData := events.EventData{CommonEventData: events.CommonEventData{IncomingMessage: msg, OutgoingMessage: response}}
4846
if err != nil {
49-
return nil, err
47+
eventData.EventError = &events.EventError{
48+
Message: err.Error(),
49+
}
5050
}
51-
52-
var wg sync.WaitGroup
53-
wg.Add(1)
54-
waitings[eventId] = &wg
55-
wg.Wait()
56-
57-
event, err := m.eventRepo.GetEvent(ctx, eventId)
51+
_, err = m.eventBus.CreateNotice(pr.ID, events.EventNamePatronRequestMessage, eventData, status, events.EventDomainPatronRequest)
5852
if err != nil {
5953
return nil, err
6054
}
61-
return event.ResultData.OutgoingMessage, nil
62-
}
6355

64-
func (m *PatronRequestMessageHandler) PatronRequestMessage(ctx common.ExtendedContext, event events.Event) {
65-
ctx = ctx.WithArgs(ctx.LoggerArgs().WithComponent(COMP_MESSAGE))
66-
_, _ = m.eventBus.ProcessTask(ctx, event, m.handlePatronRequestMessage)
67-
if waiting, ok := waitings[event.ID]; ok {
68-
waiting.Done()
69-
}
56+
return response, err
7057
}
7158

72-
func (m *PatronRequestMessageHandler) handlePatronRequestMessage(ctx common.ExtendedContext, event events.Event) (events.EventStatus, *events.EventResult) {
73-
if event.EventData.IncomingMessage.SupplyingAgencyMessage != nil {
74-
return m.handleSupplyingAgencyMessage(ctx, *event.EventData.IncomingMessage.SupplyingAgencyMessage)
75-
} else if event.EventData.IncomingMessage.RequestingAgencyMessage != nil {
76-
return events.EventStatusError, &events.EventResult{CommonEventData: events.CommonEventData{Note: "requesting agency message handling is not implemented yet"}}
77-
} else if event.EventData.IncomingMessage.Request != nil {
78-
return events.EventStatusError, &events.EventResult{CommonEventData: events.CommonEventData{Note: "request handling is not implemented yet"}}
59+
func (m *PatronRequestMessageHandler) handlePatronRequestMessage(ctx common.ExtendedContext, msg *iso18626.ISO18626Message) (events.EventStatus, *iso18626.ISO18626Message, error) {
60+
if msg.SupplyingAgencyMessage != nil {
61+
return m.handleSupplyingAgencyMessage(ctx, *msg.SupplyingAgencyMessage)
62+
} else if msg.RequestingAgencyMessage != nil {
63+
return events.EventStatusError, nil, errors.New("requesting agency message handling is not implemented yet")
64+
} else if msg.Request != nil {
65+
return events.EventStatusError, nil, errors.New("request handling is not implemented yet")
7966
} else {
80-
return events.EventStatusError, &events.EventResult{CommonEventData: events.CommonEventData{Note: "cannot process message without content"}}
67+
return events.EventStatusError, nil, errors.New("cannot process message without content")
8168
}
8269
}
8370

@@ -93,13 +80,13 @@ func getPatronRequestId(msg iso18626.ISO18626Message) string {
9380
}
9481
}
9582

96-
func (m *PatronRequestMessageHandler) handleSupplyingAgencyMessage(ctx common.ExtendedContext, sam iso18626.SupplyingAgencyMessage) (events.EventStatus, *events.EventResult) {
83+
func (m *PatronRequestMessageHandler) handleSupplyingAgencyMessage(ctx common.ExtendedContext, sam iso18626.SupplyingAgencyMessage) (events.EventStatus, *iso18626.ISO18626Message, error) {
9784
pr, err := m.prRepo.GetPatronRequestById(ctx, sam.Header.RequestingAgencyRequestId)
9885
if err != nil {
9986
return createSAMResponse(sam, iso18626.TypeMessageStatusERROR, &iso18626.ErrorData{
10087
ErrorType: iso18626.TypeErrorTypeUnrecognisedDataValue,
101-
ErrorValue: "could not find patron request",
102-
})
88+
ErrorValue: "could not find patron request: " + err.Error(),
89+
}, err)
10390
}
10491
// TODO handle notifications
10592
switch sam.StatusInfo.Status {
@@ -110,8 +97,8 @@ func (m *PatronRequestMessageHandler) handleSupplyingAgencyMessage(ctx common.Ex
11097
if err != nil {
11198
return createSAMResponse(sam, iso18626.TypeMessageStatusERROR, &iso18626.ErrorData{
11299
ErrorType: iso18626.TypeErrorTypeUnrecognisedDataValue,
113-
ErrorValue: "could not find supplier",
114-
})
100+
ErrorValue: "could not find supplier: " + err.Error(),
101+
}, err)
115102
}
116103
pr.LendingPeerID = pgtype.Text{
117104
String: supplier.ID,
@@ -143,27 +130,26 @@ func (m *PatronRequestMessageHandler) handleSupplyingAgencyMessage(ctx common.Ex
143130
return createSAMResponse(sam, iso18626.TypeMessageStatusERROR, &iso18626.ErrorData{
144131
ErrorType: iso18626.TypeErrorTypeBadlyFormedMessage,
145132
ErrorValue: "status change no allowed",
146-
})
133+
}, errors.New("status change no allowed"))
147134
}
148135

149-
func (m *PatronRequestMessageHandler) updatePatronRequestAndCreateSamResponse(ctx common.ExtendedContext, pr pr_db.PatronRequest, sam iso18626.SupplyingAgencyMessage) (events.EventStatus, *events.EventResult) {
136+
func (m *PatronRequestMessageHandler) updatePatronRequestAndCreateSamResponse(ctx common.ExtendedContext, pr pr_db.PatronRequest, sam iso18626.SupplyingAgencyMessage) (events.EventStatus, *iso18626.ISO18626Message, error) {
150137
_, err := m.prRepo.SavePatronRequest(ctx, pr_db.SavePatronRequestParams(pr))
151138
if err != nil {
152139
return createSAMResponse(sam, iso18626.TypeMessageStatusERROR, &iso18626.ErrorData{
153140
ErrorType: iso18626.TypeErrorTypeUnrecognisedDataValue,
154141
ErrorValue: err.Error(),
155-
})
142+
}, err)
156143
}
157-
return createSAMResponse(sam, iso18626.TypeMessageStatusOK, nil)
144+
return createSAMResponse(sam, iso18626.TypeMessageStatusOK, nil, nil)
158145
}
159146

160-
func createSAMResponse(sam iso18626.SupplyingAgencyMessage, messageStatus iso18626.TypeMessageStatus, errorData *iso18626.ErrorData) (events.EventStatus, *events.EventResult) {
147+
func createSAMResponse(sam iso18626.SupplyingAgencyMessage, messageStatus iso18626.TypeMessageStatus, errorData *iso18626.ErrorData, err error) (events.EventStatus, *iso18626.ISO18626Message, error) {
161148
eventStatus := events.EventStatusSuccess
162149
if messageStatus != iso18626.TypeMessageStatusOK {
163150
eventStatus = events.EventStatusProblem
164151
}
165-
return eventStatus, &events.EventResult{CommonEventData: events.CommonEventData{
166-
OutgoingMessage: &iso18626.ISO18626Message{
152+
return eventStatus, &iso18626.ISO18626Message{
167153
SupplyingAgencyMessageConfirmation: &iso18626.SupplyingAgencyMessageConfirmation{
168154
ConfirmationHeader: iso18626.ConfirmationHeader{
169155
SupplyingAgencyId: &sam.Header.SupplyingAgencyId,
@@ -175,5 +161,5 @@ func createSAMResponse(sam iso18626.SupplyingAgencyMessage, messageStatus iso186
175161
ErrorData: errorData,
176162
},
177163
},
178-
}}
164+
err
179165
}

0 commit comments

Comments
 (0)