-
Notifications
You must be signed in to change notification settings - Fork 6
4. Operators
Operators are the building blocks of the Cortex Data Framework's processing pipeline. They define how data flows through the stream, transforming, filtering, aggregating, and ultimately directing the data to its destination. This section explores the different types of operators available, their use cases, implementation details, and code examples to demonstrate their practical application.
Operators in the Cortex Data Framework are components that process data as it flows through a stream. They can perform various functions such as transforming data, filtering out unwanted data, aggregating information, managing state, and sending data to external systems. Operators can be chained together to form complex processing pipelines tailored to specific application needs.
Key Features:
- Modularity: Operators can be combined in flexible ways to create diverse data processing workflows.
- Stateful Processing: Some operators maintain state across multiple data items, enabling advanced processing like aggregations and windowing.
- Telemetry Integration: Operators can integrate with telemetry providers to monitor performance and track processing metrics.
- Extensibility: Developers can create custom operators to extend the platform's capabilities.
Types of Operators:
-
Transform Operators:
- Map Operator: Transforms each data item.
- FlatMap Operator: Transforms each data item into zero or more items.
-
Filter Operators:
- Filter Operator: Filters data items based on a predicate.
-
Aggregate Operators:
- Aggregate Operator: Aggregates data items based on keys and aggregation functions.
-
Window Operators:
- Tumbling Window Operator: Processes data in fixed-size, non-overlapping windows.
- Sliding Window Operator: Processes data in fixed-size, overlapping windows.
- Session Window Operator: Processes data based on sessions with inactivity gaps.
-
Sink Operators:
- Console Sink: Outputs data to the console.
- AWS SQS Sink: Sends data to an AWS SQS queue.
- Kafka Sink: Sends data to a Kafka topic.
- Apache Pulsar Sink: Sends data to a Apache Pulsar topic.
-
Source Operators:
- AWS SQS Source: Receives data from an AWS SQS queue.
- Kafka Source: Receives data from a Kafka topic.
- Apache Pulsar Source: Receives data from a Apache Pulsar topic.
- Azure Service Bus Source: Receives data from an Azure Service Bus queue.
Operator Interfaces:
- IOperator: The base interface for all operators.
- IStatefulOperator: Interface for operators that maintain state.
- ITelemetryEnabled: Interface for operators that support telemetry integration.
- ISinkOperator: Interface for sink operators that consume data.
- ISourceOperator: Interface for source operators that emit data.
Operator Pipeline Flow:
- Source Operator: Emits data into the stream.
- Transform/Filter/Aggregate Operators: Process the data as it flows through the pipeline.
- Sink Operator: Consumes and handles the processed data.
Architecture Diagram:
flowchart TB
subgraph BusinessApp["Business Application / Use Case"]
A["Data Streams - Sales Orders, Device Events"]
end
subgraph Operators["Operators & Logic"]
B1["Aggregation Operator - Totals, Running Averages"]
B2["Windowing Operator - Time or Session Windows"]
B3["Join Operator - Enrichment, Lookups"]
B4["Custom Logic - Alerts, Fraud Detection"]
end
subgraph DataStores["Pluggable Data Stores"]
C1["In-Memory - Ephemeral"]
C2["SQL/NoSQL - Cassandra, Postgres, MongoDB"]
C3["Analytics DB - ClickHouse, etc."]
C4["Embedded Stores - RocksDB, SQLite"]
end
A --> B1 --> B2 --> B3 --> B4 --> D["Stream Output / Next Step"]
B1 -->|State Access| C1
B2 -->|State Access| C2
B3 -->|State Access| C3
B4 -->|State Access| C4
Figure 4.1: Operator Pipeline Architecture
The Map Operator is a fundamental transform operator that applies a specified transformation function to each data item in the stream. It transforms data from one type to another, enabling developers to modify, enrich, or reformat data as it flows through the pipeline.
Use Cases:
- Data Transformation: Converting data from one format to another (e.g., integers to strings).
- Enrichment: Adding additional information to data items (e.g., appending metadata).
- Computation: Performing calculations on data items (e.g., multiplying numbers).
To implement the Map Operator, follow these steps:
- Define the Transformation Function: Specify how each data item should be transformed.
- Integrate the Operator into the Stream: Use the Map method provided by the StreamBuilder to add the operator to the pipeline.
- Handle Telemetry (Optional): Configure telemetry to monitor the operator's performance.
Below is a code example demonstrating the usage of the Map Operator within a stream that processes integer values by doubling them and then outputs the results to the console.
using Cortex.Streams;
using System;
class Program
{
static void Main(string[] args)
{
// Create and configure the stream with a Map operator
var stream = StreamBuilder<int, int>.CreateNewStream("DoubleStream")
.Stream()
.Map(x => x * 2) // Transform each integer by doubling it
.Sink(x => Console.WriteLine(x)) // Output the transformed data to the console
.Build();
// Start the stream
stream.Start();
// Emit data into the stream
for (int i = 1; i <= 5; i++)
{
stream.Emit(i); // Outputs: 2, 4, 6, 8, 10
}
// Stop the stream after processing
stream.Stop();
}
}
Output:
2
4
6
8
10
The Filter Operator is used to selectively allow data items to pass through the stream based on a specified condition or predicate. It evaluates each data item and only forwards those that meet the criteria defined by the predicate function.
Use Cases:
- Data Validation: Excluding invalid or malformed data items.
- Conditional Processing: Processing only data items that meet certain conditions.
- Reducing Noise: Filtering out irrelevant or unnecessary data to focus on meaningful information.
To implement the Filter Operator, follow these steps:
- Define the Predicate Function: Specify the condition that determines whether a data item should pass through.
-
Integrate the Operator into the Stream: Use the
Filter
method provided by theStreamBuilder
to add the operator to the pipeline. - Handle Telemetry (Optional): Configure telemetry to monitor the operator's performance and filter outcomes.
The following example demonstrates the Filter Operator in action. It filters out even numbers from a stream of integers, allowing only odd numbers to pass through and be printed to the console.
using Cortex.Streams;
using System;
class Program
{
static void Main(string[] args)
{
// Create and configure the stream with a Filter operator
var stream = StreamBuilder<int, int>.CreateNewStream("OddNumberStream")
.Stream()
.Filter(x => x % 2 != 0) // Allow only odd numbers
.Sink(x => Console.WriteLine(x)) // Output the filtered data to the console
.Build();
// Start the stream
stream.Start();
// Emit data into the stream
for (int i = 1; i <= 10; i++)
{
stream.Emit(i); // Outputs: 1, 3, 5, 7, 9
}
// Stop the stream after processing
stream.Stop();
}
}
Output:
1
3
5
7
9
The Aggregate Operator performs aggregation operations on data items grouped by a key. It maintains and updates an aggregate value for each key based on incoming data, enabling cumulative computations such as sums, averages, or custom aggregations.
Use Cases:
- Counting: Tracking the number of occurrences of each key.
- Summation: Calculating the total sum of values per key.
- Averaging: Computing the average value per key.
- Custom Aggregations: Implementing complex aggregation logic tailored to specific requirements.
To implement the Aggregate Operator, follow these steps:
-
Define the Key Selector and Aggregation Function:
- Key Selector: Determines how to group data items.
- Aggregation Function: Defines how to update the aggregate value based on incoming data.
-
Configure the State Store:
- Use a state store (e.g.,
RocksDbStateStore
) to maintain aggregate states.
- Use a state store (e.g.,
-
Integrate the Operator into the Stream:
- Use the
Aggregate
method provided by theStreamBuilder
to add the operator to the pipeline.
- Use the
-
Handle Telemetry (Optional):
- Configure telemetry to monitor aggregation metrics and performance.
The following example demonstrates the Aggregate Operator by counting the number of occurrences of each word in a stream of strings.
using Cortex.States.RocksDb;
using Cortex.Streams;
using System;
class Program
{
static void Main(string[] args)
{
// Initialize a RocksDbStateStore for word counts
var wordCountStore = new RocksDbStateStore<string, int>("WordCountStore", "/path/to/rocksdb");
// Create and configure the stream with an Aggregate operator
var stream = StreamBuilder<string, string>.CreateNewStream("WordCountStream")
.Stream()
.AggregateSilently(
keySelector: word => word, // Group by the word itself
aggregateFunction: (currentCount, word) => currentCount + 1, // Increment count
stateStoreName: "WordCountStore",
stateStore: wordCountStore
)
.Sink(msg => Console.WriteLine($"Word: {msg}, processed")) // Output word counts
.Build();
// Start the stream
stream.Start();
// Emit data into the stream
var words = new[] { "apple", "banana", "apple", "orange", "banana", "apple" };
foreach (var word in words)
{
stream.Emit(word);
}
// Stop the stream after processing
stream.Stop();
}
}
Explanation:
-
State Store Initialization: A
RocksDbStateStore
named"WordCountStore"
is initialized to persist word counts. -
Stream Configuration:
- Aggregate Operator: Groups incoming words and increments their counts.
- Data Emission: The stream processes the words, updating counts accordingly.
- Stream Lifecycle: The stream is started, data is emitted, and then the stream is stopped.
Window operators process data within defined time frames or sessions, enabling temporal aggregations and analyses. Cortex Data Framework provides three primary window operators: Tumbling Window, Sliding Window, and Session Window. Each serves different use cases based on the nature of data processing required.
Description and Use Cases
The Tumbling Window Operator divides the data stream into fixed-size, non-overlapping time windows. Each window processes the data that arrives within its duration, and windows do not overlap or skip time intervals.
Use Cases:
- Fixed Interval Aggregations: Calculating metrics like counts or sums over consistent time periods (e.g., hourly sales totals).
- Batch Processing: Grouping data into batches for processing at regular intervals.
- Periodic Reporting: Generating reports based on fixed time frames.
Implementation Guide
To implement the Tumbling Window Operator, follow these steps:
-
Define the Key Selector and Window Function:
- Key Selector: Determines how to group data items.
- Window Function: Defines the aggregation or processing to perform on each window.
-
Configure the Window State Stores:
- Use state stores to maintain window states and store window results.
-
Integrate the Operator into the Stream:
- Use the
TumblingWindow
method provided by theStreamBuilder
to add the operator to the pipeline.
- Use the
- Handle Telemetry (Optional): Configure telemetry to monitor window processing metrics and performance.
Code Example
The following example demonstrates the Tumbling Window Operator by calculating the total number of transactions every minute.
using Cortex.States.RocksDb;
using Cortex.Streams;
using System;
class Program
{
static void Main(string[] args)
{
// Create and configure the stream with a Tumbling Window operator
var stream = StreamBuilder<string, string>.CreateNewStream("TransactionStream")
.Stream()
.TumblingWindow(
keySelector: transaction => "TotalTransactions", // Single key for all transactions
windowDuration: TimeSpan.FromMinutes(1), // 1-minute window
windowFunction: transactions => transactions.Count(), // Count transactions in the window
stateStoreName: "TransactionResultsStore"
)
.Sink(v => Console.WriteLine($"Start: TotalTransactions, Transactions: {v}")) // Output window counts
.Build();
// Start the stream
stream.Start();
// Simulate emitting transactions over time
var transactions = new[] { "txn1", "txn2", "txn3", "txn4", "txn5" };
foreach (var txn in transactions)
{
stream.Emit(txn);
System.Threading.Thread.Sleep(1000); // Wait for 1 second between transactions
}
// Wait for window to close
System.Threading.Thread.Sleep(TimeSpan.FromMinutes(1));
// Stop the stream after processing
stream.Stop();
}
}
Output:
Window Start: TotalTransactions, Transactions: 5
Explanation:
-
State Store Initialization: A
RocksDbStateStore
named "TransactionCountStore" is initialized to persist transaction counts. -
Stream Configuration:
- Tumbling Window Operator: Groups transactions into 1-minute windows and counts them.
- Sink Operator: Outputs the window start key and the count of transactions.
- Data Emission: Simulates emitting five transactions, one every second.
- Window Processing: After 1 minute, the window closes, and the total number of transactions is outputted.
- Stream Lifecycle: The stream is started, data is emitted, the window is processed, and then the stream is stopped.
Description and Use Cases
The Sliding Window Operator divides the data stream into fixed-size windows that overlap based on a specified advance interval. Unlike tumbling windows, sliding windows allow for continuous and overlapping data processing, enabling more granular and real-time analyses.
Use Cases:
- Moving Averages: Calculating rolling averages over recent data points.
- Trend Detection: Identifying trends within overlapping time frames.
- Real-Time Monitoring: Continuously monitoring metrics with overlapping windows for immediate insights.
Implementation Guide
To implement the Sliding Window Operator, follow these steps:
-
Define the Key Selector and Window Function:
- Key Selector: Determines how to group data items.
- Window Function: Defines the aggregation or processing to perform on each window.
-
Configure the Sliding Window State Stores:
- Use state stores to maintain window states and store window results.
-
Integrate the Operator into the Stream:
- Use the
SlidingWindow
method provided by theStreamBuilder
to add the operator to the pipeline.
- Use the
-
Handle Telemetry (Optional):
- Configure telemetry to monitor window processing metrics and performance.
Code Example
The following example demonstrates the Sliding Window Operator by calculating a moving average of sensor readings over a 5-minute window, advancing every minute.
using Cortex.States.RocksDb;
using Cortex.Streams;
using System;
using System.Collections.Generic;
class Program
{
static void Main(string[] args)
{
// Create and configure the stream with a Sliding Window operator
var stream = StreamBuilder<double, double>.CreateNewStream("SensorStream")
.Stream()
.SlidingWindow(
keySelector: value => "Sensor1", // Single sensor key
windowSize: TimeSpan.FromMinutes(5), // 5-minute window size
advanceBy: TimeSpan.FromMinutes(1), // Advance interval of 1 minute
windowFunction: values =>
{
double sum = 0;
foreach (var val in values)
sum += val;
return sum / values.Count(); // Calculate average
},
windowStateStoreName: "SensorDataStore",
windowResultsStateStoreName: "SensorResultsStore"
)
.Sink(average => Console.WriteLine($"Moving Average: {average:F2}")) // Output moving average
.Build();
// Start the stream
stream.Start();
// Simulate emitting sensor readings every 30 seconds
for (int i = 1; i <= 10; i++)
{
double sensorValue = 20.0 + i; // Example sensor value
stream.Emit(sensorValue);
Console.WriteLine($"Emitted Sensor Value: {sensorValue}");
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(30));
}
// Wait for sliding windows to process
System.Threading.Thread.Sleep(TimeSpan.FromMinutes(6));
// Stop the stream after processing
stream.Stop();
}
}
Output:
Emitted Sensor Value: 21
Emitted Sensor Value: 22
Emitted Sensor Value: 23
Emitted Sensor Value: 24
Emitted Sensor Value: 25
Emitted Sensor Value: 26
Emitted Sensor Value: 27
Emitted Sensor Value: 28
Emitted Sensor Value: 29
Emitted Sensor Value: 30
Moving Average: 23.00
Moving Average: 24.00
Moving Average: 25.00
Moving Average: 26.00
Moving Average: 27.00
Explanation:
-
Stream Configuration:
- Sliding Window Operator: Groups sensor readings into overlapping 5-minute windows, advancing every minute, and calculates the average.
- Sink Operator: Outputs the moving average to the console.
- Data Emission: Simulates emitting ten sensor readings, one every 30 seconds.
- Window Processing: As readings are emitted, the sliding window calculates and outputs the moving average every minute.
- Stream Lifecycle: The stream is started, data is emitted, moving averages are calculated and outputted, and then the stream is stopped.
Description and Use Cases
The Session Window Operator groups data items into sessions based on activity gaps. A new session is started when data arrives after a period of inactivity defined by the inactivity gap. This operator is ideal for scenarios where data is naturally segmented by periods of activity and inactivity.
Use Cases:
- User Activity Tracking: Grouping user actions into sessions based on inactivity.
- Event Correlation: Correlating events that occur within active periods.
- Transaction Sessions: Grouping transactions that belong to the same session.
Implementation Guide To implement the Session Window Operator, follow these steps:
-
Define the Key Selector and Window Function:
- Key Selector: Determines how to group data items.
- Window Function: Defines the aggregation or processing to perform on each session.
-
Configure the Session Window State Stores:
- Use state stores to maintain session states and store session results.
-
Integrate the Operator into the Stream:
- Use the
SessionWindow
method provided by theStreamBuilder
to add the operator to the pipeline.
- Use the
-
Handle Telemetry (Optional):
- Configure telemetry to monitor session processing metrics and performance.
Code Example
The following example demonstrates the Session Window Operator by tracking user sessions based on inactivity gaps. A new session is initiated if there's no activity for 2 minutes.
using Cortex.States.RocksDb;
using Cortex.Streams;
using System;
using System.Collections.Generic;
class Program
{
static void Main(string[] args)
{
// Initialize a RocksDbStateStore for session states
var sessionStateStore = new RocksDbStateStore<string, SessionWindowState<string>>("UserSessionStore", "/path/to/rocksdb");
var sessionResultsStore = new RocksDbStateStore<(string, DateTime), string>("SessionResultsStore", "/path/to/rocksdb");
// Create and configure the stream with a Session Window operator
var stream = StreamBuilder<string, string>.CreateNewStream("UserActivityStream")
.Stream()
.SessionWindow(
keySelector: activity => activity, // Group by user ID or activity type
inactivityGap: TimeSpan.FromMinutes(2), // 2-minute inactivity gap
windowFunction: activities =>
{
// Example: Concatenate all activities in the session
return string.Join(", ", activities);
},
sessionStateStoreName: "UserSessionStore",
windowResultsStateStoreName: "SessionResultsStore",
sessionStateStore: sessionStateStore,
windowResultsStateStore: sessionResultsStore
)
.Sink(sessionSummary => Console.WriteLine($"Session Activities: {sessionSummary}")) // Output session summaries
.Build();
// Start the stream
stream.Start();
// Simulate emitting user activities with varying delays
var activities = new List<string>
{
"Login",
"ViewDashboard",
"ClickButton",
"Logout",
"Login",
"UploadFile",
"Logout"
};
foreach (var activity in activities)
{
stream.Emit(activity);
Console.WriteLine($"Emitted Activity: {activity}");
System.Threading.Thread.Sleep(TimeSpan.FromMinutes(1)); // Wait for 1 minute between activities
}
// Wait for sessions to close
System.Threading.Thread.Sleep(TimeSpan.FromMinutes(3));
// Stop the stream after processing
stream.Stop();
}
}
Output:
Emitted Activity: Login
Emitted Activity: ViewDashboard
Emitted Activity: ClickButton
Emitted Activity: Logout
Emitted Activity: Login
Emitted Activity: UploadFile
Emitted Activity: Logout
Session Activities: Login, ViewDashboard, ClickButton, Logout
Session Activities: Login, UploadFile, Logout
Explanation:
-
State Store Initialization: Two RocksDbStateStore instances are initialized:
- UserSessionStore: Maintains the state of active user sessions.
- SessionResultsStore: Stores the results of processed sessions.
-
Stream Configuration:
- Session Window Operator: Groups user activities into sessions based on a 2-minute inactivity gap and concatenates activities within each session.
- Sink Operator: Outputs the concatenated session activities to the console.
-
Data Emission: Simulates emitting seven user activities, with a 1-minute interval between each. Given the 2-minute inactivity gap, activities are grouped into two sessions.
-
Session Processing: After the inactivity gap, the sessions are processed and the concatenated activities are outputted.
-
Stream Lifecycle: The stream is started, data is emitted, sessions are processed and outputted, and then the stream is stopped.
Sink operators are terminal points in the stream processing pipeline that consume and handle the processed data. They can output data to various destinations such as the console, external messaging systems, databases, or other storage solutions.
Description and Use Cases
The Console Sink Operator is the simplest sink operator that outputs data directly to the console. It's primarily used for debugging, logging, or simple monitoring of stream outputs during development.
Use Cases:
- Debugging: Inspecting data as it flows through the stream.
- Monitoring: Viewing real-time outputs for quick insights.
- Testing: Verifying the behavior of stream operators without external dependencies.
Implementation Guide
To implement the Console Sink Operator, follow these steps:
-
Define the Sink Action:
- Specify the action to perform on each data item (e.g., writing to the console).
-
Integrate the Operator into the Stream:
- Use the Sink method provided by the StreamBuilder to add the operator to the pipeline.
-
Handle Telemetry (Optional):
- Configure telemetry to monitor sink processing metrics and performance.
Code Example The following example demonstrates the Console Sink Operator by outputting transformed data to the console.
using Cortex.Streams;
using System;
class Program
{
static void Main(string[] args)
{
// Create and configure the stream with a Map operator and Console Sink
var stream = StreamBuilder<string, string>.CreateNewStream("ConsoleSinkStream")
.Stream()
.Map(message => $"Processed Message: {message.ToUpper()}") // Transform message to uppercase
.Sink(Console.WriteLine) // Output to console
.Build();
// Start the stream
stream.Start();
// Emit data into the stream
stream.Emit("hello");
stream.Emit("world");
stream.Emit("cortex streaming");
// Stop the stream after processing
stream.Stop();
}
}
Output:
Processed Message: HELLO
Processed Message: WORLD
Processed Message: CORTEX STREAMING
While Cortex provides a variety of built-in operators, developers can create custom operators to extend the platform's functionality and cater to specific processing needs.
To create a custom operator, follow these steps:
-
Implement the
IOperator
Interface:- Define the processing logic by implementing the
Process
andSetNext
methods.
- Define the processing logic by implementing the
-
Optionally Implement
IStatefulOperator
:- If the operator needs to maintain state, implement the
IStatefulOperator
interface.
- If the operator needs to maintain state, implement the
-
Optionally Implement
ITelemetryEnabled
:- For telemetry integration, implement the
ITelemetryEnabled
interface.
- For telemetry integration, implement the
-
Integrate the Custom Operator into the Stream:
- Use the
Map
,Filter
, or other relevant methods to add the custom operator to the pipeline.
- Use the
The following example demonstrates creating a custom operator that logs each data item processed.
using Cortex.Streams.Operators;
using Cortex.Telemetry;
using System;
public class LoggingOperator<T> : IOperator, ITelemetryEnabled
{
private IOperator _nextOperator;
private ITelemetryProvider _telemetryProvider;
private ICounter _logCounter;
public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
{
_telemetryProvider = telemetryProvider;
if (_telemetryProvider != null)
{
var metrics = _telemetryProvider.GetMetricsProvider();
_logCounter = metrics.CreateCounter($"logging_operator_processed_{typeof(T).Name}", "Number of items processed by LoggingOperator");
}
}
public void Process(object input)
{
T data = (T)input;
Console.WriteLine($"LoggingOperator: Processing data - {data}");
_logCounter?.Increment();
_nextOperator?.Process(input);
}
public void SetNext(IOperator nextOperator)
{
_nextOperator = nextOperator;
if (_nextOperator is ITelemetryEnabled telemetryEnabled)
{
telemetryEnabled.SetTelemetryProvider(_telemetryProvider);
}
}
}
Integrating the Custom Operator:
using Cortex.Streams;
using Cortex.Streams.Extensions; // Namespace where StreamBuilderExtensions is defined
using System;
class Program
{
static void Main(string[] args)
{
// Initialize the custom logging operator
var loggingOperator = new LoggingOperator<string>();
// Create and configure the stream with Map, LoggingOperator, and Sink using the extension method
var stream = StreamBuilder<string, string>.CreateNewStream("CustomOperatorStream")
.Stream()
.Map(message => $"Transformed: {message}") // Example transformation
.UseOperator<string, string, string>(loggingOperator) // Add custom LoggingOperator
.Sink(x => Console.WriteLine(x)) // Sink to console
.Build();
// Start the stream
stream.Start();
// Emit data into the stream
stream.Emit("CustomEvent1");
stream.Emit("CustomEvent2");
// Stop the stream after processing
stream.Stop();
}
}
Output:
LoggingOperator: Processing data - Transformed: CustomEvent1
Transformed: CustomEvent1
LoggingOperator: Processing data - Transformed: CustomEvent2
Transformed: CustomEvent2
Explanation:
-
Custom Operator Definition: The
LoggingOperator
logs each data item it processes and increments a telemetry counter. -
Stream Configuration:
- Map Operator: Transforms incoming messages.
- Custom Logging Operator: Logs the transformed messages.
- Sink Operator: Outputs the final data to the console.
- Data Emission: Emits two custom events that pass through the transformation, logging, and sink stages.
- Stream Lifecycle: The stream is started, data is emitted and processed, and then the stream is stopped.
Cortex Data Framework WIKI