Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build/pxc-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,12 @@ if [ -z "$CLUSTER_JOIN" ] && [ "$1" = 'mysqld' ] && [ -z "$wantHelp" ]; then
fi
set -x

if [ "$MYSQL_VERSION" == '8.0' ]; then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's check for 8.4 as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

echo "CREATE FUNCTION IF NOT EXISTS get_last_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'" | "${mysql[@]}"
echo "CREATE FUNCTION IF NOT EXISTS get_gtid_set_by_binlog RETURNS STRING SONAME 'binlog_utils_udf.so'" | "${mysql[@]}"
echo "CREATE FUNCTION IF NOT EXISTS get_first_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'" | "${mysql[@]}"
fi

echo
ls /docker-entrypoint-initdb.d/ >/dev/null
for f in /docker-entrypoint-initdb.d/*; do
Expand Down
33 changes: 32 additions & 1 deletion cmd/pitr/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (c *Collector) Init(ctx context.Context) error {
switch {
case strings.HasPrefix(version, "8.0"):
log.Println("creating collector functions")
if err := db.CreateCollectorFunctions(ctx); err != nil {
if err := c.CreateCollectorFunctions(ctx); err != nil {
return errors.Wrap(err, "init 8.0: create collector functions")
}
case strings.HasPrefix(version, "8.4"):
Expand All @@ -221,6 +221,37 @@ func (c *Collector) Init(ctx context.Context) error {
return nil
}

func (c *Collector) CreateCollectorFunctions(ctx context.Context) error {
nodes, err := pxc.GetNodesByServiceName(ctx, c.pxcServiceName)
if err != nil {
return errors.Wrap(err, "get nodes by service name")
}

create := func(node string) error {
nodeArr := strings.Split(node, ":")
host := nodeArr[0]
db, err := pxc.NewPXC(host, c.pxcUser, c.pxcPass)
if err != nil {
return errors.Errorf("creating connection for host %s: %v", host, err)
}
defer db.Close()
if err := db.CreateCollectorFunctions(ctx); err != nil {
return errors.Wrap(err, "create collector functions")
}
return nil
}

for _, node := range nodes {
if strings.Contains(node, "wsrep_ready:ON:wsrep_connected:ON:wsrep_local_state_comment:Synced:wsrep_cluster_status:Primary") {
if err := create(node); err != nil {
return err
}
}
}

return nil
}

func (c *Collector) Run(ctx context.Context) error {
err := c.newDB(ctx)
if err != nil {
Expand Down
35 changes: 22 additions & 13 deletions cmd/pitr/pxc/pxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewPXC(addr string, user, pass string) (*PXC, error) {
config.Params = map[string]string{
"interpolateParams": "true",
"tls": "preferred",
"multiStatements": "true",
}
config.DBName = "mysql"

Expand Down Expand Up @@ -205,7 +206,7 @@ func (p *PXC) SubtractGTIDSet(ctx context.Context, set, subSet string) (string,
return result, nil
}

func getNodesByServiceName(ctx context.Context, pxcServiceName string) ([]string, error) {
func GetNodesByServiceName(ctx context.Context, pxcServiceName string) ([]string, error) {
cmd := exec.CommandContext(ctx, "/opt/percona/peer-list", "-on-start=/opt/percona/get-pxc-state.sh", "-service="+pxcServiceName)
out, err := cmd.CombinedOutput()
if err != nil {
Expand All @@ -215,7 +216,7 @@ func getNodesByServiceName(ctx context.Context, pxcServiceName string) ([]string
}

func GetPXCFirstHost(ctx context.Context, pxcServiceName string) (string, error) {
nodes, err := getNodesByServiceName(ctx, pxcServiceName)
nodes, err := GetNodesByServiceName(ctx, pxcServiceName)
if err != nil {
return "", errors.Wrap(err, "get nodes by service name")
}
Expand All @@ -236,7 +237,7 @@ func GetPXCFirstHost(ctx context.Context, pxcServiceName string) (string, error)
}

func GetPXCOldestBinlogHost(ctx context.Context, pxcServiceName, user, pass string) (string, error) {
nodes, err := getNodesByServiceName(ctx, pxcServiceName)
nodes, err := GetNodesByServiceName(ctx, pxcServiceName)
if err != nil {
return "", errors.Wrap(err, "get nodes by service name")
}
Expand Down Expand Up @@ -351,19 +352,27 @@ func (p *PXC) UninstallBinlogUDFComponent(ctx context.Context) error {
}

func (p *PXC) CreateCollectorFunctions(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, "CREATE FUNCTION IF NOT EXISTS get_last_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'")
if err != nil {
return errors.Wrap(err, "create function get_first_record_timestamp_by_binlog")
m := map[string]string{
"get_last_record_timestamp_by_binlog": "INTEGER",
"get_gtid_set_by_binlog": "STRING",
"get_first_record_timestamp_by_binlog": "INTEGER",
}

_, err = p.db.ExecContext(ctx, "CREATE FUNCTION IF NOT EXISTS get_gtid_set_by_binlog RETURNS STRING SONAME 'binlog_utils_udf.so'")
if err != nil {
return errors.Wrap(err, "create function get_gtid_set_by_binlog")
}
for functionName, returnType := range m {
var x int
err := p.db.QueryRowContext(ctx, `SELECT 1 FROM mysql.func WHERE name = ? LIMIT 1`, functionName).Scan(&x)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return errors.Wrapf(err, "check if function %s exists", functionName)
}
if err == nil {
log.Printf("function %s already exists", functionName)
continue
}
Comment on lines +363 to +370
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we are using the IF NOT EXISTS on the subsequent query, is this check regarding the existence of the function necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's the whole point. CREATE IF NOT EXISTS still counts as DDL


_, err = p.db.ExecContext(ctx, "CREATE FUNCTION IF NOT EXISTS get_first_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'")
if err != nil {
return errors.Wrap(err, "create function get_first_record_timestamp_by_binlog")
createQ := fmt.Sprintf("SET SESSION wsrep_on = OFF; CREATE FUNCTION IF NOT EXISTS %s RETURNS %s SONAME 'binlog_utils_udf.so'; SET SESSION wsrep_on = ON", functionName, returnType)
if _, err := p.db.ExecContext(ctx, createQ); err != nil {
return errors.Wrapf(err, "create function %s", functionName)
}
}

return nil
Expand Down
12 changes: 12 additions & 0 deletions e2e-tests/pitr/run
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ main() {
cluster="pitr"
spinup_pxc "$cluster" "$test_dir/conf/$cluster.yml"

pitr_pod=$(
kubectl_bin get pods \
--selector="app.kubernetes.io/component=pitr" \
-o 'jsonpath={.items[].metadata.name}'
)
wait_pod "$pitr_pod"
function_exists_count=$(kubectl_bin logs -l "app.kubernetes.io/component=pitr" --tail=-1 2>/dev/null | grep -c 'already exists')
if [ "$function_exists_count" -eq 0 ]; then
echo "There are no 'function ... already exists' logs"
exit 1
fi

run_backup "$cluster" "on-pitr-minio"

# Temporarily skipping this check
Expand Down
Loading