From e8571a4ceb650c07981d4d426393301d2964624b Mon Sep 17 00:00:00 2001 From: mkaruza Date: Wed, 10 Sep 2025 14:13:31 +0200 Subject: [PATCH] feat: Introduce debug sync point for testing It can be useful that we can control execution of process by adding specific debug sync point which will be executed when debug sync name is set. For now this is only basic functionality, but in future we can add functionality for example - WAIT FOR until sync point is added. Debug sync point works with different fibers so we can synchronize execution. Added simple example how this can be done. Signed-off-by: mkaruza --- src/server/debug_sync_point.h | 44 ++++++++++++++++++++++++++++ src/server/debugcmd.cc | 51 +++++++++++++++++++++++++++++++++ src/server/debugcmd.h | 10 +++++++ src/server/string_family.cc | 4 +++ tests/dragonfly/generic_test.py | 25 ++++++++++++++++ 5 files changed, 134 insertions(+) create mode 100644 src/server/debug_sync_point.h diff --git a/src/server/debug_sync_point.h b/src/server/debug_sync_point.h new file mode 100644 index 000000000000..91cdf56b9e45 --- /dev/null +++ b/src/server/debug_sync_point.h @@ -0,0 +1,44 @@ +// Copyright 205, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include + +#include "util/fibers/synchronization.h" + +namespace dfly { + +#ifdef NDEBUG +#define DEBUG_SYNC(n, f) +#else +#define DEBUG_SYNC(n, f) \ + { \ + if (debug_sync_point.Find(n)) \ + f(); \ + } +#endif + +class DebugSyncPoint { + public: + void Add(std::string_view name) { + util::fb2::LockGuard lk(mu_); + sync_points_.emplace(name); + } + + void Del(std::string_view name) { + util::fb2::LockGuard lk(mu_); + sync_points_.erase(name); + } + + bool Find(std::string_view name) { + return sync_points_.find(name) != sync_points_.end(); + } + + private: + absl::flat_hash_set sync_points_; + util::fb2::Mutex mu_; +}; + +inline DebugSyncPoint debug_sync_point; + +} // namespace dfly diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 8fede1703665..68447b2e36b3 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -33,6 +33,7 @@ extern "C" { #include "facade/cmd_arg_parser.h" #include "server/blocking_controller.h" #include "server/container_utils.h" +#include "server/debug_sync_point.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/main_service.h" @@ -41,6 +42,7 @@ extern "C" { #include "server/server_state.h" #include "server/string_family.h" #include "server/transaction.h" + using namespace std; ABSL_DECLARE_FLAG(string, dir); @@ -657,6 +659,8 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { " per second.", "SEGMENTS", " Prints segment info for the current database.", + "SYNC ADD [sync_name] | DEL [sync_name]", + " Enable or disable debug sync point.", "HELP", " Prints this help.", }; @@ -741,6 +745,11 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { if (subcmd == "SEGMENTS") { return Segments(args.subspan(1), builder); } + + if (subcmd == "SYNC") { + return Sync(args, builder); + } + string reply = UnknownSubCmd(subcmd, "DEBUG"); return builder->SendError(reply, kSyntaxErrType); } @@ -984,6 +993,48 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys, }); } +// SYNC arguments format: +// [ADD sync_point_name | DEL sync_point_name] +std::optional DebugCmd::ParseSyncArgs(CmdArgList args, + facade::SinkReplyBuilder* builder) { + CmdArgParser parser(args.subspan(1)); + SyncOptions options; + + while (parser.HasNext()) { + SyncFlags flag = parser.MapNext("ADD", SYNC_ADD, "DEL", SYNC_DEL); + switch (flag) { + case SYNC_ADD: + case SYNC_DEL: + options.sync_point_name = parser.Next(); + options.sync_action = flag; + break; + default: + LOG(FATAL) << "Unexpected flag in PopulateArgs. Args: " << args; + break; + } + } + if (parser.HasError()) { + builder->SendError(parser.TakeError().MakeReply()); + return nullopt; + } + return options; +} + +void DebugCmd::Sync(CmdArgList args, facade::SinkReplyBuilder* builder) { + optional options = ParseSyncArgs(args, builder); + if (!options.has_value()) { + return; + } + + if (options->sync_action == SYNC_ADD) { + debug_sync_point.Add(options->sync_point_name); + } else { + debug_sync_point.Del(options->sync_point_name); + } + + builder->SendOk(); +} + void DebugCmd::Exec(facade::SinkReplyBuilder* builder) { EngineShardSet& ess = *shard_set; fb2::Mutex mu; diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index 141fe3643357..cc63b719ae39 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -30,6 +30,12 @@ class DebugCmd { std::optional> expire_ttl_range; }; + enum SyncFlags { SYNC_ADD, SYNC_DEL }; + struct SyncOptions { + std::string sync_point_name; + SyncFlags sync_action; + }; + public: DebugCmd(ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx); @@ -43,6 +49,10 @@ class DebugCmd { facade::SinkReplyBuilder* builder); void PopulateRangeFiber(uint64_t from, uint64_t count, const PopulateOptions& opts); + static std::optional ParseSyncArgs(CmdArgList args, + facade::SinkReplyBuilder* builder); + void Sync(CmdArgList args, facade::SinkReplyBuilder* builder); + void Reload(CmdArgList args, facade::SinkReplyBuilder* builder); void Replica(CmdArgList args, facade::SinkReplyBuilder* builder); void Migration(CmdArgList args, facade::SinkReplyBuilder* builder); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index cedca8c3d315..e4769df23098 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -26,6 +26,7 @@ #include "server/command_registry.h" #include "server/common.h" #include "server/conn_context.h" +#include "server/debug_sync_point.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/family_utils.h" @@ -1178,6 +1179,9 @@ void StringFamily::SetNx(CmdArgList args, const CommandContext& cmnd_cntx) { void StringFamily::Get(CmdArgList args, const CommandContext& cmnd_cntx) { auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult { auto it_res = tx->GetDbSlice(es->shard_id()).FindReadOnly(tx->GetDbContext(), key, OBJ_STRING); + + DEBUG_SYNC("return_get_key_not_found", [&]() { it_res = OpStatus::KEY_NOTFOUND; }) + if (!it_res.ok()) return it_res.status(); diff --git a/tests/dragonfly/generic_test.py b/tests/dragonfly/generic_test.py index e6204dfc9821..a1eb7db757b8 100644 --- a/tests/dragonfly/generic_test.py +++ b/tests/dragonfly/generic_test.py @@ -342,3 +342,28 @@ async def test_key_bump_ups(df_factory): new_slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"]) assert new_slot_id + 1 == slot_id break + + +@pytest.mark.debug_only +@pytest.mark.asyncio +async def test_debug_sync(df_factory): + df_server = df_factory.create() + df_server.start() + client = df_server.client() + + value = "123" + + # Should return key + await client.execute_command(f"SET key {value}") + result = await client.get("key") + assert result == value + + # Enable debug sync point to return KEY_NOTFOUND + await client.execute_command("DEBUG SYNC ADD return_get_key_not_found") + result = await client.get("key") + assert result == None + + # Disable - should return key + await client.execute_command("DEBUG SYNC DEL return_get_key_not_found") + result = await client.get("key") + assert result == value