Skip to content

Commit 3533329

Browse files
committed
Store wsrep transaction seqno and UUID into Xid_log_event
Prior to this change, wsrep overwrote xid used to prepare a transaction with a xid containing wsrep seqno and UUID. This however is not fully compatible with partition engine and atomic DDL recovery. In order to avoid overwriting xid during prepare, store the wsrep transaction seqno and UUID into Xid_log_event if the transaction is wsrep transaction. This way the seqno and UUID will be available to reconstruct the wsrep XID for commit during binlog coordinated recovery. This changes on-disk format of Xid_log_event to contain additional 24 bytes for wsrep seqno and UUID if the transaction is wsrep transaction. As Xid_log_event has no meaning in replication and used only in recovery, the change is backwards compatible. The Xid_log_event output in mariadb-binlog is extended to print also wsrep_seqno and wsrep_uuid if they are set.
1 parent c92add2 commit 3533329

File tree

14 files changed

+207
-26
lines changed

14 files changed

+207
-26
lines changed

include/mysql/service_wsrep.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ extern struct wsrep_service_st {
5454
int (*wsrep_is_wsrep_xid_func)(const void *xid);
5555
long long (*wsrep_xid_seqno_func)(const struct xid_t *xid);
5656
const unsigned char* (*wsrep_xid_uuid_func)(const struct xid_t *xid);
57+
const struct xid_t* (*wsrep_commit_xid_func)(const MYSQL_THD thd);
5758
my_bool (*wsrep_on_func)(const MYSQL_THD thd);
5859
bool (*wsrep_prepare_key_for_innodb_func)(MYSQL_THD thd, const unsigned char*, size_t, const unsigned char*, size_t, struct wsrep_buf*, size_t*);
5960
void (*wsrep_thd_LOCK_func)(const MYSQL_THD thd);
@@ -63,6 +64,7 @@ extern struct wsrep_service_st {
6364
int (*wsrep_thd_retry_counter_func)(const MYSQL_THD thd);
6465
bool (*wsrep_thd_ignore_table_func)(MYSQL_THD thd);
6566
long long (*wsrep_thd_trx_seqno_func)(const MYSQL_THD thd);
67+
const unsigned char* (*wsrep_thd_trx_uuid_func)(const MYSQL_THD thd);
6668
my_bool (*wsrep_thd_is_aborting_func)(const MYSQL_THD thd);
6769
void (*wsrep_set_data_home_dir_func)(const char *data_dir);
6870
my_bool (*wsrep_thd_is_BF_func)(const MYSQL_THD thd, my_bool sync);
@@ -109,6 +111,7 @@ extern struct wsrep_service_st {
109111
#define wsrep_is_wsrep_xid(X) wsrep_service->wsrep_is_wsrep_xid_func(X)
110112
#define wsrep_xid_seqno(X) wsrep_service->wsrep_xid_seqno_func(X)
111113
#define wsrep_xid_uuid(X) wsrep_service->wsrep_xid_uuid_func(X)
114+
#define wsrep_commit_xid(thd) wsrep_service->wsrep_commit_xid_func(thd)
112115
#define wsrep_on(thd) (thd) && WSREP_ON && wsrep_service->wsrep_on_func(thd)
113116
#define wsrep_prepare_key_for_innodb(A,B,C,D,E,F,G) wsrep_service->wsrep_prepare_key_for_innodb_func(A,B,C,D,E,F,G)
114117
#define wsrep_thd_LOCK(T) wsrep_service->wsrep_thd_LOCK_func(T)
@@ -120,6 +123,7 @@ extern struct wsrep_service_st {
120123
#define wsrep_thd_retry_counter(T) wsrep_service->wsrep_thd_retry_counter_func(T)
121124
#define wsrep_thd_ignore_table(T) wsrep_service->wsrep_thd_ignore_table_func(T)
122125
#define wsrep_thd_trx_seqno(T) wsrep_service->wsrep_thd_trx_seqno_func(T)
126+
#define wsrep_thd_trx_uuid(T) wsrep_service->wsrep_thd_trx_uuid_func(T)
123127
#define wsrep_set_data_home_dir(A) wsrep_service->wsrep_set_data_home_dir_func(A)
124128
#define wsrep_thd_is_BF(T,S) wsrep_service->wsrep_thd_is_BF_func(T,S)
125129
#define wsrep_thd_is_aborting(T) wsrep_service->wsrep_thd_is_aborting_func(T)
@@ -163,7 +167,9 @@ extern "C" const char *wsrep_thd_query(const MYSQL_THD thd);
163167
extern "C" int wsrep_is_wsrep_xid(const void* xid);
164168
extern "C" long long wsrep_xid_seqno(const struct xid_t* xid);
165169
const unsigned char* wsrep_xid_uuid(const struct xid_t* xid);
170+
extern "C" const struct xid_t* wsrep_commit_xid(const MYSQL_THD thd);
166171
extern "C" long long wsrep_thd_trx_seqno(const MYSQL_THD thd);
172+
extern "C" const unsigned char* wsrep_thd_trx_uuid(const MYSQL_THD thd);
167173
my_bool get_wsrep_recovery();
168174
bool wsrep_thd_ignore_table(MYSQL_THD thd);
169175
void wsrep_set_data_home_dir(const char *data_dir);

sql/handler.cc

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1999,15 +1999,6 @@ int ha_commit_trans(THD *thd, bool all)
19991999
xid= thd->transaction->implicit_xid.quick_get_my_xid();
20002000

20012001
#ifdef WITH_WSREP
2002-
if (run_wsrep_hooks && !error)
2003-
{
2004-
wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid);
2005-
if (!s.is_undefined())
2006-
{
2007-
// xid was rewritten by wsrep
2008-
xid= s.get();
2009-
}
2010-
}
20112002
if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all)))
20122003
goto wsrep_err;
20132004
#endif /* WITH_WSREP */
@@ -2692,7 +2683,26 @@ static void xarecover_do_commit_or_rollback(transaction_participant *hton,
26922683
x= *member->full_xid;
26932684

26942685
if (xarecover_decide_to_commit(member, ptr_commit_max))
2686+
{
2687+
#ifdef WITH_WSREP
2688+
XID wsrep_commit_xid;
2689+
if (!member->wsrep_seqno.is_undefined())
2690+
{
2691+
wsrep::gtid wsrep_gtid{
2692+
member->wsrep_uuid,
2693+
member->wsrep_seqno};
2694+
wsrep_server_gtid_t server_gtid{member->wsrep_gtid_domain_id,
2695+
member->wsrep_gtid_server_id,
2696+
member->wsrep_gtid_seq_no};
2697+
wsrep_xid_init(&wsrep_commit_xid, wsrep_gtid, server_gtid);
2698+
wsrep_recovery_commit_xid= &wsrep_commit_xid;
2699+
}
2700+
#endif /* WITH_WSREP */
26952701
rc= hton->commit_by_xid(&x);
2702+
#ifdef WITH_WSREP
2703+
wsrep_recovery_commit_xid = nullptr;
2704+
#endif /* WITH_WSREP */
2705+
}
26962706
else if (hton->recover_rollback_by_xid &&
26972707
IF_WSREP(!(WSREP_ON || wsrep_recovery), true))
26982708
rc= hton->recover_rollback_by_xid(&x);

sql/handler.h

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444
#include "mem_root_array.h"
4545
#include <utility> // pair
4646
#include <my_attribute.h> /* __attribute__ */
47+
#ifdef WITH_WSREP
48+
#include "wsrep/id.hpp"
49+
#include "wsrep/seqno.hpp"
50+
#endif /* WITH_WSREP */
4751

4852
class Alter_info;
4953
class Virtual_column_info;
@@ -1028,12 +1032,29 @@ struct xid_recovery_member
10281032
XID *full_xid; // needed by wsrep or past it recovery
10291033
decltype(::server_id) server_id; // server id of orginal server
10301034

1035+
#ifdef WITH_WSREP
1036+
/* wsrep specific fields to reconstruct wsrep_xid for commit.
1037+
If wsrep_seqno is undefined, the transaction is not a wsrep transaction. */
1038+
wsrep::seqno wsrep_seqno;
1039+
wsrep::id wsrep_uuid;
1040+
uint32 wsrep_gtid_domain_id;
1041+
uint32 wsrep_gtid_server_id;
1042+
uint64 wsrep_gtid_seq_no;
1043+
#endif /* WITH_WSREP */
1044+
10311045
xid_recovery_member(my_xid xid_arg, uint prepare_arg, bool decided_arg,
10321046
XID *full_xid_arg, decltype(::server_id) server_id_arg)
10331047
: xid(xid_arg), in_engine_prepare(prepare_arg),
10341048
decided_to_commit(decided_arg),
10351049
binlog_coord(Binlog_offset(MAX_binlog_id, MAX_off_t)),
1036-
full_xid(full_xid_arg), server_id(server_id_arg) {};
1050+
full_xid(full_xid_arg), server_id(server_id_arg)
1051+
#ifdef WITH_WSREP
1052+
,
1053+
wsrep_seqno(wsrep::seqno::undefined()),
1054+
wsrep_uuid(wsrep::id::undefined()), wsrep_gtid_domain_id(0),
1055+
wsrep_gtid_server_id(0), wsrep_gtid_seq_no(0)
1056+
#endif /* WITH_WSREP */
1057+
{}
10371058
};
10381059

