From 1a6570fc554e1e61d7807e7df7e2fa9c135ec10b Mon Sep 17 00:00:00 2001 From: Robert Underwood Date: Thu, 6 Feb 2025 16:58:54 -0500 Subject: [PATCH] libpressio version 1.0.5 + added support for MSZ + refactored timer code to be reuseable throughout libpressio + fixed set_name for cast when passed an empty name --- CMakeLists.txt | 73 +++--- src/plugins/compressors/cast.cc | 3 +- src/plugins/compressors/msz.cc | 379 ++++++++++++++++++++++++++++++++ src/plugins/metrics/time.cc | 61 +++-- src/timer.cc | 16 ++ src/timer.h | 23 ++ 6 files changed, 490 insertions(+), 65 deletions(-) create mode 100644 src/plugins/compressors/msz.cc create mode 100644 src/timer.cc create mode 100644 src/timer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index a464d6f1..5032afb6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.15 FATAL_ERROR) -project(libpressio VERSION "1.0.4" LANGUAGES CXX C) +project(libpressio VERSION "1.0.5" LANGUAGES CXX C) #correct was to set a default build type # https://blog.kitware.com/cmake-and-the-default-build-type/ @@ -98,37 +98,40 @@ add_library(libpressio ./src/plugins/domains/malloc.cc ./src/plugins/domains/nonowning.cc ./src/plugins/domains/user.cc + + #private utilities headers + ./src/timer.cc ./src/iless.cc + ./src/cleanup.h + ./src/timer.h + ./src/iless.h + ./src/external_parse.h + ./src/multi_dimensional_iterator.h #public headers - include/libpressio.h - include/libpressio_ext/cpp/compressor.h - include/libpressio_ext/cpp/configurable.h - include/libpressio_ext/cpp/data.h - include/libpressio_ext/cpp/errorable.h - include/libpressio_ext/cpp/io.h - include/libpressio_ext/cpp/libpressio.h - include/libpressio_ext/cpp/metrics.h - include/libpressio_ext/cpp/options.h - include/libpressio_ext/cpp/pressio.h - include/libpressio_ext/cpp/printers.h - include/libpressio_ext/cpp/subgroup_manager.h - include/libpressio_ext/cpp/versionable.h - include/libpressio_ext/io/posix.h - include/libpressio_ext/io/pressio_io.h - include/pressio.h - include/pressio_compressor.h - include/pressio_data.h - include/pressio_dtype.h - include/pressio_metrics.h - include/pressio_option.h - include/pressio_options.h - include/pressio_options_iter.h - - #private headers - src/external_parse.h - src/multi_dimensional_iterator.h - src/cleanup.h + ./include/libpressio.h + ./include/libpressio_ext/cpp/compressor.h + ./include/libpressio_ext/cpp/configurable.h + ./include/libpressio_ext/cpp/data.h + ./include/libpressio_ext/cpp/errorable.h + ./include/libpressio_ext/cpp/io.h + ./include/libpressio_ext/cpp/libpressio.h + ./include/libpressio_ext/cpp/metrics.h + ./include/libpressio_ext/cpp/options.h + ./include/libpressio_ext/cpp/pressio.h + ./include/libpressio_ext/cpp/printers.h + ./include/libpressio_ext/cpp/subgroup_manager.h + ./include/libpressio_ext/cpp/versionable.h + ./include/libpressio_ext/io/posix.h + ./include/libpressio_ext/io/pressio_io.h + ./include/pressio.h + ./include/pressio_compressor.h + ./include/pressio_data.h + ./include/pressio_dtype.h + ./include/pressio_metrics.h + ./include/pressio_option.h + ./include/pressio_options.h + ./include/pressio_options_iter.h ) if(NOT LIBPRESSIO_BUILD_MODE) @@ -762,6 +765,18 @@ if(LIBPRESSIO_HAS_GRIB) ) endif() + +option(LIBPRESSIO_HAS_MSz "build support for MSz for feature preservation" OFF) +if(LIBPRESSIO_HAS_MSz) + set(LIBPRESSIO_FEATURES "${LIBPRESSIO_FEATURES} MSz") + find_package(MSz REQUIRED) + target_link_libraries(libpressio PRIVATE MSz::MSz) + target_sources(libpressio + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/src/plugins/compressors/msz.cc + ) +endif() + configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/src/pressio_version.h.in ${CMAKE_CURRENT_BINARY_DIR}/include/pressio_version.h diff --git a/src/plugins/compressors/cast.cc b/src/plugins/compressors/cast.cc index bf2b7025..baa0a9bf 100644 --- a/src/plugins/compressors/cast.cc +++ b/src/plugins/compressors/cast.cc @@ -88,7 +88,8 @@ class cast_compressor_plugin : public libpressio_compressor_plugin { const char* prefix() const override { return "cast"; } void set_name_impl(std::string const& new_name) override { - compressor->set_name(new_name + "/casted"); + if(!new_name.empty()) compressor->set_name(new_name + "/casted"); + else compressor->set_name(""); } pressio_options get_metrics_results_impl() const override { return compressor->get_metrics_results(); diff --git a/src/plugins/compressors/msz.cc b/src/plugins/compressors/msz.cc new file mode 100644 index 00000000..b61e9059 --- /dev/null +++ b/src/plugins/compressors/msz.cc @@ -0,0 +1,379 @@ + +#include "std_compat/memory.h" +#include "libpressio_ext/cpp/compressor.h" +#include "libpressio_ext/cpp/data.h" +#include "libpressio_ext/cpp/options.h" +#include "libpressio_ext/cpp/pressio.h" +#include "libpressio_ext/cpp/domain_manager.h" +#include "cleanup.h" +#include "timer.h" +#include "api/MSz.h" +#include + +namespace libpressio { namespace msz_ns { + using namespace std::string_literals; + +class msz_compressor_plugin : public libpressio_compressor_plugin { +public: + static constexpr size_t HEADER_LEN = 2; + static constexpr size_t EDITS_SIZE = 1; + static constexpr size_t COMPRESSED_SIZE = 0; + static constexpr int FULL_CONNECTION_TYPE = 1; + static constexpr int PIECEWISE_CONNECTION_TYPE = 0; + struct pressio_options get_options_impl() const override + { + struct pressio_options options; + set_meta(options, "msz:compressor", compressor_id, compressor); + set(options, "msz:preservation_options",preservation_options); + set(options, "msz:connectivity_type",connectivity_type); + set(options, "msz:accelerator", accelerator); + set_type(options, "msz:preservation_options_str", pressio_option_charptr_array_type); + set_type(options, "msz:connectivity_type_str",pressio_option_charptr_type); + set_type(options, "msz:accelerator_str", pressio_option_charptr_type); + + set(options, "pressio:rel", rel_error_bound); + set(options, "msz:rel_error_bound", rel_error_bound); + + set(options, "msz:cuda_device_id",device_id); + set(options, "pressio:nthreads",num_threads); + set(options, "msz:omp_num_threads",num_threads); + + set(options, "msz:count_faults",count_faults); + return options; + } + + struct pressio_options get_configuration_impl() const override + { + struct pressio_options options; + set(options, "pressio:thread_safe", pressio_thread_safety_multiple); + set(options, "pressio:stability", "experimental"); + std::vector invalidations {}; + std::vector invalidation_children {&*compressor}; + set(options, "predictors:error_dependent", get_accumulate_configuration("predictors:error_dependent", invalidation_children, invalidations)); + set(options, "predictors:error_agnostic", get_accumulate_configuration("predictors:error_agnostic", invalidation_children, invalidations)); + set(options, "predictors:runtime", get_accumulate_configuration("predictors:runtime", invalidation_children, invalidations)); + set(options, "pressio:highlevel", get_accumulate_configuration("pressio:highlevel", invalidation_children, std::vector{})); + + set(options, "msz:preservation_options_str",std::vector{"min"s, "max"s, "path"s}); + set(options, "msz:connectivity_type_str",std::vector{"piecewise"s, "full"s}); + set(options, "msz:accelerator_str", std::vector{"cpu"s, "openmp"s, "cuda"s}); + return options; + } + + struct pressio_options get_documentation_impl() const override + { + struct pressio_options options; + set_meta(options, "msz:compressor", compressor_id, compressor); + set(options, "pressio:description", R"()"); + set(options, "msz:preservation_options","what feature to preserve"); + set(options, "msz:connectivity_type", "how does the points connect to adjacent points"); + set(options, "msz:accelerator", "what hardware to use"); + set(options, "msz:preservation_options_str", "what feature to preserve"); + set(options, "msz:connectivity_type_str", "how does the points connect to adjacent points"); + set(options, "msz:accelerator_str", "what hardware to use"); + set(options, "msz:rel_error_bound", "value range relative error bound"); + set(options, "msz:cuda_device_id","what cuda device to use"); + set(options, "pressio:nthreads","how many threads to use"); + set(options, "msz:omp_num_threads","how many threads to use"); + set(options, "msz:count_faults","count faults on compression; adds overhead"); + set(options, "msz:num_false_labels", "number of false segmentation labels"); + set(options, "msz:num_false_min", "number of false minima after decompression"); + set(options, "msz:num_false_max", "number of false maxima after decompression"); + set(options, "msz:time_count_faults", "time to count faults in ms"); + set(options, "msz:time_derive_edits", "time to derive edits in ms"); + set(options, "msz:time_compress_edits", "time to compress edits in ms"); + set(options, "msz:time_compressed_to_outputs", "time to copy data to outputs in ms"); + set(options, "msz:time_apply_edits", "time to apply edits to decompressed data in ms"); + set(options, "msz:time_decompress_edits", "time to decompress edits in ms"); + set(options, "msz:num_edits", "number of edits"); + set(options, "msz:edits_size", "the size of the compressed edits"); + return options; + } + + + int set_options_impl(struct pressio_options const& options) override + { + get_meta(options, "msz:compressor", compressor_plugins(), compressor_id, compressor); + + { + std::vector tmp; + if(get(options, "msz:preservation_options_str", &tmp) == pressio_options_key_set) { + uint32_t new_setting = 0; + if(std::find(tmp.begin(), tmp.end(), "min"s) != tmp.end()) new_setting |= MSZ_PRESERVE_MIN; + if(std::find(tmp.begin(), tmp.end(), "max"s) != tmp.end()) new_setting |= MSZ_PRESERVE_MAX; + if(std::find(tmp.begin(), tmp.end(), "path"s) != tmp.end()) new_setting |= MSZ_PRESERVE_PATH; + preservation_options = new_setting; + } + } + { + std::string tmp; + if(get(options, "msz:connectivity_type_str",&tmp) == pressio_options_key_set) { + uint32_t new_setting = 0; + if(tmp == "piecewise") new_setting = 0; + if(tmp == "full") new_setting = 1; + connectivity_type = new_setting; + } + if(get(options, "msz:accelerator_str", &tmp) == pressio_options_key_set) { + uint32_t new_setting = 0; + if(tmp == "cpu") new_setting = MSZ_ACCELERATOR_NONE; + if(tmp == "omp") new_setting = MSZ_ACCELERATOR_OMP; + if(tmp == "cuda") new_setting = MSZ_ACCELERATOR_CUDA; + accelerator = new_setting; + } + } + + get(options, "msz:preservation_options",&preservation_options); + get(options, "msz:connectivity_type",&connectivity_type); + get(options, "msz:accelerator", &accelerator); + get(options, "msz:count_faults", &count_faults); + + if(preservation_options & MSZ_PRESERVE_MIN & MSZ_PRESERVE_PATH & ~MSZ_PRESERVE_MAX ) { + return set_error(1, "both preserve min and path cannot be set together without max"); + } + else if(preservation_options & MSZ_PRESERVE_MAX & MSZ_PRESERVE_PATH & ~MSZ_PRESERVE_MIN) { + return set_error(1, "both preserve max and path cannot be set together without min"); + } + + if(connectivity_type == FULL_CONNECTION_TYPE && preservation_options & MSZ_PRESERVE_PATH) { + return set_error(1, "cannot use full connection type with preserve path"); + } + + if(get(options, "msz:rel_error_bound", &rel_error_bound) == pressio_options_key_set) {} + else get(options, "pressio:rel", &rel_error_bound); + + get(options, "msz:cuda_device_id", &device_id); + + if(get(options, "msz:omp_num_threads",&num_threads) == pressio_options_key_set) { + //intentional no-op + } + else if(get(options, "pressio:nthreads",&num_threads) == pressio_options_key_set) { + accelerator = MSZ_ACCELERATOR_OMP; + } + return 0; + } + + int compress_impl(const pressio_data* real_input, + struct pressio_data* real_output) override + { + pressio_data output = pressio_data::empty(pressio_byte_dtype, {}); + if(compressor->compress(real_input, &output) > 0) { + return set_error(compressor->error_code(), compressor->error_msg()); + } + + //MSz on compression needs the decompressed data to make its patches + pressio_data decompressed = pressio_data::owning(*real_input, domain_plugins().build("malloc")); + if(compressor->decompress(&output, &decompressed) > 0) { + return set_error(compressor->error_code(), compressor->error_msg()); + } + + auto input = domain_manager().make_readable(domain_plugins().build("malloc"), *real_input); + input = input.cast(pressio_double_dtype); + decompressed = domain_manager().make_readable(domain_plugins().build("malloc"), std::move(decompressed)); + decompressed = decompressed.cast(pressio_double_dtype); + auto msz_dims = input.normalized_dims(3, 1); + + if(count_faults) { + int32_t tmp_num_false_min, tmp_num_false_max, tmp_num_false_labels; + + start(timers.count_faults); + int status = MSz_count_faults( + static_cast(input.data()), + static_cast(decompressed.data()), + tmp_num_false_min, tmp_num_false_max, tmp_num_false_labels, + connectivity_type, + msz_dims[0], msz_dims[1], msz_dims[2], + accelerator, + device_id, + num_threads + ); + if(status != MSZ_ERR_NO_ERROR) { + return msz_error_msg(status); + } + num_false_min = tmp_num_false_min; + num_false_max = tmp_num_false_max; + num_false_labels = tmp_num_false_labels; + stop(timers.count_faults); + } else { + num_false_min = std::nullopt; + num_false_max = std::nullopt; + num_false_labels = std::nullopt; + } + + int num_edits = 0; + MSz_edit_t* edits = nullptr; + auto cleanup_edits = make_cleanup([&]{free(edits);}); + start(timers.derive_edits); + int status = MSz_derive_edits( + static_cast(input.data()), + static_cast(decompressed.data()), + nullptr, + num_edits, &edits, + preservation_options, + connectivity_type, + msz_dims[0], msz_dims[1], msz_dims[2], + rel_error_bound, + accelerator, + device_id, + num_threads + ); + stop(timers.derive_edits); + this->num_edits = num_edits; + if(status != MSZ_ERR_NO_ERROR) { + return msz_error_msg(status); + } + + char* compressed_buffer = nullptr; + auto cleanup_compressed_buffer = make_cleanup([&]{if(compressed_buffer != nullptr)free(compressed_buffer);}); + size_t compressed_buffer_size = 0; + if(num_edits != 0){ + start(timers.compress_edits); + int status = MSz_compress_edits_zstd( + num_edits, + edits, + &compressed_buffer, + compressed_buffer_size + ); + stop(timers.compress_edits); + this->edits_size = compressed_buffer_size; + if (status != MSZ_ERR_NO_ERROR) { + return msz_error_msg(status); + } + } + + start(timers.compressed_to_output); + *real_output = pressio_data::owning(pressio_byte_dtype, {2*sizeof(uint64_t) + output.size_in_bytes() + compressed_buffer_size}); + uint8_t* ptr = static_cast(real_output->data()); + uint64_t o_size = static_cast(output.size_in_bytes()); + memcpy(ptr, &o_size, sizeof(uint64_t)); ptr += sizeof(uint64_t); + memcpy(ptr, &compressed_buffer_size, sizeof(uint64_t)); ptr += sizeof(uint64_t); + memcpy(ptr, static_cast(output.data()), output.size_in_bytes()); ptr += output.size_in_bytes(); + if(num_edits != 0) { + memcpy(ptr, compressed_buffer, compressed_buffer_size); + } + stop(timers.compressed_to_output); + + + return 0; + } + + int decompress_impl(const pressio_data* real_input, + struct pressio_data* output) override + { + auto input = domain_manager().make_readable(domain_plugins().build("malloc"), *real_input); + auto header = static_cast(input.data()); + pressio_data compressed = pressio_data::nonowning(pressio_byte_dtype, header+HEADER_LEN, {header[COMPRESSED_SIZE]}); + char* compressed_edits = reinterpret_cast(((uint8_t*)header+HEADER_LEN)+header[COMPRESSED_SIZE]); + + compressor->decompress(&compressed, output); + *output = domain_manager().make_readable(domain_plugins().build("malloc"), std::move(*output)); + pressio_dtype output_dtype = output->dtype(); + *output = output->cast(pressio_double_dtype); + + if(header[EDITS_SIZE] != 0) { + int num_edits = 0; + MSz_edit_t* edits = nullptr; + auto cleanup_edits = make_cleanup([&]{free(edits);}); + start(timers.decompress_edits); + MSz_decompress_edits_zstd( + compressed_edits, + header[1], + num_edits, + &edits + ); + stop(timers.decompress_edits); + + + auto msz_dims = output->normalized_dims(3, 1); + start(timers.apply_edits); + MSz_apply_edits( + static_cast(output->data()), + num_edits, edits, + msz_dims[0], msz_dims[1], msz_dims[2], + accelerator + ); + stop(timers.apply_edits); + *output = output->cast(output_dtype); + } + + return 0; + } + + int major_version() const override { return 0; } + int minor_version() const override { return 0; } + int patch_version() const override { return 2; } + const char* version() const override { return "0.0.2"; } + const char* prefix() const override { return "msz"; } + + pressio_options get_metrics_results_impl() const override { + auto metrics = compressor->get_metrics_results(); + set(metrics, "msz:num_false_labels", num_false_labels); + set(metrics, "msz:num_false_min", num_false_min); + set(metrics, "msz:num_false_max", num_false_max); + set(metrics, "msz:time_count_faults", elapsed(timers.count_faults)); + set(metrics, "msz:time_derive_edits", elapsed(timers.derive_edits)); + set(metrics, "msz:time_compress_edits", elapsed(timers.compress_edits)); + set(metrics, "msz:time_compressed_to_outputs", elapsed(timers.compressed_to_output)); + set(metrics, "msz:time_apply_edits", elapsed(timers.apply_edits)); + set(metrics, "msz:time_decompress_edits", elapsed(timers.decompress_edits)); + set(metrics, "msz:num_edits", num_edits); + set(metrics, "msz:edits_size", edits_size); + return metrics; + } + + std::shared_ptr clone() override + { + return compat::make_unique(*this); + } + + void set_name_impl(std::string const& new_name) override { + if(!new_name.empty()) compressor->set_name(new_name + "/casted"); + else compressor->set_name(""); + } + + uint32_t preservation_options = 0; + uint32_t connectivity_type = PIECEWISE_CONNECTION_TYPE; + double rel_error_bound = 1e-3; + int32_t accelerator = MSZ_ACCELERATOR_NONE; + int32_t device_id = 0; + uint32_t num_threads = 0; + bool count_faults = false; + + + std::string compressor_id = "noop"; + pressio_compressor compressor = compressor_plugins().build("noop"); + + //metrics + std::optional num_false_min, num_false_max, num_false_labels, edits_size, num_edits; + struct { + utils::timer count_faults; + utils::timer derive_edits; + utils::timer compress_edits; + utils::timer compressed_to_output; + + utils::timer apply_edits; + utils::timer decompress_edits; + } timers; + + int msz_error_msg(int status) { + std::string msg; + switch(status) { + case MSZ_ERR_INVALID_INPUT: msg = "invalid input"; break; + case MSZ_ERR_INVALID_CONNECTIVITY_TYPE: msg = "invalid connectivity"; break; + case MSZ_ERR_NO_AVAILABLE_GPU: msg = "no GPU"; break; + case MSZ_ERR_OUT_OF_MEMORY: msg = "out of memory"; break; + case MSZ_ERR_UNKNOWN_ERROR: msg = "unknown error"; break; + case MSZ_ERR_EDITS_COMPRESSION_FAILED: msg = "edits compression failed"; break; + case MSZ_ERR_EDITS_DECOMPRESSION_FAILED:msg = "edits decompression failed"; break; + case MSZ_ERR_NOT_IMPLEMENTED: msg = "not implemented"; break; + case MSZ_ERR_INVALID_THREAD_COUNT:msg = "invalid thread count"; break; + } + return set_error(2, msg); + } +}; + +static pressio_register compressor_many_fields_plugin(compressor_plugins(), "msz", []() { + return compat::make_unique(); +}); + +} } + diff --git a/src/plugins/metrics/time.cc b/src/plugins/metrics/time.cc index 61988efb..a6da6cb5 100644 --- a/src/plugins/metrics/time.cc +++ b/src/plugins/metrics/time.cc @@ -1,51 +1,42 @@ -#include #include "pressio_options.h" #include "pressio_compressor.h" #include "libpressio_ext/cpp/metrics.h" #include "libpressio_ext/cpp/options.h" #include "libpressio_ext/cpp/pressio.h" #include "std_compat/memory.h" +#include "timer.h" namespace libpressio { -using std::chrono::high_resolution_clock; -using std::chrono::time_point; -using std::chrono::duration_cast; -using std::chrono::milliseconds; namespace time_metrics{ - struct time_range{ - time_point begin; - time_point end; - unsigned int elapsed() const { return duration_cast(end-begin).count(); } - }; - using timer = compat::optional; + using namespace libpressio::utils; struct compression_actions { - time_metrics::timer check_options; - time_metrics::timer set_options; - time_metrics::timer get_options; - time_metrics::timer get_configuration; - time_metrics::timer compress; - time_metrics::timer compress_many; - time_metrics::timer decompress; - time_metrics::timer decompress_many; + timer check_options; + timer set_options; + timer get_options; + timer get_configuration; + timer compress; + timer compress_many; + timer decompress; + timer decompress_many; }; struct metrics_actions { - time_metrics::timer begin_check_options; - time_metrics::timer begin_set_options; - time_metrics::timer begin_get_options; - time_metrics::timer begin_get_configuration; - time_metrics::timer begin_compress; - time_metrics::timer begin_compress_many; - time_metrics::timer begin_decompress; - time_metrics::timer begin_decompress_many; - time_metrics::timer end_check_options; - time_metrics::timer end_set_options; - time_metrics::timer end_get_options; - time_metrics::timer end_get_configuration; - time_metrics::timer end_compress; - time_metrics::timer end_compress_many; - time_metrics::timer end_decompress; - time_metrics::timer end_decompress_many; + timer begin_check_options; + timer begin_set_options; + timer begin_get_options; + timer begin_get_configuration; + timer begin_compress; + timer begin_compress_many; + timer begin_decompress; + timer begin_decompress_many; + timer end_check_options; + timer end_set_options; + timer end_get_options; + timer end_get_configuration; + timer end_compress; + timer end_compress_many; + timer end_decompress; + timer end_decompress_many; }; class time_plugin : public libpressio_metrics_plugin { diff --git a/src/timer.cc b/src/timer.cc new file mode 100644 index 00000000..1271c606 --- /dev/null +++ b/src/timer.cc @@ -0,0 +1,16 @@ +#include "timer.h" + +namespace libpressio { +namespace utils { +void start(timer& t) { + t = time_range(); + t->begin = high_resolution_clock::now(); +} +void stop(timer& t) { + t->end = high_resolution_clock::now(); +} +compat::optional elapsed(timer const& t) { + if(t) return t->elapsed(); + else return compat::nullopt; +} +}} diff --git a/src/timer.h b/src/timer.h new file mode 100644 index 00000000..9b20d195 --- /dev/null +++ b/src/timer.h @@ -0,0 +1,23 @@ +#ifndef LIBPRESSIO_TIMER_H +#define LIBPRESSIO_TIMER_H +#include +#include "std_compat/optional.h" + +namespace libpressio { +namespace utils { +using std::chrono::high_resolution_clock; +using std::chrono::time_point; +using std::chrono::duration_cast; +using std::chrono::milliseconds; +struct time_range{ + time_point begin; + time_point end; + unsigned int elapsed() const { return duration_cast(end-begin).count(); } +}; +using timer = compat::optional; +void start(timer& t); +void stop(timer& t); +compat::optional elapsed(timer const& t); +}} + +#endif /*LIBPRESSIO_TIMER_H*/