Skip to content

Commit afcb0e1

Browse files
committed
WIP
Signed-off-by: Carlosespicur <[email protected]>
1 parent e0c453b commit afcb0e1

File tree

5 files changed

+26
-0
lines changed

5 files changed

+26
-0
lines changed

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,6 +1298,7 @@ void DataWriterImpl::InnerDataWriterListener::on_writer_matched(
12981298
RTPSWriter* /*writer*/,
12991299
const MatchingInfo& info)
13001300
{
1301+
// std::lock_guard<std::mutex> guard(inner_listener_mutex_);
13011302
data_writer_->update_publication_matched_status(info);
13021303

13031304
StatusMask notify_status = StatusMask::publication_matched();
@@ -1317,6 +1318,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos(
13171318
RTPSWriter* /*writer*/,
13181319
fastdds::dds::PolicyMask qos)
13191320
{
1321+
// std::lock_guard<std::mutex> guard(inner_listener_mutex_);
13201322
data_writer_->update_offered_incompatible_qos(qos);
13211323
StatusMask notify_status = StatusMask::offered_incompatible_qos();
13221324
DataWriterListener* listener = data_writer_->get_listener_for(notify_status);
@@ -1356,6 +1358,7 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost(
13561358
fastdds::rtps::RTPSWriter* /*writer*/,
13571359
const LivelinessLostStatus& status)
13581360
{
1361+
// std::lock_guard<std::mutex> guard(inner_listener_mutex_);
13591362
data_writer_->update_liveliness_lost_status(status);
13601363
StatusMask notify_status = StatusMask::liveliness_lost();
13611364
DataWriterListener* listener = data_writer_->get_listener_for(notify_status);

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,8 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
487487
#endif //FASTDDS_STATISTICS
488488

489489
DataWriterImpl* data_writer_;
490+
491+
std::mutex inner_listener_mutex_;
490492
}
491493
writer_listener_;
492494

src/cpp/fastdds/subscriber/DataReaderImpl.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,7 @@ void DataReaderImpl::InnerDataReaderListener::on_reader_matched(
957957
RTPSReader* /*reader*/,
958958
const MatchingInfo& info)
959959
{
960+
// std::lock_guard<std::mutex> lock(inner_listener_mutex_);
960961
data_reader_->update_subscription_matched_status(info);
961962

962963
StatusMask notify_status = StatusMask::subscription_matched();
@@ -976,6 +977,7 @@ void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed(
976977
RTPSReader* /*reader*/,
977978
const LivelinessChangedStatus& status)
978979
{
980+
// std::lock_guard<std::mutex> guard(inner_listener_mutex_);
979981
data_reader_->update_liveliness_status(status);
980982
StatusMask notify_status = StatusMask::liveliness_changed();
981983
DataReaderListener* listener = data_reader_->get_listener_for(notify_status);
@@ -999,6 +1001,7 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos(
9991001
RTPSReader* /*reader*/,
10001002
PolicyMask qos)
10011003
{
1004+
// std::lock_guard<std::mutex> guard(inner_listener_mutex_);
10021005
data_reader_->update_requested_incompatible_qos(qos);
10031006
StatusMask notify_status = StatusMask::requested_incompatible_qos();
10041007
DataReaderListener* listener = data_reader_->get_listener_for(notify_status);
@@ -1022,6 +1025,7 @@ void DataReaderImpl::InnerDataReaderListener::on_sample_lost(
10221025
RTPSReader* /*reader*/,
10231026
int32_t sample_lost_since_last_update)
10241027
{
1028+
// std::lock_guard<std::mutex> guard(inner_listener_mutex_);
10251029
data_reader_->update_sample_lost_status(sample_lost_since_last_update);
10261030
StatusMask notify_status = StatusMask::sample_lost();
10271031
DataReaderListener* listener = data_reader_->get_listener_for(notify_status);
@@ -1046,6 +1050,7 @@ void DataReaderImpl::InnerDataReaderListener::on_sample_rejected(
10461050
SampleRejectedStatusKind reason,
10471051
const CacheChange_t* const change_in)
10481052
{
1053+
// std::lock_guard<std::mutex> guard(inner_listener_mutex_);
10491054
data_reader_->update_sample_rejected_status(reason, change_in);
10501055
StatusMask notify_status = StatusMask::sample_rejected();
10511056
DataReaderListener* listener = data_reader_->get_listener_for(notify_status);

src/cpp/fastdds/subscriber/DataReaderImpl.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ class DataReaderImpl
483483

484484
DataReaderImpl* data_reader_;
485485

486+
std::mutex inner_listener_mutex_;
486487
}
487488
reader_listener_;
488489

src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,8 @@ bool EDP::unpairWriterProxy(
524524
MatchingInfo info;
525525
info.status = REMOVED_MATCHING;
526526
info.remoteEndpointGuid = writer_guid;
527+
528+
std::lock_guard<RecursiveTimedMutex> lock(r.getMutex());
527529
listener->on_reader_matched(&r, info);
528530
}
529531
}
@@ -559,6 +561,7 @@ bool EDP::unpairReaderProxy(
559561
MatchingInfo info;
560562
info.status = REMOVED_MATCHING;
561563
info.remoteEndpointGuid = reader_guid;
564+
std::lock_guard<RecursiveTimedMutex> lock(w.getMutex());
562565
listener->on_writer_matched(&w, info);
563566
}
564567
}
@@ -868,6 +871,7 @@ bool EDP::pairingReader(
868871
MatchingInfo info;
869872
info.status = MATCHED_MATCHING;
870873
info.remoteEndpointGuid = writer_guid;
874+
std::lock_guard<RecursiveTimedMutex> lock(r.getMutex());
871875
reader->get_listener()->on_reader_matched(reader, info);
872876
}
873877
}
@@ -896,6 +900,7 @@ bool EDP::pairingReader(
896900
MatchingInfo info;
897901
info.status = REMOVED_MATCHING;
898902
info.remoteEndpointGuid = writer_guid;
903+
std::lock_guard<RecursiveTimedMutex> lock(reader->getMutex());
899904
reader->get_listener()->on_reader_matched(reader, info);
900905
}
901906
}
@@ -961,6 +966,7 @@ bool EDP::pairingWriter(
961966
MatchingInfo info;
962967
info.status = MATCHED_MATCHING;
963968
info.remoteEndpointGuid = reader_guid;
969+
std::lock_guard<RecursiveTimedMutex> lock(w.getMutex());
964970
writer->get_listener()->on_writer_matched(writer, info);
965971
}
966972
}
@@ -986,6 +992,7 @@ bool EDP::pairingWriter(
986992
MatchingInfo info;
987993
info.status = REMOVED_MATCHING;
988994
info.remoteEndpointGuid = reader_guid;
995+
std::lock_guard<RecursiveTimedMutex> lock(writer->getMutex());
989996
writer->get_listener()->on_writer_matched(writer, info);
990997
}
991998
}
@@ -1037,6 +1044,7 @@ bool EDP::pairing_reader_proxy_with_any_local_writer(
10371044
MatchingInfo info;
10381045
info.status = MATCHED_MATCHING;
10391046
info.remoteEndpointGuid = reader_guid;
1047+
std::lock_guard<RecursiveTimedMutex> lock(w.getMutex());
10401048
w.get_listener()->on_writer_matched(&w, info);
10411049
}
10421050
}
@@ -1063,6 +1071,7 @@ bool EDP::pairing_reader_proxy_with_any_local_writer(
10631071
MatchingInfo info;
10641072
info.status = REMOVED_MATCHING;
10651073
info.remoteEndpointGuid = reader_guid;
1074+
std::lock_guard<RecursiveTimedMutex> lock(w.getMutex());
10661075
w.get_listener()->on_writer_matched(&w, info);
10671076
}
10681077
}
@@ -1130,6 +1139,7 @@ bool EDP::pairing_reader_proxy_with_local_writer(
11301139
MatchingInfo info;
11311140
info.status = REMOVED_MATCHING;
11321141
info.remoteEndpointGuid = reader_guid;
1142+
std::lock_guard<RecursiveTimedMutex> lock(w.getMutex());
11331143
w.get_listener()->on_writer_matched(&w, info);
11341144
}
11351145
}
@@ -1172,6 +1182,7 @@ bool EDP::pairing_remote_reader_with_local_writer_after_security(
11721182
MatchingInfo info;
11731183
info.status = MATCHED_MATCHING;
11741184
info.remoteEndpointGuid = reader_guid;
1185+
std::lock_guard<RecursiveTimedMutex> lock(w.getMutex());
11751186
w.get_listener()->on_writer_matched(&w, info);
11761187
}
11771188
}
@@ -1230,6 +1241,7 @@ bool EDP::pairing_writer_proxy_with_any_local_reader(
12301241
MatchingInfo info;
12311242
info.status = MATCHED_MATCHING;
12321243
info.remoteEndpointGuid = writer_guid;
1244+
std::lock_guard<RecursiveTimedMutex> lock(r.getMutex());
12331245
r.get_listener()->on_reader_matched(&r, info);
12341246
}
12351247
}
@@ -1256,6 +1268,7 @@ bool EDP::pairing_writer_proxy_with_any_local_reader(
12561268
MatchingInfo info;
12571269
info.status = REMOVED_MATCHING;
12581270
info.remoteEndpointGuid = writer_guid;
1271+
std::lock_guard<RecursiveTimedMutex> lock(r.getMutex());
12591272
r.get_listener()->on_reader_matched(&r, info);
12601273
}
12611274
}
@@ -1323,6 +1336,7 @@ bool EDP::pairing_writer_proxy_with_local_reader(
13231336
MatchingInfo info;
13241337
info.status = REMOVED_MATCHING;
13251338
info.remoteEndpointGuid = writer_guid;
1339+
std::lock_guard<RecursiveTimedMutex> lock(r.getMutex());
13261340
r.get_listener()->on_reader_matched(&r, info);
13271341
}
13281342
}
@@ -1367,6 +1381,7 @@ bool EDP::pairing_remote_writer_with_local_reader_after_security(
13671381
MatchingInfo info;
13681382
info.status = MATCHED_MATCHING;
13691383
info.remoteEndpointGuid = writer_guid;
1384+
std::lock_guard<RecursiveTimedMutex> lock(r.getMutex());
13701385
r.get_listener()->on_reader_matched(&r, info);
13711386

13721387
}

0 commit comments

Comments
 (0)