Skip to content

Commit 6fa3c29

Browse files
authored
Add File source with SQS notifications (#5148)
* Refactor file source * More explicit reponse for read_batch * Fix clippy * Remove batch logic from DocFileReader * Address styling comments * Replace FileSourceParams path with URI * Additional URI related cleanup * Fix unit tests to now use the URI * Add queue source with SQS implementation * Fix the publish_token strategy * Fix never-used warnings * Fix unit tests * Abort visibility task after acknowledging * Address smaller review comments * Shorten visibility extension task * Fix pulsar tests * Adjustments after rebase * Move object backed source to file source * Simpler flow for adhoc file processing * Fix tests and refactor batch creation to BatchReader * Add max_messages param to Queue.receive * Move use_shard_api to the metastore crate * Dedup within batches * Improve visibility task * Re-acquire partitions aggressively * Address simpler review comments * Add test for visibility actor (failing) * Fix visibility actor drop * Fix reader edge case * Add end to end tests * Improve integration test scenario * Chunk receive future to avoid hanging actor * Improve error handling * Fix flaky test * New SourceConfig format with notifications field * Improvements to error handling * Clarification of Queue contract * Address new round of review comments * Remove SqsSource for now * Fix panic * Revert to forbidding any adhoc file source through the API * Add docs * Fix panic on empty file * Documentation improvments * Improve documentation * Improve error handling code and associated docs * Nitpic and TODO cleanup * Add tip about ingests directly from object stores * Ack notifications of undesired type * Add docs about situations where messages require a DLQ * Fix integ test after rebase
1 parent 0caaf07 commit 6fa3c29

File tree

62 files changed

+4968
-773
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+4968
-773
lines changed

Diff for: docker-compose.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ networks:
2727

2828
services:
2929
localstack:
30-
image: localstack/localstack:${LOCALSTACK_VERSION:-2.3.2}
30+
image: localstack/localstack:${LOCALSTACK_VERSION:-3.5.0}
3131
container_name: localstack
3232
ports:
3333
- "${MAP_HOST_LOCALSTACK:-127.0.0.1}:4566:4566"
@@ -37,7 +37,7 @@ services:
3737
- all
3838
- localstack
3939
environment:
40-
SERVICES: kinesis,s3
40+
SERVICES: kinesis,s3,sqs
4141
PERSISTENCE: 1
4242
volumes:
4343
- .localstack:/etc/localstack/init/ready.d

Diff for: docs/assets/sqs-file-source.tf

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
terraform {
2+
required_version = "1.7.5"
3+
required_providers {
4+
aws = {
5+
source = "hashicorp/aws"
6+
version = "~> 5.39.1"
7+
}
8+
}
9+
}
10+
11+
provider "aws" {
12+
region = "us-east-1"
13+
default_tags {
14+
tags = {
15+
provisioner = "terraform"
16+
author = "Quickwit"
17+
}
18+
}
19+
}
20+
21+
locals {
22+
sqs_notification_queue_name = "qw-tuto-s3-event-notifications"
23+
source_bucket_name = "qw-tuto-source-bucket"
24+
}
25+
26+
resource "aws_s3_bucket" "file_source" {
27+
bucket_prefix = local.source_bucket_name
28+
force_destroy = true
29+
}
30+
31+
data "aws_iam_policy_document" "sqs_notification" {
32+
statement {
33+
effect = "Allow"
34+
35+
principals {
36+
type = "*"
37+
identifiers = ["*"]
38+
}
39+
40+
actions = ["sqs:SendMessage"]
41+
resources = ["arn:aws:sqs:*:*:${local.sqs_notification_queue_name}"]
42+
43+
condition {
44+
test = "ArnEquals"
45+
variable = "aws:SourceArn"
46+
values = [aws_s3_bucket.file_source.arn]
47+
}
48+
}
49+
}
50+
51+
52+
resource "aws_sqs_queue" "s3_events" {
53+
name = local.sqs_notification_queue_name
54+
policy = data.aws_iam_policy_document.sqs_notification.json
55+
56+
redrive_policy = jsonencode({
57+
deadLetterTargetArn = aws_sqs_queue.s3_events_deadletter.arn
58+
maxReceiveCount = 5
59+
})
60+
}
61+
62+
resource "aws_sqs_queue" "s3_events_deadletter" {
63+
name = "${locals.sqs_notification_queue_name}-deadletter"
64+
}
65+
66+
resource "aws_sqs_queue_redrive_allow_policy" "s3_events_deadletter" {
67+
queue_url = aws_sqs_queue.s3_events_deadletter.id
68+
69+
redrive_allow_policy = jsonencode({
70+
redrivePermission = "byQueue",
71+
sourceQueueArns = [aws_sqs_queue.s3_events.arn]
72+
})
73+
}
74+
75+
resource "aws_s3_bucket_notification" "bucket_notification" {
76+
bucket = aws_s3_bucket.file_source.id
77+
78+
queue {
79+
queue_arn = aws_sqs_queue.s3_events.arn
80+
events = ["s3:ObjectCreated:*"]
81+
}
82+
}
83+
84+
data "aws_iam_policy_document" "quickwit_node" {
85+
statement {
86+
effect = "Allow"
87+
actions = [
88+
"sqs:ReceiveMessage",
89+
"sqs:DeleteMessage",
90+
"sqs:ChangeMessageVisibility",
91+
"sqs:GetQueueAttributes",
92+
]
93+
resources = [aws_sqs_queue.s3_events.arn]
94+
}
95+
statement {
96+
effect = "Allow"
97+
actions = ["s3:GetObject"]
98+
resources = ["${aws_s3_bucket.file_source.arn}/*"]
99+
}
100+
}
101+
102+
resource "aws_iam_user" "quickwit_node" {
103+
name = "quickwit-filesource-tutorial"
104+
path = "/system/"
105+
}
106+
107+
resource "aws_iam_user_policy" "quickwit_node" {
108+
name = "quickwit-filesource-tutorial"
109+
user = aws_iam_user.quickwit_node.name
110+
policy = data.aws_iam_policy_document.quickwit_node.json
111+
}
112+
113+
resource "aws_iam_access_key" "quickwit_node" {
114+
user = aws_iam_user.quickwit_node.name
115+
}
116+
117+
output "source_bucket_name" {
118+
value = aws_s3_bucket.file_source.bucket
119+
120+
}
121+
122+
output "notification_queue_url" {
123+
value = aws_sqs_queue.s3_events.id
124+
}
125+
126+
output "quickwit_node_access_key_id" {
127+
value = aws_iam_access_key.quickwit_node.id
128+
sensitive = true
129+
}
130+
131+
output "quickwit_node_secret_access_key" {
132+
value = aws_iam_access_key.quickwit_node.secret
133+
sensitive = true
134+
}

Diff for: docs/configuration/source-config.md

+51-4
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,62 @@ The source type designates the kind of source being configured. As of version 0.
2929

3030
The source parameters indicate how to connect to a data store and are specific to the source type.
3131

32-
### File source (CLI only)
32+
### File source
3333

34-
A file source reads data from a local file. The file must consist of JSON objects separated by a newline (NDJSON).
35-
As of version 0.5, a file source can only be ingested with the [CLI command](/docs/reference/cli.md#tool-local-ingest). Compressed files (bz2, gzip, ...) and remote files (Amazon S3, HTTP, ...) are not supported.
34+
A file source reads data from files containing JSON objects separated by newlines (NDJSON). Gzip compression is supported provided that the file name ends with the `.gz` suffix.
35+
36+
#### Ingest a single file (CLI only)
37+
38+
To ingest a specific file, run the indexing directly in an adhoc CLI process with:
39+
40+
```bash
41+
./quickwit tool local-ingest --index <index> --input-path <input-path>
42+
```
43+
44+
Both local and object files are supported, provided that the environment is configured with the appropriate permissions. A tutorial is available [here](/docs/ingest-data/ingest-local-file.md).
45+
46+
#### Notification based file ingestion (beta)
47+
48+
Quickwit can automatically ingest all new files that are uploaded to an S3 bucket. This requires creating and configuring an [SQS notification queue](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html). A complete example can be found [in this tutorial](/docs/ingest-data/sqs-files.md).
49+
50+
51+
The `notifications` parameter takes an array of notification settings. Currently one notifier can be configured per source and only the SQS notification `type` is supported.
52+
53+
Required fields for the SQS `notifications` parameter items:
54+
- `type`: `sqs`
55+
- `queue_url`: complete URL of the SQS queue (e.g `https://sqs.us-east-1.amazonaws.com/123456789012/queue-name`)
56+
- `message_type`: format of the message payload, either
57+
- `s3_notification`: an [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html)
58+
- `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)
59+
60+
*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*
3661

3762
```bash
38-
./quickwit tool local-ingest --input-path <INPUT_PATH>
63+
cat << EOF > source-config.yaml
64+
version: 0.8
65+
source_id: my-sqs-file-source
66+
source_type: file
67+
num_pipelines: 2
68+
params:
69+
notifications:
70+
- type: sqs
71+
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
72+
message_type: s3_notification
73+
EOF
74+
./quickwit source create --index my-index --source-config source-config.yaml
3975
```
4076

77+
:::note
78+
79+
- Quickwit does not automatically delete the source files after a successful ingestion. You can use [S3 object expiration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-expire-general-considerations.html) to configure how long they should be retained in the bucket.
80+
- Configure the notification to only forward events of type `s3:ObjectCreated:*`. Other events are acknowledged by the source without further processing and an warning is logged.
81+
- We strongly recommend using a [dead letter queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) to receive all messages that couldn't be processed by the file source. A `maxReceiveCount` of 5 is a good default value. Here are some common situations where the notification message ends up in the dead letter queue:
82+
- the notification message could not be parsed (e.g it is not a valid S3 notification)
83+
- the file was not found
84+
- the file is corrupted (e.g unexpected compression)
85+
86+
:::
87+
4188
### Ingest API source
4289

4390
An ingest API source reads data from the [Ingest API](/docs/reference/rest-api.md#ingest-data-into-an-index). This source is automatically created at the index creation and cannot be deleted nor disabled.

Diff for: docs/ingest-data/ingest-local-file.md

+6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ Clearing local cache directory...
7272
✔ Documents successfully indexed.
7373
```
7474

75+
:::tip
76+
77+
Object store URIs like `s3://mybucket/mykey.json` are also supported as `--input-path`, provided that your environment is configured with the appropriate permissions.
78+
79+
:::
80+
7581
## Tear down resources (optional)
7682

7783
That's it! You can now tear down the resources you created. You can do so by running the following command:

0 commit comments

Comments
 (0)