Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Producer for Analytics and User Metrics #1143

Merged
merged 13 commits into from
Feb 12, 2025
Merged

Conversation

emplam27
Copy link
Contributor

@emplam27 emplam27 commented Feb 6, 2025

What this PR does / why we need it:

Add kafka producer for Yorkie analytics MAU measurement.

yorkie server --kafka-address localhost:29092  --kafka-topic user-events

These are all default options except for the async option.
https://github.com/segmentio/kafka-go/blob/a558bb8629a8f5a920a8e1bb640064ad7efb7cd8/writer.go#L98-L159

Async: true
MaxAttempts: 10 // Default
BatchSize 100 // Default
...

Which issue(s) this PR fixes:

Address #1130

Special notes for your reviewer:

Does this PR introduce a user-facing change?:


Additional documentation:


Checklist:

  • Added relevant tests or not required
  • Addressed and resolved all CodeRabbit review comments
  • Didn't break anything

Summary by CodeRabbit

Summary by CodeRabbit

  • Documentation

    • Improved API documentation formatting for enhanced clarity and consistency.
  • New Features

    • Expanded client activation to support user identifiers and detailed metadata.
    • Integrated a Kafka-based message broker to enable robust event publishing.
    • Introduced enhanced configuration options for Kafka and updated container deployment settings, including an external Kafka listener.
  • Bug Fixes

    • Adjusted health check commands and configurations for improved service monitoring.
  • Chores

    • Updated dependencies for better performance and security.

@emplam27 emplam27 self-assigned this Feb 6, 2025
Copy link

coderabbitai bot commented Feb 6, 2025

Warning

Rate limit exceeded

@hackerwins has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 23 minutes and 47 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between e18d764 and cab7025.

📒 Files selected for processing (7)
  • cmd/yorkie/server.go (4 hunks)
  • server/backend/messagebroker/config.go (1 hunks)
  • server/backend/messagebroker/config_test.go (1 hunks)
  • server/backend/messagebroker/kafka.go (1 hunks)
  • server/backend/messagebroker/messagebroker.go (1 hunks)
  • server/config.go (4 hunks)
  • server/config.sample.yml (1 hunks)

Walkthrough

The pull request introduces formatting improvements across multiple OpenAPI YAML files by consolidating multi-line descriptions, standardizing quotation styles, and aligning indentations. It adds new client event types and extends the protocol definition with additional fields. Furthermore, the PR integrates Kafka support into various server components through new configuration options, dependency updates, and a dedicated message broker package. Test cases have been updated to accommodate the additional backend configuration parameter.

Changes

