Skip to content

Commit

Permalink
feat(duplication): make the task code for incremental loading from pr…
Browse files Browse the repository at this point in the history
…ivate logs configurable (#2184)

#2183

We can make the task code configurable, allowing the thread priority incremental
loading from private logs of to be adjusted from **LOW** to **COMMON**, thereby
enabling support for low-latency real-time duplication.
  • Loading branch information
ninsmiracle authored Mar 7, 2025
1 parent 88bd9a3 commit cb9a1d3
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions src/replica/duplication/replica_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,26 @@
#include "load_from_private_log.h"
#include "replica/mutation_log.h"
#include "replica/replica.h"
#include "task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"

METRIC_DEFINE_counter(replica,
dup_confirmed_mutations,
dsn::metric_unit::kMutations,
"The number of confirmed mutations for dup");

namespace dsn {
namespace replication {
DSN_DEFINE_string(
replication,
dup_load_plog_task,
"LPC_REPLICATION_LONG_LOW",
"The task code for incremental loading from private logs while duplicating. Tasks with "
"TASK_PRIORITY_HIGH are not recommended.");
DSN_TAG_VARIABLE(dup_load_plog_task, FT_MUTABLE);

namespace dsn::replication {

replica_duplicator::replica_duplicator(const duplication_entry &ent, replica *r)
: replica_base(r),
Expand Down Expand Up @@ -161,7 +170,13 @@ void replica_duplicator::start_dup_log()
_load = std::make_unique<load_mutation>(this, _replica, _load_private.get());

from(*_load).link(*_ship).link(*_load);
fork(*_load_private, LPC_REPLICATION_LONG_LOW, 0).link(*_ship);
auto dup_load_plog_task = dsn::task_code::try_get(FLAGS_dup_load_plog_task, TASK_CODE_INVALID);
if (dup_load_plog_task == TASK_CODE_INVALID) {
dup_load_plog_task = LPC_REPLICATION_LONG_LOW;
LOG_ERROR_PREFIX("invalid dup_load_plog_task ({}), set it to LPC_REPLICATION_LONG_LOW",
FLAGS_dup_load_plog_task);
}
fork(*_load_private, dup_load_plog_task, 0).link(*_ship);

run_pipeline();
}
Expand Down Expand Up @@ -317,5 +332,4 @@ void replica_duplicator::set_duplication_plog_checking(bool checking)
_replica->set_duplication_plog_checking(checking);
}

} // namespace replication
} // namespace dsn
} // namespace dsn::replication

0 comments on commit cb9a1d3

Please sign in to comment.