-
Notifications
You must be signed in to change notification settings - Fork 7
11. Examples and Tutorials
Welcome to the Cortex Examples and Tutorials page! This guide is designed to help you understand how to use Cortex by walking you through various examples, ranging from basic data transformations to advanced stream processing scenarios. We'll cover all types of operators, including TumblingWindow, SlidingWindow, and SessionWindow, ensuring that they are used appropriately within the stream processing pipeline.
Objective: Transform a stream of integers by doubling each number using the Map operator.
using Cortex.Streams;
var streamBuilder = StreamBuilder<int, int>.CreateNewStream("DoubleNumbersStream");
streamBuilder
.Stream()
.Map(x => x * 2)
.Sink(x => Console.WriteLine($"Doubled number: {x}"));
var stream = streamBuilder.Build();
stream.Start();
// Emitting numbers into the stream
stream.Emit(1); // Output: Doubled number: 2
stream.Emit(2); // Output: Doubled number: 4
stream.Emit(3); // Output: Doubled number: 6
stream.Stop();
Explanation:
- Stream Operator: Initiate the Stream to accept emits from the user.
- Map Operator: Transforms each input number by multiplying it by 2.
- Sink Operator: Outputs the transformed number to the console.
Objective: Filter out negative numbers from a stream using the Filter operator.
using Cortex.Streams;
var streamBuilder = StreamBuilder<int, int>.CreateNewStream("PositiveNumbersStream");
streamBuilder
.Stream()
.Filter(x => x >= 0)
.Sink(x => Console.WriteLine($"Positive number: {x}"));
var stream = streamBuilder.Build();
stream.Start();
// Emitting numbers into the stream
stream.Emit(-1); // No output
stream.Emit(0); // Output: Positive number: 0
stream.Emit(5); // Output: Positive number: 5
stream.Stop();
Explanation:
- Filter Operator: Allows only numbers greater than or equal to zero to pass through.
- Sink Operator: Outputs the positive numbers to the console.
Objective: Count occurrences of words in a stream using the Aggregate operator.
using Cortex.Streams;
using Cortex.States;
var streamBuilder = StreamBuilder<string, string>.CreateNewStream("WordCountStream");
streamBuilder
.Stream()
.AggregateSilently<string, int>(
keySelector: word => word,
aggregateFunction: (count, _) => count + 1
)
.Sink(word => Console.WriteLine($"Word: {word}"));
var stream = streamBuilder.Build();
stream.Start();
// Emitting words into the stream
stream.Emit("apple");
stream.Emit("banana");
stream.Emit("apple");
stream.Emit("orange");
stream.Emit("banana");
stream.Emit("apple");
// Expected Output: when you GetAll records from WordCountStream
// Word: apple, Count: 1
// Word: banana, Count: 1
// Word: apple, Count: 2
// Word: orange, Count: 1
// Word: banana, Count: 2
// Word: apple, Count: 3
stream.Stop();
Explanation:
- Aggregate Operator: Maintains a count of each unique word using an in-memory state store.
Objective: Compute the sum of numbers over a tumbling window of 5 seconds using the TumblingWindow operator.
using Cortex.Streams;
using System.Timers;
var stream = StreamBuilder<int, int>.CreateNewStream("SumTumblingWindowStream")
.Stream()
.TumblingWindow<string, int>(
keySelector: _ => "Sum", // Single window
windowDuration: TimeSpan.FromSeconds(5),
windowFunction: numbers => numbers.Sum()
)
.Sink(sum => Console.WriteLine($"Sum over window: {sum}"))
.Build();
var stream = streamBuilder.Build();
stream.Start();
// Emitting numbers into the stream every second
var timer = new Timer(1000);
int counter = 1;
timer.Elapsed += (sender, args) =>
{
if (counter <= 10)
{
stream.Emit(counter);
counter++;
}
else
{
timer.Stop();
stream.Stop();
}
};
timer.Start();
// Expected Output (after every 5 seconds):
// Sum over window: 15 (1+2+3+4+5)
// Sum over window: 40 (6+7+8+9+10)
Explanation:
- TumblingWindow Operator: Groups numbers into fixed, non-overlapping windows of 5 seconds.
- Window Function: Calculates the sum of numbers in each window.
- Sink Operator: Outputs the sum after each window closes.
Objective: Calculate the moving average of stock prices over a sliding window using the SlidingWindow operator.
using Cortex.Streams;
using Cortex.States;
public class StockPrice
{
public string Symbol { get; set; }
public double Price { get; set; }
}
var stream = StreamBuilder<StockPrice, StockPrice>.CreateNewStream("StockPriceStream")
.Stream()
.SlidingWindow<string, double>(
keySelector: stock => stock.Symbol,
windowSize: TimeSpan.FromSeconds(10),
advanceBy: TimeSpan.FromSeconds(5),
windowFunction: prices => prices.Average(p => p.Price)
)
.Sink(avgPrice => Console.WriteLine($"Moving average price: {avgPrice}"))
.Build();
stream.Start();
// Emitting stock prices every 2 seconds
var timer = new System.Timers.Timer(2000);
int tick = 0;
timer.Elapsed += (sender, args) =>
{
if (tick < 10)
{
var price = new StockPrice { Symbol = "MSFT", Price = 150 + tick };
stream.Emit(price);
tick++;
}
else
{
timer.Stop();
stream.Stop();
}
};
timer.Start();
// Expected Output:
// Every 5 seconds, the moving average over the last 10 seconds is printed.
Explanation:
- SlidingWindow Operator: Creates overlapping windows of 10 seconds, advancing every 5 seconds.
- Window Function: Calculates the average stock price within each window.
- Sink Operator: Outputs the moving average price.
Objective: Detect periods of user inactivity using the SessionWindow operator.
using Cortex.Streams;
using Cortex.States;
public class UserAction
{
public string UserId { get; set; }
public DateTime Timestamp { get; set; }
}
var stream = StreamBuilder<UserAction, UserAction>.CreateNewStream("UserSessionStream")
.Stream()
.SessionWindow<string, TimeSpan>(
keySelector: action => action.UserId,
inactivityGap: TimeSpan.FromSeconds(5),
windowFunction: actions =>
{
var sessionDuration = actions.Last().Timestamp - actions.First().Timestamp;
return sessionDuration;
}
)
.Sink(duration => Console.WriteLine($"Session duration: {duration.TotalSeconds} seconds"))
.Build();
stream.Start();
// Simulating user actions with inactivity gaps
stream.Emit(new UserAction { UserId = "User1", Timestamp = DateTime.UtcNow });
System.Threading.Thread.Sleep(2000); // Active within 5 seconds
stream.Emit(new UserAction { UserId = "User1", Timestamp = DateTime.UtcNow });
System.Threading.Thread.Sleep(6000); // Inactivity gap greater than 5 seconds
stream.Emit(new UserAction { UserId = "User1", Timestamp = DateTime.UtcNow });
// Expected Output:
// Session duration: 2 seconds (First session)
// Session duration: 0 seconds (Second session)
Explanation:
- SessionWindow Operator: Groups events into sessions based on user activity, closing a session if there's inactivity for more than 5 seconds.
- Window Function: Calculates the duration of each session.
- Sink Operator: Outputs the session duration.
Objective: Process IoT sensor data with multiple branches, applying different processing logic, without using windowing operators inside branches (since windowing inside branches is currently not supported in v1.0, support will arrive with v1.1).
using Cortex.Streams;
using Cortex.States;
public class SensorData
{
public string SensorId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
var stream = StreamBuilder<SensorData, SensorData>.CreateNewStream("IoTDataStream")
.Stream()
.AddBranch("TemperatureBranch", branch =>
{
// Filter temperature sensors and process
branch
.Filter(data => data.SensorId.StartsWith("Temp"))
.Map(data => data.Value)
.Sink(tempValue => Console.WriteLine($"Temperature reading: {tempValue}°C"));
})
.AddBranch("HumidityBranch", branch =>
{
// Filter humidity sensors and process
branch
.Filter(data => data.SensorId.StartsWith("Humid"))
.Map(data => data.Value)
.Sink(humidValue => Console.WriteLine($"Humidity reading: {humidValue}%"));
})
.AddBranch("AnomalyDetection", branch =>
{
// Detect anomalies
branch
.Filter(data => data.Value > 90) // Arbitrary anomaly condition
.Sink(data => Console.WriteLine($"Anomaly detected on sensor {data.SensorId} with value {data.Value}!"));
})
.Build();
stream.Start();
// Simulating sensor data
var sensors = new[] { "Temp1", "Humid1", "Temp2", "Humid2" };
var random = new Random();
var timer = new System.Timers.Timer(1000);
int ticks = 0;
timer.Elapsed += (sender, args) =>
{
if (ticks < 30)
{
foreach (var sensorId in sensors)
{
var data = new SensorData
{
SensorId = sensorId,
Value = random.NextDouble() * 100,
Timestamp = DateTime.UtcNow
};
stream.Emit(data);
}
ticks++;
}
else
{
timer.Stop();
stream.Stop();
}
};
timer.Start();
Explanation:
- Branching:
- TemperatureBranch:
- Filters temperature sensor data.
- Maps to the sensor value.
- Outputs the temperature readings.
- HumidityBranch:
- Filters humidity sensor data.
- Maps to the sensor value.
- Outputs the humidity readings.
- AnomalyDetection:
- Filters data where value exceeds 90.
- Outputs an anomaly alert.
- TemperatureBranch:
Note: Since windowing operators are not currently supported inside branches, we dont have any state or window in place for the branches, use can add Windowing before the Sink in the main pipeline.
Objective: Build a real-time analytics pipeline that reads data from Kafka, processes it, and writes results back to Kafka if the log message is ERROR
or WARNING
.
using Cortex.Streams;
using Cortex.Streams.Kafka;
public class LogMessage
{
public string Level { get; set; } // INFO, WARNING, ERROR
public string Message { get; set; }
public DateTime Timestamp { get; set; }
}
var kafkaSource = new KafkaSourceOperator<LogMessage>(
bootstrapServers: "localhost:9092",
topic: "raw-logs"
);
var kafkaSink = new KafkaSinkOperator<string>(
bootstrapServers: "localhost:9092",
topic: "processed-logs"
);
var stream = StreamBuilder<LogMessage, LogMessage>.CreateNewStream("LogProcessingStream")
.Stream(kafkaSource)
.Map(log => new { LogLevel = log.Level, FormattedMessage = $"[{log.Level}] {log.Timestamp}: {log.Message}" })
.AddBranch("ErrorAndWarningLogs", branch =>
{
branch
.Filter(log => log.LogLevel == "ERROR" || log.LogLevel == "WARNING")
.Map(log => log.FormattedMessage)
.Sink(kafkaSink);
})
.AddBranch("InfoLogs", branch =>
{
branch
.Filter(log => log.LogLevel == "INFO")
.Map(log => log.FormattedMessage)
.Sink(log => Console.WriteLine(log));
})
.Build();
stream.Start();
// The stream will now process logs from the "raw-logs" Kafka topic and output results.
Console.WriteLine("Press Enter to stop the stream.");
Console.ReadLine();
stream.Stop();
Explanation:
- KafkaSourceOperator: Reads log messages from a Kafka topic.
- Map Operator: Formats the log messages.
-
Branching:
-
ErrorAndWarningLogs:
- Filters logs by level (
ERROR
orWARNING
). - Sends the formatted message to another Kafka topic using KafkaSinkOperator.
- Filters logs by level (
-
InfoLogs:
- Filters logs with level
INFO
. - Outputs the formatted message to the console.
- Filters logs with level
-
ErrorAndWarningLogs:
Note: Ensure Kafka is running and the topics are properly configured.
Cortex Data Framework WIKI