File(s) Change Summary
api/docs/.../admin.openapi.yaml, api/docs/.../cluster.openapi.yaml, api/docs/.../resources.openapi.yaml, api/docs/.../yorkie.openapi.yaml Reformatted OpenAPI specs: consolidated multi-line descriptions, standardized $ref quoting to single quotes, and adjusted indentations in info, components, and tags sections.
api/types/events/events.go, api/yorkie/v1/yorkie.proto Added new client event types (ClientEventType, ClientEvent) and extended ActivateClientRequest with additional fields (metadata).
build/docker/analytics/docker-compose.yml, build/docker/analytics/init-user-events-db.sql Updated Docker Compose for Kafka configuration (port mapping, listeners, healthcheck revisions) and removed user_agent and metadata columns from the SQL script.
cmd/yorkie/server.go, go.mod, server/config.go, server/config.sample.yml Introduced Kafka integration: added new CLI flags for Kafka, updated dependencies (e.g., added kafka-go), and included Kafka configuration fields in server settings.
server/backend/backend.go Updated the backend initialization to accept a new Kafka configuration parameter and incorporated a new MsgBroker field for message production.
server/backend/messagebroker/* Created a new message broker package with a Config struct, a DummyBroker for fallback, a KafkaBroker implementation for real Kafka integration, and defined the Message interface with its concrete implementation (UserEventMessage).
server/rpc/yorkie_server.go, server/server.go Updated the ActivateClient and server initialization flows to use the new message broker for publishing client activation events and to pass Kafka configuration accordingly.
Test files:
server/packs/packs_test.go, server/rpc/server_test.go, test/bench/push_pull_bench_test.go, test/complex/main_test.go, test/integration/housekeeping_test.go
Modified test setups to pass an additional nil parameter reflecting the updated backend.New function signature for Kafka configuration.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant YS as YorkieServer (RPC)
    participant BE as Backend
    participant MB as Message Broker
    participant KB as KafkaBroker

    C->>YS: ActivateClient request
    YS->>BE: Process activation
    BE->>MB: Produce UserEventMessage
    alt Kafka Configured
      MB->>KB: Forward message via Kafka writer
      KB-->>MB: Acknowledge message write
    else No Kafka Config
      MB-->>BE: Use DummyBroker (no-operation)
    end
    BE-->>YS: Return activation response
    YS-->>C: Send response
Loading

Suggested reviewers

  • hackerwins

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@hackerwins hackerwins force-pushed the kafka-config branch 3 times, most recently from 63b0091 to 7f4fd90 Compare February 6, 2025 11:28
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go Benchmark

Benchmark suite Current: cab7025 Previous: 87cce01 Ratio
BenchmarkDocument/constructor_test 1442 ns/op 1385 B/op 24 allocs/op 1442 ns/op 1385 B/op 24 allocs/op 1
BenchmarkDocument/constructor_test - ns/op 1442 ns/op 1442 ns/op 1
BenchmarkDocument/constructor_test - B/op 1385 B/op 1385 B/op 1
BenchmarkDocument/constructor_test - allocs/op 24 allocs/op 24 allocs/op 1
BenchmarkDocument/status_test 1032 ns/op 1353 B/op 22 allocs/op 1054 ns/op 1353 B/op 22 allocs/op 0.98
BenchmarkDocument/status_test - ns/op 1032 ns/op 1054 ns/op 0.98
BenchmarkDocument/status_test - B/op 1353 B/op 1353 B/op 1
BenchmarkDocument/status_test - allocs/op 22 allocs/op 22 allocs/op 1
BenchmarkDocument/equals_test 7807 ns/op 7562 B/op 129 allocs/op 7916 ns/op 7562 B/op 129 allocs/op 0.99
BenchmarkDocument/equals_test - ns/op 7807 ns/op 7916 ns/op 0.99
BenchmarkDocument/equals_test - B/op 7562 B/op 7562 B/op 1
BenchmarkDocument/equals_test - allocs/op 129 allocs/op 129 allocs/op 1
BenchmarkDocument/nested_update_test 16964 ns/op 12307 B/op 258 allocs/op 16906 ns/op 12307 B/op 258 allocs/op 1.00
BenchmarkDocument/nested_update_test - ns/op 16964 ns/op 16906 ns/op 1.00
BenchmarkDocument/nested_update_test - B/op 12307 B/op 12307 B/op 1
BenchmarkDocument/nested_update_test - allocs/op 258 allocs/op 258 allocs/op 1
BenchmarkDocument/delete_test 27163 ns/op 15788 B/op 339 allocs/op 23166 ns/op 15788 B/op 339 allocs/op 1.17
BenchmarkDocument/delete_test - ns/op 27163 ns/op 23166 ns/op 1.17
BenchmarkDocument/delete_test - B/op 15788 B/op 15788 B/op 1
BenchmarkDocument/delete_test - allocs/op 339 allocs/op 339 allocs/op 1
BenchmarkDocument/object_test 8583 ns/op 7033 B/op 118 allocs/op 8635 ns/op 7034 B/op 118 allocs/op 0.99
BenchmarkDocument/object_test - ns/op 8583 ns/op 8635 ns/op 0.99
BenchmarkDocument/object_test - B/op 7033 B/op 7034 B/op 1.00
BenchmarkDocument/object_test - allocs/op 118 allocs/op 118 allocs/op 1
BenchmarkDocument/array_test 28919 ns/op 12139 B/op 273 allocs/op 33323 ns/op 12139 B/op 273 allocs/op 0.87
BenchmarkDocument/array_test - ns/op 28919 ns/op 33323 ns/op 0.87
BenchmarkDocument/array_test - B/op 12139 B/op 12139 B/op 1
BenchmarkDocument/array_test - allocs/op 273 allocs/op 273 allocs/op 1
BenchmarkDocument/text_test 32554 ns/op 15188 B/op 484 allocs/op 32055 ns/op 15188 B/op 484 allocs/op 1.02
BenchmarkDocument/text_test - ns/op 32554 ns/op 32055 ns/op 1.02
BenchmarkDocument/text_test - B/op 15188 B/op 15188 B/op 1
BenchmarkDocument/text_test - allocs/op 484 allocs/op 484 allocs/op 1
BenchmarkDocument/text_composition_test 31768 ns/op 18701 B/op 501 allocs/op 32501 ns/op 18701 B/op 501 allocs/op 0.98
BenchmarkDocument/text_composition_test - ns/op 31768 ns/op 32501 ns/op 0.98
BenchmarkDocument/text_composition_test - B/op 18701 B/op 18701 B/op 1
BenchmarkDocument/text_composition_test - allocs/op 501 allocs/op 501 allocs/op 1
BenchmarkDocument/rich_text_test 86468 ns/op 39356 B/op 1146 allocs/op 90445 ns/op 39358 B/op 1146 allocs/op 0.96
BenchmarkDocument/rich_text_test - ns/op 86468 ns/op 90445 ns/op 0.96
BenchmarkDocument/rich_text_test - B/op 39356 B/op 39358 B/op 1.00
BenchmarkDocument/rich_text_test - allocs/op 1146 allocs/op 1146 allocs/op 1
BenchmarkDocument/counter_test 18296 ns/op 11810 B/op 253 allocs/op 18334 ns/op 11810 B/op 253 allocs/op 1.00
BenchmarkDocument/counter_test - ns/op 18296 ns/op 18334 ns/op 1.00
BenchmarkDocument/counter_test - B/op 11810 B/op 11810 B/op 1
BenchmarkDocument/counter_test - allocs/op 253 allocs/op 253 allocs/op 1
BenchmarkDocument/text_edit_gc_100 1400739 ns/op 864934 B/op 17283 allocs/op 1412188 ns/op 864908 B/op 17282 allocs/op 0.99
BenchmarkDocument/text_edit_gc_100 - ns/op 1400739 ns/op 1412188 ns/op 0.99
BenchmarkDocument/text_edit_gc_100 - B/op 864934 B/op 864908 B/op 1.00
BenchmarkDocument/text_edit_gc_100 - allocs/op 17283 allocs/op 17282 allocs/op 1.00
BenchmarkDocument/text_edit_gc_1000 53491352 ns/op 46838369 B/op 185588 allocs/op 57222048 ns/op 46838739 B/op 185584 allocs/op 0.93
BenchmarkDocument/text_edit_gc_1000 - ns/op 53491352 ns/op 57222048 ns/op 0.93
BenchmarkDocument/text_edit_gc_1000 - B/op 46838369 B/op 46838739 B/op 1.00
BenchmarkDocument/text_edit_gc_1000 - allocs/op 185588 allocs/op 185584 allocs/op 1.00
BenchmarkDocument/text_split_gc_100 2153489 ns/op 1581089 B/op 15950 allocs/op 2151513 ns/op 1581001 B/op 15951 allocs/op 1.00
BenchmarkDocument/text_split_gc_100 - ns/op 2153489 ns/op 2151513 ns/op 1.00
BenchmarkDocument/text_split_gc_100 - B/op 1581089 B/op 1581001 B/op 1.00
BenchmarkDocument/text_split_gc_100 - allocs/op 15950 allocs/op 15951 allocs/op 1.00
BenchmarkDocument/text_split_gc_1000 129013930 ns/op 137788880 B/op 185001 allocs/op 129782372 ns/op 137790460 B/op 184996 allocs/op 0.99
BenchmarkDocument/text_split_gc_1000 - ns/op 129013930 ns/op 129782372 ns/op 0.99
BenchmarkDocument/text_split_gc_1000 - B/op 137788880 B/op 137790460 B/op 1.00
BenchmarkDocument/text_split_gc_1000 - allocs/op 185001 allocs/op 184996 allocs/op 1.00
BenchmarkDocument/text_delete_all_10000 17057947 ns/op 10575838 B/op 56133 allocs/op 18300206 ns/op 10575821 B/op 56132 allocs/op 0.93
BenchmarkDocument/text_delete_all_10000 - ns/op 17057947 ns/op 18300206 ns/op 0.93
BenchmarkDocument/text_delete_all_10000 - B/op 10575838 B/op 10575821 B/op 1.00
BenchmarkDocument/text_delete_all_10000 - allocs/op 56133 allocs/op 56132 allocs/op 1.00
BenchmarkDocument/text_delete_all_100000 256681846 ns/op 105530652 B/op 566110 allocs/op 279901785 ns/op 105513620 B/op 566013 allocs/op 0.92
BenchmarkDocument/text_delete_all_100000 - ns/op 256681846 ns/op 279901785 ns/op 0.92
BenchmarkDocument/text_delete_all_100000 - B/op 105530652 B/op 105513620 B/op 1.00
BenchmarkDocument/text_delete_all_100000 - allocs/op 566110 allocs/op 566013 allocs/op 1.00
BenchmarkDocument/text_100 244616 ns/op 120939 B/op 5181 allocs/op 244301 ns/op 120908 B/op 5181 allocs/op 1.00
BenchmarkDocument/text_100 - ns/op 244616 ns/op 244301 ns/op 1.00
BenchmarkDocument/text_100 - B/op 120939 B/op 120908 B/op 1.00
BenchmarkDocument/text_100 - allocs/op 5181 allocs/op 5181 allocs/op 1
BenchmarkDocument/text_1000 2588277 ns/op 1156080 B/op 51084 allocs/op 2576707 ns/op 1156087 B/op 51084 allocs/op 1.00
BenchmarkDocument/text_1000 - ns/op 2588277 ns/op 2576707 ns/op 1.00
BenchmarkDocument/text_1000 - B/op 1156080 B/op 1156087 B/op 1.00
BenchmarkDocument/text_1000 - allocs/op 51084 allocs/op 51084 allocs/op 1
BenchmarkDocument/array_1000 1306871 ns/op 1088219 B/op 11879 allocs/op 1333395 ns/op 1088396 B/op 11879 allocs/op 0.98
BenchmarkDocument/array_1000 - ns/op 1306871 ns/op 1333395 ns/op 0.98
BenchmarkDocument/array_1000 - B/op 1088219 B/op 1088396 B/op 1.00
BenchmarkDocument/array_1000 - allocs/op 11879 allocs/op 11879 allocs/op 1
BenchmarkDocument/array_10000 13582903 ns/op 9887057 B/op 120727 allocs/op 13677413 ns/op 9888732 B/op 120733 allocs/op 0.99
BenchmarkDocument/array_10000 - ns/op 13582903 ns/op 13677413 ns/op 0.99
BenchmarkDocument/array_10000 - B/op 9887057 B/op 9888732 B/op 1.00
BenchmarkDocument/array_10000 - allocs/op 120727 allocs/op 120733 allocs/op 1.00
BenchmarkDocument/array_gc_100 137922 ns/op 99894 B/op 1266 allocs/op 138538 ns/op 99883 B/op 1266 allocs/op 1.00
BenchmarkDocument/array_gc_100 - ns/op 137922 ns/op 138538 ns/op 1.00
BenchmarkDocument/array_gc_100 - B/op 99894 B/op 99883 B/op 1.00
BenchmarkDocument/array_gc_100 - allocs/op 1266 allocs/op 1266 allocs/op 1
BenchmarkDocument/array_gc_1000 1475997 ns/op 1140987 B/op 12926 allocs/op 1486987 ns/op 1140940 B/op 12926 allocs/op 0.99
BenchmarkDocument/array_gc_1000 - ns/op 1475997 ns/op 1486987 ns/op 0.99
BenchmarkDocument/array_gc_1000 - B/op 1140987 B/op 1140940 B/op 1.00
BenchmarkDocument/array_gc_1000 - allocs/op 12926 allocs/op 12926 allocs/op 1
BenchmarkDocument/counter_1000 211983 ns/op 178137 B/op 5771 allocs/op 214761 ns/op 178135 B/op 5771 allocs/op 0.99
BenchmarkDocument/counter_1000 - ns/op 211983 ns/op 214761 ns/op 0.99
BenchmarkDocument/counter_1000 - B/op 178137 B/op 178135 B/op 1.00
BenchmarkDocument/counter_1000 - allocs/op 5771 allocs/op 5771 allocs/op 1
BenchmarkDocument/counter_10000 2219103 ns/op 2068954 B/op 59778 allocs/op 2239619 ns/op 2068955 B/op 59778 allocs/op 0.99
BenchmarkDocument/counter_10000 - ns/op 2219103 ns/op 2239619 ns/op 0.99
BenchmarkDocument/counter_10000 - B/op 2068954 B/op 2068955 B/op 1.00
BenchmarkDocument/counter_10000 - allocs/op 59778 allocs/op 59778 allocs/op 1
BenchmarkDocument/object_1000 1491721 ns/op 1437158 B/op 9925 allocs/op 1502137 ns/op 1437065 B/op 9924 allocs/op 0.99
BenchmarkDocument/object_1000 - ns/op 1491721 ns/op 1502137 ns/op 0.99
BenchmarkDocument/object_1000 - B/op 1437158 B/op 1437065 B/op 1.00
BenchmarkDocument/object_1000 - allocs/op 9925 allocs/op 9924 allocs/op 1.00
BenchmarkDocument/object_10000 14867919 ns/op 12350287 B/op 101225 allocs/op 14970583 ns/op 12351293 B/op 101231 allocs/op 0.99
BenchmarkDocument/object_10000 - ns/op 14867919 ns/op 14970583 ns/op 0.99
BenchmarkDocument/object_10000 - B/op 12350287 B/op 12351293 B/op 1.00
BenchmarkDocument/object_10000 - allocs/op 101225 allocs/op 101231 allocs/op 1.00
BenchmarkDocument/tree_100 1093405 ns/op 951029 B/op 6102 allocs/op 1086572 ns/op 951029 B/op 6102 allocs/op 1.01
BenchmarkDocument/tree_100 - ns/op 1093405 ns/op 1086572 ns/op 1.01
BenchmarkDocument/tree_100 - B/op 951029 B/op 951029 B/op 1
BenchmarkDocument/tree_100 - allocs/op 6102 allocs/op 6102 allocs/op 1
BenchmarkDocument/tree_1000 79931276 ns/op 86582390 B/op 60112 allocs/op 80004380 ns/op 86582440 B/op 60112 allocs/op 1.00
BenchmarkDocument/tree_1000 - ns/op 79931276 ns/op 80004380 ns/op 1.00
BenchmarkDocument/tree_1000 - B/op 86582390 B/op 86582440 B/op 1.00
BenchmarkDocument/tree_1000 - allocs/op 60112 allocs/op 60112 allocs/op 1
BenchmarkDocument/tree_10000 9670310661 ns/op 8581188944 B/op 600183 allocs/op 9735221656 ns/op 8581190000 B/op 600181 allocs/op 0.99
BenchmarkDocument/tree_10000 - ns/op 9670310661 ns/op 9735221656 ns/op 0.99
BenchmarkDocument/tree_10000 - B/op 8581188944 B/op 8581190000 B/op 1.00
BenchmarkDocument/tree_10000 - allocs/op 600183 allocs/op 600181 allocs/op 1.00
BenchmarkDocument/tree_delete_all_1000 84334261 ns/op 87566249 B/op 75287 allocs/op 83089152 ns/op 87566814 B/op 75289 allocs/op 1.01
BenchmarkDocument/tree_delete_all_1000 - ns/op 84334261 ns/op 83089152 ns/op 1.01
BenchmarkDocument/tree_delete_all_1000 - B/op 87566249 B/op 87566814 B/op 1.00
BenchmarkDocument/tree_delete_all_1000 - allocs/op 75287 allocs/op 75289 allocs/op 1.00
BenchmarkDocument/tree_edit_gc_100 4092936 ns/op 4147839 B/op 15146 allocs/op 4112848 ns/op 4147845 B/op 15146 allocs/op 1.00
BenchmarkDocument/tree_edit_gc_100 - ns/op 4092936 ns/op 4112848 ns/op 1.00
BenchmarkDocument/tree_edit_gc_100 - B/op 4147839 B/op 4147845 B/op 1.00
BenchmarkDocument/tree_edit_gc_100 - allocs/op 15146 allocs/op 15146 allocs/op 1
BenchmarkDocument/tree_edit_gc_1000 345031377 ns/op 384045600 B/op 154949 allocs/op 333423655 ns/op 384045973 B/op 154946 allocs/op 1.03
BenchmarkDocument/tree_edit_gc_1000 - ns/op 345031377 ns/op 333423655 ns/op 1.03
BenchmarkDocument/tree_edit_gc_1000 - B/op 384045600 B/op 384045973 B/op 1.00
BenchmarkDocument/tree_edit_gc_1000 - allocs/op 154949 allocs/op 154946 allocs/op 1.00
BenchmarkDocument/tree_split_gc_100 2807316 ns/op 2407264 B/op 11131 allocs/op 2822208 ns/op 2407308 B/op 11131 allocs/op 0.99
BenchmarkDocument/tree_split_gc_100 - ns/op 2807316 ns/op 2822208 ns/op 0.99
BenchmarkDocument/tree_split_gc_100 - B/op 2407264 B/op 2407308 B/op 1.00
BenchmarkDocument/tree_split_gc_100 - allocs/op 11131 allocs/op 11131 allocs/op 1
BenchmarkDocument/tree_split_gc_1000 207613589 ns/op 222500587 B/op 122081 allocs/op 208949544 ns/op 222501939 B/op 122088 allocs/op 0.99
BenchmarkDocument/tree_split_gc_1000 - ns/op 207613589 ns/op 208949544 ns/op 0.99
BenchmarkDocument/tree_split_gc_1000 - B/op 222500587 B/op 222501939 B/op 1.00
BenchmarkDocument/tree_split_gc_1000 - allocs/op 122081 allocs/op 122088 allocs/op 1.00
BenchmarkRPC/client_to_server 429228964 ns/op 16167565 B/op 224173 allocs/op 420342208 ns/op 16174032 B/op 224153 allocs/op 1.02
BenchmarkRPC/client_to_server - ns/op 429228964 ns/op 420342208 ns/op 1.02
BenchmarkRPC/client_to_server - B/op 16167565 B/op 16174032 B/op 1.00
BenchmarkRPC/client_to_server - allocs/op 224173 allocs/op 224153 allocs/op 1.00
BenchmarkRPC/client_to_client_via_server 797163646 ns/op 36202048 B/op 481104 allocs/op 781785924 ns/op 36425352 B/op 477909 allocs/op 1.02
BenchmarkRPC/client_to_client_via_server - ns/op 797163646 ns/op 781785924 ns/op 1.02
BenchmarkRPC/client_to_client_via_server - B/op 36202048 B/op 36425352 B/op 0.99
BenchmarkRPC/client_to_client_via_server - allocs/op 481104 allocs/op 477909 allocs/op 1.01
BenchmarkRPC/attach_large_document 1438351315 ns/op 1921173576 B/op 12293 allocs/op 1310347894 ns/op 1918494488 B/op 12536 allocs/op 1.10
BenchmarkRPC/attach_large_document - ns/op 1438351315 ns/op 1310347894 ns/op 1.10
BenchmarkRPC/attach_large_document - B/op 1921173576 B/op 1918494488 B/op 1.00
BenchmarkRPC/attach_large_document - allocs/op 12293 allocs/op 12536 allocs/op 0.98
BenchmarkRPC/adminCli_to_server 546869572 ns/op 19895616 B/op 291140 allocs/op 542082074 ns/op 19959464 B/op 291124 allocs/op 1.01
BenchmarkRPC/adminCli_to_server - ns/op 546869572 ns/op 542082074 ns/op 1.01
BenchmarkRPC/adminCli_to_server - B/op 19895616 B/op 19959464 B/op 1.00
BenchmarkRPC/adminCli_to_server - allocs/op 291140 allocs/op 291124 allocs/op 1.00
BenchmarkLocker 86.3 ns/op 32 B/op 1 allocs/op 70.82 ns/op 16 B/op 1 allocs/op 1.22
BenchmarkLocker - ns/op 86.3 ns/op 70.82 ns/op 1.22
BenchmarkLocker - B/op 32 B/op 16 B/op 2
BenchmarkLocker - allocs/op 1 allocs/op 1 allocs/op 1
BenchmarkLockerParallel 46.85 ns/op 0 B/op 0 allocs/op 39.43 ns/op 0 B/op 0 allocs/op 1.19
BenchmarkLockerParallel - ns/op 46.85 ns/op 39.43 ns/op 1.19
BenchmarkLockerParallel - B/op 0 B/op 0 B/op 1
BenchmarkLockerParallel - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkLockerMoreKeys 183.9 ns/op 31 B/op 0 allocs/op 175.2 ns/op 15 B/op 0 allocs/op 1.05
BenchmarkLockerMoreKeys - ns/op 183.9 ns/op 175.2 ns/op 1.05
BenchmarkLockerMoreKeys - B/op 31 B/op 15 B/op 2.07
BenchmarkLockerMoreKeys - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkRWLocker/RWLock_rate_2 50.58 ns/op 0 B/op 0 allocs/op
BenchmarkRWLocker/RWLock_rate_2 - ns/op 50.58 ns/op
BenchmarkRWLocker/RWLock_rate_2 - B/op 0 B/op
BenchmarkRWLocker/RWLock_rate_2 - allocs/op 0 allocs/op
BenchmarkRWLocker/RWLock_rate_10 45.53 ns/op 0 B/op 0 allocs/op
BenchmarkRWLocker/RWLock_rate_10 - ns/op 45.53 ns/op
BenchmarkRWLocker/RWLock_rate_10 - B/op 0 B/op
BenchmarkRWLocker/RWLock_rate_10 - allocs/op 0 allocs/op
BenchmarkRWLocker/RWLock_rate_100 60.91 ns/op 2 B/op 0 allocs/op
BenchmarkRWLocker/RWLock_rate_100 - ns/op 60.91 ns/op
BenchmarkRWLocker/RWLock_rate_100 - B/op 2 B/op
BenchmarkRWLocker/RWLock_rate_100 - allocs/op 0 allocs/op
BenchmarkRWLocker/RWLock_rate_1000 88.14 ns/op 8 B/op 0 allocs/op
BenchmarkRWLocker/RWLock_rate_1000 - ns/op 88.14 ns/op
BenchmarkRWLocker/RWLock_rate_1000 - B/op 8 B/op
BenchmarkRWLocker/RWLock_rate_1000 - allocs/op 0 allocs/op
BenchmarkChange/Push_10_Changes 4545206 ns/op 150710 B/op 1626 allocs/op 4499701 ns/op 150249 B/op 1617 allocs/op 1.01
BenchmarkChange/Push_10_Changes - ns/op 4545206 ns/op 4499701 ns/op 1.01
BenchmarkChange/Push_10_Changes - B/op 150710 B/op 150249 B/op 1.00
BenchmarkChange/Push_10_Changes - allocs/op 1626 allocs/op 1617 allocs/op 1.01
BenchmarkChange/Push_100_Changes 16568308 ns/op 783844 B/op 8513 allocs/op 16060622 ns/op 778786 B/op 8505 allocs/op 1.03
BenchmarkChange/Push_100_Changes - ns/op 16568308 ns/op 16060622 ns/op 1.03
BenchmarkChange/Push_100_Changes - B/op 783844 B/op 778786 B/op 1.01
BenchmarkChange/Push_100_Changes - allocs/op 8513 allocs/op 8505 allocs/op 1.00
BenchmarkChange/Push_1000_Changes 131858474 ns/op 7091966 B/op 79324 allocs/op 127883322 ns/op 7174923 B/op 79324 allocs/op 1.03
BenchmarkChange/Push_1000_Changes - ns/op 131858474 ns/op 127883322 ns/op 1.03
BenchmarkChange/Push_1000_Changes - B/op 7091966 B/op 7174923 B/op 0.99
BenchmarkChange/Push_1000_Changes - allocs/op 79324 allocs/op 79324 allocs/op 1
BenchmarkChange/Pull_10_Changes 3757720 ns/op 123939 B/op 1452 allocs/op 3696210 ns/op 124527 B/op 1452 allocs/op 1.02
BenchmarkChange/Pull_10_Changes - ns/op 3757720 ns/op 3696210 ns/op 1.02
BenchmarkChange/Pull_10_Changes - B/op 123939 B/op 124527 B/op 1.00
BenchmarkChange/Pull_10_Changes - allocs/op 1452 allocs/op 1452 allocs/op 1
BenchmarkChange/Pull_100_Changes 5343177 ns/op 353012 B/op 5177 allocs/op 5298195 ns/op 354620 B/op 5178 allocs/op 1.01
BenchmarkChange/Pull_100_Changes - ns/op 5343177 ns/op 5298195 ns/op 1.01
BenchmarkChange/Pull_100_Changes - B/op 353012 B/op 354620 B/op 1.00
BenchmarkChange/Pull_100_Changes - allocs/op 5177 allocs/op 5178 allocs/op 1.00
BenchmarkChange/Pull_1000_Changes 10956825 ns/op 2195570 B/op 44677 allocs/op 10690325 ns/op 2195057 B/op 44678 allocs/op 1.02
BenchmarkChange/Pull_1000_Changes - ns/op 10956825 ns/op 10690325 ns/op 1.02
BenchmarkChange/Pull_1000_Changes - B/op 2195570 B/op 2195057 B/op 1.00
BenchmarkChange/Pull_1000_Changes - allocs/op 44677 allocs/op 44678 allocs/op 1.00
BenchmarkSnapshot/Push_3KB_snapshot 18737072 ns/op 898326 B/op 8515 allocs/op 18424239 ns/op 902145 B/op 8513 allocs/op 1.02
BenchmarkSnapshot/Push_3KB_snapshot - ns/op 18737072 ns/op 18424239 ns/op 1.02
BenchmarkSnapshot/Push_3KB_snapshot - B/op 898326 B/op 902145 B/op 1.00
BenchmarkSnapshot/Push_3KB_snapshot - allocs/op 8515 allocs/op 8513 allocs/op 1.00
BenchmarkSnapshot/Push_30KB_snapshot 134770064 ns/op 8296264 B/op 90440 allocs/op 131931516 ns/op 8149350 B/op 87133 allocs/op 1.02
BenchmarkSnapshot/Push_30KB_snapshot - ns/op 134770064 ns/op 131931516 ns/op 1.02
BenchmarkSnapshot/Push_30KB_snapshot - B/op 8296264 B/op 8149350 B/op 1.02
BenchmarkSnapshot/Push_30KB_snapshot - allocs/op 90440 allocs/op 87133 allocs/op 1.04
BenchmarkSnapshot/Pull_3KB_snapshot 7446876 ns/op 1113311 B/op 20055 allocs/op 7518925 ns/op 1115929 B/op 20060 allocs/op 0.99
BenchmarkSnapshot/Pull_3KB_snapshot - ns/op 7446876 ns/op 7518925 ns/op 0.99
BenchmarkSnapshot/Pull_3KB_snapshot - B/op 1113311 B/op 1115929 B/op 1.00
BenchmarkSnapshot/Pull_3KB_snapshot - allocs/op 20055 allocs/op 20060 allocs/op 1.00
BenchmarkSnapshot/Pull_30KB_snapshot 20040924 ns/op 9304257 B/op 193607 allocs/op 20158323 ns/op 9303400 B/op 193604 allocs/op 0.99
BenchmarkSnapshot/Pull_30KB_snapshot - ns/op 20040924 ns/op 20158323 ns/op 0.99
BenchmarkSnapshot/Pull_30KB_snapshot - B/op 9304257 B/op 9303400 B/op 1.00
BenchmarkSnapshot/Pull_30KB_snapshot - allocs/op 193607 allocs/op 193604 allocs/op 1.00
BenchmarkSplayTree/stress_test_100000 0.19 ns/op 0 B/op 0 allocs/op 0.1981 ns/op 0 B/op 0 allocs/op 0.96
BenchmarkSplayTree/stress_test_100000 - ns/op 0.19 ns/op 0.1981 ns/op 0.96
BenchmarkSplayTree/stress_test_100000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/stress_test_100000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/stress_test_200000 0.3807 ns/op 0 B/op 0 allocs/op 0.3762 ns/op 0 B/op 0 allocs/op 1.01
BenchmarkSplayTree/stress_test_200000 - ns/op 0.3807 ns/op 0.3762 ns/op 1.01
BenchmarkSplayTree/stress_test_200000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/stress_test_200000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/stress_test_300000 0.5704 ns/op 0 B/op 0 allocs/op 0.5689 ns/op 0 B/op 0 allocs/op 1.00
BenchmarkSplayTree/stress_test_300000 - ns/op 0.5704 ns/op 0.5689 ns/op 1.00
BenchmarkSplayTree/stress_test_300000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/stress_test_300000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/random_access_100000 0.01254 ns/op 0 B/op 0 allocs/op 0.01259 ns/op 0 B/op 0 allocs/op 1.00
BenchmarkSplayTree/random_access_100000 - ns/op 0.01254 ns/op 0.01259 ns/op 1.00
BenchmarkSplayTree/random_access_100000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/random_access_100000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/random_access_200000 0.03244 ns/op 0 B/op 0 allocs/op 0.02909 ns/op 0 B/op 0 allocs/op 1.12
BenchmarkSplayTree/random_access_200000 - ns/op 0.03244 ns/op 0.02909 ns/op 1.12
BenchmarkSplayTree/random_access_200000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/random_access_200000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/random_access_300000 0.04026 ns/op 0 B/op 0 allocs/op 0.04425 ns/op 0 B/op 0 allocs/op 0.91
BenchmarkSplayTree/random_access_300000 - ns/op 0.04026 ns/op 0.04425 ns/op 0.91
BenchmarkSplayTree/random_access_300000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/random_access_300000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/editing_trace_bench 0.002206 ns/op 0 B/op 0 allocs/op 0.002184 ns/op 0 B/op 0 allocs/op 1.01
BenchmarkSplayTree/editing_trace_bench - ns/op 0.002206 ns/op 0.002184 ns/op 1.01
BenchmarkSplayTree/editing_trace_bench - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/editing_trace_bench - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSync/memory_sync_10_test 7097 ns/op 1342 B/op 35 allocs/op 6919 ns/op 1183 B/op 35 allocs/op 1.03
BenchmarkSync/memory_sync_10_test - ns/op 7097 ns/op 6919 ns/op 1.03
BenchmarkSync/memory_sync_10_test - B/op 1342 B/op 1183 B/op 1.13
BenchmarkSync/memory_sync_10_test - allocs/op 35 allocs/op 35 allocs/op 1
BenchmarkSync/memory_sync_100_test 55976 ns/op 9507 B/op 268 allocs/op 53447 ns/op 8529 B/op 269 allocs/op 1.05
BenchmarkSync/memory_sync_100_test - ns/op 55976 ns/op 53447 ns/op 1.05
BenchmarkSync/memory_sync_100_test - B/op 9507 B/op 8529 B/op 1.11
BenchmarkSync/memory_sync_100_test - allocs/op 268 allocs/op 269 allocs/op 1.00
BenchmarkSync/memory_sync_1000_test 606982 ns/op 76978 B/op 2143 allocs/op 575996 ns/op 74876 B/op 2146 allocs/op 1.05
BenchmarkSync/memory_sync_1000_test - ns/op 606982 ns/op 575996 ns/op 1.05
BenchmarkSync/memory_sync_1000_test - B/op 76978 B/op 74876 B/op 1.03
BenchmarkSync/memory_sync_1000_test - allocs/op 2143 allocs/op 2146 allocs/op 1.00
BenchmarkSync/memory_sync_10000_test 7580556 ns/op 768555 B/op 20640 allocs/op 7468624 ns/op 754372 B/op 20505 allocs/op 1.01
BenchmarkSync/memory_sync_10000_test - ns/op 7580556 ns/op 7468624 ns/op 1.01
BenchmarkSync/memory_sync_10000_test - B/op 768555 B/op 754372 B/op 1.02
BenchmarkSync/memory_sync_10000_test - allocs/op 20640 allocs/op 20505 allocs/op 1.01
BenchmarkTextEditing 5250970401 ns/op 3922638264 B/op 20619668 allocs/op 5297178369 ns/op 3922671656 B/op 20619745 allocs/op 0.99
BenchmarkTextEditing - ns/op 5250970401 ns/op 5297178369 ns/op 0.99
BenchmarkTextEditing - B/op 3922638264 B/op 3922671656 B/op 1.00
BenchmarkTextEditing - allocs/op 20619668 allocs/op 20619745 allocs/op 1.00
BenchmarkTree/10000_vertices_to_protobuf 4221091 ns/op 6363268 B/op 70025 allocs/op 4391932 ns/op 6363240 B/op 70025 allocs/op 0.96
BenchmarkTree/10000_vertices_to_protobuf - ns/op 4221091 ns/op 4391932 ns/op 0.96
BenchmarkTree/10000_vertices_to_protobuf - B/op 6363268 B/op 6363240 B/op 1.00
BenchmarkTree/10000_vertices_to_protobuf - allocs/op 70025 allocs/op 70025 allocs/op 1
BenchmarkTree/10000_vertices_from_protobuf 221239882 ns/op 442306043 B/op 290039 allocs/op 223587457 ns/op 442304264 B/op 290038 allocs/op 0.99
BenchmarkTree/10000_vertices_from_protobuf - ns/op 221239882 ns/op 223587457 ns/op 0.99
BenchmarkTree/10000_vertices_from_protobuf - B/op 442306043 B/op 442304264 B/op 1.00
BenchmarkTree/10000_vertices_from_protobuf - allocs/op 290039 allocs/op 290038 allocs/op 1.00
BenchmarkTree/20000_vertices_to_protobuf 9013759 ns/op 12890897 B/op 140028 allocs/op 9415715 ns/op 12890832 B/op 140028 allocs/op 0.96
BenchmarkTree/20000_vertices_to_protobuf - ns/op 9013759 ns/op 9415715 ns/op 0.96
BenchmarkTree/20000_vertices_to_protobuf - B/op 12890897 B/op 12890832 B/op 1.00
BenchmarkTree/20000_vertices_to_protobuf - allocs/op 140028 allocs/op 140028 allocs/op 1
BenchmarkTree/20000_vertices_from_protobuf 884559755 ns/op 1697470000 B/op 580043 allocs/op 889504614 ns/op 1697478812 B/op 580089 allocs/op 0.99
BenchmarkTree/20000_vertices_from_protobuf - ns/op 884559755 ns/op 889504614 ns/op 0.99
BenchmarkTree/20000_vertices_from_protobuf - B/op 1697470000 B/op 1697478812 B/op 1.00
BenchmarkTree/20000_vertices_from_protobuf - allocs/op 580043 allocs/op 580089 allocs/op 1.00
BenchmarkTree/30000_vertices_to_protobuf 13975623 ns/op 18976272 B/op 210029 allocs/op 13707963 ns/op 18976262 B/op 210029 allocs/op 1.02
BenchmarkTree/30000_vertices_to_protobuf - ns/op 13975623 ns/op 13707963 ns/op 1.02
BenchmarkTree/30000_vertices_to_protobuf - B/op 18976272 B/op 18976262 B/op 1.00
BenchmarkTree/30000_vertices_to_protobuf - allocs/op 210029 allocs/op 210029 allocs/op 1
BenchmarkTree/30000_vertices_from_protobuf 2004429453 ns/op 3751744480 B/op 870147 allocs/op 1983871431 ns/op 3751747496 B/op 870094 allocs/op 1.01
BenchmarkTree/30000_vertices_from_protobuf - ns/op 2004429453 ns/op 1983871431 ns/op 1.01
BenchmarkTree/30000_vertices_from_protobuf - B/op 3751744480 B/op 3751747496 B/op 1.00
BenchmarkTree/30000_vertices_from_protobuf - allocs/op 870147 allocs/op 870094 allocs/op 1.00

This comment was automatically generated by workflow using github-action-benchmark.

Copy link

codecov bot commented Feb 6, 2025

Codecov Report

Attention: Patch coverage is 15.25424% with 150 lines in your changes missing coverage. Please review.

Project coverage is 38.47%. Comparing base (b4da816) to head (cab7025).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
server/backend/database/testcases/testcases.go 0.00% 35 Missing ⚠️
server/backend/messagebroker/kafka.go 0.00% 25 Missing ⚠️
cmd/yorkie/server.go 0.00% 24 Missing ⚠️
server/backend/messagebroker/messagebroker.go 0.00% 20 Missing ⚠️
server/backend/messagebroker/config.go 26.92% 19 Missing ⚠️
api/yorkie/v1/yorkie.pb.go 22.22% 7 Missing ⚠️
server/backend/backend.go 0.00% 7 Missing ⚠️
server/backend/messagebroker/dummy.go 0.00% 4 Missing ⚠️
server/config.go 0.00% 4 Missing ⚠️
server/rpc/yorkie_server.go 82.35% 2 Missing and 1 partial ⚠️
... and 2 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1143      +/-   ##
==========================================
- Coverage   38.54%   38.47%   -0.07%     
==========================================
  Files         165      169       +4     
  Lines       25247    25428     +181     
==========================================
+ Hits         9731     9783      +52     
- Misses      14698    14824     +126     
- Partials      818      821       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@hackerwins
Copy link
Member

I changed it to ready temporarily to activate CodeRabbit reviews.

@hackerwins hackerwins marked this pull request as ready for review February 6, 2025 11:42
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (9)
server/backend/messagebroker/config.go (1)

20-23: Use consistent naming convention in YAML tags.

The YAML tags use PascalCase (ConnectionURL) while typical YAML conventions favor snake_case or kebab-case.

 type Config struct {
-	Address string `yaml:"ConnectionURL"`
-	Topic   string `yaml:"Topic"`
+	Address string `yaml:"connection_url"`
+	Topic   string `yaml:"topic"`
 }
server/backend/messagebroker/messagebroker.go (1)

59-72: Enhance error handling in Ensure function.

The Ensure function should:

  1. Validate the configuration before creating the broker
  2. Add more detailed logging for debugging
 func Ensure(kafkaConf *Config) Broker {
 	if kafkaConf == nil || kafkaConf.Address == "" {
+		logging.DefaultLogger().Info("no kafka configuration provided, using dummy broker")
 		return &DummyBroker{}
 	}

+	if err := kafkaConf.Validate(); err != nil {
+		logging.DefaultLogger().Warnf("invalid kafka configuration: %v, using dummy broker", err)
+		return &DummyBroker{}
+	}
+
 	logging.DefaultLogger().Infof(
 		"connecting to kafka: %s, topic: %s",
 		kafkaConf.Address,
 		kafkaConf.Topic,
 	)

 	return newKafkaBroker(kafkaConf.Address, kafkaConf.Topic)
 }
api/types/events/events.go (1)

72-84: Enhance ClientEvent type with additional metadata.

The ClientEvent struct could be more comprehensive to include:

  1. Timestamp
  2. Client metadata
  3. String() method for ClientEventType
+// String returns the string representation of ClientEventType
+func (t ClientEventType) String() string {
+	return string(t)
+}

 type ClientEvent struct {
 	// Type is the type of the event.
 	Type ClientEventType
+	// Add additional fields
+	Timestamp  time.Time
+	ClientID   string
+	Metadata   map[string]interface{}
 }
server/backend/backend.go (1)

151-153: Consider enhancing message broker initialization.

The broker initialization could benefit from additional error handling and context for graceful shutdown.

Consider modifying the initialization to:

  1. Return any initialization errors
  2. Accept a context for graceful shutdown
-// 08. Create the message broker instance.
-broker := messagebroker.Ensure(kafkaConf)
+// 08. Create the message broker instance with context for graceful shutdown.
+broker, err := messagebroker.New(context.Background(), kafkaConf)
+if err != nil {
+    return nil, fmt.Errorf("failed to initialize message broker: %w", err)
+}
server/config.go (1)

79-84: Consider adding default values for Kafka configuration.

While the Kafka configuration field is correctly added, there are no default values set in the ensureDefaultValue method. This could make it harder for users to get started with the Kafka integration.

Consider adding default values similar to other configurations:

 func (c *Config) ensureDefaultValue() {
+    if c.Kafka != nil {
+        if c.Kafka.Address == "" {
+            c.Kafka.Address = "localhost:29092"
+        }
+        if c.Kafka.Topic == "" {
+            c.Kafka.Topic = "user-events"
+        }
+    }
test/bench/push_pull_bench_test.go (1)

66-66: Consider adding Kafka performance benchmarks.

While passing nil for Kafka config is correct for existing benchmarks, consider adding performance benchmarks for the Kafka integration to measure its impact on the system.

Consider adding benchmarks that:

  1. Measure latency with and without Kafka integration
  2. Measure throughput of user event publishing
  3. Evaluate different Kafka configurations
cmd/yorkie/server.go (1)

89-94: Consider adding validation for Kafka configuration.

The configuration block is correctly implemented. However, consider adding validation for the Kafka configuration to ensure the address format is valid and the topic name meets Kafka's requirements.

 if kafkaAddress != "" && kafkaTopic != "" {
+    // Validate Kafka address format
+    if !strings.Contains(kafkaAddress, ":") {
+        return fmt.Errorf("invalid kafka address format: %s", kafkaAddress)
+    }
+    // Validate topic name (Kafka topics cannot be empty and have character limitations)
+    if len(kafkaTopic) == 0 || len(kafkaTopic) > 249 {
+        return fmt.Errorf("invalid kafka topic name: %s", kafkaTopic)
+    }
     conf.Kafka = &messagebroker.Config{
         Address: kafkaAddress,
         Topic:   kafkaTopic,
     }
 }
build/docker/analytics/init-user-events-db.sql (1)

11-11: Consider adding an index for analytics queries.

The metadata column type change to JSON is appropriate for storing structured metadata. However, for analytics queries, consider adding an index on the timestamp column to optimize MAU calculations.

CREATE INDEX idx_user_events_timestamp ON user_events (timestamp);
api/yorkie/v1/yorkie.proto (1)

44-45: Add field documentation.

The new fields are correctly added and maintain backward compatibility. However, consider adding documentation comments to describe:

  1. The purpose and format of user_id
  2. Expected key-value pairs in the metadata map
 message ActivateClientRequest {
   string client_key = 1;
+  // Unique identifier for the user. If not provided, client_id will be used.
   string user_id = 2;
+  // Additional metadata about the client activation.
+  // Keys should be lowercase and use underscores.
+  // Common keys: device_type, app_version, os_version
   map<string, string> metadata = 3;
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ccf25f0 and 252de54.

⛔ Files ignored due to path filters (2)
  • api/yorkie/v1/yorkie.pb.go is excluded by !**/*.pb.go
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (24)
  • api/docs/yorkie/v1/admin.openapi.yaml (58 hunks)
  • api/docs/yorkie/v1/cluster.openapi.yaml (7 hunks)
  • api/docs/yorkie/v1/resources.openapi.yaml (67 hunks)
  • api/docs/yorkie/v1/yorkie.openapi.yaml (53 hunks)
  • api/types/events/events.go (1 hunks)
  • api/yorkie/v1/yorkie.proto (1 hunks)
  • build/docker/analytics/docker-compose.yml (3 hunks)
  • build/docker/analytics/init-user-events-db.sql (1 hunks)
  • cmd/yorkie/server.go (4 hunks)
  • go.mod (3 hunks)
  • server/backend/backend.go (6 hunks)
  • server/backend/messagebroker/config.go (1 hunks)
  • server/backend/messagebroker/dummy.go (1 hunks)
  • server/backend/messagebroker/kafka.go (1 hunks)
  • server/backend/messagebroker/messagebroker.go (1 hunks)
  • server/config.go (3 hunks)
  • server/config.sample.yml (1 hunks)
  • server/packs/packs_test.go (1 hunks)
  • server/rpc/server_test.go (1 hunks)
  • server/rpc/yorkie_server.go (2 hunks)
  • server/server.go (1 hunks)
  • test/bench/push_pull_bench_test.go (1 hunks)
  • test/complex/main_test.go (1 hunks)
  • test/integration/housekeeping_test.go (1 hunks)
