Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions broker/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ const KindConfirmationError = "confirmation-error"

const BrokerInfoStatus = iso18626.TypeStatusExpectToSupply

// TODO this must be removed and saved from the initial request
var BrokerSymbol = utils.GetEnv("BROKER_SYMBOL", "ISIL:BROKER")
var brokerSymbol = utils.GetEnv("BROKER_SYMBOL", "ISIL:BROKER")
var appendSupplierInfo, _ = utils.GetEnvBool("SUPPLIER_INFO", true)
var appendRequestingAgencyInfo, _ = utils.GetEnvBool("REQ_AGENCY_INFO", true)
var appendReturnInfo, _ = utils.GetEnvBool("RETURN_INFO", true)
Expand Down Expand Up @@ -486,14 +485,14 @@ func (c *Iso18626Client) getSelectedSupplierAndPeer(ctx extctx.ExtendedContext,
}

func (c *Iso18626Client) createMessageHeader(transaction ill_db.IllTransaction, sup *ill_db.LocatedSupplier, isRequestingMessage bool, brokerMode string) iso18626.Header {
requesterSymbol := strings.SplitN(BrokerSymbol, ":", 2)
requesterSymbol := strings.SplitN(brokerSymbol, ":", 2)
if !isRequestingMessage || brokerMode == string(extctx.BrokerModeTransparent) {
requesterSymbol = strings.SplitN(transaction.RequesterSymbol.String, ":", 2)
}
if len(requesterSymbol) < 2 {
requesterSymbol = append(requesterSymbol, "")
}
supplierSymbol := strings.SplitN(BrokerSymbol, ":", 2)
supplierSymbol := strings.SplitN(brokerSymbol, ":", 2)
if sup != nil && sup.SupplierSymbol != "" && (isRequestingMessage || brokerMode == string(extctx.BrokerModeTransparent)) {
supplierSymbol = strings.SplitN(sup.SupplierSymbol, ":", 2)
}
Expand Down
68 changes: 46 additions & 22 deletions broker/handler/iso18626-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)

var brokerSymbol = utils.GetEnv("BROKER_SYMBOL", "ISIL:BROKER")

const HANDLER_COMP = "iso18626_handler"

type ErrorValue string

const (
ReqIdAlreadyExists ErrorValue = "requestingAgencyRequestId: request with a given ID already exists"
ReqIdIsEmpty ErrorValue = "requestingAgencyRequestId: cannot be empty"
ReqIdNotFound ErrorValue = "requestingAgencyRequestId: request with a given ID not found"
RetryNotPossible ErrorValue = "requestType: Retry not possible"
SupplierNotFound ErrorValue = "supplyingAgencyId: located supplier cannot be found"
IncorrectSupplier ErrorValue = "supplyingAgencyId: not a selected supplier for this request"
UnsupportedRequestType ErrorValue = "requestType: unsupported value"
ReqAgencyNotFound ErrorValue = "requestingAgencyId: requesting agency not found"
CouldNotSendReqToPeer ErrorValue = "Could not send request to peer"
InvalidAction ErrorValue = "%v is not a valid action"
InvalidStatus ErrorValue = "%v is not a valid status"
InvalidReason ErrorValue = "%v is not a valid reason"
ReqIdAlreadyExists ErrorValue = "requestingAgencyRequestId: request with a given ID already exists"
ReqIdIsEmpty ErrorValue = "requestingAgencyRequestId: cannot be empty"
ReqIdNotFound ErrorValue = "requestingAgencyRequestId: request with a given ID not found"
RetryNotPossible ErrorValue = "requestType: Retry not possible"
SupplierNotFoundOrInvalid ErrorValue = "supplyingAgencyId: supplying agency not found or invalid"
UnsupportedRequestType ErrorValue = "requestType: unsupported value"
ReqAgencyNotFound ErrorValue = "requestingAgencyId: requesting agency not found"
CouldNotSendReqToPeer ErrorValue = "Could not send request to peer"
InvalidAction ErrorValue = "%v is not a valid action"
InvalidStatus ErrorValue = "%v is not a valid status"
InvalidReason ErrorValue = "%v is not a valid reason"
)

