Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/csp/adapters/kafka/KafkaInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ void KafkaInputAdapter::processMessage( RdKafka::Message* message, bool live, cs
if( m_tickTimestampField )
msgTime = m_tickTimestampField->value<DateTime>(tick.get());

if (!tick.get() -> validate())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processMessage is called in KafkaSubscriber.cpp and in line 37 should catch this exception, I think

CSP_THROW( ValueError, "Struct validation failed for Kafka message, fields missing" );


bool pushLive = shouldPushLive(live, msgTime);
if( shouldProcessMessage( pushLive, msgTime ) )
pushTick(pushLive, msgTime, std::move(tick), batch);
Expand Down
3 changes: 3 additions & 0 deletions cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ void ParquetStructAdapter::dispatchValue( const utils::Symbol *symbol, bool isNu
{
fieldSetter( s );
}

CSP_TRUE_OR_THROW_RUNTIME( s -> validate(), "Struct validation failed for Parquet message, some fields are missing" );
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be double-checked to make sure things are being caught properly. From the call graph of dispatchValue it seems like its callers are StructColumnAdapter::readCurValue() $\rightarrow$ SingleTableParquetReader::readNextRow() $\rightarrow$ SingleTableParquetReader::start() or SingleTableParquetReader::skipRow() or MultipleFileParquetReader::dispatchRow, which eventually seem to end up in calls that have some runtime throw of this same type (e.g., ParquetInputAdapterManager::processNextSimTimeSlice). So seems like a runtime error would be caught OK?

All the way at AdapterManager::start level there's also a ValueErr catch.


dispatchedValue = &s;
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/csp/adapters/utils/JSONMessageStructConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ StructPtr JSONMessageStructConverter::convertJSON( const char * fieldname, const
} );
}

if( !struct_ -> validate() )
Copy link
Author

@sim15 sim15 Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This propagates to the processMessage in this file via asStruct, and then to the websocket ClientAdapterManager, which we add a new try/catch block.

CSP_THROW( ValueError, "JSON conversion of struct " << sType.meta() -> name() << " failed; some required fields were not set" );

return struct_;
}

Expand Down Expand Up @@ -251,6 +254,7 @@ csp::StructPtr JSONMessageStructConverter::asStruct( void * bytes, size_t size )
}
);
}
// root struct validation (validate()) deferred to adapter level

return data;
}
Expand Down
11 changes: 9 additions & 2 deletions cpp/csp/adapters/websocket/ClientAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,15 @@ void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
if( m_inputAdapter ) {
m_endpoint -> setOnMessage(
[ this ]( void* c, size_t t ) {
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( c, t, &batch );
try
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super familiar with the most natural way of handling this, so this should be double-checked.

{
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( c, t, &batch );
}
catch( csp::Exception & err )
{
pushStatus( StatusLevel::ERROR, ClientStatusType::GENERIC_ERROR, err.what() );
}
}
);
} else {
Expand Down
2 changes: 2 additions & 0 deletions cpp/csp/adapters/websocket/ClientInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ void ClientInputAdapter::processMessage( void* c, size_t t, PushBatch* batch )
if( dataType() -> type() == CspType::Type::STRUCT )
{
auto tick = m_converter -> asStruct( c, t );
if (!tick.get() -> validate())
CSP_THROW( ValueError, "Struct validation failed for WebSocket message, fields missing" );
pushTick( std::move(tick), batch );
} else if ( dataType() -> type() == CspType::Type::STRING )
{
Expand Down
6 changes: 6 additions & 0 deletions cpp/csp/cppnodes/baselibimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,9 @@ DECLARE_CPPNODE( struct_fromts )
);
}

if( !out.get() -> validate( ) )
CSP_THROW( ValueError, "Struct " << cls.value() -> name() << " is not valid; some required fields did not tick" );

CSP_OUTPUT( std::move( out ) );
}

Expand Down Expand Up @@ -758,6 +761,9 @@ DECLARE_CPPNODE( struct_collectts )
}
);
}

if( !out.get() -> validate( ) )
CSP_THROW( ValueError, "Struct " << cls.value() -> name() << " is not valid; some required fields did not tick" );