✅ Files skipped from review due to trivial changes (5)
  • api/docs/yorkie/v1/yorkie.openapi.yaml
  • server/backend/messagebroker/dummy.go
  • api/docs/yorkie/v1/admin.openapi.yaml
  • api/docs/yorkie/v1/cluster.openapi.yaml
  • api/docs/yorkie/v1/resources.openapi.yaml
🔇 Additional comments (20)
test/integration/housekeeping_test.go (1)

61-67: LGTM!

The addition of nil as the last argument to backend.New() is correct for test scenarios where Kafka functionality is not required.

server/server.go (1)

63-69: LGTM!

The addition of conf.Kafka to backend.New() correctly integrates the Kafka configuration into the server initialization process.

test/complex/main_test.go (1)

75-96: LGTM!

The addition of nil as the Kafka configuration is appropriate for these complex test scenarios where Kafka functionality is not needed.

server/backend/backend.go (2)

65-66: LGTM!

The MsgBroker field is well-documented and properly typed.


200-202: LGTM!

The error handling during shutdown is appropriate, logging errors without failing the shutdown process.

server/config.go (2)

30-30: LGTM!

The import of the messagebroker package is correctly added.


138-142: LGTM!

The validation logic for Kafka configuration is correctly implemented, ensuring that if Kafka config is provided, it is valid.