const PublicFailedToProcessReqMsg = "failed to process request"
Expand Down Expand Up @@ -333,33 +334,49 @@ func handleRequestingAgencyMessage(ctx extctx.ExtendedContext, illMessage *iso18
handleRequestingAgencyError(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, ReqIdIsEmpty)
return
}

symbol := getSupplierSymbol(&illMessage.RequestingAgencyMessage.Header)
if len(symbol) == 0 {
handleRequestingAgencyError(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, SupplierNotFoundOrInvalid)
return
}
eventData := events.EventData{
CommonEventData: events.CommonEventData{
IncomingMessage: illMessage,
},
}

var err error
var illTrans ill_db.IllTransaction
var action iso18626.TypeAction
errorValue := ReqIdNotFound
err = repo.WithTxFunc(ctx, func(repo ill_db.IllRepo) error {
illTrans, err = repo.GetIllTransactionByRequesterRequestIdForUpdate(ctx, createPgText(requestingRequestId))
if err != nil {
errorValue = ReqIdNotFound
return err
}
action = validateAction(ctx, illMessage, w, eventData, eventBus, illTrans)
if action == "" {
return nil
}
if symbol != brokerSymbol {
supp, err := repo.GetSelectedSupplierForIllTransaction(ctx, illTrans.ID)
if err != nil {
errorValue = SupplierNotFoundOrInvalid
return err
}
if supp.SupplierSymbol != symbol {
errorValue = SupplierNotFoundOrInvalid
return pgx.ErrNoRows
}
}
illTrans.PrevRequesterAction = illTrans.LastRequesterAction
illTrans.LastRequesterAction = createPgText(string(action))
_, err = repo.SaveIllTransaction(ctx, ill_db.SaveIllTransactionParams(illTrans))
return err
})
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
handleRequestingAgencyError(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, ReqIdNotFound)
handleRequestingAgencyError(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, errorValue)
return
}
ctx.Logger().Error(InternalFailedToSaveTx, "error", err)
Expand All @@ -383,6 +400,14 @@ func handleRequestingAgencyMessage(ctx extctx.ExtendedContext, illMessage *iso18
wg.Wait()
}

func getSupplierSymbol(header *iso18626.Header) string {
if len(header.SupplyingAgencyId.AgencyIdType.Text) == 0 || len(header.SupplyingAgencyId.AgencyIdValue) == 0 {
return ""
}
return header.SupplyingAgencyId.AgencyIdType.Text + ":" +
header.SupplyingAgencyId.AgencyIdValue
}