10391060
/* for recover() handlerton call */

sql/log.cc

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2071,6 +2071,14 @@ binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr,
20712071
DBUG_ASSERT(xid); // replaced former treatment of ONE-PHASE XA
20722072

20732073
Xid_log_event end_evt(thd, xid, TRUE);
2074+
#ifdef WITH_WSREP
2075+
if (WSREP(thd))
2076+
{
2077+
end_evt.wsrep_seqno= wsrep_thd_trx_seqno(thd);
2078+
memcpy(end_evt.wsrep_uuid, wsrep_thd_trx_uuid(thd),
2079+
sizeof(end_evt.wsrep_uuid));
2080+
}
2081+
#endif
20742082
if (!thd->rgi_slave && !thd->user_time.val)
20752083
{
20762084
/*
@@ -11791,6 +11799,16 @@ class Recovery_context
1179111799
bool last_gtid_no2pc; // true when the group does not end with Xid event
1179211800
uint last_gtid_engines;
1179311801
Binlog_offset last_gtid_coord; // <binlog id, binlog offset>
11802+
#ifdef WITH_WSREP
11803+
/*
11804+
Wsrep gtid specific fields are initialized from SE checkpoint and
11805+
updated when Gtid_log_event with domain id matching last_wsrep_gtid_domain_id
11806+
is processed.
11807+
*/
11808+
uint32 last_wsrep_gtid_domain_id;
11809+
uint32 last_wsrep_gtid_server_id;
11810+
uint64 last_wsrep_gtid_seq_no;
11811+
#endif /* WITH_WSREP */
1179411812
/*
1179511813
When true, it's semisync slave recovery mode
1179611814
rolls back transactions in doubt and wipes them off from binlog.
@@ -11884,6 +11902,10 @@ class Recovery_context
1188411902
*/
1188511903
void process_gtid(int round, Gtid_log_event *gev, LOG_INFO *linfo);
1188611904

11905+
#ifdef WITH_WSREP
11906+
void process_wsrep(xid_recovery_member *member, const Xid_log_event *xid_ev);
11907+
#endif /* WITH_WSREP */
11908+
1188711909
/*
1188811910
Compute next action at the end of processing of the current binlog file.
1188911911
It may increment the round.
@@ -12027,6 +12049,13 @@ Recovery_context::Recovery_context() :
1202712049
binlog_unsafe_gtid= truncate_gtid= truncate_gtid_1st_round= rpl_gtid();
1202812050
if (do_truncate)
1202912051
gtid_maybe_to_truncate= new Dynamic_array<rpl_gtid>(16, 16);
12052+
#ifdef WITH_WSREP
12053+
wsrep_server_gtid_t wsrep_server_gtid=
12054+
wsrep_get_SE_checkpoint<wsrep_server_gtid_t>();
12055+
last_wsrep_gtid_domain_id= wsrep_server_gtid.domain_id;
12056+
last_wsrep_gtid_server_id= wsrep_server_gtid.server_id;
12057+
last_wsrep_gtid_seq_no= wsrep_server_gtid.seqno;
12058+
#endif /* WITH_WSREP */
1203012059
}
1203112060