server/rpc/server_test.go (1)

93-93: Consider adding Kafka integration tests.

While passing nil for Kafka config is correct for non-Kafka test cases, consider adding test cases that verify the Kafka integration functionality.

Would you like me to help create test cases for the Kafka integration? This could include:

  1. Tests with valid Kafka configuration
  2. Tests with invalid Kafka configuration
  3. Tests verifying user events are correctly published to Kafka
server/packs/packs_test.go (1)

119-119: LGTM!

The addition of nil for Kafka config is consistent with other test files.

cmd/yorkie/server.go (2)

30-30: LGTM!

The import statement is correctly placed and follows Go's import grouping conventions.


57-58: LGTM!

The Kafka configuration variables are correctly declared and follow Go naming conventions.

server/rpc/yorkie_server.go (1)

33-33: LGTM!

The import statement is correctly placed and follows Go's import grouping conventions.

go.mod (3)

19-19: New Kafka Dependency Added.
The addition of github.com/segmentio/kafka-go v0.4.47 supports the new Kafka producer functionality for Yorkie analytics. Please ensure that the Kafka producer configuration in the code consistently uses this library's API.


35-35: New Indirect lz4 Dependency.
The indirect dependency github.com/pierrec/lz4/v4 v4.1.15 has been added. Verify that any runtime components relying on lz4 compression are compatible with this version.


