Skip to content

Commit 5d7a549

Browse files
authoredAug 20, 2024··
Fix source path in Lambda distrib (#5327)
* Fix source file path * Expose Lambda tests in Makefile
1 parent c59be63 commit 5d7a549

File tree

9 files changed

+150
-23
lines changed

9 files changed

+150
-23
lines changed
 

‎Makefile

+4-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ test-all: docker-compose-up
6262
test-failpoints:
6363
@$(MAKE) -C $(QUICKWIT_SRC) test-failpoints
6464

65+
test-lambda: DOCKER_SERVICES=localstack
66+
test-lambda: docker-compose-up
67+
@$(MAKE) -C $(QUICKWIT_SRC) test-lambda
68+
6569
# This will build and push all custom cross images for cross-compilation.
6670
# You will need to login into Docker Hub with the `quickwit` account.
6771
IMAGE_TAGS = x86_64-unknown-linux-gnu aarch64-unknown-linux-gnu x86_64-unknown-linux-musl aarch64-unknown-linux-musl
@@ -104,4 +108,3 @@ build-rustdoc:
104108
.PHONY: build-ui
105109
build-ui:
106110
$(MAKE) -C $(QUICKWIT_SRC) build-ui
107-

‎quickwit/Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ test-all:
3636
test-failpoints:
3737
cargo nextest run --test failpoints --features fail/failpoints
3838

39+
test-lambda:
40+
AWS_ACCESS_KEY_ID=ignored \
41+
AWS_SECRET_ACCESS_KEY=ignored \
42+
AWS_REGION=us-east-1 \
43+
QW_S3_ENDPOINT=http://localhost:4566 \
44+
QW_S3_FORCE_PATH_STYLE_ACCESS=1 \
45+
cargo nextest run --all-features -p quickwit-lambda --retries 1
46+
3947
# TODO: to be replaced by https://github.com/quickwit-oss/quickwit/issues/237
4048
TARGET ?= x86_64-unknown-linux-gnu
4149
.PHONY: build

‎quickwit/quickwit-lambda/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ path = "src/bin/indexer.rs"
1818
name = "searcher"
1919
path = "src/bin/searcher.rs"
2020

21+
[features]
22+
s3-localstack-tests = []
23+
2124
[dependencies]
2225
anyhow = { workspace = true }
2326
aws_lambda_events = "0.15.0"

‎quickwit/quickwit-lambda/src/indexer/environment.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ pub const CONFIGURATION_TEMPLATE: &str = r#"
2626
version: 0.8
2727
node_id: lambda-indexer
2828
cluster_id: lambda-ephemeral
29-
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index
30-
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
29+
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/${QW_LAMBDA_METASTORE_PREFIX:-index}
30+
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/${QW_LAMBDA_INDEX_PREFIX:-index}
3131
data_dir: /tmp
3232
"#;
3333

‎quickwit/quickwit-lambda/src/indexer/handler.rs

-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ async fn indexer_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
3636
let ingest_res = ingest(IngestArgs {
3737
input_path: payload.uri()?,
3838
input_format: quickwit_config::SourceInputFormat::Json,
39-
overwrite: false,
4039
vrl_script: None,
4140
// TODO: instead of clearing the cache, we use a cache and set its max
4241
// size with indexer_config.split_store_max_num_bytes

‎quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs

+2-15
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use quickwit_config::{
3535
load_index_config_from_user_config, ConfigFormat, IndexConfig, NodeConfig, SourceConfig,
3636
SourceInputFormat, SourceParams, TransformConfig,
3737
};
38-
use quickwit_index_management::IndexService;
3938
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
4039
use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, SpawnPipeline};
4140
use quickwit_indexing::IndexingPipeline;
@@ -154,15 +153,14 @@ pub(super) async fn configure_source(
154153
})
155154
}
156155