1203212061
bool Recovery_context::reset_truncate_coord(my_off_t pos)
@@ -12232,8 +12261,30 @@ void Recovery_context::process_gtid(int round, Gtid_log_event *gev,
1223212261
/* Update the binlog state with any 'valid' GTID logged after Gtid_list. */
1223312262
last_gtid_valid= true; // may flip at Xid when falls to truncate
1223412263
}
12264+
#ifdef WITH_WSREP
12265+
if (last_gtid.domain_id == last_wsrep_gtid_domain_id)
12266+
{
12267+
last_wsrep_gtid_server_id= last_gtid.server_id;
12268+
last_wsrep_gtid_seq_no= last_gtid.seq_no;
12269+
}
12270+
#endif /* WITH_WSREP */
1223512271
}
1223612272

12273+
#ifdef WITH_WSREP
12274+
void Recovery_context::process_wsrep(xid_recovery_member *member,
12275+
const Xid_log_event *xid_ev)
12276+
{
12277+
if (member && xid_ev->wsrep_seqno != Xid_log_event::wsrep_seqno_undefined)
12278+
{
12279+
member->wsrep_seqno= wsrep::seqno{xid_ev->wsrep_seqno};
12280+
member->wsrep_uuid= wsrep::id{xid_ev->wsrep_uuid, sizeof(member->wsrep_uuid)};
12281+
member->wsrep_gtid_domain_id= last_wsrep_gtid_domain_id;
12282+
member->wsrep_gtid_server_id= last_wsrep_gtid_server_id;
12283+
member->wsrep_gtid_seq_no= last_wsrep_gtid_seq_no;
12284+
}
12285+
}
12286+
#endif /* WITH_WSREP */
12287+
1223712288
int Recovery_context::next_binlog_or_round(int& round,
1223812289
const char *last_log_name,
1223912290
const char *binlog_checkpoint_name,
@@ -12381,9 +12432,10 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
1238112432
case XID_EVENT:
1238212433
if (do_xa)
1238312434
{
12435+
const Xid_log_event *xid_ev= static_cast<const Xid_log_event*>(ev);
1238412436
xid_recovery_member *member=
1238512437
(xid_recovery_member*)
12386-
my_hash_search(&xids, (uchar*) &static_cast<Xid_log_event*>(ev)->xid,
12438+
my_hash_search(&xids, (uchar*) &xid_ev->xid,
1238712439
sizeof(my_xid));
1238812440
#ifndef HAVE_REPLICATION
1238912441
{
@@ -12393,6 +12445,9 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
1239312445
#else
1239412446
if (ctx.decide_or_assess(member, round, fdle, linfo, end_pos))
1239512447
goto err2;
12448+
#ifdef WITH_WSREP
12449+
ctx.process_wsrep(member, xid_ev);
12450+
#endif /* WITH_WSREP */
1239612451
#endif
1239712452
}
1239812453
break;

sql/log_event.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1139,7 +1139,7 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len,
11391139
ev= new Intvar_log_event(buf, fdle);
11401140
break;
11411141
case XID_EVENT:
1142-
ev= new Xid_log_event(buf, fdle);
1142+
ev= new Xid_log_event(buf, event_len, fdle);
11431143
break;
11441144
case XA_PREPARE_LOG_EVENT:
11451145
ev= new XA_prepare_log_event(buf, fdle);
@@ -2743,13 +2743,29 @@ Rand_log_event::Rand_log_event(const uchar *buf,
27432743

27442744
Xid_log_event::
27452745
Xid_log_event(const uchar *buf,
2746+
uint event_len,
27462747
const Format_description_log_event *description_event)
27472748
:Xid_apply_log_event(buf, description_event)
27482749
{
27492750
/* The Post-Header is empty. The Variable Data part begins immediately. */
27502751
buf+= description_event->common_header_len +
27512752
description_event->post_header_len[XID_EVENT-1];
27522753
memcpy((char*) &xid, buf, sizeof(xid));
2754+
const uint len_with_wsrep= description_event->common_header_len +
2755+
description_event->post_header_len[XID_EVENT-1] +
2756+
sizeof(xid) + sizeof(wsrep_seqno) + sizeof(wsrep_uuid);
2757+
if (event_len == len_with_wsrep)
2758+
{
2759+
buf += sizeof(xid);
2760+
wsrep_seqno= sint8korr(buf);
2761+
buf += sizeof(wsrep_seqno);
2762+
memcpy(wsrep_uuid, buf, sizeof(wsrep_uuid));
2763+
}
2764+
else
2765+
{
2766+
wsrep_seqno= wsrep_seqno_undefined;
2767+
memset(wsrep_uuid, 0, sizeof(wsrep_uuid));
2768+
}
27532769
}
27542770

27552771
/**************************************************************************

sql/log_event.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2823,6 +2823,9 @@ class Xid_apply_log_event: public Log_event
28232823
Logs xid of the transaction-to-be-committed in the 2pc protocol.
28242824
Has no meaning in replication, slaves ignore it.
28252825
2826+
Stores wsrep_seqno and wsrep_uuid in the event body if the event
2827+
was produced by server with wsrep enabled.
2828+
28262829
@section Xid_log_event_binary_format Binary Format
28272830
*/
28282831
#ifdef MYSQL_CLIENT
@@ -2833,10 +2836,16 @@ class Xid_log_event: public Xid_apply_log_event
28332836
{
28342837
public:
28352838
my_xid xid;
2839+
/* This matches the definition of WSREP_SEQNO_UNDEFINED and
2840+
wsrep::seqno::undefined() */
2841+
constexpr static int64 wsrep_seqno_undefined= -1;
2842+
int64 wsrep_seqno;
2843+
uchar wsrep_uuid[16];
28362844

28372845
#ifdef MYSQL_SERVER
28382846
Xid_log_event(THD* thd_arg, my_xid x, bool direct):
2839-
Xid_apply_log_event(thd_arg), xid(x)
2847+
Xid_apply_log_event(thd_arg), xid(x),
2848+
wsrep_seqno(wsrep_seqno_undefined), wsrep_uuid()
28402849
{
28412850
if (direct)
28422851
cache_type= Log_event::EVENT_NO_CACHE;
@@ -2853,10 +2862,16 @@ class Xid_log_event: public Xid_apply_log_event
28532862
#endif
28542863

28552864
Xid_log_event(const uchar *buf,
2865+
uint event_len,
28562866
const Format_description_log_event *description_event);
28572867
~Xid_log_event() = default;
28582868
Log_event_type get_type_code() override { return XID_EVENT;}
2859-
int get_data_size() override { return sizeof(xid); }
2869+
int get_data_size() override
2870+
{
2871+
return sizeof(xid) +
2872+
(wsrep_seqno != wsrep_seqno_undefined ?
2873+
sizeof(wsrep_seqno) + sizeof(wsrep_uuid) : 0);
2874+
}
28602875
#ifdef MYSQL_SERVER
28612876
bool write(Log_event_writer *writer) override;
28622877
#endif

sql/log_event_client.cc

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2464,9 +2464,24 @@ bool Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
24642464
char buf[64];
24652465
longlong10_to_str(xid, buf, 10);
24662466

2467-
if (print_header(&cache, print_event_info, FALSE) ||
2468-
my_b_printf(&cache, "\tXid = %s\n", buf))
2469-
goto err;
2467+
if (wsrep_seqno == wsrep_seqno_undefined)
2468+
{
2469+
if (print_header(&cache, print_event_info, FALSE) ||
2470+
my_b_printf(&cache, "\tXid = %s\n", buf))
2471+
goto err;
2472+
}
2473+
else
2474+
{
2475+
char wsrep_seqno_buf[22];
2476+
longlong10_to_str(wsrep_seqno, wsrep_seqno_buf, 10);
2477+
char wsrep_uuid_buf[MY_UUID_STRING_LENGTH + 1];
2478+
my_uuid2str(wsrep_uuid, wsrep_uuid_buf, 1);
2479+
wsrep_uuid_buf[MY_UUID_STRING_LENGTH]= '\0';
2480+
if (print_header(&cache, print_event_info, FALSE) ||
2481+
my_b_printf(&cache, "\tXid = %s, wsrep_seqno = %s, wsrep_uuid = %s\n",
2482+
buf, wsrep_seqno_buf, wsrep_uuid_buf))
2483+
goto err;
2484+
}
24702485
}
24712486
if (my_b_printf(&cache, is_flashback ? "START TRANSACTION%s\n" : "COMMIT%s\n",
24722487
print_event_info->delimiter))

sql/log_event_server.cc

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3846,9 +3846,21 @@ int Xid_log_event::do_commit()
38463846
bool Xid_log_event::write(Log_event_writer *writer)
38473847
{
38483848
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
3849-
return write_header(writer, sizeof(xid)) ||
3850-
write_data(writer, (uchar*)&xid, sizeof(xid)) ||
3851-
write_footer(writer);
3849+
if (wsrep_seqno == wsrep_seqno_undefined)
3850+
return write_header(writer, sizeof(xid)) ||
3851+
write_data(writer, (uchar*)&xid, sizeof(xid)) ||
3852+
write_footer(writer);
3853+
else
3854+
{
3855+
uchar data[sizeof(xid) + sizeof(wsrep_seqno) + sizeof(wsrep_uuid)];
3856+
memcpy(data, &xid, sizeof(xid));
3857+
int8store(data + sizeof(xid), wsrep_seqno);
3858+
memcpy(data + sizeof(xid) + sizeof(wsrep_seqno), wsrep_uuid,
3859+
sizeof(wsrep_uuid));
3860+
return write_header(writer, sizeof(xid) + sizeof(wsrep_seqno) +
3861+
sizeof(wsrep_uuid)) ||
3862+
write_data(writer, data, sizeof(data)) || write_footer(writer);
3863+
}
38523864
}
38533865

38543866
/**************************************************************************

0 commit comments

Comments
 (0)