CSP_OUTPUT( std::move( out ) );
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/csp/engine/BasketInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ void DynamicOutputBasketInfo::addShapeChange( const DialectGenericType & key, bo
{
auto events = autogen::DynamicBasketEvents::create();
events -> set_events( {} );
if( !events -> validate() )
CSP_THROW( ValueError, "DynamicBasketEvents struct is not valid; some required fields were not set" );
m_shapeTs.outputTickTyped<StructPtr>( m_parentNode -> rootEngine() -> cycleCount(),
m_parentNode -> rootEngine() -> now(),
events, false );
Expand All @@ -171,6 +173,8 @@ void DynamicOutputBasketInfo::addShapeChange( const DialectGenericType & key, bo
auto event = autogen::DynamicBasketEvent::create();
event -> set_key( key );
event -> set_added( added );
if( !event -> validate() )
CSP_THROW( ValueError, "DynamicBasketEvent struct is not valid; some required fields were not set" );

const_cast<std::vector<autogen::DynamicBasketEvent::Ptr> &>( events ).emplace_back( event );
}
Expand Down
36 changes: 34 additions & 2 deletions cpp/csp/engine/Struct.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include <csp/core/System.h>
#include <csp/engine/Struct.h>
#include <algorithm>
#include <ranges>
#include <string>

namespace csp
{
Expand Down Expand Up @@ -33,8 +35,8 @@ and adjustments required for the hidden fields

*/

StructMeta::StructMeta( const std::string & name, const Fields & fields,
std::shared_ptr<StructMeta> base ) : m_name( name ), m_base( base ), m_fields( fields ),
StructMeta::StructMeta( const std::string & name, const Fields & fields, bool isStrict,
std::shared_ptr<StructMeta> base ) : m_name( name ), m_base( base ), m_isStrict( isStrict ), m_fields( fields ),
m_size( 0 ), m_partialSize( 0 ), m_partialStart( 0 ), m_nativeStart( 0 ), m_basePadding( 0 ),
m_maskLoc( 0 ), m_maskSize( 0 ), m_firstPartialField( 0 ), m_firstNativePartialField( 0 ),
m_isPartialNative( true ), m_isFullyNative( true )
Expand Down Expand Up @@ -128,6 +130,18 @@ StructMeta::StructMeta( const std::string & name, const Fields & fields,
if( !rv.second )
CSP_THROW( ValueError, "csp Struct " << name << " attempted to add existing field " << m_fields[ idx ] -> fieldname() );
}

// A non-strict struct may not inherit (directly or indirectly) from a strict base
bool encountered_non_strict = false;
for ( const StructMeta * cur = this; cur; cur = cur -> m_base.get() )
{
encountered_non_strict |= !cur -> isStrict();
if ( encountered_non_strict && cur -> isStrict() )
CSP_THROW( ValueError,
"Strict '" << m_name
<< "' has non-strict inheritance of strict base '"
<< cur -> name() << "'" );
}
}

StructMeta::~StructMeta()
Expand Down Expand Up @@ -494,6 +508,24 @@ void StructMeta::destroy( Struct * s ) const
m_base -> destroy( s );
}

[[nodiscard]] bool StructMeta::validate( const Struct * s ) const
{
for ( const StructMeta * cur = this; cur; cur = cur -> m_base.get() )
{
if ( !cur -> isStrict() )
continue;

// Note that we do not recursively validate nested struct.
// We assume after any creation on the C++ side, these structs
// are validated properly prior to being set as field values
if ( !cur -> allFieldsSet( s ) )
return false;
}
return true;
}



Struct::Struct( const std::shared_ptr<const StructMeta> & meta )
{
//Initialize meta shared_ptr
Expand Down
13 changes: 11 additions & 2 deletions cpp/csp/engine/Struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,21 +587,24 @@ class StructMeta : public std::enable_shared_from_this<StructMeta>
using FieldNames = std::vector<std::string>;

//Fields will be re-arranged and assigned their offsets in StructMeta for optimal performance
StructMeta( const std::string & name, const Fields & fields, std::shared_ptr<StructMeta> base = nullptr );
StructMeta( const std::string & name, const Fields & fields, bool isStrict, std::shared_ptr<StructMeta> base = nullptr );
virtual ~StructMeta();

const std::string & name() const { return m_name; }
size_t size() const { return m_size; }
size_t partialSize() const { return m_partialSize; }

bool isNative() const { return m_isFullyNative; }
bool isStrict() const { return m_isStrict; }

const Fields & fields() const { return m_fields; }
const FieldNames & fieldNames() const { return m_fieldnames; }

size_t maskLoc() const { return m_maskLoc; }
size_t maskSize() const { return m_maskSize; }

[[nodiscard]] bool validate( const Struct * s ) const;

const StructFieldPtr & field( const char * name ) const
{
static StructFieldPtr s_empty;
Expand Down Expand Up @@ -652,7 +655,8 @@ class StructMeta : public std::enable_shared_from_this<StructMeta>
std::shared_ptr<StructMeta> m_base;
StructPtr m_default;
FieldMap m_fieldMap;

bool m_isStrict;

//fields in order, memory owners of field objects which in turn own the key memory
//m_fields includes all base fields as well. m_fieldnames maintains the proper iteration order of fields
Fields m_fields;
Expand Down Expand Up @@ -738,6 +742,11 @@ class Struct
return meta() -> allFieldsSet( this );
}

[[nodiscard]] bool validate() const
{
return meta() -> validate( this );
}


//used to cache dialect representations of this struct, if needed
void * dialectPtr() const { return hidden() -> dialectPtr; }
Expand Down
30 changes: 24 additions & 6 deletions cpp/csp/python/PyStruct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class PyObjectStructField final : public DialectGenericStructField
public:
using BASE = DialectGenericStructField;
PyObjectStructField( const std::string & name,
PyTypeObjectPtr pytype ) : DialectGenericStructField( name, sizeof( PyObjectPtr ), alignof( PyObjectPtr ) ),
m_pytype( pytype )
PyTypeObjectPtr pytype ) : BASE( name, sizeof( PyObjectPtr ), alignof( PyObjectPtr ) ),
m_pytype( pytype )
{}


Expand All @@ -42,8 +42,8 @@ class PyObjectStructField final : public DialectGenericStructField
};

