Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/helm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,39 @@ jobs:
- chart_name: crosslink-broker
chart_version: "0.1.0-main.${{ github.run_number }}"
chart_path: ./broker/chart
md_path: ./broker/descriptors/ModuleDescriptor-template.json
- chart_name: crosslink-illmock
chart_version: "0.1.0-main.${{ github.run_number }}"
chart_path: ./illmock/chart
steps:
- uses: actions/checkout@v4

- name: Calculate chart and app version with short SHA
run: |
SHORT_SHA=$(echo $GITHUB_SHA | cut -c1-7)
CHART_VERSION="${{ matrix.chart_base_version }}+sha.$SHORT_SHA"
APP_VERSION="sha-$SHORT_SHA"
echo "CHART_VERSION=$CHART_VERSION" >> $GITHUB_ENV
echo "APP_VERSION=$APP_VERSION" >> $GITHUB_ENV

- name: Process MD for okapi-hooks
if: ${{ matrix.md_path }}
run: |
sed -i "s/@version@/$CHART_VERSION/" ${{ matrix.md_path }}
sed 's/^/ /' ${{ matrix.md_path }} > indented.json
sed -i -e '/@descriptor@/{
r indented.json
d
}' ${{ matrix.chart_path }}/values.yaml
- name: helm lint
run: |
helm lint ${{ matrix.chart_path }}
- name: helm login
run: |
echo ${{ secrets.GITHUB_TOKEN }} | helm registry login ghcr.io -u $ --password-stdin
- name: helm dependency
run: |
helm dependency build ${{ matrix.chart_path }}
- name: helm package
run: |
helm package ${{ matrix.chart_path }} --version "${{ matrix.chart_version }}+sha.$(echo $GITHUB_SHA | cut -c1-7)" --app-version "sha-$(echo $GITHUB_SHA | cut -c1-7)"
Expand Down
5 changes: 5 additions & 0 deletions broker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The broker's API uses hyperlinks to connect JSON resources.
If you're using Chrome or another browser to explore the API,
consider using an extension like [JSON Formatter](https://chromewebstore.google.com/detail/json-formatter/bcjindcccaagfpapjjmafapmmgkkhgoa) which allows to easily navigate hyperlinked JSON.

Note that API is also available with base path `/broker` if env
`TENANT_TO_SYMBOL` is defined.

# Configuration

Expand All @@ -39,6 +41,9 @@ Configuration is provided via environment variables:
| DIRECTORY_API_URL | Comma separated list of URLs when DIRECTORY_ADAPTER is `api` | `http://localhost:8081/directory/entries` |
| BROKER_MODE | Should broker forward supplier/requester symbols: `opaque` or `transparent` | `opaque` |
| LOCAL_SUPPLY | Should we check if requester can supply item: `true` or `false` | `false` |
| TENANT_TO_SYMBOL | Limits results to include only transactions with `requesterSymbol`matching | `` |
| | TENANT_TO_SYMBOL with {tenant} being replaced by X-Okapi-Tenant value | |


# Build

Expand Down
133 changes: 104 additions & 29 deletions broker/api/api-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/indexdata/go-utils/utils"
"net/http"
"reflect"
"strings"
"time"

"github.com/indexdata/go-utils/utils"

"github.com/google/uuid"
icql "github.com/indexdata/cql-go/cql"
extctx "github.com/indexdata/crosslink/broker/common"
Expand All @@ -27,15 +28,32 @@ var PEERS_PATH = "/peers"
var ILL_TRANSACTION_QUERY = "ill_transaction_id="

type ApiHandler struct {
eventRepo events.EventRepo
illRepo ill_db.IllRepo
eventRepo events.EventRepo
illRepo ill_db.IllRepo
tenantToSymbol string // non-empty if in /broker mode
}

func NewApiHandler(eventRepo events.EventRepo, illRepo ill_db.IllRepo) ApiHandler {
func NewApiHandler(eventRepo events.EventRepo, illRepo ill_db.IllRepo, tenentToSymbol string) ApiHandler {
return ApiHandler{
eventRepo: eventRepo,
illRepo: illRepo,
eventRepo: eventRepo,
illRepo: illRepo,
tenantToSymbol: tenentToSymbol,
}
}

func (a *ApiHandler) TenantFilter(trans *ill_db.IllTransaction, tenant *string, requesterSymbol *string) bool {
if tenant == nil && requesterSymbol != nil {
return trans.RequesterSymbol.String == *requesterSymbol
}
if a.tenantToSymbol == "" {
return true
}
// this is the /broker mode
if tenant == nil {
return false
}
full := strings.ReplaceAll(a.tenantToSymbol, "{tenant}", strings.ToUpper(*tenant))
return trans.RequesterSymbol.String == full
}

func (a *ApiHandler) GetEvents(w http.ResponseWriter, r *http.Request, params oapi.GetEventsParams) {
Expand All @@ -46,18 +64,35 @@ func (a *ApiHandler) GetEvents(w http.ResponseWriter, r *http.Request, params oa
ctx := extctx.CreateExtCtxWithArgs(context.Background(), &extctx.LoggerArgs{
Other: logParams,
})
resp := []oapi.Event{}
var eventList []events.Event
var err error
if params.IllTransactionId != nil {
eventList, err = a.eventRepo.GetIllTransactionEvents(ctx, *params.IllTransactionId)
} else {
if params.RequesterReqId != nil {
var tran ill_db.IllTransaction
tran, err = a.illRepo.GetIllTransactionByRequesterRequestId(ctx, pgtype.Text{
String: *params.RequesterReqId,
Valid: true,
})
if err == nil && a.TenantFilter(&tran, params.XOkapiTenant, params.RequesterSymbol) {
eventList, err = a.eventRepo.GetIllTransactionEvents(ctx, tran.ID)
}
} else if params.IllTransactionId != nil {
var tran ill_db.IllTransaction
tran, err = a.illRepo.GetIllTransactionById(ctx, *params.IllTransactionId)
if err == nil && a.TenantFilter(&tran, params.XOkapiTenant, params.RequesterSymbol) {
eventList, err = a.eventRepo.GetIllTransactionEvents(ctx, tran.ID)
}
} else if a.tenantToSymbol == "" {
eventList, err = a.eventRepo.ListEvents(ctx)
}
if err != nil {
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
addInternalError(ctx, w, err)
return
}
if len(eventList) == 0 && a.tenantToSymbol != "" {
addForbiddenError(ctx, w)
return
}
resp := []oapi.Event{}
for _, event := range eventList {
resp = append(resp, toApiEvent(event))
}
Expand All @@ -78,33 +113,43 @@ func (a *ApiHandler) GetIllTransactions(w http.ResponseWriter, r *http.Request,
addInternalError(ctx, w, err)
return
}
resp = append(resp, toApiIllTransaction(r, tran))
} else {
if a.TenantFilter(&tran, params.XOkapiTenant, params.RequesterSymbol) {
resp = append(resp, toApiIllTransaction(r, tran))
}
} else if a.tenantToSymbol == "" {
trans, err := a.illRepo.ListIllTransactions(ctx)
if err != nil {
addInternalError(ctx, w, err)
return
}
for _, t := range trans {
resp = append(resp, toApiIllTransaction(r, t))
if a.TenantFilter(&t, params.XOkapiTenant, params.RequesterSymbol) {
resp = append(resp, toApiIllTransaction(r, t))
}
}
}
if len(resp) == 0 && a.tenantToSymbol != "" {
addForbiddenError(ctx, w)
return
}
writeJsonResponse(w, resp)
}

func (a *ApiHandler) GetIllTransactionsId(w http.ResponseWriter, r *http.Request, id string) {
func (a *ApiHandler) GetIllTransactionsId(w http.ResponseWriter, r *http.Request, id string, params oapi.GetIllTransactionsIdParams) {
ctx := extctx.CreateExtCtxWithArgs(context.Background(), &extctx.LoggerArgs{
Other: map[string]string{"method": "GetIllTransactionsId", "id": id},
})
trans, err := a.illRepo.GetIllTransactionById(ctx, id)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
addNotFoundError(w)
return
} else {
addInternalError(ctx, w, err)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
addInternalError(ctx, w, err)
return
}
if err != nil || !a.TenantFilter(&trans, params.XOkapiTenant, params.RequesterSymbol) {
if a.tenantToSymbol != "" {
addForbiddenError(ctx, w)
return
}
addNotFoundError(w)
}
writeJsonResponse(w, toApiIllTransaction(r, trans))
}
Expand Down Expand Up @@ -275,7 +320,7 @@ func (a *ApiHandler) PostPeers(w http.ResponseWriter, r *http.Request) {
return
}
for _, s := range newPeer.Symbols {
if s == "" || !strings.Contains(s, ":") {
if !strings.Contains(s, ":") {
addBadRequestError(ctx, w, fmt.Errorf("symbol should be in \"ISIL:SYMBOL\" format but got %v", s))
return
}
Expand Down Expand Up @@ -450,20 +495,37 @@ func (a *ApiHandler) GetLocatedSuppliers(w http.ResponseWriter, r *http.Request,
ctx := extctx.CreateExtCtxWithArgs(context.Background(), &extctx.LoggerArgs{
Other: logParams,
})
resp := []oapi.LocatedSupplier{}
var supList []ill_db.LocatedSupplier
var err error
if params.IllTransactionId != nil {
supList, err = a.illRepo.GetLocatedSupplierByIllTransition(ctx, *params.IllTransactionId)
} else {
if params.RequesterReqId != nil {
var tran ill_db.IllTransaction
tran, err = a.illRepo.GetIllTransactionByRequesterRequestId(ctx, pgtype.Text{
String: *params.RequesterReqId,
Valid: true,
})
if err == nil && a.TenantFilter(&tran, params.XOkapiTenant, params.RequesterSymbol) {
supList, err = a.illRepo.GetLocatedSupplierByIllTransition(ctx, tran.ID)
}
} else if params.IllTransactionId != nil {
var tran ill_db.IllTransaction
tran, err = a.illRepo.GetIllTransactionById(ctx, *params.IllTransactionId)
if err == nil && a.TenantFilter(&tran, params.XOkapiTenant, params.RequesterSymbol) {
supList, err = a.illRepo.GetLocatedSupplierByIllTransition(ctx, *params.IllTransactionId)
}
} else if a.tenantToSymbol == "" {
supList, err = a.illRepo.ListLocatedSuppliers(ctx)
}
if err != nil {
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
addInternalError(ctx, w, err)
return
}
for _, event := range supList {
resp = append(resp, toApiLocatedSupplier(r, event))
if len(supList) == 0 && a.tenantToSymbol != "" {
addForbiddenError(ctx, w)
return
}
resp := []oapi.LocatedSupplier{}
for _, supplier := range supList {
resp = append(resp, toApiLocatedSupplier(r, supplier))
}
writeJsonResponse(w, resp)
}
Expand Down Expand Up @@ -500,6 +562,16 @@ func addInternalError(ctx extctx.ExtendedContext, w http.ResponseWriter, err err
_ = json.NewEncoder(w).Encode(resp)
}

func addForbiddenError(ctx extctx.ExtendedContext, w http.ResponseWriter) {
resp := ErrorMessage{
Error: "forbidden",
}
ctx.Logger().Error("error serving api request", "error", "forbidden")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusForbidden)
_ = json.NewEncoder(w).Encode(resp)
}

func addBadRequestError(ctx extctx.ExtendedContext, w http.ResponseWriter, err error) {
resp := ErrorMessage{
Error: err.Error(),
Expand Down Expand Up @@ -690,6 +762,9 @@ func toLink(r *http.Request, path string, id string, query string) string {
if strings.Contains(urlHost, "localhost") {
urlScheme = "http"
}
if strings.Contains(r.RequestURI, "/broker/") {
path = "/broker" + path
}
if id != "" {
path = path + "/" + id
}
Expand Down
5 changes: 3 additions & 2 deletions broker/api/api-handler_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package api

import (
"github.com/indexdata/cql-go/cql"
"github.com/stretchr/testify/assert"
"strings"
"testing"

"github.com/indexdata/cql-go/cql"
"github.com/stretchr/testify/assert"
)

func TestMatchQueries(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion broker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var MAX_MESSAGE_SIZE, _ = utils.GetEnvAny("MAX_MESSAGE_SIZE", int(100*1024), fun
})
var BROKER_MODE = utils.GetEnv("BROKER_MODE", "opaque")
var LOCAL_SUPPLY = utils.Must(utils.GetEnvBool("LOCAL_SUPPLY", false))
var TENANT_TO_SYMBOL = os.Getenv("TENANT_TO_SYMBOL")

var appCtx = extctx.CreateExtCtxWithLogArgsAndHandler(context.Background(), nil, configLog())

Expand Down Expand Up @@ -128,8 +129,12 @@ func StartServer(context Context) error {
http.ServeFile(w, r, "handler/open-api.yaml")
})

apiHandler := api.NewApiHandler(context.EventRepo, context.IllRepo)
apiHandler := api.NewApiHandler(context.EventRepo, context.IllRepo, "")
oapi.HandlerFromMux(&apiHandler, mux)
if TENANT_TO_SYMBOL != "" {
apiHandler := api.NewApiHandler(context.EventRepo, context.IllRepo, TENANT_TO_SYMBOL)
oapi.HandlerFromMuxWithBaseURL(&apiHandler, mux, "/broker")
}

appCtx.Logger().Info("Server started on http://localhost:" + strconv.Itoa(HTTP_PORT))
return http.ListenAndServe(":"+strconv.Itoa(HTTP_PORT), mux)
Expand Down
5 changes: 5 additions & 0 deletions broker/chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ version: 0.1.0

# appVersion is used as the Docker image tag by the chart
appVersion: "main"

dependencies:
- name: okapi-hooks
repository: oci://ghcr.io/indexdata/charts
version: ">0.1.0-0"
5 changes: 5 additions & 0 deletions broker/chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ image:
# Overrides the image tag whose default is the chart appVersion.
tag: ""

okapi-hooks:
moduleUrl: http://crosslink-broker:80
moduleDescriptor: |
@descriptor@

containerPort: 8080
# envvars passed to the container
env: {}
Expand Down
Loading
Loading