73-74: Updated scram and stringprep Dependencies.
The versions for github.com/xdg-go/scram and github.com/xdg-go/stringprep have been bumped to v1.1.2 and v1.0.4, respectively. Please confirm that these updates do not introduce breaking changes for modules that utilize these libraries.

server/config.sample.yml (1)

109-116: New Kafka Message Broker Configuration Section.
A new MessageBroker section has been added with parameters ConnectionURL set to "localhost:29092" and Topic set to "user-events". This configuration is essential for Kafka integration in the backend. Please ensure that the field names and default values match the expectations in the server initialization and Kafka producer setup.

build/docker/analytics/docker-compose.yml (4)

52-52: Kafka Port Mapping Update.
The Kafka service port mapping has been updated to "29092:29092", which aligns with the new configuration. Verify that all client connections and configuration parameters correctly reference this port.


61-63: Kafka Listener Configuration Update.
The environment variables for Kafka now include:

  • KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
  • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
  • KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT

These changes are critical for exposing an external listener on port 29092. Please review these settings to ensure that external connectors can reliably access the Kafka broker.


67-67: Kafka Healthcheck Command Adjustment.
The healthcheck command for the Kafka service now uses kafka-topics.sh --bootstrap-server localhost:9092 --list. Confirm that using localhost in this context correctly resolves within the container network and meets the health monitoring requirements.


