Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/arkflow-core/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub trait CodecBuilder: Send + Sync {
) -> Result<Arc<dyn Codec>, Error>;
}

/// Buffer configuration
/// Codec configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CodecConfig {
#[serde(rename = "type")]
Expand Down
50 changes: 47 additions & 3 deletions crates/arkflow-core/src/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//!
//! The input component is responsible for receiving data from various sources such as message queues, file systems, HTTP endpoints, and so on.

use crate::codec::{Codec, CodecConfig, Encoder};
use crate::{Error, MessageBatch, Resource};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
Expand All @@ -32,7 +33,7 @@ pub trait InputBuilder: Send + Sync {
&self,
name: Option<&String>,
config: &Option<serde_json::Value>,
resource: &Resource,
resource: &mut Resource,
) -> Result<Arc<dyn Input>, Error>;
}

Expand Down Expand Up @@ -97,17 +98,25 @@ pub struct InputConfig {
#[serde(rename = "type")]
pub input_type: String,
pub name: Option<String>,
pub decode: Option<CodecConfig>,
#[serde(flatten)]
pub config: Option<serde_json::Value>,
}

impl InputConfig {
/// Building input components
pub fn build(&self, resource: &Resource) -> Result<Arc<dyn Input>, Error> {
pub fn build(&self, resource: &mut Resource) -> Result<Arc<dyn InputEncoder>, Error> {
let builders = INPUT_BUILDERS.read().unwrap();
let encoder = if let Some(codec_config) = self.decode.as_ref() {
let arc = codec_config.build(resource)?;
Some(arc)
} else {
None
};

if let Some(builder) = builders.get(&self.input_type) {
builder.build(self.name.as_ref(), &self.config, resource)
let input = builder.build(self.name.as_ref(), &self.config, resource)?;
Ok(Arc::new(InputEncode { input, encoder }))
} else {
Err(Error::Config(format!(
"Unknown input type: {}",
Expand All @@ -117,6 +126,41 @@ impl InputConfig {
}
}

pub struct InputEncode {
pub input: Arc<dyn Input>,
pub encoder: Option<Arc<dyn Codec>>,
}

impl InputEncoder for InputEncode {
fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error> {
if let Some(e) = &self.encoder {
MessageBatch::new_binary(e.encode(msg)?)
} else {
Ok(msg)
}
}
Comment on lines +135 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Preserve MessageBatch metadata during encoding.

When an encoder is configured, encode rebuilds the batch with MessageBatch::new_binary(...) but never restores the original input name. Components like the window buffer rely on MessageBatch::get_input_name() for routing, so the metadata loss breaks multi-input setups.

Please capture the existing name before encoding and reapply it to the new batch.

 impl InputEncoder for InputEncode {
     fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error> {
         if let Some(e) = &self.encoder {
-            MessageBatch::new_binary(e.encode(msg)?)
+            let input_name = msg.get_input_name();
+            let encoded = e.encode(msg)?;
+            let mut batch = MessageBatch::new_binary(encoded)?;
+            batch.set_input_name(input_name);
+            Ok(batch)
         } else {
             Ok(msg)
         }
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error> {
if let Some(e) = &self.encoder {
MessageBatch::new_binary(e.encode(msg)?)
} else {
Ok(msg)
}
}
fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error> {
if let Some(e) = &self.encoder {
let input_name = msg.get_input_name();
let encoded = e.encode(msg)?;
let mut batch = MessageBatch::new_binary(encoded)?;
batch.set_input_name(input_name);
Ok(batch)
} else {
Ok(msg)
}
}
🤖 Prompt for AI Agents
In crates/arkflow-core/src/input/mod.rs around lines 135 to 141, the encode
method currently replaces the batch with MessageBatch::new_binary(...) but loses
the original input name; capture the existing batch input name (e.g., via
msg.get_input_name() or equivalent) before encoding, create the new binary batch
from the encoder output, then set or restore the captured input name on the new
MessageBatch (using the appropriate setter or constructor) before returning so
metadata like input name is preserved for routing.

}

#[async_trait]
impl Input for InputEncode {
async fn connect(&self) -> Result<(), Error> {
self.input.connect().await
}

async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error> {
self.input.read().await
}

async fn close(&self) -> Result<(), Error> {
self.input.close().await
}
}

// #[async_trait]
pub trait InputEncoder: Input {
fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error>;
}

pub fn register_input_builder(
type_name: &str,
builder: Arc<dyn InputBuilder>,
Expand Down
4 changes: 3 additions & 1 deletion crates/arkflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Rust stream processing engine

use crate::codec::Codec;
use crate::temporary::Temporary;
use datafusion::arrow::array::{Array, ArrayRef, BinaryArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -78,7 +79,8 @@ pub enum Error {

#[derive(Clone)]
pub struct Resource {
pub temporary: HashMap<String, Arc<dyn Temporary>>,
pub temporary: RefCell<HashMap<String, Arc<dyn Temporary>>>,
// pub codec: RefCell<HashMap<String, Arc<dyn Codec>>>,
pub input_names: RefCell<Vec<String>>,
}

Expand Down
4 changes: 2 additions & 2 deletions crates/arkflow-core/src/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct OutputConfig {

impl OutputConfig {
/// Build the output component according to the configuration
pub fn build(&self, resource: &Resource) -> Result<Arc<dyn Output>, Error> {
pub fn build(&self, resource: &mut Resource) -> Result<Arc<dyn Output>, Error> {
let builders = OUTPUT_BUILDERS.read().unwrap();

if let Some(builder) = builders.get(&self.output_type) {
Expand All @@ -70,7 +70,7 @@ pub trait OutputBuilder: Send + Sync {
&self,
name: Option<&String>,
config: &Option<serde_json::Value>,
resource: &Resource,
resource: &mut Resource,
) -> Result<Arc<dyn Output>, Error>;
}

Expand Down
2 changes: 1 addition & 1 deletion crates/arkflow-core/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct PipelineConfig {

impl PipelineConfig {
/// Build pipelines based on your configuration
pub fn build(&self, resource: &Resource) -> Result<(Pipeline, u32), Error> {
pub fn build(&self, resource: &mut Resource) -> Result<(Pipeline, u32), Error> {
let mut processors = Vec::with_capacity(self.processors.len());
for processor_config in &self.processors {
processors.push(processor_config.build(resource)?);
Expand Down
4 changes: 2 additions & 2 deletions crates/arkflow-core/src/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct ProcessorConfig {

impl ProcessorConfig {
/// Build the processor components according to the configuration
pub fn build(&self, resource: &Resource) -> Result<Arc<dyn Processor>, Error> {
pub fn build(&self, resource: &mut Resource) -> Result<Arc<dyn Processor>, Error> {
let builders = PROCESSOR_BUILDERS.read().unwrap();

if let Some(builder) = builders.get(&self.processor_type) {
Expand All @@ -68,7 +68,7 @@ pub trait ProcessorBuilder: Send + Sync {
&self,
name: Option<&String>,
config: &Option<serde_json::Value>,
resource: &Resource,
resource: &mut Resource,
) -> Result<Arc<dyn Processor>, Error>;
}

Expand Down
44 changes: 27 additions & 17 deletions crates/arkflow-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! A stream is a complete data processing unit, containing input, pipeline, and output.

use crate::buffer::Buffer;
use crate::input::Ack;
use crate::input::{Ack, InputEncoder};
use crate::{input::Input, output::Output, pipeline::Pipeline, Error, MessageBatch, Resource};
use flume::{Receiver, Sender};
use std::cell::RefCell;
Expand All @@ -32,7 +32,7 @@ const BACKPRESSURE_THRESHOLD: u64 = 1024;

/// A stream structure, containing input, pipe, output, and an optional buffer.
pub struct Stream {
input: Arc<dyn Input>,
input: Arc<dyn InputEncoder>,
pipeline: Arc<Pipeline>,
output: Arc<dyn Output>,
error_output: Option<Arc<dyn Output>>,
Expand All @@ -51,7 +51,7 @@ enum ProcessorData {
impl Stream {
/// Create a new stream.
pub fn new(
input: Arc<dyn Input>,
input: Arc<dyn InputEncoder>,
pipeline: Pipeline,
output: Arc<dyn Output>,
error_output: Option<Arc<dyn Output>>,
Expand Down Expand Up @@ -80,7 +80,7 @@ impl Stream {
if let Some(ref error_output) = self.error_output {
error_output.connect().await?;
}
for (_, temporary) in &self.resource.temporary {
for (_, temporary) in self.resource.temporary.get_mut() {
temporary.connect().await?
}

Expand Down Expand Up @@ -147,7 +147,7 @@ impl Stream {

async fn do_input(
cancellation_token: CancellationToken,
input: Arc<dyn Input>,
input: Arc<dyn InputEncoder>,
input_sender: Sender<(MessageBatch, Arc<dyn Ack>)>,
buffer_option: Option<Arc<dyn Buffer>>,
) {
Expand All @@ -159,13 +159,20 @@ impl Stream {
result = input.read() =>{
match result {
Ok(msg) => {
let encoded_msg = input.encode(msg.0);
if let Err(e) = encoded_msg {
error!("Failed to encode message: {}", e);
continue;
}

let encoded_msg = encoded_msg.unwrap();
if let Some(buffer) = &buffer_option {
if let Err(e) = buffer.write(msg.0, msg.1).await {
if let Err(e) = buffer.write(encoded_msg, msg.1).await {
error!("Failed to send input message: {}", e);
break;
}
} else {
if let Err(e) = input_sender.send_async(msg).await {
if let Err(e) = input_sender.send_async((encoded_msg,msg.1)).await {
error!("Failed to send input message: {}", e);
break;
}
Expand Down Expand Up @@ -442,25 +449,28 @@ impl StreamConfig {
/// Build stream based on configuration
pub fn build(&self) -> Result<Stream, Error> {
let mut resource = Resource {
temporary: HashMap::new(),
temporary: RefCell::default(),
input_names: RefCell::default(),
};

if let Some(temporary_configs) = &self.temporary {
resource.temporary = HashMap::with_capacity(temporary_configs.len());
resource
.temporary
.replace(HashMap::with_capacity(temporary_configs.len()));
for temporary_config in temporary_configs {
resource.temporary.insert(
temporary_config.name.clone(),
temporary_config.build(&resource)?,
);
let temp = temporary_config.build(&resource)?;
resource
.temporary
.get_mut()
.insert(temporary_config.name.clone(), temp);
}
};

let input = self.input.build(&resource)?;
let (pipeline, thread_num) = self.pipeline.build(&resource)?;
let output = self.output.build(&resource)?;
let input = self.input.build(&mut resource)?;
let (pipeline, thread_num) = self.pipeline.build(&mut resource)?;
let output = self.output.build(&mut resource)?;
let error_output = if let Some(error_output_config) = &self.error_output {
Some(error_output_config.build(&resource)?)
Some(error_output_config.build(&mut resource)?)
} else {
None
};
Expand Down
12 changes: 8 additions & 4 deletions crates/arkflow-plugin/src/input/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

use crate::udf;
use arkflow_core::codec::Codec;
use arkflow_core::input::{Ack, Input, InputBuilder, NoopAck};
use arkflow_core::{input, Error, MessageBatch, Resource};
use async_trait::async_trait;
Expand Down Expand Up @@ -153,7 +154,11 @@ struct FileInput {
}

impl FileInput {
fn new(name: Option<&String>, config: FileInputConfig) -> Result<Self, Error> {
fn new(
name: Option<&String>,
config: FileInputConfig,
_resource: &mut Resource,
) -> Result<Self, Error> {
let cancellation_token = CancellationToken::new();
Ok(Self {
input_name: name.cloned(),
Expand Down Expand Up @@ -459,17 +464,16 @@ impl InputBuilder for FileBuilder {
&self,
name: Option<&String>,
config: &Option<Value>,
_resource: &Resource,
resource: &mut Resource,
) -> Result<Arc<dyn Input>, Error> {
if config.is_none() {
return Err(Error::Config(
"File input configuration is missing".to_string(),
));
}

let config: FileInputConfig = serde_json::from_value(config.clone().unwrap())
.map_err(|e| Error::Config(format!("Failed to parse File input config: {}", e)))?;
Ok(Arc::new(FileInput::new(name, config)?))
Ok(Arc::new(FileInput::new(name, config, resource)?))
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/arkflow-plugin/src/input/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl InputBuilder for GenerateInputBuilder {
&self,
name: Option<&String>,
config: &Option<serde_json::Value>,
_resource: &Resource,
_resource: &mut Resource,
) -> Result<Arc<dyn Input>, Error> {
if config.is_none() {
return Err(Error::Config(
Expand Down Expand Up @@ -241,6 +241,7 @@ mod tests {
&Some(config_json),
&Resource {
temporary: Default::default(),
codec: Default::default(),
input_names: RefCell::new(Default::default()),
},
)
Expand Down
2 changes: 1 addition & 1 deletion crates/arkflow-plugin/src/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl InputBuilder for HttpInputBuilder {
&self,
name: Option<&String>,
config: &Option<serde_json::Value>,
_resource: &Resource,
_resource: &mut Resource,
) -> Result<Arc<dyn Input>, Error> {
if config.is_none() {
return Err(Error::Config(
Expand Down
2 changes: 1 addition & 1 deletion crates/arkflow-plugin/src/input/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl InputBuilder for KafkaInputBuilder {
&self,
name: Option<&String>,
config: &Option<serde_json::Value>,
_resource: &Resource,
_resource: &mut Resource,
) -> Result<Arc<dyn Input>, Error> {
if config.is_none() {
return Err(Error::Config(
Expand Down
2 changes: 1 addition & 1 deletion crates/arkflow-plugin/src/input/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl InputBuilder for MemoryInputBuilder {
&self,
name: Option<&String>,
config: &Option<serde_json::Value>,
_resource: &Resource,
_resource: &mut Resource,
) -> Result<Arc<dyn Input>, Error> {
if config.is_none() {
return Err(Error::Config(
Expand Down
2 changes: 1 addition & 1 deletion crates/arkflow-plugin/src/input/modbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl InputBuilder for ModbusInputBuilder {
&self,
name: Option<&String>,
config: &Option<Value>,
_resource: &Resource,
_resource: &mut Resource,
) -> Result<Arc<dyn Input>, Error> {
let config = config
.as_ref()
Expand Down
2 changes: 1 addition & 1 deletion crates/arkflow-plugin/src/input/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl InputBuilder for MqttInputBuilder {
&self,
name: Option<&String>,
config: &Option<serde_json::Value>,
_resource: &Resource,
_resource: &mut Resource,
) -> Result<Arc<dyn Input>, Error> {
if config.is_none() {
return Err(Error::Config(
Expand Down
Loading
Loading