func validateAction(ctx extctx.ExtendedContext, illMessage *iso18626.ISO18626Message, w http.ResponseWriter, eventData events.EventData, eventBus events.EventBus, illTrans ill_db.IllTransaction) iso18626.TypeAction {
action, ok := iso18626.ActionMap[string(illMessage.RequestingAgencyMessage.Action)]
if !ok {
Expand Down Expand Up @@ -435,10 +460,9 @@ func handleSupplyingAgencyMessage(ctx extctx.ExtendedContext, illMessage *iso186
http.Error(w, PublicFailedToProcessReqMsg, http.StatusInternalServerError)
return
}
symbol := illMessage.SupplyingAgencyMessage.Header.SupplyingAgencyId.AgencyIdType.Text + ":" +
illMessage.SupplyingAgencyMessage.Header.SupplyingAgencyId.AgencyIdValue
if len(symbol) < 3 {
handleSupplyingAgencyError(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, IncorrectSupplier)
symbol := getSupplierSymbol(&illMessage.SupplyingAgencyMessage.Header)
if len(symbol) == 0 {
handleSupplyingAgencyError(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, SupplierNotFoundOrInvalid)
return
}
requester, err := repo.GetPeerById(ctx, illTrans.RequesterID.String)
Expand Down Expand Up @@ -468,18 +492,18 @@ func handleSupplyingAgencyMessage(ctx extctx.ExtendedContext, illMessage *iso186
return
}
if supplier.SupplierStatus != ill_db.SupplierStateSkippedPg {
handleSupplyingAgencyErrorWithNotice(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, SupplierNotFound,
handleSupplyingAgencyErrorWithNotice(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, SupplierNotFoundOrInvalid,
eventBus, illTrans.ID)
return
}
} else {
handleSupplyingAgencyErrorWithNotice(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, IncorrectSupplier,
handleSupplyingAgencyErrorWithNotice(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, SupplierNotFoundOrInvalid,
eventBus, illTrans.ID)
return
}
}
if supplier.SupplierSymbol != symbol { //ensure we found the correct supplier
handleSupplyingAgencyErrorWithNotice(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, IncorrectSupplier,
handleSupplyingAgencyErrorWithNotice(ctx, w, illMessage, iso18626.TypeErrorTypeUnrecognisedDataValue, SupplierNotFoundOrInvalid,
eventBus, illTrans.ID)
return
}
Expand Down
28 changes: 28 additions & 0 deletions broker/handler/iso18626-handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package handler

import (
"testing"

"github.com/indexdata/crosslink/iso18626"
"github.com/stretchr/testify/assert"
)

func TestGetSupplierSymbol(t *testing.T) {
header := &iso18626.Header{
SupplyingAgencyId: iso18626.TypeAgencyId{
AgencyIdType: iso18626.TypeSchemeValuePair{
Text: "ISIL",
},
AgencyIdValue: "12345",
},
}
symbol := getSupplierSymbol(header)
assert.Equal(t, "ISIL:12345", symbol)
header.SupplyingAgencyId.AgencyIdType.Text = ""
symbol = getSupplierSymbol(header)
assert.Equal(t, "", symbol)
header.SupplyingAgencyId.AgencyIdType.Text = "ISIL"
header.SupplyingAgencyId.AgencyIdValue = ""
symbol = getSupplierSymbol(header)
assert.Equal(t, "", symbol)
}
60 changes: 44 additions & 16 deletions broker/test/handler/iso18626-handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestIso18626PostSupplyingMessageIncorrectSupplier(t *testing.T) {
assert.Equal(t, http.StatusOK, rr.Code)
msgError := "<messageStatus>ERROR</messageStatus>"
assert.Contains(t, rr.Body.String(), msgError)
errorValue := "<errorValue>supplyingAgencyId: not a selected supplier for this request</errorValue>"
errorValue := "<errorValue>supplyingAgencyId: supplying agency not found or invalid</errorValue>"
assert.Contains(t, rr.Body.String(), errorValue)
}

Expand Down Expand Up @@ -288,11 +288,13 @@ func TestIso18626PostSupplyingMessageReqNotFound(t *testing.T) {

func TestIso18626PostRequestingMessage(t *testing.T) {
tests := []struct {
name string
status int
contains string
urlEnding string
useMock bool
name string
status int
contains string
urlEnding string
supplierSymbol string
skipped bool
useMock bool
}{
{
name: "ResponseSuccessful",
Expand Down Expand Up @@ -322,35 +324,61 @@ func TestIso18626PostRequestingMessage(t *testing.T) {
urlEnding: "/notExists",
useMock: false,
},
{
name: "ResponseSupplierNotFoundOrInvalid-WrongSymbol",
status: 200,
contains: "<errorValue>supplyingAgencyId: supplying agency not found or invalid</errorValue>",
urlEnding: "",
supplierSymbol: "ISIL:SLNP_TWO_B",
useMock: true,
},
{
name: "ResponseSupplierNotFoundOrInvalid-Skipped",
status: 200,
contains: "<errorValue>supplyingAgencyId: supplying agency not found or invalid</errorValue>",
urlEnding: "",
supplierSymbol: "ISIL:SLNP_TWO_A",
skipped: true,
useMock: true,
},
}
appCtx := extctx.CreateExtCtxWithArgs(context.Background(), nil)
data, _ := os.ReadFile("../testdata/reqmsg-ok.xml")
data, _ := os.ReadFile("../testdata/reqmsg-notification.xml")
illId := uuid.NewString()
requester := apptest.CreatePeer(t, illRepo, "isil:requester1", adapter.MOCK_CLIENT_URL)
requester := apptest.CreatePeer(t, illRepo, "ISIL:SLNP_ONE", adapter.MOCK_CLIENT_URL)
_, err := illRepo.SaveIllTransaction(appCtx, ill_db.SaveIllTransactionParams{
ID: illId,
Timestamp: test.GetNow(),
RequesterRequestID: apptest.CreatePgText("reqid"),
RequesterSymbol: apptest.CreatePgText("isil:requester1"),
RequesterSymbol: apptest.CreatePgText("ISIL:SLNP_ONE"),
RequesterID: apptest.CreatePgText(requester.ID),
})
if err != nil {
t.Errorf("failed to create ill transaction: %s", err)
}
peer := apptest.CreatePeer(t, illRepo, "isil:reqTest", adapter.MOCK_CLIENT_URL)
apptest.CreateLocatedSupplier(t, illRepo, illId, peer.ID, "isil:reqTest", "selected")

supplier := apptest.CreatePeer(t, illRepo, "ISIL:SLNP_TWO_A", adapter.MOCK_CLIENT_URL)
locSup := apptest.CreateLocatedSupplier(t, illRepo, illId, supplier.ID, "ISIL:SLNP_TWO_A", "selected")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.useMock {
peer.Url = adapter.MOCK_CLIENT_URL + tt.urlEnding
supplier.Url = adapter.MOCK_CLIENT_URL + tt.urlEnding
} else {
port, _ := test.GetFreePort()
peer.Url = "http:localhost:" + strconv.Itoa(port) + tt.urlEnding
supplier.Url = "http:localhost:" + strconv.Itoa(port) + tt.urlEnding
}
supplier, err = illRepo.SavePeer(appCtx, ill_db.SavePeerParams(supplier))
if err != nil {
t.Errorf("failed to update supplier peer : %s", err)
}
if tt.supplierSymbol != "" {
locSup.SupplierSymbol = tt.supplierSymbol
}
if tt.skipped {
locSup.SupplierStatus = ill_db.SupplierStateSkippedPg
}
peer, err = illRepo.SavePeer(appCtx, ill_db.SavePeerParams(peer))
_, err := illRepo.SaveLocatedSupplier(appCtx, ill_db.SaveLocatedSupplierParams(locSup))
if err != nil {
t.Errorf("failed to update peer : %s", err)
t.Errorf("failed to update located supplier : %s", err)
}
url := "http://localhost:" + strconv.Itoa(app.HTTP_PORT) + "/iso18626"
req, _ := http.NewRequest("POST", url, bytes.NewReader(data))
Expand Down
25 changes: 25 additions & 0 deletions broker/test/testdata/reqmsg-notification.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ISO18626Message
xmlns="http://illtransactions.org/2013/iso18626"
xmlns:ill="http://illtransactions.org/2013/iso18626"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
ill:version="1.2"
xsi:schemaLocation="http://illtransactions.org/2013/iso18626 https://illtransactions.org/schemas/ISO-18626-v1_2.xsd">
<requestingAgencyMessage>
<header>
<supplyingAgencyId>
<agencyIdType>ISIL</agencyIdType>
<agencyIdValue>SLNP_TWO_A</agencyIdValue>
</supplyingAgencyId>
<requestingAgencyId>
<agencyIdType>ISIL</agencyIdType>
<agencyIdValue>SLNP_ONE</agencyIdValue>
</requestingAgencyId>
<multipleItemRequestId></multipleItemRequestId>
<timestamp>2024-12-12T20:12:00.62959771Z</timestamp>
<requestingAgencyRequestId>reqid</requestingAgencyRequestId>
<supplyingAgencyRequestId>c488bef0-5735-4c33-ba13-a9f343447574</supplyingAgencyRequestId>
</header>
<action>Notification</action>
</requestingAgencyMessage>
</ISO18626Message>
Loading