DialectStructMeta::DialectStructMeta( PyTypeObject * pyType, const std::string & name,
const Fields & flds, std::shared_ptr<StructMeta> base ) :
StructMeta( name, flds, base ),
const Fields & flds, bool isStrict, std::shared_ptr<StructMeta> base ) :
StructMeta( name, flds, isStrict, base ),
m_pyType( pyType )
{
}
Expand Down Expand Up @@ -110,12 +110,19 @@ static PyObject * PyStructMeta_new( PyTypeObject *subtype, PyObject *args, PyObj
{
PyObject *key, *type;
Py_ssize_t pos = 0;
PyObject *optional_fields = PyDict_GetItemString( dict, "__optional_fields__" );


while( PyDict_Next( metadata, &pos, &key, &type ) )
{
const char * keystr = PyUnicode_AsUTF8( key );
if( !keystr )
CSP_THROW( PythonPassthrough, "" );

if (!PySet_Check(optional_fields))
CSP_THROW( TypeError, "Struct metadata for key " << keystr << " expected a set, got " << PyObjectPtr::incref( optional_fields ) );


if( !PyType_Check( type ) && !PyList_Check( type ) )
CSP_THROW( TypeError, "Struct metadata for key " << keystr << " expected a type, got " << PyObjectPtr::incref( type ) );

Expand Down Expand Up @@ -151,7 +158,7 @@ static PyObject * PyStructMeta_new( PyTypeObject *subtype, PyObject *args, PyObj
default:
CSP_THROW( ValueError, "Unexpected csp type " << csptype -> type() << " on struct " << name );
}

fields.emplace_back( field );
}
}
Expand Down Expand Up @@ -188,7 +195,12 @@ static PyObject * PyStructMeta_new( PyTypeObject *subtype, PyObject *args, PyObj
| |
PyStruct --------------------------
*/
auto structMeta = std::make_shared<DialectStructMeta>( ( PyTypeObject * ) pymeta, name, fields, metabase );

PyObject * strict_enabled = PyDict_GetItemString( dict, "__strict_enabled__" );
if( !strict_enabled )
CSP_THROW( KeyError, "StructMeta missing __strict_enabled__" );
bool isStrict = strict_enabled == Py_True;
auto structMeta = std::make_shared<DialectStructMeta>( ( PyTypeObject * ) pymeta, name, fields, isStrict, metabase );

//Setup fast attr dict lookup
pymeta -> attrDict = PyObjectPtr::own( PyDict_New() );
Expand Down Expand Up @@ -347,6 +359,7 @@ static PyObject * PyStructMeta_metadata_info( PyStructMeta * m )
return out.release();
}


static PyMethodDef PyStructMeta_methods[] = {
{"_layout", (PyCFunction) PyStructMeta_layout, METH_NOARGS, "debug view of structs internal mem layout"},
{"_metadata_info", (PyCFunction) PyStructMeta_metadata_info, METH_NOARGS, "provide detailed information about struct layout"},
Expand Down Expand Up @@ -456,6 +469,9 @@ void PyStruct::setattr( Struct * s, PyObject * attr, PyObject * value )
if( !field )
CSP_THROW( AttributeError, "'" << s -> meta() -> name() << "' object has no attribute '" << PyUnicode_AsUTF8( attr ) << "'" );

if ( s -> meta() -> isStrict() && value == nullptr )
CSP_THROW( AttributeError, "Strict struct " << s -> meta() -> name() << " does not allow the deletion of field " << PyUnicode_AsUTF8( attr ) );

try
{
switchCspType( field -> type(), [field,&struct_=s,value]( auto tag )
Expand Down Expand Up @@ -795,6 +811,8 @@ int PyStruct_init( PyStruct * self, PyObject * args, PyObject * kwargs )
CSP_BEGIN_METHOD;

PyStruct_setattrs( self, args, kwargs, "__init__" );
if( !self -> struct_ -> validate() )
CSP_THROW( ValueError, "Struct " << self -> struct_ -> meta() -> name() << " is not valid; some required fields were not set on init" );

CSP_RETURN_INT;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/csp/python/PyStruct.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class CSPTYPESIMPL_EXPORT DialectStructMeta : public StructMeta
{
public:
DialectStructMeta( PyTypeObject * pyType, const std::string & name,
const Fields & fields, std::shared_ptr<StructMeta> base = nullptr );
const Fields & fields, bool isStrict, std::shared_ptr<StructMeta> base = nullptr );
~DialectStructMeta() {}

PyTypeObject * pyType() const { return m_pyType; }
Expand Down
22 changes: 19 additions & 3 deletions csp/impl/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


class StructMeta(_csptypesimpl.PyStructMeta):
def __new__(cls, name, bases, dct):
def __new__(cls, name, bases, dct, allow_unset=True):
full_metadata = {}
full_metadata_typed = {}
metadata = {}
Expand All @@ -29,12 +29,17 @@ def __new__(cls, name, bases, dct):
defaults.update(base.__defaults__)

annotations = dct.get("__annotations__", None)
optional_fields = set()
if annotations:
for k, v in annotations.items():
actual_type = v
# Lists need to be normalized too as potentially we need to add a boolean flag to use FastList
if v == FastList:
raise TypeError(f"{v} annotation is not supported without args")
if CspTypingUtils.is_optional_type(v):
if (not allow_unset) and (k not in dct):
raise TypeError(f"Optional field {k} must have a default value")
optional_fields.add(k)
if (
CspTypingUtils.is_generic_container(v)
or CspTypingUtils.is_union_type(v)
Expand Down Expand Up @@ -72,6 +77,8 @@ def __new__(cls, name, bases, dct):
dct["__full_metadata_typed__"] = full_metadata_typed
dct["__metadata__"] = metadata
dct["__defaults__"] = defaults
dct["__optional_fields__"] = optional_fields
dct["__strict_enabled__"] = not allow_unset

res = super().__new__(cls, name, bases, dct)
# This is how we make sure we construct the pydantic schema from the new class
Expand Down Expand Up @@ -174,6 +181,14 @@ def metadata(cls, typed=False):
else:
return cls.__full_metadata__

@classmethod
def optional_fields(cls):
return cls.__optional_fields__

@classmethod
def is_strict(cls):
return cls.__strict_enabled__

@classmethod
def fromts(cls, trigger=None, /, **kwargs):
"""convert valid inputs into ts[ struct ]
Expand Down Expand Up @@ -237,12 +252,13 @@ def _obj_from_python(cls, json, obj_type):
elif issubclass(obj_type, Struct):
if not isinstance(json, dict):
raise TypeError("Representation of struct as json is expected to be of dict type")
res = obj_type()
obj_args = {}
for k, v in json.items():
expected_type = obj_type.__full_metadata_typed__.get(k, None)
if expected_type is None:
raise KeyError(f"Unexpected key {k} for type {obj_type}")
setattr(res, k, cls._obj_from_python(v, expected_type))
obj_args[k] = cls._obj_from_python(v, expected_type)
res = obj_type(**obj_args)
return res
else:
if isinstance(json, obj_type):
Expand Down
7 changes: 7 additions & 0 deletions csp/impl/types/typing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ def is_numpy_nd_array_type(cls, typ):
def is_union_type(cls, typ):
return isinstance(typ, typing._GenericAlias) and typ.__origin__ is typing.Union

@classmethod
def is_optional_type(cls, typ):
if cls.is_union_type(typ):
args = typing.get_args(typ)
return type(None) in args
return False

@classmethod
def is_literal_type(cls, typ):
return isinstance(typ, typing._GenericAlias) and typ.__origin__ is typing.Literal
Expand Down
Loading
Loading