@@ -621,16 +621,30 @@ func mergeErrors(a, b error) error {
621621 return b
622622}
623623
624+ func extractIncrementalNumber (binlogName string ) (string , error ) {
625+ parts := strings .Split (binlogName , "." )
626+ if len (parts ) < 2 {
627+ return "" , fmt .Errorf ("invalid binlog name format: %s" , binlogName )
628+ }
629+ return parts [len (parts )- 1 ], nil
630+ }
631+
624632func (c * Collector ) manageBinlog (ctx context.Context , binlog pxc.Binlog ) (err error ) {
625633 binlogTmstmp , err := c .db .GetBinLogFirstTimestamp (ctx , binlog .Name )
626634 if err != nil {
627635 return errors .Wrapf (err , "get first timestamp for %s" , binlog .Name )
628636 }
629637
630- binlogName := fmt .Sprintf ("binlog_%s_%x" , binlogTmstmp , md5 .Sum ([]byte (binlog .GTIDSet .Raw ())))
638+ incrementalNum , err := extractIncrementalNumber (binlog .Name ) // extracts e.g. "000011"
639+ if err != nil {
640+ return errors .Wrapf (err , "extract incremental number from %s" , binlog .Name )
641+ }
642+
643+ // Construct internal storage filename with timestamp, incremental number, and GTID md5 hash
644+ binlogName := fmt .Sprintf ("binlog_%s_%s_%x" , binlogTmstmp , incrementalNum , md5 .Sum ([]byte (binlog .GTIDSet .Raw ())))
631645
632646 var setBuffer bytes.Buffer
633- // no error handling because WriteString() always return nil error
647+ // no error handling because WriteString() always returns nil error
634648 // nolint:errcheck
635649 setBuffer .WriteString (binlog .GTIDSet .Raw ())
636650
@@ -641,6 +655,7 @@ func (c *Collector) manageBinlog(ctx context.Context, binlog pxc.Binlog) (err er
641655 return errors .Wrap (err , "remove temp file" )
642656 }
643657
658+ // Create named pipe with original binlog.Name
644659 err = syscall .Mkfifo (tmpDir + binlog .Name , 0o666 )
645660 if err != nil {
646661 return errors .Wrap (err , "make named pipe file error" )
@@ -686,6 +701,7 @@ func (c *Collector) manageBinlog(ctx context.Context, binlog pxc.Binlog) (err er
686701
687702 go readBinlog (ctx , file , pw , errBuf , binlog .Name )
688703
704+ // Use constructed binlogName for storage keys
689705 err = c .storage .PutObject (ctx , binlogName , pr , - 1 )
690706 if err != nil {
691707 return errors .Wrapf (err , "put %s object" , binlog .Name )
@@ -703,7 +719,7 @@ func (c *Collector) manageBinlog(ctx context.Context, binlog pxc.Binlog) (err er
703719 return errors .Wrap (err , "put gtid-set object" )
704720 }
705721 for _ , gtidSet := range binlog .GTIDSet .List () {
706- // no error handling because WriteString() always return nil error
722+ // no error handling because WriteString() always returns nil error
707723 // nolint:errcheck
708724 setBuffer .WriteString (binlog .GTIDSet .Raw ())
709725
0 commit comments