Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,26 @@ spatialbench-cli --scale-factor 1 --mb-per-file 256 --output-dir sf1-parquet
spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir sf10-parquet
```

#### Generate Data Directly to S3

You can generate data directly to Amazon S3 or S3-compatible storage by providing an S3 URI as the output directory:

```bash
# Set AWS credentials
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_REGION="us-west-2" # Must match your bucket's region

# Generate to S3
spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir s3://my-bucket/spatialbench/sf10

# For S3-compatible services (MinIO, etc.)
export AWS_ENDPOINT="http://localhost:9000"
spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data
```

The S3 writer uses streaming multipart upload, buffering data in 32MB chunks before uploading parts. This ensures memory-efficient generation even for large datasets. All output formats (Parquet, CSV, TBL) are supported, and the generated files are byte-for-byte identical to local generation.

#### Custom Spider Configuration

You can override these defaults at runtime by passing a YAML file via the `--config` flag:
Expand Down
3 changes: 2 additions & 1 deletion spatialbench-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ serde = { version = "1.0.219", features = ["derive"] }
anyhow = "1.0.99"
serde_yaml = "0.9.33"
datafusion = "50.2"
object_store = { version = "0.12.4", features = ["http"] }
object_store = { version = "0.12.4", features = ["http", "aws"] }
arrow-array = "56"
arrow-schema = "56"
url = "2.5.7"
bytes = "1.10.1"

[dev-dependencies]
assert_cmd = "2.0"
Expand Down
83 changes: 83 additions & 0 deletions spatialbench-cli/src/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ pub trait Sink: Send {
fn flush(self) -> Result<(), io::Error>;
}

/// Async version of Sink for writers that need async finalization (like S3Writer)
pub trait AsyncSink: Send {
/// Write all data from the buffer to the sink
fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>;

/// Complete and flush any remaining data from the sink (async)
fn async_flush(self) -> impl std::future::Future<Output = Result<(), io::Error>> + Send;
}

/// Generates data in parallel from a series of [`Source`] and writes to a [`Sink`]
///
/// Each [`Source`] is a data generator that generates data directly into an in
Expand Down Expand Up @@ -135,6 +144,80 @@ where
writer_task.await.expect("writer task panicked")
}

/// Generates data in parallel from a series of [`Source`] and writes to an [`AsyncSink`]
///
/// This is similar to generate_in_chunks but handles async finalization for S3Writer
pub async fn generate_in_chunks_async<G, I, S>(
mut sink: S,
sources: I,
num_threads: usize,
) -> Result<(), io::Error>
where
G: Source + 'static,
I: Iterator<Item = G>,
S: AsyncSink + 'static,
{
let recycler = BufferRecycler::new();
let mut sources = sources.peekable();

debug!("Using {num_threads} threads (async sink)");

let (tx, mut rx) = tokio::sync::mpsc::channel(num_threads);

// write the header
let Some(first) = sources.peek() else {
return Ok(()); // no sources
};
let header = first.header(Vec::new());
tx.send(header)
.await
.expect("tx just created, it should not be closed");

let sources_and_recyclers = sources.map(|generator| (generator, recycler.clone()));

let mut stream = futures::stream::iter(sources_and_recyclers)
.map(async |(source, recycler)| {
let buffer = recycler.new_buffer(1024 * 1024 * 8);
let mut join_set = JoinSet::new();
join_set.spawn(async move { source.create(buffer) });
join_set
.join_next()
.await
.expect("had one item")
.expect("join_next join is infallible unless task panics")
})
.buffered(num_threads)
.map(async |buffer| {
if let Err(e) = tx.send(buffer).await {
debug!("Error sending buffer to writer: {e}");
}
});

let captured_recycler = recycler.clone();
let writer_task = tokio::task::spawn(async move {
while let Some(buffer) = rx.recv().await {
sink.sink(&buffer)?;
captured_recycler.return_buffer(buffer);
}
// No more input, flush the sink asynchronously
sink.async_flush().await
});

// drive the stream to completion
while let Some(write_task) = stream.next().await {
if writer_task.is_finished() {
debug!("writer task is done early, stopping writer");
break;
}
write_task.await;
}
drop(stream);
drop(tx);

debug!("waiting for writer task to complete");
writer_task.await.expect("writer task panicked")
}

/// A simple buffer recycler to avoid allocating new buffers for each part
///
/// Clones share the same underlying recycler, so it is not thread safe
Expand Down
37 changes: 37 additions & 0 deletions spatialbench-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod output_plan;
mod parquet;
mod plan;
mod runner;
mod s3_writer;
mod spatial_config_file;
mod statistics;
mod tbl;
Expand Down Expand Up @@ -384,6 +385,13 @@ impl IntoSize for BufWriter<File> {
}
}

impl IntoSize for s3_writer::S3Writer {
fn into_size(self) -> Result<usize, io::Error> {
// Return the buffer size before finishing
Ok(self.buffer_size())
}
}

/// Wrapper around a buffer writer that counts the number of buffers and bytes written
struct WriterSink<W: Write> {
statistics: WriteStatistics,
Expand All @@ -410,3 +418,32 @@ impl<W: Write + Send> Sink for WriterSink<W> {
self.inner.flush()
}
}

/// Async wrapper for S3Writer to handle async finalization
pub struct AsyncWriterSink {
statistics: WriteStatistics,
inner: s3_writer::S3Writer,
}

impl AsyncWriterSink {
pub fn new(inner: s3_writer::S3Writer) -> Self {
Self {
inner,
statistics: WriteStatistics::new("buffers"),
}
}
}

impl generate::AsyncSink for AsyncWriterSink {
fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error> {
self.statistics.increment_chunks(1);
self.statistics.increment_bytes(buffer.len());
self.inner.write_all(buffer)
}

async fn async_flush(mut self) -> Result<(), io::Error> {
self.inner.flush()?;
self.inner.finish().await?;
Ok(())
}
}
38 changes: 29 additions & 9 deletions spatialbench-cli/src/output_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub enum OutputLocation {
File(PathBuf),
/// Output to stdout
Stdout,
/// Output to S3
S3(String),
}

impl Display for OutputLocation {
Expand All @@ -31,6 +33,10 @@ impl Display for OutputLocation {
write!(f, "{}", file.to_string_lossy())
}
OutputLocation::Stdout => write!(f, "Stdout"),
OutputLocation::S3(uri) => {
// Display the S3 URI
write!(f, "{}", uri)
}
}
}
}
Expand Down Expand Up @@ -265,17 +271,31 @@ impl OutputPlanGenerator {
OutputFormat::Parquet => "parquet",
};

let mut output_path = self.output_dir.clone();
if let Some(part) = part {
// If a partition is specified, create a subdirectory for it
output_path.push(table.to_string());
self.ensure_directory_exists(&output_path)?;
output_path.push(format!("{table}.{part}.{extension}"));
// Check if output_dir is an S3 URI
let output_dir_str = self.output_dir.to_string_lossy();
if output_dir_str.starts_with("s3://") {
// Handle S3 path
let base_uri = output_dir_str.trim_end_matches('/');
let s3_uri = if let Some(part) = part {
format!("{base_uri}/{table}/{table}.{part}.{extension}")
} else {
format!("{base_uri}/{table}.{extension}")
};
Ok(OutputLocation::S3(s3_uri))
} else {
// No partition specified, output to a single file
output_path.push(format!("{table}.{extension}"));
// Handle local filesystem path
let mut output_path = self.output_dir.clone();
if let Some(part) = part {
// If a partition is specified, create a subdirectory for it
output_path.push(table.to_string());
self.ensure_directory_exists(&output_path)?;
output_path.push(format!("{table}.{part}.{extension}"));
} else {
// No partition specified, output to a single file
output_path.push(format!("{table}.{extension}"));
}
Ok(OutputLocation::File(output_path))
}
Ok(OutputLocation::File(output_path))
}
}

Expand Down
92 changes: 92 additions & 0 deletions spatialbench-cli/src/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Parquet output format

use crate::s3_writer::S3Writer;
use crate::statistics::WriteStatistics;
use arrow::datatypes::SchemaRef;
use futures::StreamExt;
Expand Down Expand Up @@ -124,6 +125,97 @@ where
Ok(())
}

/// Converts a set of RecordBatchIterators into a Parquet file for S3Writer
///
/// This is a specialized version that handles S3Writer's async finalization
pub async fn generate_parquet_s3<I>(
writer: S3Writer,
iter_iter: I,
num_threads: usize,
parquet_compression: Compression,
) -> Result<(), io::Error>
where
I: Iterator<Item: RecordBatchIterator> + 'static,
{
debug!(
"Generating Parquet for S3 with {num_threads} threads, using {parquet_compression} compression"
);
let mut iter_iter = iter_iter.peekable();

// get schema from the first iterator
let Some(first_iter) = iter_iter.peek() else {
return Ok(());
};
let schema = Arc::clone(first_iter.schema());

// Compute the parquet schema
let writer_properties = WriterProperties::builder()
.set_compression(parquet_compression)
.build();
let writer_properties = Arc::new(writer_properties);
let parquet_schema = Arc::new(
ArrowSchemaConverter::new()
.with_coerce_types(writer_properties.coerce_types())
.convert(&schema)
.unwrap(),
);

// create a stream that computes the data for each row group
let mut row_group_stream = futures::stream::iter(iter_iter)
.map(async |iter| {
let parquet_schema = Arc::clone(&parquet_schema);
let writer_properties = Arc::clone(&writer_properties);
let schema = Arc::clone(&schema);
tokio::task::spawn(async move {
encode_row_group(parquet_schema, writer_properties, schema, iter)
})
.await
.expect("Inner task panicked")
})
.buffered(num_threads);

let root_schema = parquet_schema.root_schema_ptr();
let writer_properties_captured = Arc::clone(&writer_properties);
let (tx, mut rx): (
Sender<Vec<ArrowColumnChunk>>,
Receiver<Vec<ArrowColumnChunk>>,
) = tokio::sync::mpsc::channel(num_threads);

let writer_task = tokio::task::spawn_blocking(move || {
let mut statistics = WriteStatistics::new("row groups");
let mut writer =
SerializedFileWriter::new(writer, root_schema, writer_properties_captured).unwrap();

while let Some(chunks) = rx.blocking_recv() {
let mut row_group_writer = writer.next_row_group().unwrap();
for chunk in chunks {
chunk.append_to_row_group(&mut row_group_writer).unwrap();
}
row_group_writer.close().unwrap();
statistics.increment_chunks(1);
}
// Return the S3Writer for async upload
let s3_writer = writer.into_inner()?;
Ok((s3_writer, statistics)) as Result<(S3Writer, WriteStatistics), io::Error>
});

// Drive the input stream
while let Some(chunks) = row_group_stream.next().await {
if let Err(e) = tx.send(chunks).await {
debug!("Error sending chunks to writer: {e}");
break;
}
}
drop(tx);

// Wait for writer task and upload to S3
let (s3_writer, mut statistics) = writer_task.await??;
let size = s3_writer.finish().await?;
statistics.increment_bytes(size);

Ok(())
}

/// Creates the data for a particular row group
///
/// Note at the moment it does not use multiple tasks/threads but it could
Expand Down
10 changes: 10 additions & 0 deletions spatialbench-cli/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ pub struct GenerationPlan {

pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 128 * 1024 * 1024;

/// Buffer size for Parquet writing (32MB)
///
/// This buffer size is used for:
/// - Local file writing with BufWriter
/// - S3 multipart upload parts
///
/// The 32MB size provides good performance and is well above the AWS S3
/// minimum part size requirement of 5MB for multipart uploads.
pub const PARQUET_BUFFER_SIZE: usize = 32 * 1024 * 1024;

impl GenerationPlan {
/// Returns a GenerationPlan number of parts to generate
///
Expand Down
Loading