From 195165fda2d7a5834fb60f5b7047bf0238c81b5e Mon Sep 17 00:00:00 2001 From: Simon Beyzerov Date: Thu, 31 Jul 2025 14:40:20 -0400 Subject: [PATCH 1/8] add preliminary unset field semantics (disc #536) annotate Struct fields as optional and Structs as strict add validation logic add preliminary testing Signed-off-by: Simon Beyzerov --- cpp/csp/adapters/kafka/KafkaInputAdapter.cpp | 2 + .../parquet/ParquetReaderColumnAdapter.cpp | 1 + .../utils/JSONMessageStructConverter.cpp | 3 + .../adapters/websocket/ClientInputAdapter.cpp | 1 + cpp/csp/cppnodes/baselibimpl.cpp | 3 +- cpp/csp/engine/Struct.cpp | 48 ++- cpp/csp/engine/Struct.h | 45 ++- cpp/csp/python/PyStruct.cpp | 99 ++++- cpp/csp/python/PyStruct.h | 2 +- csp/impl/struct.py | 18 +- csp/impl/types/typing_utils.py | 14 + csp/impl/wiring/edge.py | 4 + csp/tests/test_strict_structs.py | 353 ++++++++++++++++++ 13 files changed, 549 insertions(+), 44 deletions(-) create mode 100644 csp/tests/test_strict_structs.py diff --git a/cpp/csp/adapters/kafka/KafkaInputAdapter.cpp b/cpp/csp/adapters/kafka/KafkaInputAdapter.cpp index 296df8daa..6cbeb8335 100644 --- a/cpp/csp/adapters/kafka/KafkaInputAdapter.cpp +++ b/cpp/csp/adapters/kafka/KafkaInputAdapter.cpp @@ -113,6 +113,8 @@ void KafkaInputAdapter::processMessage( RdKafka::Message* message, bool live, cs if( m_tickTimestampField ) msgTime = m_tickTimestampField->value(tick.get()); + tick.get() -> validate(); + bool pushLive = shouldPushLive(live, msgTime); if( shouldProcessMessage( pushLive, msgTime ) ) pushTick(pushLive, msgTime, std::move(tick), batch); diff --git a/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp b/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp index b8380ce53..73837d0cb 100644 --- a/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp +++ b/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp @@ -520,6 +520,7 @@ void ParquetStructAdapter::dispatchValue( const utils::Symbol *symbol, bool isNu { fieldSetter( s ); } + s -> validate(); dispatchedValue = &s; } diff --git a/cpp/csp/adapters/utils/JSONMessageStructConverter.cpp b/cpp/csp/adapters/utils/JSONMessageStructConverter.cpp index 574f21084..5f81b2aac 100644 --- a/cpp/csp/adapters/utils/JSONMessageStructConverter.cpp +++ b/cpp/csp/adapters/utils/JSONMessageStructConverter.cpp @@ -145,6 +145,8 @@ StructPtr JSONMessageStructConverter::convertJSON( const char * fieldname, const } ); } + struct_ -> validate(); + return struct_; } @@ -251,6 +253,7 @@ csp::StructPtr JSONMessageStructConverter::asStruct( void * bytes, size_t size ) } ); } + // root struct validation (validate()) deferred to adapter level return data; } diff --git a/cpp/csp/adapters/websocket/ClientInputAdapter.cpp b/cpp/csp/adapters/websocket/ClientInputAdapter.cpp index e4b0b7ff7..d413b9dd1 100644 --- a/cpp/csp/adapters/websocket/ClientInputAdapter.cpp +++ b/cpp/csp/adapters/websocket/ClientInputAdapter.cpp @@ -31,6 +31,7 @@ void ClientInputAdapter::processMessage( void* c, size_t t, PushBatch* batch ) if( dataType() -> type() == CspType::Type::STRUCT ) { auto tick = m_converter -> asStruct( c, t ); + tick.get() -> validate(); pushTick( std::move(tick), batch ); } else if ( dataType() -> type() == CspType::Type::STRING ) { diff --git a/cpp/csp/cppnodes/baselibimpl.cpp b/cpp/csp/cppnodes/baselibimpl.cpp index 52a5537d9..9f4e1636f 100644 --- a/cpp/csp/cppnodes/baselibimpl.cpp +++ b/cpp/csp/cppnodes/baselibimpl.cpp @@ -705,6 +705,7 @@ DECLARE_CPPNODE( struct_fromts ) ); } + out.get() -> validate( ); CSP_OUTPUT( std::move( out ) ); } @@ -758,7 +759,7 @@ DECLARE_CPPNODE( struct_collectts ) } ); } - + out.get() -> validate( ); CSP_OUTPUT( std::move( out ) ); } diff --git a/cpp/csp/engine/Struct.cpp b/cpp/csp/engine/Struct.cpp index 42830357e..9e5e884fe 100644 --- a/cpp/csp/engine/Struct.cpp +++ b/cpp/csp/engine/Struct.cpp @@ -1,12 +1,14 @@ #include #include #include +#include +#include namespace csp { StructField::StructField( CspTypePtr type, const std::string & fieldname, - size_t size, size_t alignment ) : + size_t size, size_t alignment, bool isOptional ) : m_fieldname( fieldname ), m_offset( 0 ), m_size( size ), @@ -14,7 +16,8 @@ StructField::StructField( CspTypePtr type, const std::string & fieldname, m_maskOffset( 0 ), m_maskBit( 0 ), m_maskBitMask( 0 ), - m_type( type ) + m_type( type ), + m_isOptional( isOptional ) { } @@ -33,8 +36,8 @@ and adjustments required for the hidden fields */ -StructMeta::StructMeta( const std::string & name, const Fields & fields, - std::shared_ptr base ) : m_name( name ), m_base( base ), m_fields( fields ), +StructMeta::StructMeta( const std::string & name, const Fields & fields, bool isStrict, + std::shared_ptr base ) : m_name( name ), m_base( base ), m_isStrict( isStrict ), m_fields( fields ), m_size( 0 ), m_partialSize( 0 ), m_partialStart( 0 ), m_nativeStart( 0 ), m_basePadding( 0 ), m_maskLoc( 0 ), m_maskSize( 0 ), m_firstPartialField( 0 ), m_firstNativePartialField( 0 ), m_isPartialNative( true ), m_isFullyNative( true ) @@ -494,6 +497,43 @@ void StructMeta::destroy( Struct * s ) const m_base -> destroy( s ); } +void StructMeta::validate( const Struct * s ) const +{ + bool encountered_non_strict = false; + + for( const StructMeta * cur = this; cur; cur = cur -> m_base.get() ) + { + encountered_non_strict |= !cur -> isStrict(); + if( !cur -> isStrict() ) { + continue; + } + + // rule 1: a non-strict struct may not inherit (directly or indirectly) from a strict base + if (encountered_non_strict) + CSP_THROW( ValueError, "Struct '" << s -> meta() -> name() << "' has non-strict inheritance of strict base '" << cur -> name() << "'" ); + + // rule 2: all local fields are set + std::string missing_fields; + for(const auto& field : cur->m_fields) { + if(!field->isSet(s)) { + if(missing_fields.empty()) { + missing_fields = field->fieldname(); + } else { + missing_fields += ", " + field->fieldname(); + } + } + } + + // raise error if any fields are missing + if (!missing_fields.empty()) { + CSP_THROW(ValueError, + "Strict struct '" << cur->name() << "' missing required fields: " << missing_fields); + } + } +} + + + Struct::Struct( const std::shared_ptr & meta ) { //Initialize meta shared_ptr diff --git a/cpp/csp/engine/Struct.h b/cpp/csp/engine/Struct.h index 64b51ecae..ff78f7b0c 100644 --- a/cpp/csp/engine/Struct.h +++ b/cpp/csp/engine/Struct.h @@ -35,6 +35,8 @@ class StructField bool isNative() const { return m_type -> type() <= CspType::Type::MAX_NATIVE_TYPE; } + bool isOptional() const { return m_isOptional; } + void setOffset( size_t off ) { m_offset = off; } void setMaskOffset( size_t off, uint8_t bit ) { @@ -75,7 +77,7 @@ class StructField protected: StructField( CspTypePtr type, const std::string & fieldname, - size_t size, size_t alignment ); + size_t size, size_t alignment, bool isOptional ); void setIsSet( Struct * s ) const { @@ -108,6 +110,7 @@ class StructField uint8_t m_maskBit; uint8_t m_maskBitMask; CspTypePtr m_type; + const bool m_isOptional; }; using StructFieldPtr = std::shared_ptr; @@ -120,7 +123,7 @@ class NativeStructField : public StructField public: NativeStructField() {} - NativeStructField( const std::string & fieldname ) : NativeStructField( CspType::fromCType::type(), fieldname ) + NativeStructField( const std::string & fieldname, bool isOptional ) : NativeStructField( CspType::fromCType::type(), fieldname, isOptional ) { } @@ -157,7 +160,7 @@ class NativeStructField : public StructField } protected: - NativeStructField( CspTypePtr type, const std::string & fieldname ) : StructField( type, fieldname, sizeof( T ), alignof( T ) ) + NativeStructField( CspTypePtr type, const std::string & fieldname, bool isOptional ) : StructField( type, fieldname, sizeof( T ), alignof( T ), isOptional ) {} }; @@ -179,7 +182,8 @@ using TimeStructField = NativeStructField