Skip to content

Conversation

@chenquan
Copy link
Collaborator

@chenquan chenquan commented Nov 2, 2025

Summary by CodeRabbit

  • New Features

    • Added codec configuration for inputs, allowing optional automatic encoding of incoming messages.
    • Runtime now integrates input encoding so messages are encoded before buffering or downstream delivery.
  • Chores

    • Internal resource handling updated to support the new encoding/build flow.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 2, 2025

Walkthrough

Input/output/processor builder signatures were changed to accept a mutable &mut Resource; Resource.temporary gained RefCell interior mutability. A new InputEncoder wrapper and codec wiring were added to the input build/read path so inputs can optionally encode messages before downstream processing.

Changes

Cohort / File(s) Summary
Core: resource & codec docs
crates/arkflow-core/src/lib.rs, crates/arkflow-core/src/codec/mod.rs
Resource.temporary switched to RefCell<HashMap<...>> for interior mutability; CodecConfig doc comment changed ("Buffer configuration" → "Codec configuration"). A commented placeholder for codec storage exists.
Core: input trait & builders
crates/arkflow-core/src/input/mod.rs
Introduced InputEncoder trait and InputEncode wrapper (encodes messages when codec present). InputConfig gained decode: Option<CodecConfig>. InputBuilder::build / InputConfig::build signatures now take &mut Resource and return/produce Arc<dyn InputEncoder>.
Core: stream runtime
crates/arkflow-core/src/stream/mod.rs
Stream now holds Arc<dyn InputEncoder>; build-time calls pass &mut Resource. Incoming messages are encoded via input.encode(...) before buffering/sending. Accesses to resource.temporary use RefCell borrow patterns.
Core: outputs/processors/pipeline
crates/arkflow-core/src/output/mod.rs, crates/arkflow-core/src/processor/mod.rs, crates/arkflow-core/src/pipeline/mod.rs
All build methods and builder trait signatures updated to accept &mut Resource instead of &Resource.
Plugin inputs: builders adjusted
crates/arkflow-plugin/src/input/*.rs (e.g. file.rs, generate.rs, http.rs, kafka.rs, memory.rs, modbus.rs, mqtt.rs, multiple_inputs.rs, nats.rs, pulsar.rs, redis.rs, sql.rs, websocket.rs)
Many input builders updated to accept &mut Resource and pass it through to input constructors; multiple_inputs now stores Vec<Arc<dyn InputEncoder>> and uses resource.input_names borrow patterns.
Plugin outputs: builders adjusted
crates/arkflow-plugin/src/output/*.rs (e.g. drop.rs, http.rs, kafka.rs, mqtt.rs, nats.rs, pulsar.rs, redis.rs, sql.rs, stdout.rs)
Output builder build signatures changed to take _resource: &mut Resource.
Plugin processors: builders adjusted
crates/arkflow-plugin/src/processor/*.rs (e.g. batch.rs, json.rs, protobuf.rs, python.rs, sql.rs, vrl.rs)
Processor builder build signatures changed to accept _resource: &mut Resource. SqlProcessor::new updated to take &mut Resource and uses resource.temporary.get_mut() for lookups; a private BallistaConfig was removed.

Sequence Diagram(s)

sequenceDiagram
    participant Builder as System Builder
    participant Resource as Resource (RefCell)
    participant InputB as InputBuilder
    participant Codec as Codec (optional)

    Builder->>Resource: pass &mut Resource
    Builder->>InputB: build(name, config, &mut Resource)
    activate InputB
    InputB->>Resource: borrow/insert codec via RefCell (if decode present)
    InputB->>Codec: init/lookup codec (optional)
    InputB-->>Builder: Arc<dyn InputEncoder>
    deactivate InputB
Loading
sequenceDiagram
    participant Runtime as Stream::run()
    participant Input as Arc<dyn InputEncoder>
    participant Processor as Processor
    participant Output as Buffer/Output

    loop process messages
        Runtime->>Input: read()
        Input-->>Runtime: msg
        Runtime->>Input: encode(msg)
        alt encode success
            Input-->>Runtime: encoded_msg
            Runtime->>Processor: process(encoded_msg)
            Processor->>Output: send(encoded_msg)
        else encode error
            Input-->>Runtime: Error
            Runtime->>Runtime: log and skip
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Pay close attention to RefCell borrow patterns in Resource (risk of runtime panics).
  • Review changes to InputEncoder/InputEncode behavior and trait object conversions across modules.
  • Verify all builder signature changes are consistently propagated and do not break external plugin/public API contracts.
  • Inspect SqlProcessor temporary lookup changes and any removed private structs.

Possibly related PRs

Suggested labels

enhancement

Poem

🐰 I hopped into code with a curious glance,
Borrowed a RefCell and gave codecs a chance.
Builders now borrow me, mutably brave,
Messages get dressed before they behave.
Hop, encode, deliver — a rabbit's small dance 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 19.15% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "feat(input/decode): supports decoding the input layer" accurately reflects the primary objective of the changeset. The raw summary confirms that the pull request introduces codec-related support to the input layer, including a new decode field in InputConfig, a new InputEncoder trait with encoding functionality, and integration of encoding logic into the message processing pipeline. The title is specific, concise, and clearly communicates the main architectural addition being made to support input decoding/encoding capabilities. While there is a minor terminology distinction in the implementation (using "encode" terminology via InputEncoder and .encode() methods), this represents the internal mechanism for executing the decode configuration, and the overall intent and scope are clearly aligned with the changeset.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/input-encode

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
crates/arkflow-plugin/src/input/generate.rs (1)

255-269: Add missing codec field to Resource initialization.

The Resource initialization at line 262-265 is missing the codec field that was added to the Resource struct. This will cause a compilation error.

Apply this diff to fix the issue:

             builder.build(
                 None,
                 &None,
-                &Resource {
+                &mut Resource {
                     temporary: Default::default(),
+                    codec: Default::default(),
                     input_names: RefCell::new(Default::default()),
                 },
             ),
crates/arkflow-plugin/src/processor/protobuf.rs (1)

415-457: Add missing codec field to Resource initializations in tests.

The Resource initializations at lines 420-423 and 430-433 are missing the codec field that was added to the Resource struct.

Apply this diff to fix both occurrences:

         let result = ProtobufToArrowProcessorBuilder.build(
             None,
             &None,
-            &Resource {
+            &mut Resource {
                 temporary: Default::default(),
+                codec: Default::default(),
                 input_names: RefCell::new(Default::default()),
             },
         );
         assert!(result.is_err());

         let result = ArrowToProtobufProcessorBuilder.build(
             None,
             &None,
-            &Resource {
+            &mut Resource {
                 temporary: Default::default(),
+                codec: Default::default(),
                 input_names: RefCell::new(Default::default()),
             },
         );
         assert!(result.is_err());

And at line 448-454:

         let result = ProtobufToArrowProcessorBuilder.build(
             None,
             &Some(config),
-            &Resource {
+            &mut Resource {
                 temporary: Default::default(),
+                codec: Default::default(),
                 input_names: RefCell::new(Default::default()),
             },
         );
crates/arkflow-plugin/src/processor/sql.rs (1)

252-265: Add missing codec field to Resource initialization in test.

The test Resource initialization is missing the codec field that was added to the Resource struct.

Apply this diff:

         let processor = SqlProcessor::new(
             SqlProcessorConfig {
                 query: "SELECT * FROM flow".to_string(),
                 table_name: None,
                 temporary_list: None,
             },
-            &Resource {
+            &mut Resource {
                 temporary: Default::default(),
+                codec: Default::default(),
                 input_names: RefCell::new(Default::default()),
             },
         )

This same fix needs to be applied to the other test cases at lines 291-303, 316-327, and 333-345.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1180c91 and b0c5b1a.

📒 Files selected for processing (35)
  • crates/arkflow-core/src/codec/mod.rs (1 hunks)
  • crates/arkflow-core/src/input/mod.rs (4 hunks)
  • crates/arkflow-core/src/lib.rs (2 hunks)
  • crates/arkflow-core/src/output/mod.rs (2 hunks)
  • crates/arkflow-core/src/pipeline/mod.rs (1 hunks)
  • crates/arkflow-core/src/processor/mod.rs (2 hunks)
  • crates/arkflow-core/src/stream/mod.rs (3 hunks)
  • crates/arkflow-plugin/src/input/file.rs (1 hunks)
  • crates/arkflow-plugin/src/input/generate.rs (2 hunks)
  • crates/arkflow-plugin/src/input/http.rs (1 hunks)
  • crates/arkflow-plugin/src/input/kafka.rs (1 hunks)
  • crates/arkflow-plugin/src/input/memory.rs (1 hunks)
  • crates/arkflow-plugin/src/input/modbus.rs (1 hunks)
  • crates/arkflow-plugin/src/input/mqtt.rs (1 hunks)
  • crates/arkflow-plugin/src/input/multiple_inputs.rs (2 hunks)
  • crates/arkflow-plugin/src/input/nats.rs (1 hunks)
  • crates/arkflow-plugin/src/input/pulsar.rs (1 hunks)
  • crates/arkflow-plugin/src/input/redis.rs (1 hunks)
  • crates/arkflow-plugin/src/input/sql.rs (1 hunks)
  • crates/arkflow-plugin/src/input/websocket.rs (1 hunks)
  • crates/arkflow-plugin/src/output/drop.rs (1 hunks)
  • crates/arkflow-plugin/src/output/http.rs (1 hunks)
  • crates/arkflow-plugin/src/output/kafka.rs (1 hunks)
  • crates/arkflow-plugin/src/output/mqtt.rs (1 hunks)
  • crates/arkflow-plugin/src/output/nats.rs (1 hunks)
  • crates/arkflow-plugin/src/output/pulsar.rs (1 hunks)
  • crates/arkflow-plugin/src/output/redis.rs (1 hunks)
  • crates/arkflow-plugin/src/output/sql.rs (1 hunks)
  • crates/arkflow-plugin/src/output/stdout.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/batch.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/json.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/protobuf.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/python.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/sql.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/vrl.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
crates/arkflow-core/src/processor/mod.rs (4)
crates/arkflow-core/src/input/mod.rs (2)
  • build (32-37)
  • build (125-150)
crates/arkflow-core/src/output/mod.rs (2)
  • build (54-65)
  • build (69-74)
crates/arkflow-core/src/pipeline/mod.rs (1)
  • build (69-75)
crates/arkflow-plugin/src/processor/protobuf.rs (2)
  • build (195-209)
  • build (213-227)
crates/arkflow-plugin/src/processor/sql.rs (3)
crates/arkflow-core/src/stream/mod.rs (1)
  • new (53-73)
crates/arkflow-plugin/src/buffer/window.rs (1)
  • resource (70-75)
crates/arkflow-plugin/src/temporary/redis.rs (1)
  • Self (88-88)
crates/arkflow-core/src/input/mod.rs (4)
crates/arkflow-core/src/codec/mod.rs (1)
  • encode (24-24)
crates/arkflow-core/src/output/mod.rs (2)
  • build (54-65)
  • build (69-74)
crates/arkflow-core/src/processor/mod.rs (2)
  • build (52-63)
  • build (67-72)
crates/arkflow-core/src/stream/mod.rs (1)
  • build (450-493)
crates/arkflow-core/src/stream/mod.rs (19)
crates/arkflow-core/src/codec/mod.rs (1)
  • encode (24-24)
crates/arkflow-core/src/input/mod.rs (1)
  • encode (69-71)
crates/arkflow-plugin/src/codec/json.rs (1)
  • encode (24-37)
crates/arkflow-core/src/output/mod.rs (1)
  • write (36-36)
crates/arkflow-plugin/src/output/http.rs (1)
  • write (95-110)
crates/arkflow-plugin/src/output/kafka.rs (1)
  • write (175-241)
crates/arkflow-plugin/src/output/mqtt.rs (1)
  • write (122-176)
crates/arkflow-plugin/src/output/nats.rs (1)
  • write (126-197)
crates/arkflow-plugin/src/output/redis.rs (6)
  • write (84-159)
  • pipeline (107-107)
  • pipeline (117-117)
  • pipeline (138-138)
  • pipeline (149-149)
  • pipeline (154-155)
crates/arkflow-plugin/src/output/sql.rs (1)
  • write (259-265)
crates/arkflow-plugin/src/output/stdout.rs (2)
  • write (61-63)
  • write (138-140)
crates/arkflow-plugin/src/buffer/window.rs (2)
  • write (160-171)
  • resource (70-75)
crates/arkflow-plugin/src/buffer/session_window.rs (1)
  • write (106-111)
crates/arkflow-plugin/src/buffer/sliding_window.rs (1)
  • write (165-169)
crates/arkflow-plugin/src/buffer/tumbling_window.rs (1)
  • write (99-101)
crates/arkflow-core/src/buffer/mod.rs (1)
  • write (28-28)
crates/arkflow-plugin/src/buffer/memory.rs (1)
  • write (145-163)
crates/arkflow-plugin/src/temporary/redis.rs (1)
  • pipeline (78-79)
crates/arkflow-plugin/src/processor/vrl.rs (1)
  • output (69-72)
crates/arkflow-plugin/src/input/multiple_inputs.rs (1)
crates/arkflow-plugin/src/buffer/window.rs (1)
  • resource (70-75)
crates/arkflow-core/src/output/mod.rs (8)
crates/arkflow-core/src/input/mod.rs (2)
  • build (32-37)
  • build (125-150)
crates/arkflow-core/src/processor/mod.rs (2)
  • build (52-63)
  • build (67-72)
crates/arkflow-core/src/stream/mod.rs (1)
  • build (450-493)
crates/arkflow-plugin/src/output/drop.rs (1)
  • build (48-55)
crates/arkflow-plugin/src/output/http.rs (1)
  • build (218-232)
crates/arkflow-plugin/src/output/kafka.rs (1)
  • build (290-305)
crates/arkflow-plugin/src/output/mqtt.rs (1)
  • build (200-213)
crates/arkflow-plugin/src/output/stdout.rs (1)
  • build (99-112)
crates/arkflow-core/src/pipeline/mod.rs (4)
crates/arkflow-core/src/input/mod.rs (2)
  • build (32-37)
  • build (125-150)
crates/arkflow-core/src/output/mod.rs (2)
  • build (54-65)
  • build (69-74)
crates/arkflow-core/src/processor/mod.rs (2)
  • build (52-63)
  • build (67-72)
crates/arkflow-core/src/stream/mod.rs (1)
  • build (450-493)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (52)
crates/arkflow-core/src/codec/mod.rs (1)

45-45: LGTM! Documentation fix accurately describes the configuration.

The doc comment now correctly describes CodecConfig instead of referencing "Buffer configuration".

crates/arkflow-plugin/src/processor/vrl.rs (1)

82-87: LGTM! Signature updated to support mutable resource access.

The parameter type change from &Resource to &mut Resource aligns with the broader codebase shift to enable in-place resource mutations during build-time. Since the parameter is currently unused, there's no behavioral change.

crates/arkflow-plugin/src/processor/python.rs (1)

140-145: LGTM! Signature updated to support mutable resource access.

The parameter type change from &Resource to &mut Resource aligns with the broader codebase shift to enable in-place resource mutations during build-time. Since the parameter is currently unused, there's no behavioral change.

crates/arkflow-plugin/src/processor/json.rs (2)

111-116: LGTM! Signature updated to support mutable resource access.

The parameter type change from &Resource to &mut Resource aligns with the broader codebase shift to enable in-place resource mutations during build-time. Since the parameter is currently unused, there's no behavioral change.


130-135: LGTM! Signature updated to support mutable resource access.

The parameter type change from &Resource to &mut Resource aligns with the broader codebase shift to enable in-place resource mutations during build-time. Since the parameter is currently unused, there's no behavioral change.

crates/arkflow-plugin/src/output/drop.rs (1)

48-53: LGTM! Signature updated to support mutable resource access.

The parameter type change from &Resource to &mut Resource aligns with the broader codebase shift to enable in-place resource mutations during build-time. Since the parameter is currently unused, there's no behavioral change.

crates/arkflow-plugin/src/input/websocket.rs (1)

240-245: LGTM! Signature updated to support mutable resource access.

The parameter type change from &Resource to &mut Resource aligns with the broader codebase shift to enable in-place resource mutations during build-time. Since the parameter is currently unused, there's no behavioral change.

crates/arkflow-plugin/src/output/kafka.rs (1)

290-295: LGTM! Signature updated to support mutable resource access.

The parameter type change from &Resource to &mut Resource aligns with the broader codebase shift to enable in-place resource mutations during build-time. Since the parameter is currently unused, there's no behavioral change.

crates/arkflow-plugin/src/input/modbus.rs (1)

215-220: LGTM! Signature updated to support mutable resource access.

The parameter type change from &Resource to &mut Resource aligns with the broader codebase shift to enable in-place resource mutations during build-time. Since the parameter is currently unused, there's no behavioral change.

crates/arkflow-plugin/src/input/mqtt.rs (1)

246-260: LGTM! Signature update aligns with interior mutability refactor.

The change from &Resource to &mut Resource is consistent with the broader PR objective to introduce interior mutability for Resource fields (codec, temporary). This mechanical update prepares the builder for future codec initialization logic.

crates/arkflow-plugin/src/input/file.rs (1)

457-474: LGTM! Signature update aligns with interior mutability refactor.

The change from &Resource to &mut Resource is consistent with the broader PR objective to introduce interior mutability for Resource fields. This mechanical update prepares the builder for future codec initialization logic.

crates/arkflow-plugin/src/output/pulsar.rs (1)

183-209: LGTM! Signature update aligns with interior mutability refactor.

The change from &Resource to &mut Resource is consistent with the broader PR objective to introduce interior mutability for Resource fields. This mechanical update prepares the builder for future codec initialization logic.

crates/arkflow-plugin/src/input/kafka.rs (1)

210-226: LGTM! Signature update aligns with interior mutability refactor.

The change from &Resource to &mut Resource is consistent with the broader PR objective to introduce interior mutability for Resource fields. This mechanical update prepares the builder for future codec initialization logic.

crates/arkflow-core/src/lib.rs (2)

17-17: LGTM! Codec import added for new field.

The import supports the new codec field in the Resource struct, enabling codec functionality as outlined in the PR objectives.


80-85: Original thread-safety concern is unfounded; actual issue is missing codec field in struct instantiations.

The review comment's concern about RefCell not being thread-safe is incorrect for this codebase. Resource is never wrapped in Arc, is always passed by &mut reference, and never crosses task or thread boundaries—the linear initialization pattern is appropriate for RefCell.

However, the struct definition now includes a codec field that was not added to 19+ instantiation sites across the codebase, causing compilation failures:

  • crates/arkflow-core/src/stream/mod.rs:451
  • crates/arkflow-plugin/src/processor/sql.rs:260, 298, 323, 340
  • crates/arkflow-plugin/src/processor/json.rs:171, 208, 236, 260, 270, 298, 308
  • crates/arkflow-plugin/src/processor/protobuf.rs:420, 430, 451
  • crates/arkflow-plugin/tests/pulsar_tests.rs:282, 304, 328, 349
  • crates/arkflow-plugin/src/input/generate.rs:262

Add codec: RefCell::default(), to all Resource {} instantiations.

Likely an incorrect or invalid review comment.

crates/arkflow-plugin/src/output/stdout.rs (1)

96-113: LGTM! Signature update aligns with interior mutability refactor.

The change from &Resource to &mut Resource is consistent with the broader PR objective to introduce interior mutability for Resource fields. This mechanical update prepares the builder for future codec initialization logic.

crates/arkflow-plugin/src/processor/batch.rs (1)

120-136: LGTM! Signature update aligns with interior mutability refactor.

The change from &Resource to &mut Resource is consistent with the broader PR objective to introduce interior mutability for Resource fields. This mechanical update prepares the builder for future codec initialization logic.

crates/arkflow-plugin/src/input/http.rs (1)

182-199: LGTM! Signature update aligns with interior mutability refactor.

The change from &Resource to &mut Resource is consistent with the broader PR objective to introduce interior mutability for Resource fields. This mechanical update prepares the builder for future codec initialization logic.

crates/arkflow-plugin/src/output/redis.rs (1)

176-181: LGTM: Signature updated to support mutable Resource.

The builder signature change from &Resource to &mut Resource aligns with the repository-wide refactoring to enable interior mutability and codec support. The parameter is currently unused, but this prepares the infrastructure for future codec initialization during the build phase.

crates/arkflow-plugin/src/output/sql.rs (1)

410-416: LGTM: Consistent signature update for mutable Resource.

The signature change aligns with the project-wide Resource mutability refactoring mentioned in the PR objectives.

crates/arkflow-plugin/src/input/redis.rs (1)

419-425: LGTM: Signature updated for mutable Resource.

Consistent with the architectural changes to support codec initialization and interior mutability of Resource.

crates/arkflow-plugin/src/input/sql.rs (1)

321-327: LGTM: Builder signature updated to accept mutable Resource.

The change is consistent with the broader refactoring to enable Resource mutation during input/output construction.

crates/arkflow-plugin/src/output/mqtt.rs (1)

199-205: LGTM: Signature updated for mutable Resource.

Consistent with the repository-wide changes to support interior mutability and codec handling.

crates/arkflow-plugin/src/output/nats.rs (1)

219-225: LGTM: Builder signature updated for mutable Resource.

The change aligns with the architectural refactoring described in the PR objectives.

crates/arkflow-plugin/src/output/http.rs (1)

217-223: LGTM: Signature updated to support mutable Resource.

Consistent with the broader Resource mutability changes across all builders.

crates/arkflow-plugin/src/input/pulsar.rs (1)

266-272: LGTM: Builder signature updated for mutable Resource.

The signature change is consistent with the project-wide refactoring to enable codec support and interior mutability.

crates/arkflow-plugin/src/input/nats.rs (1)

417-431: LGTM: Signature updated to support mutable Resource.

The builder signature change from &Resource to &mut Resource aligns with the project-wide refactoring to support interior mutability of Resource. The parameter remains unused in this builder, which is appropriate as NATS input construction doesn't require Resource mutation.

crates/arkflow-plugin/src/input/memory.rs (1)

107-123: LGTM: Signature updated for API consistency.

The signature change to accept &mut Resource maintains consistency with the updated InputBuilder trait across the codebase. The parameter remains unused here, which is correct for the memory input builder.

crates/arkflow-core/src/pipeline/mod.rs (1)

67-76: LGTM: Enables processor builders to mutate Resource.

The signature change from &Resource to &mut Resource allows processor builders to modify shared Resource state during pipeline construction. This is essential for the codec initialization and other build-time mutations introduced in this PR.

crates/arkflow-plugin/src/input/generate.rs (2)

91-108: LGTM: Signature updated for API consistency.

The builder signature correctly updated to accept &mut Resource. The parameter is unused in this builder, which is appropriate for the generate input.


228-253: LGTM: Test updated with new Resource structure.

The test correctly initializes Resource with the new codec field.

crates/arkflow-plugin/src/input/multiple_inputs.rs (2)

121-158: LGTM: Correct RefCell usage for input name collection and validation.

The implementation correctly uses borrow_mut() to collect input names during the loop (line 136) and borrow() for the subsequent validation (line 140). The borrows are properly scoped, preventing potential panics from overlapping mutable and immutable borrows.


160-177: LGTM: Builder signature updated correctly.

The signature change enables the builder to pass mutable Resource to MultipleInputs::new, supporting the interior mutability pattern.

crates/arkflow-plugin/src/processor/protobuf.rs (2)

193-210: LGTM: Signature updated for API consistency.

The builder signature correctly updated to accept &mut Resource, aligning with the ProcessorBuilder trait changes.


211-228: LGTM: Signature updated for API consistency.

The builder signature correctly updated to accept &mut Resource, maintaining consistency with other processor builders.

crates/arkflow-core/src/processor/mod.rs (2)

50-64: LGTM: Core trait updated to support mutable Resource.

The signature change enables processor builders to mutate Resource during construction, aligning with the interior mutability pattern introduced across the codebase.


66-73: LGTM: Trait definition updated consistently.

The ProcessorBuilder trait signature now accepts &mut Resource, enabling implementations to modify shared Resource state during processor construction.

crates/arkflow-plugin/src/processor/sql.rs (2)

65-100: LGTM: Correct RefCell usage for temporary access.

The implementation correctly uses resource.temporary.get_mut() to access the temporary map during processor construction. This is appropriate as the Resource is being mutated during the build phase.


222-239: LGTM: Builder signature updated to enable Resource mutation.

The builder correctly passes &mut Resource to SqlProcessor::new, enabling the processor to access and potentially modify Resource state during construction.

crates/arkflow-core/src/output/mod.rs (2)

54-54: LGTM: Signature update aligns with interior mutability pattern.

The change from &Resource to &mut Resource is consistent with the broader architectural shift to enable in-place mutations during the build phase.


68-74: LGTM: Trait signature correctly updated.

The OutputBuilder trait's build method now accepts &mut Resource, maintaining consistency with other builder traits across the codebase (Input, Pipeline, Processor).

crates/arkflow-core/src/input/mod.rs (5)

19-19: LGTM: Import added for codec support.


31-37: LGTM: Signature update aligns with mutable Resource pattern.


56-71: LGTM: Default encode implementation provides sensible no-op behavior.

The documentation is clear and the default implementation allows inputs without codec support to work unchanged.


118-118: LGTM: Decode field enables per-input codec configuration.


125-125: LGTM: Signature update aligns with mutable Resource pattern.

crates/arkflow-core/src/stream/mod.rs (6)

83-85: LGTM: Correct RefCell access pattern for iteration.

Using .get_mut() for mutable access to the interior-mutable temporary map is appropriate here.


162-168: LGTM: Encoding errors are handled gracefully.

The error handling logs the failure and continues processing, which prevents a single encoding error from halting the entire stream. The use of unwrap() at line 168 is safe because the error case is already handled above.


170-170: LGTM: Encoded messages correctly propagated to both buffer and direct paths.

Both the buffered path (line 170) and direct send path (line 175) now use the encoded message, ensuring consistency in downstream processing.

Also applies to: 175-175


451-455: LGTM: Resource initialization with interior mutability.

Initializing all fields with RefCell::default() enables the interior mutability pattern required for in-place mutations during the build phase.


457-468: LGTM: Temporary configuration correctly uses RefCell methods.

The pattern of using .replace() to initialize the HashMap and then .get_mut() for insertions is correct for RefCell usage.


470-477: LGTM: Build calls correctly pass mutable Resource reference.

All build method calls now pass &mut resource, consistent with the updated trait signatures across Input, Pipeline, and Output modules.

Comment on lines 127 to 140
if let Some(codec_config) = self.decode.as_ref() {
if !resource
.codec
.clone()
.get_mut()
.contains_key(&codec_config.codec_type)
{
let codec_new = codec_config.build(resource)?;
resource
.codec
.get_mut()
.insert(codec_config.codec_type.clone(), codec_new);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: .clone() on RefCell breaks duplicate codec detection.

Cloning the RefCell at line 130 creates a new RefCell instance with a copy of the inner HashMap. Calling .get_mut() on this clone checks the cloned map, not the original resource.codec. This means the duplicate check always fails, and codecs will be recreated on every build even if already present.

Apply this diff to fix the logic:

 if let Some(codec_config) = self.decode.as_ref() {
     if !resource
         .codec
-        .clone()
-        .get_mut()
+        .borrow()
         .contains_key(&codec_config.codec_type)
     {
         let codec_new = codec_config.build(resource)?;
         resource
             .codec
             .get_mut()
             .insert(codec_config.codec_type.clone(), codec_new);
     }
 }

Use .borrow() for the read-only check, and reserve .get_mut() for the mutation at line 137.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(codec_config) = self.decode.as_ref() {
if !resource
.codec
.clone()
.get_mut()
.contains_key(&codec_config.codec_type)
{
let codec_new = codec_config.build(resource)?;
resource
.codec
.get_mut()
.insert(codec_config.codec_type.clone(), codec_new);
}
}
if let Some(codec_config) = self.decode.as_ref() {
if !resource
.codec
.borrow()
.contains_key(&codec_config.codec_type)
{
let codec_new = codec_config.build(resource)?;
resource
.codec
.get_mut()
.insert(codec_config.codec_type.clone(), codec_new);
}
}
🤖 Prompt for AI Agents
In crates/arkflow-core/src/input/mod.rs around lines 127 to 140, the code clones
the RefCell at line 130 which causes contains_key to inspect a cloned map so
duplicate codecs are never detected; change the read-only check to use
resource.codec.borrow().contains_key(&codec_config.codec_type) and only call
resource.codec.get_mut() when you need to insert the new codec (i.e., build the
codec, then acquire get_mut() once and insert), ensuring the borrow for reading
and the mutable access for insertion are used correctly without cloning the
RefCell.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
crates/arkflow-plugin/src/processor/protobuf.rs (1)

417-456: Update tests to pass a mutable Resource.

ProcessorBuilder::build now requires &mut Resource, but the tests still pass immutable temporaries like &Resource { … }, which no longer compile. Please allocate a Resource variable and pass it mutably in each case.

-        let result = ProtobufToArrowProcessorBuilder.build(
-            None,
-            &None,
-            &Resource {
-                temporary: Default::default(),
-                input_names: RefCell::new(Default::default()),
-            },
-        );
+        let mut resource = Resource {
+            temporary: Default::default(),
+            input_names: RefCell::new(Default::default()),
+        };
+        let result =
+            ProtobufToArrowProcessorBuilder.build(None, &None, &mut resource);
         assert!(result.is_err());

-        let result = ArrowToProtobufProcessorBuilder.build(
-            None,
-            &None,
-            &Resource {
-                temporary: Default::default(),
-                input_names: RefCell::new(Default::default()),
-            },
-        );
+        let mut resource = Resource {
+            temporary: Default::default(),
+            input_names: RefCell::new(Default::default()),
+        };
+        let result =
+            ArrowToProtobufProcessorBuilder.build(None, &None, &mut resource);
         assert!(result.is_err());
@@
-        let result = ProtobufToArrowProcessorBuilder.build(
-            None,
-            &Some(config),
-            &Resource {
-                temporary: Default::default(),
-                input_names: RefCell::new(Default::default()),
-            },
-        );
+        let mut resource = Resource {
+            temporary: Default::default(),
+            input_names: RefCell::new(Default::default()),
+        };
+        let result =
+            ProtobufToArrowProcessorBuilder.build(None, &Some(config), &mut resource);
crates/arkflow-plugin/src/processor/sql.rs (1)

252-363: Adjust SQL processor tests for &mut Resource.

SqlProcessor::new now expects &mut Resource, but the tests still pass immutable temporaries, which fails to compile. Please introduce a mutable Resource variable in each test before calling SqlProcessor::new.

-        let processor = SqlProcessor::new(
-            SqlProcessorConfig {
-                query: "SELECT * FROM flow".to_string(),
-                table_name: None,
-                temporary_list: None,
-            },
-            &Resource {
-                temporary: Default::default(),
-                input_names: RefCell::new(Default::default()),
-            },
-        )
+        let mut resource = Resource {
+            temporary: Default::default(),
+            input_names: RefCell::new(Default::default()),
+        };
+        let processor = SqlProcessor::new(
+            SqlProcessorConfig {
+                query: "SELECT * FROM flow".to_string(),
+                table_name: None,
+                temporary_list: None,
+            },
+            &mut resource,
+        )
         .unwrap();
@@
-        let processor = SqlProcessor::new(
+        let mut resource = Resource {
+            temporary: Default::default(),
+            input_names: RefCell::new(Default::default()),
+        };
+        let processor = SqlProcessor::new(
             SqlProcessorConfig {
                 query: "INVALID SQL QUERY".to_string(),
                 table_name: None,
                 temporary_list: None,
             },
-            &Resource {
-                temporary: Default::default(),
-                input_names: RefCell::new(Default::default()),
-            },
-        );
+            &mut resource,
+        );
@@
-        let processor = SqlProcessor::new(
+        let mut resource = Resource {
+            temporary: Default::default(),
+            input_names: RefCell::new(Default::default()),
+        };
+        let processor = SqlProcessor::new(
             SqlProcessorConfig {
                 query: "SELECT * FROM custom_table".to_string(),
                 table_name: Some("custom_table".to_string()),
                 temporary_list: None,
             },
-            &Resource {
-                temporary: Default::default(),
-                input_names: RefCell::new(Default::default()),
-            },
-        )
+            &mut resource,
+        )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b0c5b1a and 5287593.

📒 Files selected for processing (35)
  • crates/arkflow-core/src/codec/mod.rs (1 hunks)
  • crates/arkflow-core/src/input/mod.rs (4 hunks)
  • crates/arkflow-core/src/lib.rs (2 hunks)
  • crates/arkflow-core/src/output/mod.rs (2 hunks)
  • crates/arkflow-core/src/pipeline/mod.rs (1 hunks)
  • crates/arkflow-core/src/processor/mod.rs (2 hunks)
  • crates/arkflow-core/src/stream/mod.rs (7 hunks)
  • crates/arkflow-plugin/src/input/file.rs (3 hunks)
  • crates/arkflow-plugin/src/input/generate.rs (2 hunks)
  • crates/arkflow-plugin/src/input/http.rs (1 hunks)
  • crates/arkflow-plugin/src/input/kafka.rs (1 hunks)
  • crates/arkflow-plugin/src/input/memory.rs (1 hunks)
  • crates/arkflow-plugin/src/input/modbus.rs (1 hunks)
  • crates/arkflow-plugin/src/input/mqtt.rs (1 hunks)
  • crates/arkflow-plugin/src/input/multiple_inputs.rs (4 hunks)
  • crates/arkflow-plugin/src/input/nats.rs (1 hunks)
  • crates/arkflow-plugin/src/input/pulsar.rs (1 hunks)
  • crates/arkflow-plugin/src/input/redis.rs (1 hunks)
  • crates/arkflow-plugin/src/input/sql.rs (1 hunks)
  • crates/arkflow-plugin/src/input/websocket.rs (1 hunks)
  • crates/arkflow-plugin/src/output/drop.rs (1 hunks)
  • crates/arkflow-plugin/src/output/http.rs (1 hunks)
  • crates/arkflow-plugin/src/output/kafka.rs (1 hunks)
  • crates/arkflow-plugin/src/output/mqtt.rs (1 hunks)
  • crates/arkflow-plugin/src/output/nats.rs (1 hunks)
  • crates/arkflow-plugin/src/output/pulsar.rs (1 hunks)
  • crates/arkflow-plugin/src/output/redis.rs (1 hunks)
  • crates/arkflow-plugin/src/output/sql.rs (1 hunks)
  • crates/arkflow-plugin/src/output/stdout.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/batch.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/json.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/protobuf.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/python.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/sql.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/vrl.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (12)
  • crates/arkflow-plugin/src/processor/vrl.rs
  • crates/arkflow-plugin/src/processor/batch.rs
  • crates/arkflow-core/src/output/mod.rs
  • crates/arkflow-plugin/src/input/nats.rs
  • crates/arkflow-plugin/src/input/memory.rs
  • crates/arkflow-plugin/src/output/mqtt.rs
  • crates/arkflow-core/src/codec/mod.rs
  • crates/arkflow-plugin/src/output/drop.rs
  • crates/arkflow-plugin/src/input/websocket.rs
  • crates/arkflow-core/src/lib.rs
  • crates/arkflow-plugin/src/output/sql.rs
  • crates/arkflow-plugin/src/input/sql.rs
🧰 Additional context used
🧬 Code graph analysis (6)
crates/arkflow-plugin/src/input/file.rs (2)
crates/arkflow-core/src/stream/mod.rs (1)
  • new (53-73)
crates/arkflow-plugin/src/processor/sql.rs (1)
  • new (67-100)
crates/arkflow-core/src/processor/mod.rs (4)
crates/arkflow-core/src/input/mod.rs (2)
  • build (32-37)
  • build (108-126)
crates/arkflow-core/src/output/mod.rs (2)
  • build (54-65)
  • build (69-74)
crates/arkflow-core/src/pipeline/mod.rs (1)
  • build (69-75)
crates/arkflow-plugin/src/processor/protobuf.rs (2)
  • build (195-209)
  • build (213-227)
crates/arkflow-core/src/stream/mod.rs (5)
crates/arkflow-core/src/codec/mod.rs (1)
  • encode (24-24)
crates/arkflow-core/src/input/mod.rs (2)
  • encode (135-141)
  • encode (161-161)
crates/arkflow-plugin/src/buffer/window.rs (1)
  • write (160-171)
crates/arkflow-core/src/buffer/mod.rs (1)
  • write (28-28)
crates/arkflow-plugin/src/buffer/memory.rs (1)
  • write (145-163)
crates/arkflow-plugin/src/input/multiple_inputs.rs (1)
crates/arkflow-plugin/src/buffer/window.rs (1)
  • resource (70-75)
crates/arkflow-core/src/input/mod.rs (8)
crates/arkflow-plugin/src/input/generate.rs (4)
  • serde_json (105-105)
  • build (93-107)
  • new (41-50)
  • read (59-85)
crates/arkflow-core/src/stream/mod.rs (2)
  • build (450-492)
  • new (53-73)
crates/arkflow-plugin/src/input/http.rs (3)
  • build (184-198)
  • new (76-89)
  • read (158-169)
crates/arkflow-plugin/src/input/kafka.rs (3)
  • build (212-225)
  • new (61-67)
  • read (137-176)
crates/arkflow-plugin/src/input/memory.rs (3)
  • build (109-122)
  • new (46-61)
  • read (79-98)
crates/arkflow-plugin/src/input/mqtt.rs (3)
  • build (246-260)
  • new (70-81)
  • read (172-210)
crates/arkflow-core/src/codec/mod.rs (1)
  • encode (24-24)
crates/arkflow-core/src/lib.rs (1)
  • new_binary (97-99)
crates/arkflow-core/src/pipeline/mod.rs (4)
crates/arkflow-core/src/input/mod.rs (2)
  • build (32-37)
  • build (108-126)
crates/arkflow-core/src/output/mod.rs (2)
  • build (54-65)
  • build (69-74)
crates/arkflow-core/src/processor/mod.rs (2)
  • build (52-63)
  • build (67-72)
crates/arkflow-core/src/stream/mod.rs (1)
  • build (450-492)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (17)
crates/arkflow-plugin/src/output/kafka.rs (1)

294-294: LGTM - Signature updated to align with trait changes.

The build method signature correctly updated to accept &mut Resource, aligning with the repository-wide refactoring to enable resource mutation during build paths.

crates/arkflow-plugin/src/output/redis.rs (1)

180-180: LGTM - Signature updated to align with trait changes.

The build method signature correctly updated to accept &mut Resource, consistent with the broader refactoring across output builders.

crates/arkflow-plugin/src/input/http.rs (1)

188-188: LGTM - Signature updated to align with trait changes.

The build method signature correctly updated to accept &mut Resource, aligning with the repository-wide refactoring of input builders.

crates/arkflow-plugin/src/input/kafka.rs (1)

216-216: LGTM - Signature updated to align with trait changes.

The build method signature correctly updated to accept &mut Resource, consistent with the InputBuilder trait update.

crates/arkflow-plugin/src/input/modbus.rs (1)

219-219: LGTM - Signature updated to align with trait changes.

The build method signature correctly updated to accept &mut Resource, aligning with the InputBuilder trait update.

crates/arkflow-plugin/src/output/pulsar.rs (1)

188-188: LGTM - Signature updated to align with trait changes.

The build method signature correctly updated to accept &mut Resource, consistent with the OutputBuilder trait update.

crates/arkflow-plugin/src/input/mqtt.rs (1)

250-250: LGTM - Signature updated to align with trait changes.

The build method signature correctly updated to accept &mut Resource, aligning with the InputBuilder trait update.

crates/arkflow-plugin/src/output/stdout.rs (1)

103-103: LGTM - Signature updated to align with trait changes.

The build method signature correctly updated to accept &mut Resource, consistent with the OutputBuilder trait update.

crates/arkflow-core/src/pipeline/mod.rs (1)

69-75: LGTM! Signature update aligns with mutable resource pattern.

The change from &Resource to &mut Resource is consistent with the broader refactoring to enable in-place resource mutations during build. The mutable reference is correctly passed to processor_config.build(resource) at line 72.

crates/arkflow-plugin/src/input/redis.rs (1)

420-430: LGTM! Builder signature updated to match trait requirements.

The change to accept &mut Resource aligns with the updated InputBuilder trait signature. Although the resource is unused in this builder, the signature change maintains consistency across all input builders.

crates/arkflow-plugin/src/input/generate.rs (2)

93-107: LGTM! Builder signature correctly updated.

The change to &mut Resource aligns with the InputBuilder trait requirements and the broader refactoring to support mutable resource access during build.


238-248: LGTM! Test updated to match new Resource structure.

The addition of codec: Default::default() correctly reflects the new field in the Resource struct, ensuring tests remain compatible with the updated API.

crates/arkflow-plugin/src/processor/json.rs (1)

110-145: LGTM! Processor builder signatures correctly updated.

Both JsonToArrowProcessorBuilder and ArrowToJsonProcessorBuilder now accept &mut Resource, aligning with the updated ProcessorBuilder trait signature and maintaining consistency across all processor builders in the codebase.

crates/arkflow-plugin/src/input/pulsar.rs (1)

267-294: LGTM! Builder signature updated for trait compliance.

The change to &mut Resource correctly implements the updated InputBuilder trait signature and is consistent with the broader API migration across input builders.

crates/arkflow-plugin/src/output/nats.rs (1)

220-233: LGTM! Output builder signature correctly updated.

The change to &mut Resource aligns with the updated OutputBuilder trait signature and maintains consistency with other output builders throughout the codebase.

crates/arkflow-plugin/src/output/http.rs (1)

218-232: LGTM! HTTP output builder signature correctly updated.

The change to &mut Resource is consistent with the OutputBuilder trait requirements and the coordinated API migration across all output builders.

crates/arkflow-plugin/src/processor/python.rs (1)

140-154: LGTM! Python processor builder signature correctly updated.

The change to &mut Resource aligns with the updated ProcessorBuilder trait signature and is consistent with the broader refactoring to enable mutable resource access during processor construction.

Comment on lines +135 to +141
fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error> {
if let Some(e) = &self.encoder {
MessageBatch::new_binary(e.encode(msg)?)
} else {
Ok(msg)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Preserve MessageBatch metadata during encoding.

When an encoder is configured, encode rebuilds the batch with MessageBatch::new_binary(...) but never restores the original input name. Components like the window buffer rely on MessageBatch::get_input_name() for routing, so the metadata loss breaks multi-input setups.

Please capture the existing name before encoding and reapply it to the new batch.

 impl InputEncoder for InputEncode {
     fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error> {
         if let Some(e) = &self.encoder {
-            MessageBatch::new_binary(e.encode(msg)?)
+            let input_name = msg.get_input_name();
+            let encoded = e.encode(msg)?;
+            let mut batch = MessageBatch::new_binary(encoded)?;
+            batch.set_input_name(input_name);
+            Ok(batch)
         } else {
             Ok(msg)
         }
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error> {
if let Some(e) = &self.encoder {
MessageBatch::new_binary(e.encode(msg)?)
} else {
Ok(msg)
}
}
fn encode(&self, msg: MessageBatch) -> Result<MessageBatch, Error> {
if let Some(e) = &self.encoder {
let input_name = msg.get_input_name();
let encoded = e.encode(msg)?;
let mut batch = MessageBatch::new_binary(encoded)?;
batch.set_input_name(input_name);
Ok(batch)
} else {
Ok(msg)
}
}
🤖 Prompt for AI Agents
In crates/arkflow-core/src/input/mod.rs around lines 135 to 141, the encode
method currently replaces the batch with MessageBatch::new_binary(...) but loses
the original input name; capture the existing batch input name (e.g., via
msg.get_input_name() or equivalent) before encoding, create the new binary batch
from the encoder output, then set or restore the captured input name on the new
MessageBatch (using the appropriate setter or constructor) before returning so
metadata like input name is preserved for routing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants