-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support except table #74
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -71,16 +71,17 @@ func (j JobState) String() string { | |||||
} | ||||||
|
||||||
type Job struct { | ||||||
SyncType SyncType `json:"sync_type"` | ||||||
Name string `json:"name"` | ||||||
Src base.Spec `json:"src"` | ||||||
ISrc base.Specer `json:"-"` | ||||||
srcMeta Metaer `json:"-"` | ||||||
Dest base.Spec `json:"dest"` | ||||||
IDest base.Specer `json:"-"` | ||||||
destMeta Metaer `json:"-"` | ||||||
SkipError bool `json:"skip_error"` | ||||||
State JobState `json:"state"` | ||||||
SyncType SyncType `json:"sync_type"` | ||||||
Name string `json:"name"` | ||||||
Src base.Spec `json:"src"` | ||||||
ISrc base.Specer `json:"-"` | ||||||
srcMeta Metaer `json:"-"` | ||||||
Dest base.Spec `json:"dest"` | ||||||
IDest base.Specer `json:"-"` | ||||||
destMeta Metaer `json:"-"` | ||||||
SkipError bool `json:"skip_error"` | ||||||
ExceptTable string `json:"except_table"` | ||||||
State JobState `json:"state"` | ||||||
|
||||||
factory *Factory `json:"-"` | ||||||
|
||||||
|
@@ -96,24 +97,37 @@ type Job struct { | |||||
|
||||||
type jobContext struct { | ||||||
context.Context | ||||||
src base.Spec | ||||||
dest base.Spec | ||||||
db storage.DB | ||||||
skipError bool | ||||||
factory *Factory | ||||||
src base.Spec | ||||||
dest base.Spec | ||||||
db storage.DB | ||||||
skipError bool | ||||||
exceptTable string | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
factory *Factory | ||||||
} | ||||||
|
||||||
func NewJobContext(src, dest base.Spec, skipError bool, db storage.DB, factory *Factory) *jobContext { | ||||||
func NewJobContext(src, dest base.Spec, skipError bool, exceptTable string, db storage.DB, factory *Factory) *jobContext { | ||||||
return &jobContext{ | ||||||
Context: context.Background(), | ||||||
src: src, | ||||||
dest: dest, | ||||||
skipError: skipError, | ||||||
db: db, | ||||||
factory: factory, | ||||||
Context: context.Background(), | ||||||
src: src, | ||||||
dest: dest, | ||||||
skipError: skipError, | ||||||
exceptTable: exceptTable, | ||||||
db: db, | ||||||
factory: factory, | ||||||
} | ||||||
} | ||||||
|
||||||
func IsExceptTable(exceptTables string, targetTable string) bool { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Define it as a method of It likes:
|
||||||
exceptTableList := strings.Split(exceptTables, ",") | ||||||
for _, exceptTable := range exceptTableList { | ||||||
if targetTable == strings.TrimSpace(exceptTable) { | ||||||
return true | ||||||
} | ||||||
} | ||||||
|
||||||
return false | ||||||
} | ||||||
|
||||||
// new job | ||||||
func NewJobFromService(name string, ctx context.Context) (*Job, error) { | ||||||
jobContext, ok := ctx.(*jobContext) | ||||||
|
@@ -125,15 +139,16 @@ func NewJobFromService(name string, ctx context.Context) (*Job, error) { | |||||
src := jobContext.src | ||||||
dest := jobContext.dest | ||||||
job := &Job{ | ||||||
Name: name, | ||||||
Src: src, | ||||||
ISrc: factory.NewSpecer(&src), | ||||||
srcMeta: factory.NewMeta(&jobContext.src), | ||||||
Dest: dest, | ||||||
IDest: factory.NewSpecer(&dest), | ||||||
destMeta: factory.NewMeta(&jobContext.dest), | ||||||
SkipError: jobContext.skipError, | ||||||
State: JobRunning, | ||||||
Name: name, | ||||||
Src: src, | ||||||
ISrc: factory.NewSpecer(&src), | ||||||
srcMeta: factory.NewMeta(&jobContext.src), | ||||||
Dest: dest, | ||||||
IDest: factory.NewSpecer(&dest), | ||||||
destMeta: factory.NewMeta(&jobContext.dest), | ||||||
SkipError: jobContext.skipError, | ||||||
State: JobRunning, | ||||||
ExceptTable: jobContext.exceptTable, | ||||||
|
||||||
factory: factory, | ||||||
|
||||||
|
@@ -203,6 +218,9 @@ func (j *Job) valid() error { | |||||
return xerror.New(xerror.Normal, "src/dest are not both db or table sync") | ||||||
} | ||||||
|
||||||
if j.Src.Table != "" && j.ExceptTable != "" { | ||||||
return xerror.New(xerror.Normal, "source table and except table caonnot exist at the same time") | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
|
@@ -292,9 +310,17 @@ func (j *Job) fullSync() error { | |||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
// skip if table is except | ||||||
log.Infof("fullsync except table: %s", j.ExceptTable) | ||||||
for _, table := range tables { | ||||||
if IsExceptTable(j.ExceptTable, table.Name) { | ||||||
continue | ||||||
} | ||||||
|
||||||
backupTableList = append(backupTableList, table.Name) | ||||||
} | ||||||
log.Infof("fullsync table is : %s", backupTableList) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There may be thousands of tables here, so it is not suitable for printing logs. |
||||||
case TableSync: | ||||||
backupTableList = append(backupTableList, j.Src.Table) | ||||||
default: | ||||||
|
@@ -730,7 +756,14 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { | |||||
log.Debugf("tableRecords: %v", tableRecords) | ||||||
destTableIds := make([]int64, 0, len(tableRecords)) | ||||||
if j.SyncType == DBSync { | ||||||
var sourceTableName string | ||||||
for _, tableRecord := range tableRecords { | ||||||
sourceTableName, err = j.srcMeta.GetTableNameById(tableRecord.Id) | ||||||
if IsExceptTable(j.ExceptTable, sourceTableName) { | ||||||
log.Infof("db sync upsert, but table is except, just return") | ||||||
return nil | ||||||
} | ||||||
|
||||||
if destTableId, err := j.getDestTableIdBySrc(tableRecord.Id); err != nil { | ||||||
return err | ||||||
} else { | ||||||
|
@@ -886,6 +919,14 @@ func (j *Job) handleAddPartition(binlog *festruct.TBinlog) error { | |||||
if j.SyncType == TableSync { | ||||||
destTableName = j.Dest.Table | ||||||
} else if j.SyncType == DBSync { | ||||||
// use sourceTableName to judge if table is except | ||||||
var sourceTableName string | ||||||
sourceTableName, err = j.srcMeta.GetTableNameById(addPartition.TableId) | ||||||
if IsExceptTable(j.ExceptTable, sourceTableName) { | ||||||
log.Infof("db sync add partition, but table is except, just return") | ||||||
return nil | ||||||
} | ||||||
|
||||||
destTableId, err := j.getDestTableIdBySrc(addPartition.TableId) | ||||||
if err != nil { | ||||||
return err | ||||||
|
@@ -918,6 +959,14 @@ func (j *Job) handleDropPartition(binlog *festruct.TBinlog) error { | |||||
if j.SyncType == TableSync { | ||||||
destTableName = j.Dest.Table | ||||||
} else if j.SyncType == DBSync { | ||||||
// use sourceTableName to judge if table is except | ||||||
var sourceTableName string | ||||||
sourceTableName, err = j.srcMeta.GetTableNameById(dropPartition.TableId) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
One possible solution is to query the table ID of the except tables before adding the job to JobManager, see |
||||||
if IsExceptTable(j.ExceptTable, sourceTableName) { | ||||||
log.Infof("db sync drop partition, but table is except, just return") | ||||||
return nil | ||||||
} | ||||||
|
||||||
destTableId, err := j.getDestTableIdBySrc(dropPartition.TableId) | ||||||
if err != nil { | ||||||
return err | ||||||
|
@@ -1004,6 +1053,11 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error { | |||||
tableName = srcTable.Name | ||||||
} | ||||||
|
||||||
if IsExceptTable(j.ExceptTable, tableName) { | ||||||
log.Infof("db sync drop table, but table is except, just return") | ||||||
return nil | ||||||
} | ||||||
|
||||||
sql := fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(tableName)) | ||||||
log.Infof("dropTableSql: %s", sql) | ||||||
if err = j.IDest.DbExec(sql); err != nil { | ||||||
|
@@ -1049,6 +1103,12 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { | |||||
if j.SyncType == TableSync { | ||||||
dropTableSql = fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(j.Dest.Table)) | ||||||
} else { | ||||||
// if table is except, just break | ||||||
if IsExceptTable(j.ExceptTable, alterJob.TableName) { | ||||||
log.Infof("db sync alter job, but table is except, just break") | ||||||
break | ||||||
} | ||||||
|
||||||
dropTableSql = fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(alterJob.TableName)) | ||||||
} | ||||||
log.Infof("dropTableSql: %s", dropTableSql) | ||||||
|
@@ -1074,6 +1134,13 @@ func (j *Job) handleLightningSchemaChange(binlog *festruct.TBinlog) error { | |||||
log.Debugf("lightningSchemaChange %v", lightningSchemaChange) | ||||||
|
||||||
rawSql := lightningSchemaChange.RawSql | ||||||
var sourceTableName string | ||||||
sourceTableName, err = j.srcMeta.GetTableNameById(lightningSchemaChange.TableId) | ||||||
if IsExceptTable(j.ExceptTable, sourceTableName) { | ||||||
log.Infof("db sync lightning schema change, but table is except, just return") | ||||||
return nil | ||||||
} | ||||||
|
||||||
// "rawSql": "ALTER TABLE `default_cluster:ccr`.`test_ddl` ADD COLUMN `nid1` int(11) NULL COMMENT \"\"" | ||||||
// replace `default_cluster:${Src.Database}`.`test_ddl` to `test_ddl` | ||||||
var sql string | ||||||
|
@@ -1098,6 +1165,11 @@ func (j *Job) handleTruncateTable(binlog *festruct.TBinlog) error { | |||||
var destTableName string | ||||||
switch j.SyncType { | ||||||
case DBSync: | ||||||
if IsExceptTable(j.ExceptTable, truncateTable.TableName) { | ||||||
log.Infof("db sync truncate table, but table is except, just return") | ||||||
return nil | ||||||
} | ||||||
|
||||||
destTableName = truncateTable.TableName | ||||||
case TableSync: | ||||||
destTableName = j.Dest.Table | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -73,15 +73,16 @@ func NewHttpServer(host string, port int, db storage.DB, jobManager *ccr.JobMana | |||||
|
||||||
type CreateCcrRequest struct { | ||||||
// must need all fields required | ||||||
Name string `json:"name,required"` | ||||||
Src base.Spec `json:"src,required"` | ||||||
Dest base.Spec `json:"dest,required"` | ||||||
SkipError bool `json:"skip_error"` | ||||||
Name string `json:"name,required"` | ||||||
Src base.Spec `json:"src,required"` | ||||||
Dest base.Spec `json:"dest,required"` | ||||||
SkipError bool `json:"skip_error"` | ||||||
ExceptTable string `json:"except_table"` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Define it to an array might be better.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then run |
||||||
} | ||||||
|
||||||
// Stringer | ||||||
func (r *CreateCcrRequest) String() string { | ||||||
return fmt.Sprintf("name: %s, src: %v, dest: %v", r.Name, r.Src, r.Dest) | ||||||
return fmt.Sprintf("name: %s, src: %v, dest: %v, except_table: %s", r.Name, r.Src, r.Dest, r.ExceptTable) | ||||||
} | ||||||
|
||||||
// version Handler | ||||||
|
@@ -106,7 +107,7 @@ func (s *HttpService) versionHandler(w http.ResponseWriter, r *http.Request) { | |||||
func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) error { | ||||||
log.Infof("create ccr %s", request) | ||||||
|
||||||
ctx := ccr.NewJobContext(request.Src, request.Dest, request.SkipError, db, jobManager.GetFactory()) | ||||||
ctx := ccr.NewJobContext(request.Src, request.Dest, request.SkipError, request.ExceptTable, db, jobManager.GetFactory()) | ||||||
job, err := ccr.NewJobFromService(request.Name, ctx) | ||||||
if err != nil { | ||||||
return err | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.