From 7ea1fe53d1a126c13b8d0b4e05bea184abe00a39 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 28 Jan 2025 06:03:38 +0000 Subject: [PATCH 1/7] rebase --- cpp/src/io/json/read_json.cu | 67 +++++++++++++++++++++------------ cpp/tests/io/json/json_test.cpp | 19 +++++++++- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 419e7bb120f..36b63af834e 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -165,23 +165,31 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size) } /** - * @brief Return the upper bound on the batch size for the JSON reader. + * @brief Return the batch size for the JSON reader. * - * The datasources passed to the JSON reader are split into batches demarcated by byte range - * offsets and read iteratively. The batch size is capped at INT_MAX bytes, which is the - * default value returned by the function. This value can be overridden at runtime using the - * environment variable LIBCUDF_JSON_BATCH_SIZE + * The datasources passed to the JSON reader are read iteratively in batches demarcated by byte + * range offsets. The tokenizer requires the JSON buffer read in each batch to be of size at most + * INT_MAX bytes. Since the byte range corresponding to a given batch can cause the last JSON line + * in the batch to be incomplete, the batch size returned by this function allows for an additional + * `max_subchunks_prealloced` subchunks to be allocated beyond the byte range offsets. Since the + * size of the subchunk depends on the size of the byte range, the batch size is variable and cannot + * be directly controlled by the user. As a workaround, the environment variable + * LIBCUDF_JSON_BATCH_SIZE can be used to set a fixed batch size at runtime. * * @return size in bytes */ -std::size_t get_batch_size_upper_bound() +std::size_t get_batch_size(std::size_t chunk_size) { - auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE"); - int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L; - auto const batch_limit = static_cast(std::numeric_limits::max()); - auto const batch_size_upper_bound = static_cast( + std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE"); + int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L; + auto const batch_limit = static_cast(std::numeric_limits::max()); + auto const batch_size_upper_bound = static_cast( (batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit); - return batch_size_upper_bound; + if (batch_size_str != nullptr) return batch_size_upper_bound; + return batch_size_upper_bound <= (max_subchunks_prealloced * size_per_subchunk) + ? batch_size_upper_bound + : batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); } /** @@ -295,6 +303,10 @@ datasource::owning_buffer get_record_range_raw_input( } } + auto const batch_limit = static_cast(std::numeric_limits::max()); + CUDF_EXPECTS(static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < + batch_limit, + "The size of the JSON buffer returned by every batch cannot exceed INT_MAX bytes"); return datasource::owning_buffer( std::move(buffer), reinterpret_cast(buffer.data()) + first_delim_pos + shift_for_nonzero_offset, @@ -365,17 +377,11 @@ table_with_metadata read_json_impl(host_span> source reader_opts.is_enabled_lines() || total_source_size < std::numeric_limits::max(), "Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported"); - std::size_t chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t chunk_size = reader_opts.get_byte_range_size(); - chunk_size = !chunk_size ? total_source_size - chunk_offset - : std::min(chunk_size, total_source_size - chunk_offset); - - std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - std::size_t const batch_size_upper_bound = get_batch_size_upper_bound(); - std::size_t const batch_size = - batch_size_upper_bound < (max_subchunks_prealloced * size_per_subchunk) - ? batch_size_upper_bound - : batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); + std::size_t chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); + chunk_size = !chunk_size ? total_source_size - chunk_offset + : std::min(chunk_size, total_source_size - chunk_offset); + std::size_t const batch_size = get_batch_size(chunk_size); /* * Identify the position (zero-indexed) of starting source file from which to begin @@ -490,9 +496,20 @@ table_with_metadata read_json_impl(host_span> source // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - for (std::size_t i = 1; i < batch_offsets.size() - 1; i++) { - batched_reader_opts.set_byte_range_offset(batch_offsets[i]); - batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); + std::size_t batch_offset_pos = 0; + for (; batch_offset_pos < batch_offsets.size() - 2; batch_offset_pos++) { + batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); + batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - + batch_offsets[batch_offset_pos]); + partial_tables.emplace_back( + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + } + batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); + batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - + batch_offsets[batch_offset_pos]); + auto partial_table = + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); + if (partial_table.tbl->num_columns() != 0 && partial_table.tbl->num_rows() != 0) { partial_tables.emplace_back( read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); } diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 23ca5734ded..e9d03f79b37 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -3461,4 +3461,21 @@ TEST_F(JsonReaderTest, MismatchedBeginEndTokens) EXPECT_THROW(cudf::io::read_json(opts), cudf::logic_error); } +TEST_F(JsonReaderTest, EmptyLastBatch) +{ + std::string data = R"( + {"a": "b"} + {"a": "b"} + {"a": "b"} + {"a": "b"} + )"; + setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(data.size() - 5).c_str(), 1); + auto opts = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .lines(true) + .build(); + auto res = cudf::io::read_json(opts); + unsetenv("LIBCUDF_JSON_BATCH_SIZE"); +} + CUDF_TEST_PROGRAM_MAIN() From 3927b1f09492f79b7328d079f2f77896847cfbb6 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 28 Jan 2025 19:25:54 +0000 Subject: [PATCH 2/7] cleanup after review --- cpp/src/io/json/read_json.cu | 7 ++----- cpp/tests/io/json/json_test.cpp | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 36b63af834e..4a63aaf92aa 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -187,9 +187,7 @@ std::size_t get_batch_size(std::size_t chunk_size) auto const batch_size_upper_bound = static_cast( (batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit); if (batch_size_str != nullptr) return batch_size_upper_bound; - return batch_size_upper_bound <= (max_subchunks_prealloced * size_per_subchunk) - ? batch_size_upper_bound - : batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); + return batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); } /** @@ -510,8 +508,7 @@ table_with_metadata read_json_impl(host_span> source auto partial_table = read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); if (partial_table.tbl->num_columns() != 0 && partial_table.tbl->num_rows() != 0) { - partial_tables.emplace_back( - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); + partial_tables.emplace_back(std::move(partial_table)); } auto expects_schema_equality = diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index e9d03f79b37..1471bd25a83 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -3461,7 +3461,20 @@ TEST_F(JsonReaderTest, MismatchedBeginEndTokens) EXPECT_THROW(cudf::io::read_json(opts), cudf::logic_error); } -TEST_F(JsonReaderTest, EmptyLastBatch) +/** + * @brief Base test fixture for JSON batched reader tests + */ +struct JsonBatchedReaderTest : public cudf::test::BaseFixture { + public: + void set_batch_size(size_t batch_size_upper_bound) + { + setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(batch_size_upper_bound).c_str(), 1); + } + + ~JsonBatchedReaderTest() { unsetenv("LIBCUDF_JSON_BATCH_SIZE"); } +}; + +TEST_F(JsonBatchedReaderTest, EmptyLastBatch) { std::string data = R"( {"a": "b"} @@ -3469,13 +3482,12 @@ TEST_F(JsonReaderTest, EmptyLastBatch) {"a": "b"} {"a": "b"} )"; - setenv("LIBCUDF_JSON_BATCH_SIZE", std::to_string(data.size() - 5).c_str(), 1); + this->set_batch_size(data.size() - 5); auto opts = cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) .lines(true) .build(); auto res = cudf::io::read_json(opts); - unsetenv("LIBCUDF_JSON_BATCH_SIZE"); } CUDF_TEST_PROGRAM_MAIN() From 373b06d112e5c6907b2b193a66a2a3ec285198d8 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 28 Jan 2025 21:47:06 +0000 Subject: [PATCH 3/7] simplifying the batch size logic --- cpp/src/io/json/read_json.cu | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 4a63aaf92aa..592be8ada58 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -181,13 +181,11 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size) std::size_t get_batch_size(std::size_t chunk_size) { std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - auto const batch_size_str = std::getenv("LIBCUDF_JSON_BATCH_SIZE"); - int64_t const batch_size = batch_size_str != nullptr ? std::atol(batch_size_str) : 0L; - auto const batch_limit = static_cast(std::numeric_limits::max()); - auto const batch_size_upper_bound = static_cast( - (batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit); - if (batch_size_str != nullptr) return batch_size_upper_bound; - return batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk); + auto const batch_limit = static_cast(std::numeric_limits::max()); + auto const extra_space = max_subchunks_prealloced * size_per_subchunk; + return std::min( + batch_limit - extra_space, + getenv_or("LIBCUDF_JSON_BATCH_SIZE", batch_limit - extra_space)); } /** From af8d052831ffb29da16caee6494e615da10e65b2 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 28 Jan 2025 22:03:08 +0000 Subject: [PATCH 4/7] removing unnecessary variables --- cpp/src/io/json/read_json.cu | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 592be8ada58..ef1f4072d5f 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -181,11 +181,10 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size) std::size_t get_batch_size(std::size_t chunk_size) { std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - auto const batch_limit = static_cast(std::numeric_limits::max()); - auto const extra_space = max_subchunks_prealloced * size_per_subchunk; - return std::min( - batch_limit - extra_space, - getenv_or("LIBCUDF_JSON_BATCH_SIZE", batch_limit - extra_space)); + auto const batch_limit = static_cast(std::numeric_limits::max()) - + (max_subchunks_prealloced * size_per_subchunk); + return std::min(batch_limit, + getenv_or("LIBCUDF_JSON_BATCH_SIZE", batch_limit)); } /** From b7a6533db1f6dacf93bd7bb5201f34a0bb4f8098 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 31 Jan 2025 02:57:13 +0000 Subject: [PATCH 5/7] fixing bad rebase --- cpp/src/io/json/read_json.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index ef1f4072d5f..497cc0c0d28 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -491,7 +491,7 @@ table_with_metadata read_json_impl(host_span> source // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - std::size_t batch_offset_pos = 0; + std::size_t batch_offset_pos = 1; for (; batch_offset_pos < batch_offsets.size() - 2; batch_offset_pos++) { batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - From 7b2c64c6a863cec75dff69ca9740413deed70082 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 3 Feb 2025 19:26:39 +0000 Subject: [PATCH 6/7] pre reviews --- cpp/src/io/json/read_json.cu | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 497cc0c0d28..5617760b703 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -169,7 +169,8 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size) * * The datasources passed to the JSON reader are read iteratively in batches demarcated by byte * range offsets. The tokenizer requires the JSON buffer read in each batch to be of size at most - * INT_MAX bytes. Since the byte range corresponding to a given batch can cause the last JSON line + * INT_MAX bytes. + * Since the byte range corresponding to a given batch can cause the last JSON line * in the batch to be incomplete, the batch size returned by this function allows for an additional * `max_subchunks_prealloced` subchunks to be allocated beyond the byte range offsets. Since the * size of the subchunk depends on the size of the byte range, the batch size is variable and cannot @@ -298,7 +299,7 @@ datasource::owning_buffer get_record_range_raw_input( } } - auto const batch_limit = static_cast(std::numeric_limits::max()); + auto const batch_limit = static_cast(std::numeric_limits::max()); CUDF_EXPECTS(static_cast(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) < batch_limit, "The size of the JSON buffer returned by every batch cannot exceed INT_MAX bytes"); From 3e83d6010220397ff05811962371dfb63d0fabc0 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Tue, 4 Feb 2025 00:36:27 +0000 Subject: [PATCH 7/7] pre reviews --- cpp/src/io/json/read_json.cu | 27 ++++++++++++--------------- cpp/tests/io/json/json_test.cpp | 19 ++++++++++++++++--- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 5617760b703..4b0af7d6e81 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -181,11 +181,10 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size) */ std::size_t get_batch_size(std::size_t chunk_size) { - std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - auto const batch_limit = static_cast(std::numeric_limits::max()) - + auto const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + auto const batch_limit = static_cast(std::numeric_limits::max()) - (max_subchunks_prealloced * size_per_subchunk); - return std::min(batch_limit, - getenv_or("LIBCUDF_JSON_BATCH_SIZE", batch_limit)); + return std::min(batch_limit, getenv_or("LIBCUDF_JSON_BATCH_SIZE", batch_limit)); } /** @@ -492,20 +491,18 @@ table_with_metadata read_json_impl(host_span> source // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - std::size_t batch_offset_pos = 1; - for (; batch_offset_pos < batch_offsets.size() - 2; batch_offset_pos++) { + for (std::size_t batch_offset_pos = 1; batch_offset_pos < batch_offsets.size() - 1; + batch_offset_pos++) { batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - batch_offsets[batch_offset_pos]); - partial_tables.emplace_back( - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref())); - } - batched_reader_opts.set_byte_range_offset(batch_offsets[batch_offset_pos]); - batched_reader_opts.set_byte_range_size(batch_offsets[batch_offset_pos + 1] - - batch_offsets[batch_offset_pos]); - auto partial_table = - read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); - if (partial_table.tbl->num_columns() != 0 && partial_table.tbl->num_rows() != 0) { + auto partial_table = + read_batch(sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref()); + if (partial_table.tbl->num_columns() == 0 && partial_table.tbl->num_rows() == 0) { + CUDF_EXPECTS(batch_offset_pos == batch_offsets.size() - 2, + "Only the partial table generated by the last batch can be empty"); + break; + } partial_tables.emplace_back(std::move(partial_table)); } diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 1471bd25a83..00f46975fdc 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -3476,18 +3476,31 @@ struct JsonBatchedReaderTest : public cudf::test::BaseFixture { TEST_F(JsonBatchedReaderTest, EmptyLastBatch) { - std::string data = R"( + std::string data = R"( {"a": "b"} {"a": "b"} {"a": "b"} {"a": "b"} )"; - this->set_batch_size(data.size() - 5); + size_t size_of_last_batch = 5; + // This test constructs two batches by setting the batch size such that the last batch is an + // incomplete line. The JSON string corresponding to the first batch is + // '\n{"a": "b"}\n{"a": "b"}\n{"a": "b"}\n{"a": ' + // The JSON string corresponding to the second batch is + // '"b"}\n' + this->set_batch_size(data.size() - size_of_last_batch); auto opts = cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) .lines(true) .build(); - auto res = cudf::io::read_json(opts); + auto result = cudf::io::read_json(opts); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 4); + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRING); + EXPECT_EQ(result.metadata.schema_info[0].name, "a"); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + cudf::test::strings_column_wrapper{{"b", "b", "b", "b"}}); } CUDF_TEST_PROGRAM_MAIN()