Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The RecordIntegrationTestFixture.can_record_again_after_stop test is tend to be flaky #1914

Open
MichaelOrlov opened this issue Feb 19, 2025 · 1 comment
Assignees
Labels
bug Something isn't working

Comments

@MichaelOrlov
Copy link
Contributor

MichaelOrlov commented Feb 19, 2025

Description

Sometimes, on CI the RecordIntegrationTestFixture.can_record_again_after_stop test fails because recorder receiving 5 messages while expected only 4.
It seems that the subscription from the first run hasn't been deleted and triggered callback twice.
It was added debug information in the #1871 PR.

Expected Behavior

The test shall pass without errors and the Recorder shall not receive extra unexpected messages

Actual Behavior

Sometimes Rosbag2 recorder receives one extra message after calling recorder::stop() and then recorder::record() again.

To Reproduce

The issue reproduces on CI only.
Link to the failed CI job https://build.ros2.org/job/Rpr__rosbag2__ubuntu_noble_amd64/528/testReport/junit/rosbag2_transport/RecordIntegrationTestFixture/can_record_again_after_stop/
Full log with debug info available here https://build.ros2.org/job/Rpr__rosbag2__ubuntu_noble_amd64/528/consoleText

Log for failing test: Click to expand
5: [ RUN      ] RecordIntegrationTestFixture.can_record_again_after_stop
15: stdin is not a terminal device. Keyboard handling disabled.[INFO] [1739949727.541278352] [rosbag2_recorder]: Press SPACE for pausing/resuming
15: [INFO] [1739949727.543589196] [rosbag2_recorder]: Listening for topics...
15: [INFO] [1739949727.544010616] [rosbag2_recorder]: Event publisher thread: Starting
15: [INFO] [1739949727.545092141] [rosbag2_recorder]: Subscribed to topic '/can_record_again_after_stop_topic'
15: [INFO] [1739949727.545195423] [rosbag2_recorder]: Recording...
15: [INFO] [1739949727.545510770] [rosbag2_recorder]: All requested topics are subscribed. Stopping discovery...
15: [INFO] [1739949727.665805436] [rosbag2_recorder]: Pausing recording.
15: [INFO] [1739949727.666431920] [rosbag2_recorder]: Event publisher thread: Exiting
15: [INFO] [1739949727.666767578] [rosbag2_recorder]: Recording stopped
15: [INFO] [1739949727.668759854] [rosbag2_recorder]: Listening for topics...
15: [INFO] [1739949727.668783415] [rosbag2_recorder]: Event publisher thread: Starting
15: [INFO] [1739949727.669554603] [rosbag2_recorder]: Subscribed to topic '/can_record_again_after_stop_topic'
15: [INFO] [1739949727.669638865] [rosbag2_recorder]: Recording...
15: [INFO] [1739949727.669854790] [rosbag2_recorder]: All requested topics are subscribed. Stopping discovery...
15: /tmp/ws/src/rosbag2/rosbag2_transport/test/rosbag2_transport/test_record.cpp:171: Failure
15: Value of: recorded_messages
15: Expected: has a size that is equal to 4
15:   Actual: { (ptr = 0x7ff7200023a0, value = 64-byte object <20-23 00-20 F7-7F 00-00 B0-22 00-20 F7-7F 00-00 D6-53 93-FE 0E-8B 25-18 CD-C0 92-FE 0E-8B 25-18 A0-17 00-20 F7-7F 00-00 22-00 00-00 00-00 00-00 22-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00>), (ptr = 0x7ff7200034b0, value = 64-byte object <B0-1E 00-20 F7-7F 00-00 10-34 00-20 F7-7F 00-00 E8-3D 91-01 0F-8B 25-18 5F-67 90-01 0F-8B 25-18 80-1E 00-20 F7-7F 00-00 22-00 00-00 00-00 00-00 22-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00>), (ptr = 0x7ff720003620, value = 64-byte object <C0-35 00-20 F7-7F 00-00 50-32 00-20 F7-7F 00-00 2B-CF C9-04 0F-8B 25-18 41-AA C9-04 0F-8B 25-18 20-32 00-20 F7-7F 00-00 22-00 00-00 00-00 00-00 22-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00>), (ptr = 0x557aa0467840, value = 64-byte object <70-0D 46-A0 7A-55 00-00 80-32 00-20 F7-7F 00-00 68-E8 C9-04 0F-8B 25-18 41-AA C9-04 0F-8B 25-18 50-90 48-A0 7A-55 00-00 22-00 00-00 00-00 00-00 22-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00>), (ptr = 0x557aa046ac00, value = 64-byte object <10-BF 3A-A0 7A-55 00-00 D0-36 00-20 F7-7F 00-00 7B-5D C7-07 0F-8B 25-18 98-B3 C6-07 0F-8B 25-18 30-35 00-20 F7-7F 00-00 22-00 00-00 00-00 00-00 22-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00>) }, whose size 5 doesn't match
15: 
15: => recorded_messages[0].send_timestamp = 1739949727565398221
15: recorded_messages[0].recv_timestamp = 1739949727565435862
15: => recorded_messages[1].send_timestamp = 1739949727615575903
15: recorded_messages[1].recv_timestamp = 1739949727615630824
15: => recorded_messages[2].send_timestamp = 1739949727669660225
15: recorded_messages[2].recv_timestamp = 1739949727669669675
15: => recorded_messages[3].send_timestamp = 1739949727669660225
15: recorded_messages[3].recv_timestamp = 1739949727669676136
15: => recorded_messages[4].send_timestamp = 1739949727719797656
15: recorded_messages[4].recv_timestamp = 1739949727719841147
15: /tmp/ws/src/rosbag2/rosbag2_transport/test/rosbag2_transport/test_record.cpp:194: Failure
15: Value of: basic_type_messages
15: Expected: has a size that is equal to 4
15:   Actual: { (ptr = 0x557aa0307d60, value = 48-byte object <00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 54-2C 31-0B DC-1E 01-00 00-00 00-00 00-00 00-00>), (ptr = 0x557aa0749ee0, value = 48-byte object <00-00 00-2E 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 65-20 00-00 00-00 00-00 00-00 72-C9 2E-0E DC-1E 01-00 01-00 00-00 00-00 00-00>), (ptr = 0x557aa0749fc0, value = 48-byte object <00-00 00-2E 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 65-20 00-00 00-00 00-00 00-00 E2-1E 68-11 DC-1E 01-00 00-00 00-00 00-00 00-00>), (ptr = 0x557aa0575490, value = 48-byte object <00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 E2-1E 68-11 DC-1E 01-00 00-00 00-00 00-00 00-00>), (ptr = 0x557aa0592510, value = 48-byte object <00-00 00-2E 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 00-00 65-20 00-00 00-00 00-00 00-00 1D-12 65-14 DC-1E 01-00 01-00 00-00 00-00 00-00>) }, whose size 5 doesn't match
15: 
15: => basic_type_messages[0].uint64_value = 0
15: basic_type_messages[0].int64_value = 315405406121044
15: => basic_type_messages[1].uint64_value = 1
15: basic_type_messages[1].int64_value = 315405456296306
15: => basic_type_messages[2].uint64_value = 0
15: basic_type_messages[2].int64_value = 315405510385378
15: => basic_type_messages[3].uint64_value = 0
15: basic_type_messages[3].int64_value = 315405510385378
15: => basic_type_messages[4].uint64_value = 1
15: basic_type_messages[4].int64_value = 315405560517149
15: /tmp/ws/src/rosbag2/rosbag2_transport/test/rosbag2_transport/test_record.cpp:208: Failure
15: Value of: basic_type_messages[i]->uint64_value
15: Expected: is equal to 1
15:   Actual: 0 (of type unsigned long)
15: 
15: /tmp/ws/src/rosbag2/rosbag2_transport/test/rosbag2_transport/test_record.cpp:208: Failure
15: Value of: basic_type_messages[i]->uint64_value
15: Expected: is equal to 0
15:   Actual: 1 (of type unsigned long)
15: 
15: [INFO] [1739949727.772790963] [rosbag2_recorder]: Pausing recording.
15: [INFO] [1739949727.773680534] [rosbag2_recorder]: Event publisher thread: Exiting
15: [INFO] [1739949727.773726605] [rosbag2_recorder]: Recording stopped
15: [  FAILED  ] RecordIntegrationTestFixture.can_record_again_after_stop (253 ms)