144-144: Routine Load Healthcheck Enhancement.
The routine load check in the init-starrocks-database service now accepts either "State: NEED_SCHEDULE" or "State: RUNNING". This flexible check improves robustness by acknowledging multiple valid states. Ensure that these conditions accurately capture the operational statuses expected during initialization.

server/backend/messagebroker/config.go Outdated Show resolved Hide resolved
server/backend/messagebroker/kafka.go Outdated Show resolved Hide resolved
server/backend/messagebroker/kafka.go Outdated Show resolved Hide resolved
server/backend/messagebroker/messagebroker.go Outdated Show resolved Hide resolved
server/rpc/yorkie_server.go Outdated Show resolved Hide resolved
@hackerwins hackerwins marked this pull request as draft February 6, 2025 11:48
@emplam27 emplam27 requested a review from hackerwins February 12, 2025 05:46
@emplam27 emplam27 marked this pull request as ready for review February 12, 2025 05:46
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
server/backend/messagebroker/config.go (1)

33-59: Consider enhancing topic validation.

While the current validation is good, consider adding checks for:

  • Topic name format (valid characters, length limits)
  • Topic name restrictions (e.g., '.' and '..' are not allowed)
 func (c *Config) Validate() error {
 	logging.DefaultLogger().Infof("Config: %+v", c)
 
 	if c.Addresses == "" {
 		return fmt.Errorf("address cannot be empty")
 	}
 
 	kafkaAddresses := strings.Split(c.Addresses, ",")
 	for _, addr := range kafkaAddresses {
 		if addr == "" {
 			return fmt.Errorf(`address cannot be empty: "%s" in "%s"`, addr, c.Addresses)
 		}
 
 		if _, err := url.Parse(addr); err != nil {
 			return fmt.Errorf(`invalid argument "%s" for "--kafka-addresses" flag`, c.Addresses)
 		}
 	}
 
 	if c.Topic == "" {
 		return fmt.Errorf("topic cannot not be empty")
 	}
+
+	// Validate topic name format
+	if !regexp.MustCompile(`^[a-zA-Z0-9._-]+$`).MatchString(c.Topic) {
+		return fmt.Errorf("invalid topic name format: only alphanumeric characters, dots, underscores, and hyphens are allowed")
+	}
+
+	// Check for invalid topic names
+	if c.Topic == "." || c.Topic == ".." {
+		return fmt.Errorf("invalid topic name: '.' and '..' are not allowed")
+	}
 
 	return nil
 }
server/backend/database/client_info.go (1)

79-81: Consider adding metadata validation.

To ensure data integrity, consider adding validation for metadata keys and values:

  • Maximum number of metadata entries
  • Key/value length limits
  • Required/reserved keys for analytics

Add a ValidateMetadata method:

