Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions broker/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/archive
/broker
/data
8 changes: 7 additions & 1 deletion broker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ RUN --mount=type=cache,sharing=shared,target=/root/.cache/go-build \
GOOS=linux \
go build -o /broker ./cmd/broker

RUN --mount=type=cache,sharing=shared,target=/root/.cache/go-build \
CGO_ENABLED=0 \
GOOS=linux \
go build -o /archive ./cmd/archive

# create runtime user
RUN adduser \
--disabled-password \
Expand All @@ -52,8 +57,9 @@ COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=build /etc/passwd /etc/passwd
COPY --from=build /etc/group /etc/group

# copy the binary
# copy binaries
COPY --from=build /broker .
COPY --from=build /archive .
# copy migrations
COPY --from=build /app/broker/migrations /migrations

Expand Down
9 changes: 7 additions & 2 deletions broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ OAPI_GEN = $(OAPI_DIR)/openapi_gen.go

.PHONY: all docker generate generate-sqlc generate-commit-id check run fmt fmt-check vet clean view-coverage

all: $(BINARY)
all: $(BINARY) archive

docker: generate
cd .. && $(DOCKER) build -t indexdata/$(MODULE):latest -f ./$(MODULE)/Dockerfile .
Expand All @@ -57,6 +57,9 @@ $(COMMIT_ID):
$(BINARY): $(COMMIT_ID) $(GEN_SCHEMA_OUT) $(SQL_GEN_OUT) $(OAPI_GEN) $(GOFILES)
$(GO) build -v -o $(BINARY) ./$(MAIN_PACKAGE)

archive: $(COMMIT_ID) $(GEN_SCHEMA_OUT) $(SQL_GEN_OUT) $(OAPI_GEN) $(GOFILES)
$(GO) build -v -o archive ./cmd/archive

check: generate
$(GO) test -v -cover -coverpkg=./... -coverprofile=$(COVERAGE) ./...

Expand All @@ -79,7 +82,9 @@ check-coverage: check
$(GO) run github.com/vladopajic/go-test-coverage/v2@latest --config=./.testcoverage.yaml

clean:
rm -f $(BINARY)
$(GO) clean -testcache
$(GO) clean -cache
rm -f $(BINARY) archive
rm -f $(COVERAGE)
rm -f $(COMMIT_ID)
rm -f $(SQL_GEN_OUT)
Expand Down
9 changes: 7 additions & 2 deletions broker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,17 @@ Configuration is provided via environment variables:

# Build

Generate sources and compile the main program with:
Generate sources and compile the main programs with:

```
make
```

This will build the following binaries:

* `broker` — the main program for the ILL service
* `archive` — a utility for archiving old ILL transactions

You can also run included tests with:

```
Expand All @@ -100,7 +105,7 @@ go test -v -coverpkg=./.. -cover ./cmd/broker

# Run locally

You can run the program locally with:
You can run the `broker` program locally with:

```
make run
Expand Down
17 changes: 17 additions & 0 deletions broker/api/api-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/indexdata/crosslink/broker/adapter"
"github.com/indexdata/crosslink/broker/service"

"github.com/indexdata/go-utils/utils"

Expand All @@ -34,6 +35,7 @@ var LOCATED_SUPPLIERS_PATH = "/located_suppliers"
var PEERS_PATH = "/peers"
var ILL_TRANSACTION_QUERY = "ill_transaction_id="
var LIMIT_DEFAULT int32 = 10
var ARCHIVE_PROCESS_STARTED = "Archive process started"

