Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions broker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
prapi "github.com/indexdata/crosslink/broker/patron_request/api"
pr_db "github.com/indexdata/crosslink/broker/patron_request/db"
proapi "github.com/indexdata/crosslink/broker/patron_request/oapi"
prservice "github.com/indexdata/crosslink/broker/patron_request/service"
"log/slog"
"math"
"net/http"
Expand Down Expand Up @@ -75,11 +76,12 @@ var ServeMux *http.ServeMux
var appCtx = common.CreateExtCtxWithLogArgsAndHandler(context.Background(), nil, configLog())

type Context struct {
EventBus events.EventBus
IllRepo ill_db.IllRepo
EventRepo events.EventRepo
DirAdapter adapter.DirectoryLookupAdapter
PrRepo pr_db.PrRepo
EventBus events.EventBus
IllRepo ill_db.IllRepo
EventRepo events.EventRepo
DirAdapter adapter.DirectoryLookupAdapter
PrRepo pr_db.PrRepo
PrApiHandler prapi.PatronRequestApiHandler
}

func configLog() slog.Handler {
Expand Down Expand Up @@ -148,22 +150,27 @@ func Init(ctx context.Context) (Context, error) {
eventBus := CreateEventBus(eventRepo)
illRepo := CreateIllRepo(pool)
prRepo := CreatePrRepo(pool)
iso18626Client := client.CreateIso18626Client(eventBus, illRepo, MAX_MESSAGE_SIZE, delay)
iso18626Handler := handler.CreateIso18626Handler(eventBus, eventRepo)

prMessageHandler := prservice.CreatePatronRequestMessageHandler(prRepo, eventRepo, illRepo, eventBus)
iso18626Client := client.CreateIso18626Client(eventBus, illRepo, prMessageHandler, MAX_MESSAGE_SIZE, delay)
iso18626Handler := handler.CreateIso18626Handler(eventBus, eventRepo, illRepo, dirAdapter)
supplierLocator := service.CreateSupplierLocator(eventBus, illRepo, dirAdapter, holdingsAdapter)
workflowManager := service.CreateWorkflowManager(eventBus, illRepo, service.WorkflowConfig{})
AddDefaultHandlers(eventBus, iso18626Client, supplierLocator, workflowManager, iso18626Handler)
prActionService := prservice.CreatePatronRequestActionService(prRepo, illRepo, eventBus, &iso18626Handler)
prApiHandler := prapi.NewApiHandler(prRepo, eventBus)

AddDefaultHandlers(eventBus, iso18626Client, supplierLocator, workflowManager, iso18626Handler, prActionService, prApiHandler, prMessageHandler)
err = StartEventBus(ctx, eventBus)
if err != nil {
return Context{}, err
}
return Context{
EventBus: eventBus,
IllRepo: illRepo,
EventRepo: eventRepo,
DirAdapter: dirAdapter,
PrRepo: prRepo,
EventBus: eventBus,
IllRepo: illRepo,
EventRepo: eventRepo,
DirAdapter: dirAdapter,
PrRepo: prRepo,
PrApiHandler: prApiHandler,
}, nil
}

Expand Down Expand Up @@ -194,8 +201,8 @@ func StartServer(ctx Context) error {
apiHandler := api.NewApiHandler(ctx.EventRepo, ctx.IllRepo, TENANT_TO_SYMBOL, API_PAGE_SIZE)
oapi.HandlerFromMuxWithBaseURL(&apiHandler, ServeMux, "/broker")
}
prApiHandler := prapi.NewApiHandler(ctx.PrRepo)
proapi.HandlerFromMux(&prApiHandler, ServeMux)

proapi.HandlerFromMux(&ctx.PrApiHandler, ServeMux)
signatureHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", vcs.GetSignature())
ServeMux.ServeHTTP(w, r)
Expand Down Expand Up @@ -264,7 +271,8 @@ func CreateEventBus(eventRepo events.EventRepo) events.EventBus {
}

func AddDefaultHandlers(eventBus events.EventBus, iso18626Client client.Iso18626Client,
supplierLocator service.SupplierLocator, workflowManager service.WorkflowManager, iso18626Handler handler.Iso18626Handler) {
supplierLocator service.SupplierLocator, workflowManager service.WorkflowManager, iso18626Handler handler.Iso18626Handler,
prActionService prservice.PatronRequestActionService, prApiHandler prapi.PatronRequestApiHandler, prMessageHandler prservice.PatronRequestMessageHandler) {
eventBus.HandleEventCreated(events.EventNameMessageSupplier, iso18626Client.MessageSupplier)
eventBus.HandleEventCreated(events.EventNameMessageRequester, iso18626Client.MessageRequester)
eventBus.HandleEventCreated(events.EventNameConfirmRequesterMsg, iso18626Handler.ConfirmRequesterMsg)
Expand All @@ -280,6 +288,9 @@ func AddDefaultHandlers(eventBus events.EventBus, iso18626Client client.Iso18626
eventBus.HandleTaskCompleted(events.EventNameSelectSupplier, workflowManager.OnSelectSupplierComplete)
eventBus.HandleTaskCompleted(events.EventNameMessageSupplier, workflowManager.OnMessageSupplierComplete)
eventBus.HandleTaskCompleted(events.EventNameMessageRequester, workflowManager.OnMessageRequesterComplete)

eventBus.HandleEventCreated(events.EventNameInvokeAction, prActionService.InvokeAction)
eventBus.HandleTaskCompleted(events.EventNameInvokeAction, prApiHandler.ConfirmActionProcess)
}
func StartEventBus(ctx context.Context, eventBus events.EventBus) error {
err := eventBus.Start(common.CreateExtCtxWithArgs(ctx, nil))
Expand Down
37 changes: 24 additions & 13 deletions broker/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"errors"
"fmt"
prservice "github.com/indexdata/crosslink/broker/patron_request/service"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -43,11 +44,12 @@ var appendReturnInfo, _ = utils.GetEnvBool("RETURN_INFO", true)
var prependVendor, _ = utils.GetEnvBool("VENDOR_INFO", true)

type Iso18626Client struct {
eventBus events.EventBus
illRepo ill_db.IllRepo
client *http.Client
maxMsgSize int
sendDelay time.Duration
eventBus events.EventBus
illRepo ill_db.IllRepo
prMessageHandler prservice.PatronRequestMessageHandler
client *http.Client
maxMsgSize int
sendDelay time.Duration
}

type transactionContext struct {
Expand All @@ -67,13 +69,14 @@ type messageTarget struct {
problemDetails *string
}

func CreateIso18626Client(eventBus events.EventBus, illRepo ill_db.IllRepo, maxMsgSize int, delay time.Duration) Iso18626Client {
func CreateIso18626Client(eventBus events.EventBus, illRepo ill_db.IllRepo, prMessageHandler prservice.PatronRequestMessageHandler, maxMsgSize int, delay time.Duration) Iso18626Client {
return Iso18626Client{
eventBus: eventBus,
illRepo: illRepo,
client: http.DefaultClient,
maxMsgSize: maxMsgSize,
sendDelay: delay,
eventBus: eventBus,
illRepo: illRepo,
prMessageHandler: prMessageHandler,
client: http.DefaultClient,
maxMsgSize: maxMsgSize,
sendDelay: delay,
}
}

Expand Down Expand Up @@ -437,6 +440,14 @@ func (c *Iso18626Client) checkConfirmationError(ctx common.ExtendedContext, resp
return status
}

func (c *Iso18626Client) HandleIllMessage(ctx common.ExtendedContext, peer *ill_db.Peer, msg *iso18626.ISO18626Message) (*iso18626.ISO18626Message, error) {
if strings.Contains(peer.Name, "local") { // TODO Implement real check of local peer
return c.prMessageHandler.HandleMessage(ctx, msg)
} else {
return c.SendHttpPost(peer, msg)
}
}

func (c *Iso18626Client) SendHttpPost(peer *ill_db.Peer, msg *iso18626.ISO18626Message) (*iso18626.ISO18626Message, error) {
httpClient := httpclient.NewClient().
WithMaxSize(int64(c.maxMsgSize)).
Expand Down Expand Up @@ -694,7 +705,7 @@ func (c *Iso18626Client) sendAndUpdateStatus(ctx common.ExtendedContext, trCtx t
resData.CustomData = map[string]any{common.DO_NOT_SEND: true}
resData.OutgoingMessage = nil
} else {
response, err := c.SendHttpPost(trCtx.requester, message)
response, err := c.HandleIllMessage(ctx, trCtx.requester, message)
if response != nil {
resData.IncomingMessage = response
}
Expand Down Expand Up @@ -789,7 +800,7 @@ func (c *Iso18626Client) sendAndUpdateSupplier(ctx common.ExtendedContext, trCtx
resData := events.EventResult{}
resData.OutgoingMessage = message
if !isDoNotSend(trCtx.event) {
response, err := c.SendHttpPost(trCtx.selectedPeer, message)
response, err := c.HandleIllMessage(ctx, trCtx.selectedPeer, message)
if response != nil {
resData.IncomingMessage = response
}
Expand Down
Loading
Loading