Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rdf-connect/ldes-client #45

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ FROM semtech/mu-javascript-template:1.7.0
LABEL maintainer="[email protected]"
ENV PERSIST_STATE "false"
ENV LDES_ENDPOINT_VIEW "http://ldes-time-fragmenter:3000/example/1"
ENV LDES_STREAM "http://example.org/example-stream"
ENV REPLACE_VERSIONS "true"
ENV DEBUG_AUTH_HEADERS "false"
ENV LOG_SPARQL_ALL "false"
64 changes: 42 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,52 @@ Add the following snipped to your docker-compose.yml to include the consumer ser

```
consumer:
image: redpencil/ldes-consumer
image: redpencil/ldes-consumer
volumes:
- ./data/ldes-consumer:/data
```


## Configuration

The service can be configured with the following environment variables:

- `BLANK_NODE_NAMESPACE` [string]: namespace to use for skolemizing blank nodes (default 'http://mu.semte.ch/blank#')
- `CRON_PATTERN` [string]: the cron pattern which the cronjob should use. (default: `* 0 * * * *`)
- `DEBUG_AUTH_HEADERS`: Debugging of [mu-authorization](https://github.com/mu-semtech/mu-authorization) access-control related headers (default `false`)
- `LDES_DEREFERENCE_MEMBERS`: whether to dereference members, because the collection pages do not contain all information (default: false)
- `LDES_ENDPOINT_HEADER_<key>` [string]: A header key-value combination which should be send as part of the headers to the LDES ENDPOINT. E.g. `LDES_ENDPOINT_HEADER_X-API-KEY: <api_key>`.
- `LDES_ENDPOINT_VIEW` [string]: the ldes endpoint containing the view (the first page) of the stream.
- `LDES_POLLING_INTERVAL`: Number of milliseconds before refetching uncacheable fragments (default: 60000)
- `LDES_REQUESTS_PER_MINUTE`: how many requests per minutes may be sent to the same host (optional, any positive number)
- `LDES_STREAM` [string]: the uri which should be used as a subject to store the latest page and timestamp consumed in the database. (default: `http://mu.semte.ch/example-stream`)
- `LDES_TIMESTAMP_PATH` [string]: the predicate to be used to find the timestamp of an object, default: `prov:generatedAtTime`)
- `LDES_VERSION_OF_PATH` [string]: the predicate to be used to find the link to the non version object, default: `dcterms:isVersionOf`)
- `LOG_SPARQL_ALL` [boolean]: log executed SPARQL queries (default: `false`)
- `MU_APPLICATION_GRAPH` [string]: The graph where the data should be ingested. (default: see [semantic.works default graph](https://github.com/mu-semtech/mu-javascript-template/blob/d3281b8dff24502919a75147f7737b83d4dd724f/Dockerfile#L8))
- `MU_SPARQL_ENDPOINT` [string]: SPARQL endpoint to connect to, defaults to 'http://database:8890/sparql'
- `REPLACE_VERSIONS` [boolean]: boolean which indicates whether to remove old versions of a resource when adding a new version or not (default: `true`)
- `RUNONCE` [boolean]: set to true to run the consumer only once (e.g. when running the service as a Kubernetes CronJob). (default: `false`)
- `SPARQL_AUTH_PASSWORD` [string]: provide a passwords to be used in a digest auth to be sent to the SPARQL endpoint.
- `SPARQL_AUTH_USER` [string]: (optional) provide a username to be used in a digest auth to be sent to the SPARQL endpoint.
- `SPARQL_BATCH_SIZE` [integer]: amount of triples sent per query. To work around triplestore query-length limitations (default: `0` - disabled).
- `SPARQL_ENDPOINT_HEADER_<key>` [string]: A header key-value combination which should be send as part of the headers to the SPARQL ENDPOINT.
- `PERSIST_STATE` [boolean]: whether to persist (and restore) state of the ldes client on runs. (default: `false`)
| Environment variable | Default | Description |
|----------------------|---------|-------------|
| `INGEST_MODE` | `ALL` | How the LDES feed should be ingested. Valid options are `ALL` and `MATERIALIZE`. `ALL` will ingest all versioned members as-is and store them in the triplestore. `MATERIALIZE` will store the [materializations of the members](https://semiceu.github.io/LinkedDataEventStreams/#version-materializations). |
| `REPLACE_VERSIONS` | `false` | Whether to remove old versions of a resource when adding a new version or not. |
| `PERSIST_STATE` | `false` | Whether to persist the state of the LDES client. The state is stored as a file in `/data/$LDES_ENDPOINT_VIEW-state.json`, make sure to mount the data folder to have access to store the state across container rebuilds! |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't the slashes in the feed uri create extra directories?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was too hasty writing this. What the code does is the following:

const url = new URL(LDES_ENDPOINT_VIEW);
stateFilePath = `/data/${url.host}-state.json`;

So it uses the host part, which can't contain slashes.

| `LDES_ENDPOINT_VIEW` | N/A (required) | The view of the LDES endpoint that will be ingested. If not set, the service will not start. |
| `LDES_POLLING_INTERVAL` | `60000` | Number of milliseconds before refetching uncacheable fragments |
| `LDES_REQUESTS_PER_MINUTE` | `0` (unlimited) | How many requests per minutes may be sent to the same host. This is optional, but any passed in value must be a positive number. |
| `LDES_ENDPOINT_HEADERS` | `{}` (no headers will be added) | Extra headers that will be added to the requests sent to the LDES endpoint. Recommended syntax:<pre>environment:<br> LDES_ENDPOINT_HEADERS: ><br> { "HEADER-NAME": "header-value" } # The leading whitespace is important!</pre> |
| `SPARQL_ENDPOINT_HEADER_<key>` | N/A | A header key-value combination which should be send as part of the headers to the SPARQL endpoint. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guessing this will also need to mvoe to a json structure, similar to LDES_ENDPOINT_HEADERS.

| `SPARQL_BATCH_SIZE` | `0` (disabled) | The amount of triples sent per query, used to work around triplestore query-length limitations. Value must be a non-negative integer. If set to 0, no batching will be applied. |
| `SPARQL_AUTH_USER` | N/A | Optional value to provide a username to be used in a digest auth to be sent to the SPARQL endpoint. |
| `SPARQL_AUTH_PASSWORD` | N/A | Optional value to provide a password to be used in a digest auth to be sent to the SPARQL endpoint. |
| `BLANK_NODE_NAMESPACE` | `http://mu.semte.ch/blank#` | namespace to use for skolemizing blank nodes. |
| `CRON_PATTERN` | `* 0 * * * *` | The cron pattern which the cronjob should use. |
| `RUN_ONCE` | `false` | Set to true to run the consumer only once (e.g. when running the service as a Kubernetes CronJob).
| `MU_APPLICATION_GRAPH` | See [semantic.works default graph](https://github.com/mu-semtech/mu-javascript-template/blob/d3281b8dff24502919a75147f7737b83d4dd724f/Dockerfile#L8) | The graph where the data should be ingested. |
| `MU_SPARQL_ENDPOINT` | `http://database:8890/sparql` | SPARQL endpoint to connect to. |
| `LOG_SPARQL_ALL` | `false` | Log executed SPARQL queries |
| `DEBUG_AUTH_HEADERS` | `false` | Debugging of [mu-authorization](https://github.com/mu-semtech/mu-authorization) access-control related headers |


> [!WARNING]
> The following environment variables are **deprecated** and slated to be removed at a later time, but still supported:

| Environment variable | Default | Description |
|----------------------|---------|-------------|
| `RUNONCE` | `false` | Set to true to run the consumer only once (e.g. when running the service as a Kubernetes CronJob). Replaced with `RUN_ONCE` environment variable. |

> [!CAUTION]
> The following environment variables are **no longer supported**:

| Environment variable | Reason | Description |
|----------------------|--------|-------------|
| `LDES_DEREFERENCE_MEMBERS` | The underlying library does not make dereferencing optional. | Whether to dereference members, because the collection pages do not contain all information. |
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated, this is always enabled by the library and cannot be disabled as of now, or at least I could not manage to disable it.

This might be an issue for https://github.com/redpencilio/ldes-consumer-manager

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder how they decide what to dereference and what that means for us. I can see cases where you explicitly don't want this. needs further investigation on what this means in practice

| `LDES_STREAM` | The underlying library stores the LDES stream state in a file and we no longer store this info in the triplestore. | The uri which should be used as a subject to store the latest page and timestamp consumed in the database. |
| `LDES_TIMESTAMP_PATH` | Materialization and versioning support is provided by the underlying library, which expects to find this information attached to the LDES feed. | The predicate to be used to find the timestamp of an object. |
| `LDES_VERSION_OF_PATH` | Materialization and versioning support is provided by the underlying library, which expects to find this information attached to the LDES feed.| The predicate to be used to find the link to the non version object. |
| `LDES_ENDPOINT_HEADER_<key>` | Newer versions of Node.js do not support environment variables with dashes in their name. Stuff like `LDES_ENDPOINT_HEADER_X-API-KEY` is no longer supported. | A header key-value combination which should be send as part of the headers to the LDES endpoint. E.g. `LDES_ENDPOINT_HEADER_X-API-KEY: <api_key>`. |
143 changes: 82 additions & 61 deletions app.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,90 @@
import { CronJob } from "cron";
import { enhanced_fetch, intoConfig, LDESInfo, replicateLDES } from 'ldes-client';
import {
CRON_PATTERN,
LDES_DEREFERENCE_MEMBERS,
LDES_ENDPOINT_HEADER_PREFIX,
INGEST_MODE,
REPLACE_VERSIONS,
PERSIST_STATE,
LDES_ENDPOINT_VIEW,
LDES_POLLING_INTERVAL,
LDES_REQUESTS_PER_MINUTE,
LDES_STREAM,
LDES_TIMESTAMP_PATH,
LDES_VERSION_OF_PATH,
REPLACE_VERSIONS,
RUNONCE
} from "./config";
import { ConfigurableLDESOptions } from "./consumer";
import LdesPipeline from "./ldes-pipeline";
import { NamedNode } from "n3";
let taskIsRunning = false;
RUN_ONCE,
NODE_ENV,
logConfig,
} from "./cfg";
import { memberProcessor } from './lib/member-processor';
import { custom_fetch } from './lib/fetch/custom-fetch';
import { getLoggerFor } from './lib/logger';

const consumerJob = new CronJob(CRON_PATTERN, async () => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of the Cronjob wasn't entirely clear to me, so I left it out for now.

The ldes-client works by either ingesting the whole feed in one go, however long that may take, or by continuously polling the feed with a given interval.

Adding a cronjob seems like a second way to enable polling of the LDES feed? There may be a good reason to have the cronjob approach that I'm missing, so we can just add it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with leaving this out, the use case is not entirely clear to me either. I think this was to only start syncing at a given time. e.g. to make sure syncs happen outside of busy hours. but there's other ways around that.

if (taskIsRunning) {
console.log("Another task is still running");
return;
}
logConfig();

if (NODE_ENV === "production") {
main();
} else {
const timeout = 10_000; // Make this configurable?
console.log(`Starting LDES consumer in ${timeout}ms, connect to your debugger now :)`);
setTimeout(main, timeout);
}

async function main() {
let stateFilePath;
try {
taskIsRunning = true;
const endpoint = LDES_ENDPOINT_VIEW;
if (endpoint) {
const ldesOptions: ConfigurableLDESOptions = {
dereferenceMembers: LDES_DEREFERENCE_MEMBERS,
pollingInterval: LDES_POLLING_INTERVAL
};
if (LDES_REQUESTS_PER_MINUTE) {
ldesOptions.requestsPerMinute = LDES_REQUESTS_PER_MINUTE;
}
const datasetIri = new NamedNode(LDES_STREAM);
const consumer = new LdesPipeline({ datasetIri, endpoint, ldesOptions });
console.log("Started processing " + endpoint);
await consumer.consumeStream();
console.log("Finished processing " + endpoint);
if (RUNONCE) {
console.log("Job is complete.");
process.exit();
}
} else {
throw new Error("No endpoint provided");
}
} catch (e) {
console.error(e);
} finally {
taskIsRunning = false;
const url = new URL(LDES_ENDPOINT_VIEW);
stateFilePath = `/data/${url.host}-state.json`;
} catch (e: any) {
throw new Error("Provided endpoint couldn't be parsed as URL, double check your settings.");
}
});

console.log("config", {
CRON_PATTERN,
LDES_DEREFERENCE_MEMBERS,
LDES_ENDPOINT_HEADER_PREFIX,
LDES_ENDPOINT_VIEW,
LDES_POLLING_INTERVAL,
LDES_REQUESTS_PER_MINUTE,
LDES_STREAM,
LDES_TIMESTAMP_PATH,
LDES_VERSION_OF_PATH,
REPLACE_VERSIONS,
RUNONCE
});
const client = replicateLDES(
intoConfig({
url: LDES_ENDPOINT_VIEW,
urlIsView: true,
polling: !RUN_ONCE,
pollInterval: LDES_POLLING_INTERVAL,
stateFile: PERSIST_STATE ? stateFilePath : undefined,
materialize: INGEST_MODE === 'MATERIALIZE',
lastVersionOnly: REPLACE_VERSIONS, // Won't emit members if they're known to be older than what is already in the state file
loose: true, // Make this configurable? IPDC needs this to be true
fetch: enhanced_fetch({
/* In comment are the default values, perhaps we want to make these configurable
concurrent: 10, // Amount of concurrent requests to a single domain
retry: {
codes: [408, 425, 429, 500, 502, 503, 504], // Which faulty HTTP status codes will trigger retry
base: 500, // Seems to be unused in the client code
maxRetries: 5,
}*/
Comment on lines +43 to +52
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments already mention it. but some of these things could be made configurable via environment variables.

}, custom_fetch),

consumerJob.start();
}),
"none",
);

const getLDESInfo = async (): Promise<LDESInfo> => {
return new Promise(
(resolve, reject) => {
try {
client.on('description', (info: LDESInfo) => {
resolve(info);
});
} catch (e) {
reject(e);
}
}
)
};

const logger = getLoggerFor('main');
try {
logger.info('Starting stream...');

const ldesStream = client.stream({ highWaterMark: 10 });

logger.info('Waiting for LDES info...');
const { isVersionOfPath: versionOfPath, timestampPath } = await getLDESInfo();
logger.info(`Received LDES info: ${JSON.stringify({ versionOfPath, timestampPath })}`);

await ldesStream.pipeTo(memberProcessor(versionOfPath, timestampPath));

logger.info('Finished processing stream');
} catch (e) {
logger.error('Processing stream failed');
logger.error(e);
}
}
65 changes: 65 additions & 0 deletions cfg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import env from "env-var";
import { getLoggerFor } from "./lib/logger";

const logger = getLoggerFor("config");

export const LDES_ENDPOINT_VIEW = env.get("LDES_ENDPOINT_VIEW").required().asString();
export const LDES_POLLING_INTERVAL = env.get("LDES_POLLING_INTERVAL").default(60000).asIntPositive();
export const LDES_REQUESTS_PER_MINUTE = env.get("LDES_REQUESTS_PER_MINUTE").default(0).asIntPositive();
export const LDES_ENDPOINT_HEADERS_STRING = env.get("LDES_ENDPOINT_HEADERS").default("{}").asString();
export let LDES_ENDPOINT_HEADERS = {};

try {
LDES_ENDPOINT_HEADERS = JSON.parse(LDES_ENDPOINT_HEADERS_STRING);
} catch (e: any) {
logger.error(`Failed to parse contents of LDES_HEADERS. Faulty content: ${LDES_ENDPOINT_HEADERS_STRING}`);
logger.error(e);
throw e;
}


export const INGEST_MODE = env.get("INGEST_MODE").default("ALL").asEnum(["ALL", "MATERIALIZE"]);
export const REPLACE_VERSIONS = env.get("REPLACE_VERSIONS").default("true").asBool();
export const PERSIST_STATE = env.get("PERSIST_STATE").default("false").asBool()

export const SPARQL_ENDPOINT_HEADER_PREFIX = "SPARQL_ENDPOINT_HEADER_";

export const SPARQL_BATCH_SIZE = env.get("SPARQL_BATCH_SIZE").default(0).asIntPositive();
export const ENABLE_SPARQL_BATCHING = SPARQL_BATCH_SIZE > 0;

export const SPARQL_AUTH_USER = env.get("SPARQL_AUTH_USER").asString();
export const SPARQL_AUTH_PASSWORD = env.get("SPARQL_AUTH_PASSWORD").asString();

export const BLANK_NODE_NAMESPACE = env.get("BLANK_NODE_NAMESPACE").default("http://mu.semte.ch/blank#").asString();
export const CRON_PATTERN = env.get("CRON_PATTERN").default("* 0 * * * *").asString();

const RUN_ONCE_VAR = env.get("RUN_ONCE").default("false").asBool();
const RUNONCE_VAR = env.get("RUNONCE").default("false").asBool();
export const RUN_ONCE = RUN_ONCE_VAR || RUNONCE_VAR;

export const MU_APPLICATION_GRAPH = env.get("MU_APPLICATION_GRAPH").required().asString(); // Provided by template
export const MU_SPARQL_ENDPOINT = env.get("MU_SPARQL_ENDPOINT").required().asString(); // Provided by template
export const DEBUG_AUTH_HEADERS = env.get("DEBUG_AUTH_HEADERS").default("false").asBool();

export const NODE_ENV = env.get("NODE_ENV").default("production").asEnum(["development", "production"]);

export function logConfig() {
// Should this use the logger instead?
console.log("Config", {
INGEST_MODE,
REPLACE_VERSIONS,
PERSIST_STATE,
LDES_ENDPOINT_VIEW,
LDES_POLLING_INTERVAL,
LDES_REQUESTS_PER_MINUTE,
SPARQL_BATCH_SIZE,
SPARQL_AUTH_USER,
SPARQL_AUTH_PASSWORD,
BLANK_NODE_NAMESPACE,
CRON_PATTERN,
RUN_ONCE,
MU_APPLICATION_GRAPH,
MU_SPARQL_ENDPOINT,
DEBUG_AUTH_HEADERS,
});
}
24 changes: 0 additions & 24 deletions config.ts

This file was deleted.

Loading