type ApiHandler struct {
limitDefault int32
Expand Down Expand Up @@ -627,6 +629,21 @@ func (a *ApiHandler) GetLocatedSuppliers(w http.ResponseWriter, r *http.Request,
writeJsonResponse(w, resp)
}

func (a *ApiHandler) PostArchiveIllTransactions(w http.ResponseWriter, r *http.Request, params oapi.PostArchiveIllTransactionsParams) {
logParams := map[string]string{"method": "PostArchiveIllTransactions", "ArchiveDelay": params.ArchiveDelay, "ArchiveStatus": params.ArchiveStatus}
ctx := extctx.CreateExtCtxWithArgs(context.Background(), &extctx.LoggerArgs{
Other: logParams,
})
err := service.Archive(ctx, a.illRepo, params.ArchiveStatus, params.ArchiveDelay, true)
if err != nil {
addBadRequestError(ctx, w, err)
return
}
writeJsonResponse(w, oapi.StatusMessage{
Status: ARCHIVE_PROCESS_STARTED,
})
}

func writeJsonResponse(w http.ResponseWriter, resp any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Expand Down
13 changes: 13 additions & 0 deletions broker/call_archive.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ACTUAL_PORT="${HTTP_PORT:8081}"
URL="http://localhost:${ACTUAL_PORT}/archive_ill_transactions?archive_delay=240h&archive_status=LoanCompleted,CopyCompleted,Unfilled"

RESPONSE=$(curl -X POST -s -o /dev/null -w "%{http_code}" "${URL}")

if [ "$RESPONSE" -eq 200 ]; then
echo "Success! HTTP Status Code: ${RESPONSE}"
else
echo "Error! HTTP Status Code: ${RESPONSE}"
echo "Check server logs for more details."
fi

echo "Script finished."
40 changes: 40 additions & 0 deletions broker/cmd/archive/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"context"
"flag"

"fmt"
"os"

"github.com/indexdata/crosslink/broker/app"
extctx "github.com/indexdata/crosslink/broker/common"
"github.com/indexdata/crosslink/broker/service"
)

func main() {
err := run()
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
}

func run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var statusList string
var duration string
flag.StringVar(&statusList, "statuses", "LoanCompleted,CopyCompleted,Unfilled", "comma separated list of statuses to archive")
flag.StringVar(&duration, "duration", "5d", "archive transactions older than this duration, for example 2d")
flag.Parse()
context, err := app.Init(ctx)
if err != nil {
return err
}
logParams := map[string]string{"method": "PostArchiveIllTransactions", "ArchiveDelay": duration, "ArchiveStatus": statusList}
ectx := extctx.CreateExtCtxWithArgs(ctx, &extctx.LoggerArgs{
Other: logParams,
})
return service.Archive(ectx, context.IllRepo, statusList, duration, false)
}
45 changes: 45 additions & 0 deletions broker/cmd/archive/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"context"
"os"
"testing"
"time"

"github.com/indexdata/crosslink/broker/app"
test "github.com/indexdata/crosslink/broker/test/utils"
_ "github.com/lib/pq" // PostgreSQL driver
"github.com/stretchr/testify/assert"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
)

func TestMain(m *testing.M) {
ctx := context.Background()

pgContainer, err := postgres.Run(ctx, "postgres",
postgres.WithDatabase("crosslink"),
postgres.WithUsername("crosslink"),
postgres.WithPassword("crosslink"),
testcontainers.WithWaitStrategy(
wait.ForLog("database system is ready to accept connections").
WithOccurrence(2).WithStartupTimeout(5*time.Second)),
)
test.Expect(err, "failed to start db container")

connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
test.Expect(err, "failed to get conn string")
app.ConnectionString = connStr
app.MigrationsFolder = "file://../../migrations"

code := m.Run()

test.Expect(pgContainer.Terminate(ctx), "failed to stop db container")
os.Exit(code)
}

func TestMainOK(t *testing.T) {
err := run()
assert.NoError(t, err)
}
6 changes: 6 additions & 0 deletions broker/ill_db/illrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type IllRepo interface {
SaveBranchSymbol(ctx extctx.ExtendedContext, params SaveBranchSymbolParams) (BranchSymbol, error)
GetBranchSymbolsByPeerId(ctx extctx.ExtendedContext, peerId string) ([]BranchSymbol, error)
DeleteBranchSymbolByPeerId(ctx extctx.ExtendedContext, peerId string) error
CallArchiveIllTransactionByDateAndStatus(ctx extctx.ExtendedContext, toDate time.Time, statuses []string) error
}