157-
/// Check if the index exists, creating or overwriting it if necessary
156+
/// Check if the index exists, creating it if necessary
158157
///
159158
/// If the index exists but without the Lambda source ([`LAMBDA_SOURCE_ID`]),
160159
/// the source is added.
161160
pub(super) async fn init_index_if_necessary(
162161
metastore: &mut MetastoreServiceClient,
163162
storage_resolver: &StorageResolver,
164163
default_index_root_uri: &Uri,
165-
overwrite: bool,
166164
source_config: &SourceConfig,
167165
) -> anyhow::Result<IndexMetadata> {
168166
let metadata_result = metastore
@@ -171,23 +169,12 @@ pub(super) async fn init_index_if_necessary(
171169
let metadata = match metadata_result {
172170
Ok(metadata_resp) => {
173171
let current_metadata = metadata_resp.deserialize_index_metadata()?;
174-
let mut metadata_changed = false;
175-
if overwrite {
176-
info!(index_uid = %current_metadata.index_uid, "overwrite enabled, clearing existing index");
177-
let mut index_service =
178-
IndexService::new(metastore.clone(), storage_resolver.clone());
179-
index_service.clear_index(&INDEX_ID).await?;
180-
metadata_changed = true;
181-
}
182172
if !current_metadata.sources.contains_key(LAMBDA_SOURCE_ID) {
183173
let add_source_request = AddSourceRequest::try_from_source_config(
184174
current_metadata.index_uid.clone(),
185175
source_config,
186176
)?;
187177
metastore.add_source(add_source_request).await?;
188-
metadata_changed = true;
189-
}
190-
if metadata_changed {
191178
metastore
192179
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
193180
.await?
@@ -305,7 +292,7 @@ pub(super) async fn spawn_pipelines(
305292

306293
/// Prune old Lambda file checkpoints if there are too many
307294
///
308-
/// Without pruning checkpoints accumulate indifinitely. This is particularly
295+
/// Without pruning checkpoints accumulate indefinitely. This is particularly
309296
/// problematic when indexing a lot of small files, as the metastore will grow
310297
/// large even for a small index.
311298
///

‎quickwit/quickwit-lambda/src/indexer/ingest/mod.rs

+128-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::utils::load_node_config;
4545
pub struct IngestArgs {
4646
pub input_path: Uri,
4747
pub input_format: SourceInputFormat,
48-
pub overwrite: bool,
4948
pub vrl_script: Option<String>,
5049
pub clear_cache: bool,
5150
}
@@ -65,7 +64,6 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
6564
&mut metastore,
6665
&storage_resolver,
6766
&config.default_index_root_uri,
68-
args.overwrite,
6967
&source_config,
7068
)
7169
.await?;
@@ -123,3 +121,131 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
123121
}
124122
Ok(statistics)
125123
}
124+
125+
#[cfg(all(test, feature = "s3-localstack-tests"))]
126+
mod tests {
127+
use std::path::PathBuf;
128+
use std::str::FromStr;
129+
130+
use quickwit_common::new_coolid;
131+
use quickwit_storage::StorageResolver;
132+
133+
use super::*;
134+
135+
async fn put_object(
136+
storage_resolver: StorageResolver,
137+
bucket: &str,
138+
prefix: &str,
139+
filename: &str,
140+
data: Vec<u8>,
141+
) -> Uri {
142+
let src_location = format!("s3://{}/{}", bucket, prefix);
143+
let storage_uri = Uri::from_str(&src_location).unwrap();
144+
let storage = storage_resolver.resolve(&storage_uri).await.unwrap();
145+
storage
146+
.put(&PathBuf::from(filename), Box::new(data))
147+
.await
148+
.unwrap();
149+
storage_uri.join(filename).unwrap()
150+
}
151+
152+
#[tokio::test]
153+
async fn test_ingest() -> anyhow::Result<()> {
154+
quickwit_common::setup_logging_for_tests();
155+
let bucket = "quickwit-integration-tests";
156+
let prefix = new_coolid("lambda-ingest-test");
157+
let storage_resolver = StorageResolver::unconfigured();
158+
159+
let index_config = br#"
160+
version: 0.8
161+
index_id: lambda-test
162+
doc_mapping:
163+
field_mappings:
164+
- name: timestamp
165+
type: datetime
166+
input_formats:
167+
- unix_timestamp
168+
fast: true
169+
timestamp_field: timestamp
170+
"#;
171+
let config_uri = put_object(
172+
storage_resolver.clone(),
173+
bucket,
174+
&prefix,
175+
"index-config.yaml",
176+
index_config.to_vec(),
177+
)
178+
.await;
179+
180+
// TODO use dependency injection instead of lazy static for env configs
181+
std::env::set_var("QW_LAMBDA_METASTORE_BUCKET", bucket);
182+
std::env::set_var("QW_LAMBDA_INDEX_BUCKET", bucket);
183+
std::env::set_var("QW_LAMBDA_METASTORE_PREFIX", &prefix);
184+
std::env::set_var("QW_LAMBDA_INDEX_PREFIX", &prefix);
185+
std::env::set_var("QW_LAMBDA_INDEX_CONFIG_URI", config_uri.as_str());
186+
std::env::set_var("QW_LAMBDA_INDEX_ID", "lambda-test");
187+
188+
// first ingestion creates the index metadata
189+
let test_data_1 = br#"{"timestamp": 1724140899, "field1": "value1"}"#;
190+
let test_data_1_uri = put_object(
191+
storage_resolver.clone(),
192+
bucket,
193+
&prefix,
194+
"data.json",
195+
test_data_1.to_vec(),
196+
)
197+
.await;
198+
199+
{
200+
let args = IngestArgs {
201+
input_path: test_data_1_uri.clone(),
202+
input_format: SourceInputFormat::Json,
203+
vrl_script: None,
204+
clear_cache: true,
205+
};
206+
let stats = ingest(args).await?;
207+
assert_eq!(stats.num_invalid_docs, 0);
208+
assert_eq!(stats.num_docs, 1);
209+
}
210+
211+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
212+
213+
{
214+
// ingesting the same data again is a no-op
215+
let args = IngestArgs {
216+
input_path: test_data_1_uri,
217+
input_format: SourceInputFormat::Json,
218+
vrl_script: None,
219+
clear_cache: true,
220+
};
221+
let stats = ingest(args).await?;
222+
assert_eq!(stats.num_invalid_docs, 0);
223+
assert_eq!(stats.num_docs, 0);
224+
}
225+
226+
{
227+
// second ingestion should not fail when metadata already exists
228+
let test_data = br#"{"timestamp": 1724149900, "field1": "value2"}"#;
229+
let test_data_uri = put_object(
230+
storage_resolver.clone(),
231+
bucket,
232+
&prefix,
233+
"data2.json",
234+
test_data.to_vec(),
235+
)
236+
.await;
237+
238+
let args = IngestArgs {
239+
input_path: test_data_uri,
240+
input_format: SourceInputFormat::Json,
241+
vrl_script: None,
242+
clear_cache: true,
243+
};
244+
let stats = ingest(args).await?;
245+
assert_eq!(stats.num_invalid_docs, 0);
246+
assert_eq!(stats.num_docs, 1);
247+
}
248+
249+
Ok(())
250+
}
251+
}

‎quickwit/quickwit-lambda/src/indexer/model.rs

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ impl IndexerEvent {
3838
IndexerEvent::S3(event) => [
3939
"s3://",
4040
event.records[0].s3.bucket.name.as_ref().unwrap(),
41+
"/",
4142
event.records[0].s3.object.key.as_ref().unwrap(),
4243
]
4344
.join(""),

‎quickwit/quickwit-lambda/src/searcher/environment.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
pub(crate) const CONFIGURATION_TEMPLATE: &str = r#"
2121
version: 0.8
2222
node_id: lambda-searcher
23-
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL_SECONDS:-60}s
24-
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
23+
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/${QW_LAMBDA_METASTORE_PREFIX:-index}#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL_SECONDS:-60}s
24+
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/${QW_LAMBDA_INDEX_PREFIX:-index}
2525
data_dir: /tmp
2626
searcher:
2727
partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M}

0 commit comments

Comments
 (0)