// ValidateMetadata validates the metadata map
func (i *ClientInfo) ValidateMetadata() error {
    const (
        maxMetadataEntries = 10
        maxKeyLength = 50
        maxValueLength = 200
    )
    
    if len(i.Metadata) > maxMetadataEntries {
        return fmt.Errorf("metadata exceeds maximum entries: %d", maxMetadataEntries)
    }
    
    for key, value := range i.Metadata {
        if len(key) > maxKeyLength {
            return fmt.Errorf("metadata key exceeds maximum length: %s", key)
        }
        if len(value) > maxValueLength {
            return fmt.Errorf("metadata value exceeds maximum length for key: %s", key)
        }
    }
    
    // Ensure required keys for analytics
    if _, exists := i.Metadata["userID"]; !exists {
        return fmt.Errorf("required metadata key missing: userID")
    }
    
    return nil
}
cmd/yorkie/server.go (1)

89-94: Add validation for Kafka address format.

While the code correctly ensures both Kafka parameters are provided together, consider validating the address format to ensure it follows the host:port pattern.

 if kafkaAddresses != "" && kafkaTopic != "" {
+    // Validate Kafka addresses format
+    for _, addr := range strings.Split(kafkaAddresses, ",") {
+        if _, _, err := net.SplitHostPort(strings.TrimSpace(addr)); err != nil {
+            return fmt.Errorf("invalid Kafka address format %q: %w", addr, err)
+        }
+    }
     conf.Kafka = &messagebroker.Config{
         Addresses: kafkaAddresses,
         Topic:     kafkaTopic,
     }
 }
server/backend/database/testcases/testcases.go (1)

57-58: Add test cases for metadata validation.

While the metadata is consistently included in client activation calls, consider adding specific test cases to validate:

  1. Empty metadata handling
  2. Invalid metadata values
  3. Metadata persistence and retrieval
  4. Metadata updates
 func RunActivateClientDeactivateClientTest(t *testing.T, db database.Database, projectID types.ID) {
+    t.Run("metadata validation test", func(t *testing.T) {
+        ctx := context.Background()
+        
+        // Test empty metadata
+        clientInfo1, err := db.ActivateClient(ctx, projectID, t.Name()+"1", map[string]string{})
+        assert.NoError(t, err)
+        
+        // Test metadata with special characters
+        clientInfo2, err := db.ActivateClient(ctx, projectID, t.Name()+"2", 
+            map[string]string{"key!@#": "value!@#"})
+        assert.NoError(t, err)
+        
+        // Test metadata retrieval
+        found, err := db.FindClientInfoByRefKey(ctx, clientInfo2.RefKey())
+        assert.NoError(t, err)
+        assert.Equal(t, clientInfo2.Metadata, found.Metadata)
+    })

Also applies to: 77-78, 104-105, 132-133, 153-154, 266-267, 305-306, 357-358, 387-388, 466-467, 536-538, 608-609, 715-716, 733-734, 740-741, 1081-1082, 1125-1126, 1332-1333, 1455-1458, 1577-1578, 1580-1581, 1591-1592, 1594-1595

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 252de54 and 3cc74e2.

⛔ Files ignored due to path filters (1)
  • api/yorkie/v1/yorkie.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (18)
  • api/docs/yorkie/v1/yorkie.openapi.yaml (53 hunks)
  • api/yorkie/v1/yorkie.proto (1 hunks)
  • build/docker/analytics/init-user-events-db.sql (1 hunks)
  • cmd/yorkie/server.go (4 hunks)
  • server/backend/database/client_info.go (2 hunks)
  • server/backend/database/database.go (1 hunks)
  • server/backend/database/memory/database.go (2 hunks)
  • server/backend/database/mongo/client.go (1 hunks)
  • server/backend/database/testcases/testcases.go (30 hunks)
  • server/backend/messagebroker/config.go (1 hunks)
  • server/backend/messagebroker/kafka.go (1 hunks)
  • server/backend/messagebroker/messagebroker.go (1 hunks)
  • server/clients/clients.go (1 hunks)
  • server/config.sample.yml (1 hunks)
  • server/rpc/yorkie_server.go (3 hunks)
  • test/bench/push_pull_bench_test.go (2 hunks)
  • test/complex/main_test.go (1 hunks)
  • test/integration/housekeeping_test.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • api/docs/yorkie/v1/yorkie.openapi.yaml
  • server/config.sample.yml
  • test/complex/main_test.go
  • api/yorkie/v1/yorkie.proto
  • server/backend/messagebroker/kafka.go
  • test/bench/push_pull_bench_test.go
  • server/backend/messagebroker/messagebroker.go
🔇 Additional comments (14)
server/backend/messagebroker/config.go (1)

27-31: LGTM! Well-structured configuration.

The Config struct is well-designed with appropriate YAML tags and clear field names.

server/clients/clients.go (1)

38-47: LGTM! Clean metadata integration.

The addition of the metadata parameter to the Activate function is well-integrated and maintains backward compatibility.

test/integration/housekeeping_test.go (2)

119-122: LGTM! Good test coverage for metadata.

The test cases properly verify client activation with metadata, using appropriate test data.


61-67: Verify if nil Kafka config is appropriate for tests.

Consider if the test should use a mock Kafka configuration instead of nil to ensure the message broker behavior is properly tested.

✅ Verification successful

🏁 Script executed:

#!/bin/bash
# Check if other test files use mock Kafka config
rg -l "messagebroker.Config" "test/"

Length of output: 34


Let's gather some additional context regarding Kafka configuration usage across the repository to decide if using a nil value in the test is appropriate. I'll run a few more checks to see how Kafka (and any associated mocks) is used in the codebase.

Once these results are available, we can determine if passing nil for Kafka config in the integration test is acceptable or if a mock should be used to better simulate message broker behavior.


🏁 Script executed:

#!/bin/bash
# 1. Search for Kafka-related occurrences in the entire repository.
echo "[1] Searching for 'Kafka' occurrences in the repository:"
rg -n "Kafka"

# 2. Search for references to messagebroker configuration usage to see how it's set up.
echo "[2] Searching for 'messagebroker.Config' occurrences:"
rg -n "messagebroker.Config"

# 3. Look at the declaration and usage of backend.New (which accepts the Kafka config parameter)
echo "[3] Searching for the definition of backend.New in the backend package:"
rg -n "func New(" backend/

# 4. Search for any usage of a mock Kafka configuration in tests.
echo "[4] Searching for references to 'mockKafka' in tests:"
rg -n "mockKafka" test/

Length of output: 4518


Based on the additional searches:

  • There’s no evidence in tests that a mock Kafka configuration is used (e.g. no “mockKafka” appears in tests).
  • Production code uses a non-nil Kafka config in parts of the system (e.g. in cmd/yorkie/server.go), but tests seem to routinely pass nil to disable Kafka behavior.

I'll now verify how backend.New handles a nil value for the Kafka config to double-check if passing nil in tests is appropriate.

Once we have the definition, we can inspect how the nil Kafka configuration is managed internally. This will help decide if the test should supply a mock Kafka configuration or if nil is acceptable for the integration test.

If the code gracefully handles nil (skipping Kafka-related operations), then leaving it as nil in housekeeping_test.go is appropriate. Otherwise, it might indicate that a proper mock is needed for a more complete test.

Based on these results and the fact that no other tests are using a Kafka mock, it appears that passing nil is an accepted pattern in the codebase for tests where Kafka behavior isn’t the focus.

Please verify the output of the above script to ensure that backend.New properly handles a nil Kafka configuration. If it does so gracefully (i.e. by conditionally skipping Kafka integration when the config is nil), then the test is correct as written. Otherwise, consider using a mock Kafka config.

Action: Verify that backend.New handles nil Kafka config gracefully as expected in tests.


🏁 Script executed:

#!/bin/bash
# Search for the definition of backend.New while escaping parentheses properly
rg -n "func New\\(" backend/

Length of output: 78


Integration Test Kafka Config Use Verified

The investigation shows that no tests rely on a non-nil or mocked Kafka configuration. Searches across the repository found no usage of a mock Kafka config in tests, and production code validates that Kafka is an optional dependency (with nil checks in place). Given this, passing nil in housekeeping_test.go is consistent with the repository’s testing approach.

server/backend/database/client_info.go (1)

277-287: LGTM! Proper deep copy implementation.

The DeepCopy method correctly handles the new Metadata field, maintaining immutability.

cmd/yorkie/server.go (2)

57-58: LGTM!

Clear and descriptive variable names for Kafka configuration.


373-384: LGTM!

Well-documented flags with clear descriptions and examples.

server/backend/database/database.go (1)

143-143: LGTM!

Clean interface update to support client metadata.

server/rpc/yorkie_server.go (2)

67-70: LGTM!

Good fallback mechanism using clientKey when userID is not provided in metadata.


84-95: Improve error handling for Kafka message production.

The current implementation logs the error but doesn't provide enough context for debugging.

-    if err := s.backend.MsgBroker.Produce(
+    err := s.backend.MsgBroker.Produce(
         ctx,
         messagebroker.UserEventMessage{
             UserID:    userID,
             Timestamp: gotime.Now(),
             EventType: events.ClientActivatedEvent,
             ProjectID: project.ID.String(),
             UserAgent: req.Header().Get("x-yorkie-user-agent"),
         },
-    ); err != nil {
-        logging.From(ctx).Error(err)
-    }
+    )
+    if err != nil {
+        // Log the error but don't fail the activation
+        logging.From(ctx).Errorf("failed to produce user event: %v, userID: %s, projectID: %s",
+            err, userID, project.ID.String())
+    }
server/backend/database/memory/database.go (1)

511-511: LGTM!

Clean implementation of metadata support in the memory database.

Also applies to: 526-526

server/backend/database/mongo/client.go (2)

520-529: LGTM! MongoDB query correctly handles metadata.

The MongoDB query is well-structured, using metadata as a filter condition while maintaining atomicity through upsert operation.


513-518: LGTM! Verify metadata usage in client activation.

The metadata parameter is correctly integrated into the client activation process. The implementation looks good and maintains existing functionality while adding metadata support.

Run this script to verify the metadata usage in client activation:

✅ Verification successful

🏁 Script executed:

#!/bin/bash
# Description: Verify that metadata is correctly stored and retrieved during client activation.

# Search for metadata usage in client activation
ast-grep --pattern $'func \($_\) ActivateClient\($_\) {
  $$$
  metadata
  $$$
}'