type PgIllRepo struct {
Expand Down Expand Up @@ -486,6 +487,11 @@ func (r *PgIllRepo) mapSymbolsAndFilterStale(ctx extctx.ExtendedContext, symbols
return symbolToPeer, symbolsToFetch
}

func (r *PgIllRepo) CallArchiveIllTransactionByDateAndStatus(ctx extctx.ExtendedContext, toDate time.Time, statuses []string) error {
_, err := r.queries.CallArchiveIllTransactionByDateAndStatus(ctx, r.GetConnOrTx(), CallArchiveIllTransactionByDateAndStatusParams{toDate, statuses})
return err
}

func getSliceFromMapInOrder(symbolToPeer map[string]Peer, symbols []string) []Peer {
peers := make([]Peer, 0, len(symbolToPeer))
// first add peers that match the original symbols
Expand Down
2 changes: 2 additions & 0 deletions broker/migrations/010_add_archive_table.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP FUNCTION archive_ill_transaction_by_date_and_status;
DROP TABLE archived_ill_transactions;
89 changes: 89 additions & 0 deletions broker/migrations/010_add_archive_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
CREATE TABLE archived_ill_transactions
(
ill_transaction JSONB NOT NULL,
events JSONB NOT NULL,
located_suppliers JSONB NOT NULL
);

CREATE OR REPLACE FUNCTION archive_ill_transaction_by_date_and_status(
t_cut_off_date TIMESTAMPTZ,
t_status_list TEXT[]
)
RETURNS INT AS $$
DECLARE
v_deleted_ids TEXT[];
v_deleted_count INT := 0;
lock_id BIGINT := 8372910465;
lock_acquired BOOLEAN;
BEGIN
SELECT pg_try_advisory_lock(lock_id) INTO lock_acquired;

IF NOT lock_acquired THEN
RAISE NOTICE 'Function archive_ill_transaction_by_date_and_status() is already running. Exiting.';
RETURN 0;
END IF;

SELECT array_agg(id) INTO v_deleted_ids
FROM ill_transaction
WHERE timestamp <= t_cut_off_date
AND last_supplier_status = ANY(t_status_list);

-- If no ill transactions match the criteria, exit early
IF v_deleted_ids IS NULL OR array_length(v_deleted_ids, 1) IS NULL THEN
RAISE NOTICE 'No ILL transactions found matching date % and statuses %', t_cut_off_date, t_status_list;
RETURN 0;
END IF;

INSERT INTO archived_ill_transactions (ill_transaction, events, located_suppliers)
SELECT
row_to_json(t)::jsonb as transaction_json,
COALESCE(pe.events, '[]'::jsonb) as events_json,
COALESCE(pls.contacts, '[]'::jsonb) as located_suppliers_json
FROM
ill_transaction AS t
LEFT JOIN LATERAL (
SELECT
e.ill_transaction_id,
jsonb_agg(row_to_json(e)) AS events
FROM
event AS e
WHERE
e.ill_transaction_id = t.id
GROUP BY
e.ill_transaction_id
) AS pe ON pe.ill_transaction_id = t.id
LEFT JOIN LATERAL (
SELECT
ls.ill_transaction_id,
jsonb_agg(row_to_json(ls)) AS contacts
FROM
located_supplier AS ls
WHERE
ls.ill_transaction_id = t.id
GROUP BY
ls.ill_transaction_id
) AS pls ON pls.ill_transaction_id = t.id
WHERE t.id = ANY(v_deleted_ids);

DELETE FROM located_supplier
WHERE ill_transaction_id = ANY(v_deleted_ids);
GET DIAGNOSTICS v_deleted_count = ROW_COUNT;
RAISE NOTICE 'Deleted % located_supplier rows.', v_deleted_count;

DELETE FROM event
WHERE ill_transaction_id = ANY(v_deleted_ids);
GET DIAGNOSTICS v_deleted_count = ROW_COUNT;
RAISE NOTICE 'Deleted % event rows.', v_deleted_count;

DELETE FROM ill_transaction
WHERE id = ANY(v_deleted_ids);
GET DIAGNOSTICS v_deleted_count = ROW_COUNT;
RAISE NOTICE 'Deleted % ill_transaction rows.', v_deleted_count;

RETURN v_deleted_count;
EXCEPTION
WHEN OTHERS THEN
PERFORM pg_advisory_unlock(lock_id);
RAISE;
END;
$$ LANGUAGE plpgsql;
Loading
Loading