System (please complete the following information)

  • OS: Ubuntu 24.04
  • ROS 2 Distro: Rolling, Jazzy
  • Install Method: source
  • Version: N/A

Additional context

** Add any other context about the problem here **

@MichaelOrlov MichaelOrlov added the bug Something isn't working label Feb 19, 2025
@MichaelOrlov
Copy link
Contributor Author

Preliminary RCA (Root Cause Analysis)

From the log it is clear that we are getting two identical messages, and one of them is extra and unexpected.

15: => basic_type_messages[2].uint64_value = 0
15: basic_type_messages[2].int64_value = 315405510385378
15: => basic_type_messages[3].uint64_value = 0
15: basic_type_messages[3].int64_value = 315405510385378

However, while the received timestamp for them is different, the sent timestamps are the same

15: => recorded_messages[2].send_timestamp = 1739949727669660225
15: recorded_messages[2].recv_timestamp = 1739949727669669675
15: => recorded_messages[3].send_timestamp = 1739949727669660225
15: recorded_messages[3].recv_timestamp = 1739949727669676136

It does mean that the message was sent only once, but received twice!

In the Rosbag2 recorder, we clear the inner subscriptions list once when we are doing Recorder::stop() and the second time just before the start of the recording.
It seems there are no race conditions in the Recorder class or the possibility of creating the same subscription twice.
The inner subscription_ list is an unordered_map with shared pointers to the generic subscriptions, and in theory when we do
subscriptions_.clear(); the subscriptions should be gone.