# Search for metadata field in client info struct
ast-grep --pattern $'type ClientInfo struct {
  $$$
  metadata
  $$$
}'

Length of output: 155


I'm not seeing evidence of metadata usage with the initial ast-grep query. Let's run a more targeted search. I'll use rg to inspect the file for both the function signature of ActivateClient (to confirm the metadata parameter is declared) and for any usage of metadata (including in the ClientInfo struct). This should help clarify if the metadata parameter is indeed integrated.


🏁 Script executed:

#!/bin/bash
# Check the ActivateClient function signature including the metadata parameter:
rg "func \(c \*Client\) ActivateClient" server/backend/database/mongo/client.go -A 10

# Search for any usage of the keyword 'metadata' in the file:
rg "metadata" server/backend/database/mongo/client.go

# Locate the definition of ClientInfo and any metadata field inside it:
rg "type ClientInfo struct" -A 20

Length of output: 2032


LGTM! Metadata usage validated in client activation.

  • The ActivateClient function now includes the metadata map[string]string parameter.
  • The MongoDB update filter correctly incorporates the metadata.
  • The ClientInfo struct in server/backend/database/client_info.go contains a Metadata field with the proper BSON mapping.
build/docker/analytics/init-user-events-db.sql (1)

5-16: LGTM! Well-designed analytics schema.

The table schema is well-structured for analytics:

  • OLAP engine is appropriate for analytical workloads
  • Hash distribution on user_id ensures balanced data distribution
  • Proper indexing with duplicate key on user_id

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.

Actionable comments posted: 1

♻️ Duplicate comments (2)
server/backend/messagebroker/kafka.go (2)

33-44: 🛠️ Refactor suggestion

Add production-ready Kafka configurations.

The current writer configuration lacks important settings for production use.

Apply this diff to add production-ready configurations:

 		writer: &kafka.Writer{
 			Addr:         kafka.TCP(conf.SplitAddresses()...),
 			Topic:        conf.Topic,
 			WriteTimeout: conf.MustParseWriteTimeout(),
 			Balancer:     &kafka.LeastBytes{},
 			Async:        true,
+			// Add production-ready configurations
+			BatchTimeout: time.Second,
+			BatchSize:    100,
+			RequiredAcks: kafka.RequireOne,
+			MaxAttempts:  3,
 		},

46-61: 🛠️ Refactor suggestion

Enhance error handling and implement context timeout.

The Produce method needs improvements for production readiness.

Apply this diff to enhance error handling:

 func (mb *KafkaBroker) Produce(
 	ctx context.Context,
 	msg Message,
 ) error {
+	if mb.writer == nil {
+		return fmt.Errorf("kafka writer is not initialized")
+	}
+
 	value, err := msg.Marshal()
 	if err != nil {
 		return fmt.Errorf("marshal message: %w", err)
 	}
 
+	// Add timeout to prevent hanging
+	ctx, cancel := context.WithTimeout(ctx, mb.conf.MustParseWriteTimeout())
+	defer cancel()
+
 	if err := mb.writer.WriteMessages(ctx, kafka.Message{Value: value}); err != nil {
 		return fmt.Errorf("write message to kafka: %w", err)
 	}
 
 	return nil
 }
🧹 Nitpick comments (4)
server/config.go (1)

141-145: Add default values handling for Kafka configuration.

While validation is properly integrated, the ensureDefaultValue method should be updated to set default values for Kafka configuration when it's present.

Add this handling in the ensureDefaultValue method:

 	if c.Kafka != nil {
+		if c.Kafka.Topic == "" {
+			c.Kafka.Topic = DefaultKafkaTopic
+		}
+		if c.Kafka.WriteTimeout == "" {
+			c.Kafka.WriteTimeout = DefaultKafkaWriteTimeout.String()
+		}
 	}
server/backend/messagebroker/config.go (2)

72-74: Enhance URL validation for Kafka addresses.

The current URL validation is generic. For Kafka, we should specifically validate the host:port format.

-		if _, err := url.Parse(addr); err != nil {
+		host, port, err := net.SplitHostPort(addr)
+		if err != nil {
 			return fmt.Errorf(`parse address "%s": %w`, c.Addresses, err)
 		}
+		if host == "" || port == "" {
+			return fmt.Errorf(`invalid address format "%s": host:port required`, addr)
+		}

81-83: Add minimum write timeout validation.

The write timeout should have a minimum value to prevent performance issues.

-	if _, err := time.ParseDuration(c.WriteTimeout); err != nil {
+	d, err := time.ParseDuration(c.WriteTimeout)
+	if err != nil {
 		return fmt.Errorf(`parse write timeout "%s": %w`, c.WriteTimeout, ErrInvalidDuration)
 	}
+	if d < time.Second {
+		return fmt.Errorf(`write timeout "%s" must be at least 1 second`, c.WriteTimeout)
+	}
server/backend/messagebroker/config_test.go (1)

27-51: Add more test cases for comprehensive coverage.

The current test suite should be expanded to cover:

  1. Config validation
  2. Minimum write timeout validation
  3. Invalid Kafka addresses

Add these test cases:

 func TestConfig(t *testing.T) {
+	t.Run("test validate config", func(t *testing.T) {
+		tests := []struct {
+			name    string
+			config  *messagebroker.Config
+			wantErr bool
+		}{
+			{
+				name: "valid config",
+				config: &messagebroker.Config{
+					Addresses:    "localhost:9092",
+					Topic:       "test",
+					WriteTimeout: "1s",
+				},
+				wantErr: false,
+			},
+			{
+				name: "invalid write timeout",
+				config: &messagebroker.Config{
+					Addresses:    "localhost:9092",
+					Topic:       "test",
+					WriteTimeout: "100ms",
+				},
+				wantErr: true,
+			},
+			{
+				name: "invalid address",
+				config: &messagebroker.Config{
+					Addresses:    "localhost",
+					Topic:       "test",
+					WriteTimeout: "1s",
+				},
+				wantErr: true,
+			},
+		}
+		
+		for _, tt := range tests {
+			t.Run(tt.name, func(t *testing.T) {
+				err := tt.config.Validate()
+				if tt.wantErr {
+					assert.Error(t, err)
+				} else {
+					assert.NoError(t, err)
+				}
+			})
+		}
+	})
🛑 Comments failed to post (1)
cmd/yorkie/server.go (1)

90-95: 💡 Verification agent

🧩 Analysis chain

Add validation for Kafka configuration.

While the code checks for non-empty kafkaAddresses and kafkaTopic, it doesn't validate the kafkaWriteTimeout value, which could lead to potential issues if not set correctly.

Consider adding validation for the write timeout and verifying the Kafka connection during server startup. Here's a script to verify the Kafka configuration validation:


🏁 Script executed:

#!/bin/bash
# Description: Check if Kafka configuration validation exists in the codebase

# Search for Kafka config validation
rg -A 5 'func.*Validate.*Kafka'

# Search for Kafka connection verification
rg -A 5 'func.*Connect.*Kafka'

Length of output: 66


I'm not finding any evidence in the codebase for additional Kafka configuration validation—specifically nothing appears to validate parameters such as the Kafka write timeout. To be sure, please run an additional search for any references to "kafkaWriteTimeout" in the repository. For example:

If this search also returns no results, it would confirm that the current Kafka configuration is lacking proper validation. Please verify this output to ensure that validation logic is indeed missing.


🏁 Script executed:

#!/bin/bash
# Searching for usage or validation of kafkaWriteTimeout in the codebase
rg -A 5 'kafkaWriteTimeout'

Length of output: 512


Kafka Write Timeout and Connection Validation Missing
The repository does not include any explicit validation for the kafkaWriteTimeout value nor does it verify the Kafka connection during startup. The search confirms that only non-empty checks for kafkaAddresses and kafkaTopic are performed. It would be beneficial to add proper validation for kafkaWriteTimeout (and possibly verify the Kafka connection) to ensure robust configuration handling.

Copy link
Member

@hackerwins hackerwins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution.

@hackerwins hackerwins changed the title Add kafka producer for yorkie analytics Add Kafka Producer for Analytics and User Metrics Feb 12, 2025
@hackerwins hackerwins merged commit 1b2718b into main Feb 12, 2025
5 checks passed
@hackerwins hackerwins deleted the kafka-config branch February 12, 2025 07:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants