From a2beaa729315499427ad3a81197c180e3fa8c39d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Thu, 6 Mar 2025 00:21:08 +0000 Subject: [PATCH 1/3] First POC for integration with ReadySet Monitor is able to detect if a backend is a ReadySet server, and it enables special monitoring. The special monitoring hacks replication lag checks: a ReadySet server is "monitored for replication lag" , but it has a special query and a special handler. The query for check is "SHOW READYSET STATUS" , and the `Status` line is processed: * Online: the backend is configured as ONLINE * Maintenance* : the backend is configured as OFFLINE_SOFT * anything else, or failed check: SHUNNED A new monitor table is also added: `readyset_status_log` . It has a similar structure of `mysql_server_replication_lag_log` , but instead of storing `repl_lag` (replication lag) the full output of `SHOW READYSET STATUS` is saved as a JSON (that can be queried using `JSON_EXTRACT()` --- include/MySQL_HostGroups_Manager.h | 1 + include/MySQL_Monitor.hpp | 2 + lib/MySQL_HostGroups_Manager.cpp | 19 ++ lib/MySQL_Monitor.cpp | 334 ++++++++++++++++++++++------- 4 files changed, 281 insertions(+), 75 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index ecb641cc9..a3ffe9f71 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -1074,6 +1074,7 @@ class MySQL_HostGroups_Manager : public Base_HostGroups_Manager { void wait_servers_table_version(unsigned, unsigned); bool shun_and_killall(char *hostname, int port); void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us); + void set_ReadySet_status(char *hostname, int port, enum MySerStatus status); unsigned long long Get_Memory_Stats(); void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector>& new_servers); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index f85aa5804..6ce9b83ff 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -22,6 +22,8 @@ #define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" +#define MONITOR_SQLITE_TABLE_READYSET_STATUS_LOG "CREATE TABLE readyset_status_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , status VARCHAR , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" + #define MONITOR_SQLITE_TABLE_MYSQL_SERVER_GROUP_REPLICATION_LOG "CREATE TABLE mysql_server_group_replication_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , viable_candidate VARCHAR NOT NULL DEFAULT 'NO' , read_only VARCHAR NOT NULL DEFAULT 'YES' , transactions_behind INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" //#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_GALERA_LOG "CREATE TABLE mysql_server_galera_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , viable_candidate VARCHAR NOT NULL DEFAULT 'NO' , read_only VARCHAR NOT NULL DEFAULT 'YES' , transactions_behind INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 7e657b4ca..533d5bdf3 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -4093,6 +4093,25 @@ void MySQL_HostGroups_Manager::set_server_current_latency_us(char *hostname, int wrunlock(); } +void MySQL_HostGroups_Manager::set_ReadySet_status(char *hostname, int port, enum MySerStatus status) { + wrlock(); + MySrvC *mysrvc=NULL; + for (unsigned int i=0; ilen; i++) { + MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); + unsigned int j; + unsigned int l=myhgc->mysrvs->cnt(); + if (l) { + for (j=0; jmysrvs->idx(j); + if (mysrvc->port==port && strcmp(mysrvc->address,hostname)==0) { + mysrvc->set_status(status); + } + } + } + } + wrunlock(); +} + void MySQL_HostGroups_Manager::p_update_metrics() { p_update_counter(status.p_counter_array[p_hg_counter::servers_table_version], status.servers_table_version); // Update *server_connections* related metrics diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 4b8b83144..22742eb2e 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -40,6 +40,8 @@ using json = nlohmann::json; #endif /* DEBUG */ #define MYSQL_MONITOR_VERSION "2.0.1226" DEB +#define SERVER_VERSION_READYSET "readyset" + #ifdef DEBUG //#define VALGRIND_ENABLE_ERROR_REPORTING //#define VALGRIND_DISABLE_ERROR_REPORTING @@ -55,6 +57,29 @@ extern ProxySQL_Cluster* GloProxyCluster; static MySQL_Monitor *GloMyMon; +struct ServerInfo { + std::string ipAddress; + int port; + + // Default constructor (important for some operations) + ServerInfo() : ipAddress(""), port(0) {} + + ServerInfo(const std::string& ip, int p) : ipAddress(ip), port(p) {} + + // Overload the < operator for std::set to compare ServerInfo objects + bool operator<(const ServerInfo& other) const { + if (ipAddress < other.ipAddress) { + return true; + } else if (ipAddress == other.ipAddress) { + return port < other.port; + } else { + return false; + } + } +}; + +static std::set ReadySet_Servers; + #define SAFE_SQLITE3_STEP(_stmt) do {\ do {\ rc=(*proxy_sqlite3_step)(_stmt);\ @@ -694,6 +719,9 @@ void MySQL_Monitor_State_Data::init_async() { } else { query_ = "SHOW SLAVE STATUS"; } + if (strcasestr(mysql->server_version, (const char *)SERVER_VERSION_READYSET) != NULL) { + query_ = "SHOW READYSET STATUS"; + } #endif task_timeout_ = mysql_thread___monitor_replication_lag_timeout; task_handler_ = &MySQL_Monitor_State_Data::replication_lag_handler; @@ -1058,6 +1086,7 @@ MySQL_Monitor::MySQL_Monitor() { insert_into_tables_defs(tables_defs_monitor,"mysql_server_ping_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_read_only_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_READ_ONLY_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_replication_lag_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG); + insert_into_tables_defs(tables_defs_monitor,"readyset_status_log", MONITOR_SQLITE_TABLE_READYSET_STATUS_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_group_replication_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_GROUP_REPLICATION_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_galera_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_GALERA_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_aws_aurora_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_AWS_AURORA_LOG); @@ -1072,6 +1101,7 @@ MySQL_Monitor::MySQL_Monitor() { monitordb->execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON mysql_server_ping_log (time_start_us)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_read_only_log_time_start ON mysql_server_read_only_log (time_start_us)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_replication_lag_log_time_start ON mysql_server_replication_lag_log (time_start_us)"); + monitordb->execute("CREATE INDEX IF NOT EXISTS readyset_status_log_time_start ON readyset_status_log (time_start_us)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_group_replication_log_time_start ON mysql_server_group_replication_log (time_start_us)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_galera_log_time_start ON mysql_server_galera_log (time_start_us)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_aws_aurora_log_time_start ON mysql_server_aws_aurora_log (time_start_us)"); @@ -2670,6 +2700,9 @@ void * monitor_replication_lag_thread(void *arg) { mmsd->t1=start_time; + string server_version = ""; + string query = "SHOW SLAVE STATUS"; + bool crc=false; if (mmsd->mysql==NULL) { // we don't have a connection, let's create it bool rc; @@ -2692,6 +2725,8 @@ void * monitor_replication_lag_thread(void *arg) { mmsd->t1=monotonic_time(); mmsd->interr=0; // reset the value + + if (mmsd->mysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version); #ifdef TEST_REPLICATIONLAG { std::string s = "SELECT SLAVE STATUS "; // replaced SHOW with SELECT to avoid breaking simulator logic @@ -2706,13 +2741,17 @@ void * monitor_replication_lag_thread(void *arg) { char *base_query = (char *)"SELECT MAX(ROUND(TIMESTAMPDIFF(MICROSECOND, ts, SYSDATE(6))/1000000)) AS Seconds_Behind_Master FROM %s"; char *replication_query = (char *)malloc(strlen(base_query)+l); sprintf(replication_query,base_query,percona_heartbeat_table); - mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,replication_query); + query = string(replication_query); free(replication_query); } } if (use_percona_heartbeat == false) { - mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SHOW SLAVE STATUS"); + query = "SHOW SLAVE STATUS"; } + if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) != NULL) { + query = "SHOW READYSET STATUS"; + } + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,query.c_str()); #endif // TEST_REPLICATIONLAG while (mmsd->async_exit_status) { mmsd->async_exit_status=wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); @@ -2785,6 +2824,7 @@ void * monitor_replication_lag_thread(void *arg) { int rc; char *query=NULL; + if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) == NULL) { query=(char *)"INSERT OR REPLACE INTO mysql_server_replication_lag_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)"; //rc=(*proxy_sqlite3_prepare_v2)(mondb, query, -1, &statement, 0); rc = mmsd->mondb->prepare_v2(query, &statement); @@ -2864,7 +2904,65 @@ void * monitor_replication_lag_thread(void *arg) { if (mmsd->mysql_error_msg == NULL) { replication_lag_success = true; } + } else { // readyset + query=(char *)"INSERT OR REPLACE INTO readyset_status_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)"; + rc = mmsd->mondb->prepare_v2(query, &statement); + ASSERT_SQLITE_OK(rc, mmsd->mondb); + unordered_map status_output = {}; + enum MySerStatus status = MYSQL_SERVER_STATUS_SHUNNED; // default status + rc=(*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc=(*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb); + unsigned long long time_now=realtime_time(); + time_now=time_now-(mmsd->t2 - start_time); + rc=(*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc=(*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb); + if (mmsd->interr == 0 && mmsd->result) { + int num_fields=0; + int k=0; + MYSQL_FIELD * fields=NULL; + int j=-1; + num_fields = mysql_num_fields(mmsd->result); + fields = mysql_fetch_fields(mmsd->result); + if ( fields && (num_fields == 2) ) { + int num_rows = mysql_num_rows(mmsd->result); + for (int i = 0; i < num_rows; i++) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + string Variable_name = string(row[0]); + string Value = (row[1] == NULL ? string("") : string(row[1]) ); + status_output[Variable_name] = Value; + if (Variable_name == "Status") { + if (strcasecmp(Value.c_str(), (const char *)"Online") == 0) { + status = MYSQL_SERVER_STATUS_ONLINE; // set to ONLINE + } else if (strncasecmp(Value.c_str(), (const char *)"Maintenance", strlen("Maintenance")) == 0) { + status = MYSQL_SERVER_STATUS_OFFLINE_SOFT; // set to OFFLINE_SOFT + } else { + status = MYSQL_SERVER_STATUS_SHUNNED; // set to SHUNNED + } + } + } + nlohmann::json json_output = status_output; // directly assign the map to the json object + std::string json_string = json_output.dump(); + rc=(*proxy_sqlite3_bind_text)(statement, 5, json_string.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } else { + proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); + rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } + mysql_free_result(mmsd->result); + mmsd->result=NULL; + } else { + rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } + rc=(*proxy_sqlite3_bind_text)(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); + SAFE_SQLITE3_STEP2(statement); + rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); + MyHGM->set_ReadySet_status(mmsd->hostname, mmsd->port, status); + (*proxy_sqlite3_finalize)(statement); + if (mmsd->mysql_error_msg == NULL) { + replication_lag_success = true; + } + } } if (mmsd->interr || mmsd->mysql_error_msg) { // check failed if (mmsd->mysql) { @@ -4487,13 +4585,17 @@ void * MySQL_Monitor::monitor_replication_lag() { unsigned int glover; char *error=NULL; SQLite3_result *resultset=NULL; - // add support for SSL - char *query= NULL; + string ReadySetServers_query = ""; + for (const auto& server : ReadySet_Servers) { + ReadySetServers_query += " OR (hostname = '" + server.ipAddress + "' AND port = " + to_string(server.port) + ")"; + } + string queryS = ""; if (mysql_thread___monitor_replication_lag_group_by_host==true) { - query = (char *)"SELECT MIN(hostgroup_id), hostname, port, MIN(max_replication_lag), MAX(use_ssl) FROM mysql_servers WHERE max_replication_lag > 0 AND status NOT IN (2,3) GROUP BY hostname, port"; + queryS = "SELECT MIN(hostgroup_id), hostname, port, MIN(max_replication_lag), MAX(use_ssl) FROM mysql_servers WHERE (max_replication_lag > 0 AND status NOT IN (2,3)) " + ReadySetServers_query + " GROUP BY hostname, port"; } else { - query=(char *)"SELECT hostgroup_id, hostname, port, max_replication_lag, use_ssl FROM mysql_servers WHERE max_replication_lag > 0 AND status NOT IN (2,3)"; + queryS=(char *)"SELECT hostgroup_id, hostname, port, max_replication_lag, use_ssl FROM mysql_servers WHERE (max_replication_lag > 0 AND status NOT IN (2,3))" + ReadySetServers_query; } + char *query= (char *)queryS.c_str(); t1=monotonic_time(); if (!GloMTH) return NULL; // quick exit during shutdown/restart @@ -4527,24 +4629,32 @@ void * MySQL_Monitor::monitor_replication_lag() { __end_monitor_replication_lag_loop: if (mysql_thread___monitor_enabled==true) { - sqlite3_stmt *statement=NULL; + sqlite3_stmt *statement1=NULL; + sqlite3_stmt *statement2=NULL; //sqlite3 *mondb=monitordb->get_db(); int rc; char *query=NULL; query=(char *)"DELETE FROM mysql_server_replication_lag_log WHERE time_start_us < ?1"; - //rc=(*proxy_sqlite3_prepare_v2)(mondb, query, -1, &statement, 0); - rc = monitordb->prepare_v2(query, &statement); + rc = monitordb->prepare_v2(query, &statement1); + ASSERT_SQLITE_OK(rc, monitordb); + query=(char *)"DELETE FROM readyset_status_log WHERE time_start_us < ?1"; + rc = monitordb->prepare_v2(query, &statement2); ASSERT_SQLITE_OK(rc, monitordb); if (mysql_thread___monitor_history < mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 )) { // issue #626 if (mysql_thread___monitor_ping_interval < 3600000) mysql_thread___monitor_history = mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 ); } unsigned long long time_now=realtime_time(); - rc=(*proxy_sqlite3_bind_int64)(statement, 1, time_now-(unsigned long long)mysql_thread___monitor_history*1000); ASSERT_SQLITE_OK(rc, monitordb); - SAFE_SQLITE3_STEP2(statement); - rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, monitordb); - rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, monitordb); - (*proxy_sqlite3_finalize)(statement); + rc=(*proxy_sqlite3_bind_int64)(statement1, 1, time_now-(unsigned long long)mysql_thread___monitor_history*1000); ASSERT_SQLITE_OK(rc, monitordb); + SAFE_SQLITE3_STEP2(statement1); + rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, monitordb); + rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, monitordb); + (*proxy_sqlite3_finalize)(statement1); + rc=(*proxy_sqlite3_bind_int64)(statement2, 1, time_now-(unsigned long long)mysql_thread___monitor_history*1000); ASSERT_SQLITE_OK(rc, monitordb); + SAFE_SQLITE3_STEP2(statement2); + rc=(*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, monitordb); + rc=(*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, monitordb); + (*proxy_sqlite3_finalize)(statement2); } if (resultset) @@ -7182,6 +7292,9 @@ bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vectormysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version); + if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) { __sync_fetch_and_add(&ping_check_OK, 1); My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); @@ -7225,6 +7338,16 @@ bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vectormondb); rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); (*proxy_sqlite3_finalize)(statement); + if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) != NULL) { + ReadySet_Servers.insert(ServerInfo(mmsd->hostname, mmsd->port)); + } else { + if (ReadySet_Servers.size() > 0) { // optimization . The following section is skipped if there are no servers + ServerInfo searchServer(mmsd->hostname, mmsd->port); + if (ReadySet_Servers.count(searchServer) > 0) { + ReadySet_Servers.erase(searchServer); + } + } + } } return true; @@ -7847,6 +7970,9 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING); + string server_version = ""; + if (mmsd->mysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version); + if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) { __sync_fetch_and_add(&replication_lag_check_OK, 1); My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); @@ -7875,81 +8001,139 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto return false; } - sqlite3_stmt* statement = NULL; - const char* query = (char*)"INSERT OR REPLACE INTO mysql_server_replication_lag_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)"; - int rc = mmsd->mondb->prepare_v2(query, &statement); - ASSERT_SQLITE_OK(rc, mmsd->mondb); - // 'replication_lag' to be feed to 'replication_lag_action' - int repl_lag = -2; - bool override_repl_lag = true; - rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb); - unsigned long long time_now = realtime_time(); - time_now = time_now - (mmsd->t2 - mmsd->t1); - rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb); - if (mmsd->interr == 0 && mmsd->result) { - int num_fields = 0; - int k = 0; - MYSQL_FIELD* fields = NULL; - int j = -1; - num_fields = mysql_num_fields(mmsd->result); - fields = mysql_fetch_fields(mmsd->result); + if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) == NULL) { + sqlite3_stmt* statement = NULL; + const char* query = (char*)"INSERT OR REPLACE INTO mysql_server_replication_lag_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)"; + int rc = mmsd->mondb->prepare_v2(query, &statement); + ASSERT_SQLITE_OK(rc, mmsd->mondb); + // 'replication_lag' to be feed to 'replication_lag_action' + int repl_lag = -2; + bool override_repl_lag = true; + rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb); + unsigned long long time_now = realtime_time(); + time_now = time_now - (mmsd->t2 - mmsd->t1); + rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb); + if (mmsd->interr == 0 && mmsd->result) { + int num_fields = 0; + int k = 0; + MYSQL_FIELD* fields = NULL; + int j = -1; + num_fields = mysql_num_fields(mmsd->result); + fields = mysql_fetch_fields(mmsd->result); #ifdef TEST_REPLICATIONLAG - if (fields && num_fields == 1) + if (fields && num_fields == 1) #else - if ( - fields && ( - (num_fields == 1 && mmsd->use_percona_heartbeat == true) - || - (num_fields > 30 && mmsd->use_percona_heartbeat == false) - ) - ) + if ( + fields && ( + (num_fields == 1 && mmsd->use_percona_heartbeat == true) + || + (num_fields > 30 && mmsd->use_percona_heartbeat == false) + ) + ) #endif - { - for (k = 0; k < num_fields; k++) { - if (fields[k].name) { - if (strcmp("Seconds_Behind_Master", fields[k].name) == 0) { - j = k; + { + for (k = 0; k < num_fields; k++) { + if (fields[k].name) { + if (strcmp("Seconds_Behind_Master", fields[k].name) == 0) { + j = k; + } } } - } - if (j > -1) { - MYSQL_ROW row = mysql_fetch_row(mmsd->result); - if (row) { - repl_lag = -1; // this is old behavior - override_repl_lag = true; - if (row[j]) { // if Seconds_Behind_Master is not NULL - repl_lag = atoi(row[j]); - override_repl_lag = false; - } else { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG); + if (j > -1) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + if (row) { + repl_lag = -1; // this is old behavior + override_repl_lag = true; + if (row[j]) { // if Seconds_Behind_Master is not NULL + repl_lag = atoi(row[j]); + override_repl_lag = false; + } else { + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG); + } } } - } - if (/*repl_lag >= 0 ||*/ override_repl_lag == false) { - rc = (*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb); + if (/*repl_lag >= 0 ||*/ override_repl_lag == false) { + rc = (*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } else { + rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } } else { + proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); } + mysql_free_result(mmsd->result); + mmsd->result = NULL; } else { - proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); + // 'replication_lag_check' timed out, we set 'repl_lag' to '-3' to avoid server to be 're-enabled'. + repl_lag = -3; } - mysql_free_result(mmsd->result); - mmsd->result = NULL; - } else { - rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); - // 'replication_lag_check' timed out, we set 'repl_lag' to '-3' to avoid server to be 're-enabled'. - repl_lag = -3; + rc = (*proxy_sqlite3_bind_text)(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); + SAFE_SQLITE3_STEP2(statement); + rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); + //MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag); + (*proxy_sqlite3_finalize)(statement); + mysql_servers.push_back( replication_lag_server_t { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag }); + } else { // readyset + sqlite3_stmt* statement = NULL; + const char* query = (char*)"INSERT OR REPLACE INTO readyset_status_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)"; + int rc = mmsd->mondb->prepare_v2(query, &statement); + ASSERT_SQLITE_OK(rc, mmsd->mondb); + unordered_map status_output = {}; + enum MySerStatus status = MYSQL_SERVER_STATUS_SHUNNED; // default status + rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb); + unsigned long long time_now = realtime_time(); + time_now = time_now - (mmsd->t2 - mmsd->t1); + rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb); + if (mmsd->interr == 0 && mmsd->result) { + int num_fields=0; + int k=0; + MYSQL_FIELD * fields=NULL; + int j=-1; + num_fields = mysql_num_fields(mmsd->result); + fields = mysql_fetch_fields(mmsd->result); + if ( fields && (num_fields == 2) ) { + int num_rows = mysql_num_rows(mmsd->result); + for (int i = 0; i < num_rows; i++) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + string Variable_name = string(row[0]); + string Value = (row[1] == NULL ? string("") : string(row[1]) ); + status_output[Variable_name] = Value; + if (Variable_name == "Status") { + if (strcasecmp(Value.c_str(), (const char *)"Online") == 0) { + status = MYSQL_SERVER_STATUS_ONLINE; // set to ONLINE + } else if (strncasecmp(Value.c_str(), (const char *)"Maintenance", strlen("Maintenance")) == 0) { + status = MYSQL_SERVER_STATUS_OFFLINE_SOFT; // set to OFFLINE_SOFT + } else { + status = MYSQL_SERVER_STATUS_SHUNNED; // set to SHUNNED + } + } + } + nlohmann::json json_output = status_output; // directly assign the map to the json object + std::string json_string = json_output.dump(); + rc=(*proxy_sqlite3_bind_text)(statement, 5, json_string.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } else { + proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); + rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } + + mysql_free_result(mmsd->result); + mmsd->result=NULL; + } else { + rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } + rc=(*proxy_sqlite3_bind_text)(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); + SAFE_SQLITE3_STEP2(statement); + rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); + rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); + MyHGM->set_ReadySet_status(mmsd->hostname, mmsd->port, status); + (*proxy_sqlite3_finalize)(statement); } - rc = (*proxy_sqlite3_bind_text)(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); - SAFE_SQLITE3_STEP2(statement); - rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); - //MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag); - (*proxy_sqlite3_finalize)(statement); - mysql_servers.push_back( replication_lag_server_t { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag }); } //executing replication lag action From 2fa1a208bbefe8f067e2904c24fb7099fdcdd0ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sun, 9 Mar 2025 12:45:44 +0000 Subject: [PATCH 2/3] Adding info/warning message when ReadySet server change status INFO: if status is changed to ONLINE WARNING : if status is changed to anything else --- lib/MySQL_HostGroups_Manager.cpp | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 533d5bdf3..b90cc18bc 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -4104,7 +4104,23 @@ void MySQL_HostGroups_Manager::set_ReadySet_status(char *hostname, int port, enu for (j=0; jmysrvs->idx(j); if (mysrvc->port==port && strcmp(mysrvc->address,hostname)==0) { - mysrvc->set_status(status); + enum MySerStatus prev_status = mysrvc->get_status(); + if (prev_status != status) { + char *src_status = "?"; // this shouldn't display + char *dst_status = "?"; // this shouldn't display + if (prev_status == MYSQL_SERVER_STATUS_ONLINE) { src_status = "ONLINE"; } + else if (prev_status == MYSQL_SERVER_STATUS_OFFLINE_SOFT) { src_status = "OFFLINE_SOFT"; } + else if (prev_status == MYSQL_SERVER_STATUS_SHUNNED) { src_status = "SHUNNED"; }; + if (status == MYSQL_SERVER_STATUS_ONLINE) { src_status = "ONLINE"; } + else if (status == MYSQL_SERVER_STATUS_OFFLINE_SOFT) { dst_status = "OFFLINE_SOFT"; } + else if (status == MYSQL_SERVER_STATUS_SHUNNED) { dst_status = "SHUNNED"; }; + if (status == MYSQL_SERVER_STATUS_ONLINE) { + proxy_info("Changing ReadySet status for server %s:%d from HG %u from %s to %s", hostname, port, myhgc->hid, src_status, dst_status); + } else { + proxy_warning("Changing ReadySet status for server %s:%d from HG %u from %s to %s", hostname, port, myhgc->hid, src_status, dst_status); + } + mysrvc->set_status(status); + } } } } From f71549d7b9453e1485194dc14381a8ccc5718217 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Wed, 12 Mar 2025 15:53:49 +0000 Subject: [PATCH 3/3] Addressing minor issue on Readyset Monitoring - Renamed ReadySet to Readyset - fixed incorrect status variable - added missing newlines --- include/MySQL_HostGroups_Manager.h | 2 +- lib/MySQL_HostGroups_Manager.cpp | 8 ++++---- lib/MySQL_Monitor.cpp | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index a3ffe9f71..98bad91af 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -1074,7 +1074,7 @@ class MySQL_HostGroups_Manager : public Base_HostGroups_Manager { void wait_servers_table_version(unsigned, unsigned); bool shun_and_killall(char *hostname, int port); void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us); - void set_ReadySet_status(char *hostname, int port, enum MySerStatus status); + void set_Readyset_status(char *hostname, int port, enum MySerStatus status); unsigned long long Get_Memory_Stats(); void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector>& new_servers); diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index b90cc18bc..df2b47f1f 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -4093,7 +4093,7 @@ void MySQL_HostGroups_Manager::set_server_current_latency_us(char *hostname, int wrunlock(); } -void MySQL_HostGroups_Manager::set_ReadySet_status(char *hostname, int port, enum MySerStatus status) { +void MySQL_HostGroups_Manager::set_Readyset_status(char *hostname, int port, enum MySerStatus status) { wrlock(); MySrvC *mysrvc=NULL; for (unsigned int i=0; ilen; i++) { @@ -4111,13 +4111,13 @@ void MySQL_HostGroups_Manager::set_ReadySet_status(char *hostname, int port, enu if (prev_status == MYSQL_SERVER_STATUS_ONLINE) { src_status = "ONLINE"; } else if (prev_status == MYSQL_SERVER_STATUS_OFFLINE_SOFT) { src_status = "OFFLINE_SOFT"; } else if (prev_status == MYSQL_SERVER_STATUS_SHUNNED) { src_status = "SHUNNED"; }; - if (status == MYSQL_SERVER_STATUS_ONLINE) { src_status = "ONLINE"; } + if (status == MYSQL_SERVER_STATUS_ONLINE) { dst_status = "ONLINE"; } else if (status == MYSQL_SERVER_STATUS_OFFLINE_SOFT) { dst_status = "OFFLINE_SOFT"; } else if (status == MYSQL_SERVER_STATUS_SHUNNED) { dst_status = "SHUNNED"; }; if (status == MYSQL_SERVER_STATUS_ONLINE) { - proxy_info("Changing ReadySet status for server %s:%d from HG %u from %s to %s", hostname, port, myhgc->hid, src_status, dst_status); + proxy_info("Changing Readyset status for server %s:%d from HG %u from %s to %s\n", hostname, port, myhgc->hid, src_status, dst_status); } else { - proxy_warning("Changing ReadySet status for server %s:%d from HG %u from %s to %s", hostname, port, myhgc->hid, src_status, dst_status); + proxy_warning("Changing Readyset status for server %s:%d from HG %u from %s to %s\n", hostname, port, myhgc->hid, src_status, dst_status); } mysrvc->set_status(status); } diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 22742eb2e..8090163bc 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -2957,7 +2957,7 @@ void * monitor_replication_lag_thread(void *arg) { SAFE_SQLITE3_STEP2(statement); rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); - MyHGM->set_ReadySet_status(mmsd->hostname, mmsd->port, status); + MyHGM->set_Readyset_status(mmsd->hostname, mmsd->port, status); (*proxy_sqlite3_finalize)(statement); if (mmsd->mysql_error_msg == NULL) { replication_lag_success = true; @@ -8131,7 +8131,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto SAFE_SQLITE3_STEP2(statement); rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); - MyHGM->set_ReadySet_status(mmsd->hostname, mmsd->port, status); + MyHGM->set_Readyset_status(mmsd->hostname, mmsd->port, status); (*proxy_sqlite3_finalize)(statement); } }