However, in practice, things are not so simple.

  1. When we are creating GenericPoilingSubscription from node before returning shared pointer to the subscription we also adding it to the topics_interface_
    https://github.com/ros2/rclcpp/blob/34cc960124af07be717b77e0c6781ef57308aebe/rclcpp/include/rclcpp/create_generic_subscription.hpp#L72-L81
  auto subscription = std::make_shared<GenericSubscription>(
    topics_interface->get_node_base_interface(),
    std::move(ts_lib),
    topic_name,
    topic_type,
    qos,
    any_subscription_callback,
    options);

  topics_interface->add_subscription(subscription, options.callback_group);
  1. The NodeTopicsInterface doesn't keep a shared pointer on the subscription itself. It keeps weak pointer. However, it is adding subscription callback to the default node's callback group. Which is further on uses by the executor.
  2. When the executor gathers executable items, it goes through the internal node's subscription items, locks the weak pointer, and increases the counter for the shared pointer. See the void CallbackGroup::collect_all_ptrs(..) for details.
  3. The problem is that we have API to add subscription to the node. More correctly would be to say that the subscription automatically adding to the node during creation. Although there is no "way" API to explicitly remove or delete subscriptions from node or executor. Therefore, it could potentially be a situation in which even if we deleted the instance of the shared pointer on the Rosbag2 recorder side, it is still alive, and at least its callback persists in the executor callback group.
  4. I found only one place where we are removing "expired" subscriptions from the CallbackGroup and ironically we do this only after we are adding some new subscription https://github.com/ros2/rclcpp/blob/34cc960124af07be717b77e0c6781ef57308aebe/rclcpp/src/rclcpp/callback_group.cpp#L151-L162
CallbackGroup::add_subscription(
  const rclcpp::SubscriptionBase::SharedPtr subscription_ptr)
{
  std::lock_guard<std::mutex> lock(mutex_);
  subscription_ptrs_.push_back(subscription_ptr);
  subscription_ptrs_.erase(
    std::remove_if(
      subscription_ptrs_.begin(),
      subscription_ptrs_.end(),
      [](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
    subscription_ptrs_.end());
}

Potentially it could be a race condition that when we reached out subscription cleanup code it is weak pointer is not expired because the executor transformed it to the shared pointer in the void CallbackGroup::collect_all_ptrs(..) and hasn't been released it yet.

Proposed solution

The straightforward solution would be to redesign the Rosbag2 recorder and eliminate using SingleThreadedExecutor inside it; instead, start using rclcpp::ThreadSafeWaitSet directly. This would give us more granular control over execution items and callbacks.
Another option would be to try to figure out how to add API to the rclcpp::node for explicitly removing/deleting previously created subscriptions. And also, figure out how to automatically trigger deletion of the subscription from the already added executor callback group. But it looks like a dark rabbit hole.

cc: @emersonknapp @mjcarroll, @wjwwood

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants