Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support except table #74

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 103 additions & 31 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,17 @@ func (j JobState) String() string {
}

type Job struct {
SyncType SyncType `json:"sync_type"`
Name string `json:"name"`
Src base.Spec `json:"src"`
ISrc base.Specer `json:"-"`
srcMeta Metaer `json:"-"`
Dest base.Spec `json:"dest"`
IDest base.Specer `json:"-"`
destMeta Metaer `json:"-"`
SkipError bool `json:"skip_error"`
State JobState `json:"state"`
SyncType SyncType `json:"sync_type"`
Name string `json:"name"`
Src base.Spec `json:"src"`
ISrc base.Specer `json:"-"`
srcMeta Metaer `json:"-"`
Dest base.Spec `json:"dest"`
IDest base.Specer `json:"-"`
destMeta Metaer `json:"-"`
SkipError bool `json:"skip_error"`
ExceptTable string `json:"except_table"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ExceptTable string `json:"except_table"`
ExceptTables []string `json:"except_tables"`

State JobState `json:"state"`

factory *Factory `json:"-"`

Expand All @@ -96,24 +97,37 @@ type Job struct {

type jobContext struct {
context.Context
src base.Spec
dest base.Spec
db storage.DB
skipError bool
factory *Factory
src base.Spec
dest base.Spec
db storage.DB
skipError bool
exceptTable string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
exceptTable string
exceptTables []string

factory *Factory
}

func NewJobContext(src, dest base.Spec, skipError bool, db storage.DB, factory *Factory) *jobContext {
func NewJobContext(src, dest base.Spec, skipError bool, exceptTable string, db storage.DB, factory *Factory) *jobContext {
return &jobContext{
Context: context.Background(),
src: src,
dest: dest,
skipError: skipError,
db: db,
factory: factory,
Context: context.Background(),
src: src,
dest: dest,
skipError: skipError,
exceptTable: exceptTable,
db: db,
factory: factory,
}
}

func IsExceptTable(exceptTables string, targetTable string) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define it as a method of Job.

It likes:

func (j *Job) isExceptTable(targetTable string) bool {

exceptTableList := strings.Split(exceptTables, ",")
for _, exceptTable := range exceptTableList {
if targetTable == strings.TrimSpace(exceptTable) {
return true
}
}

return false
}

// new job
func NewJobFromService(name string, ctx context.Context) (*Job, error) {
jobContext, ok := ctx.(*jobContext)
Expand All @@ -125,15 +139,16 @@ func NewJobFromService(name string, ctx context.Context) (*Job, error) {
src := jobContext.src
dest := jobContext.dest
job := &Job{
Name: name,
Src: src,
ISrc: factory.NewSpecer(&src),
srcMeta: factory.NewMeta(&jobContext.src),
Dest: dest,
IDest: factory.NewSpecer(&dest),
destMeta: factory.NewMeta(&jobContext.dest),
SkipError: jobContext.skipError,
State: JobRunning,
Name: name,
Src: src,
ISrc: factory.NewSpecer(&src),
srcMeta: factory.NewMeta(&jobContext.src),
Dest: dest,
IDest: factory.NewSpecer(&dest),
destMeta: factory.NewMeta(&jobContext.dest),
SkipError: jobContext.skipError,
State: JobRunning,
ExceptTable: jobContext.exceptTable,

factory: factory,

Expand Down Expand Up @@ -203,6 +218,9 @@ func (j *Job) valid() error {
return xerror.New(xerror.Normal, "src/dest are not both db or table sync")
}

if j.Src.Table != "" && j.ExceptTable != "" {
return xerror.New(xerror.Normal, "source table and except table caonnot exist at the same time")
}
return nil
}

Expand Down Expand Up @@ -292,9 +310,17 @@ func (j *Job) fullSync() error {
if err != nil {
return err
}

// skip if table is except
log.Infof("fullsync except table: %s", j.ExceptTable)
for _, table := range tables {
if IsExceptTable(j.ExceptTable, table.Name) {
continue
}

backupTableList = append(backupTableList, table.Name)
}
log.Infof("fullsync table is : %s", backupTableList)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be thousands of tables here, so it is not suitable for printing logs.

case TableSync:
backupTableList = append(backupTableList, j.Src.Table)
default:
Expand Down Expand Up @@ -730,7 +756,14 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error {
log.Debugf("tableRecords: %v", tableRecords)
destTableIds := make([]int64, 0, len(tableRecords))
if j.SyncType == DBSync {
var sourceTableName string
for _, tableRecord := range tableRecords {
sourceTableName, err = j.srcMeta.GetTableNameById(tableRecord.Id)
if IsExceptTable(j.ExceptTable, sourceTableName) {
log.Infof("db sync upsert, but table is except, just return")
return nil
}

if destTableId, err := j.getDestTableIdBySrc(tableRecord.Id); err != nil {
return err
} else {
Expand Down Expand Up @@ -886,6 +919,14 @@ func (j *Job) handleAddPartition(binlog *festruct.TBinlog) error {
if j.SyncType == TableSync {
destTableName = j.Dest.Table
} else if j.SyncType == DBSync {
// use sourceTableName to judge if table is except
var sourceTableName string
sourceTableName, err = j.srcMeta.GetTableNameById(addPartition.TableId)
if IsExceptTable(j.ExceptTable, sourceTableName) {
log.Infof("db sync add partition, but table is except, just return")
return nil
}

destTableId, err := j.getDestTableIdBySrc(addPartition.TableId)
if err != nil {
return err
Expand Down Expand Up @@ -918,6 +959,14 @@ func (j *Job) handleDropPartition(binlog *festruct.TBinlog) error {
if j.SyncType == TableSync {
destTableName = j.Dest.Table
} else if j.SyncType == DBSync {
// use sourceTableName to judge if table is except
var sourceTableName string
sourceTableName, err = j.srcMeta.GetTableNameById(dropPartition.TableId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetTableNameById is a high-cost operation, it needs to connect doris FE and execute 'SHOW TABLE'.

One possible solution is to query the table ID of the except tables before adding the job to JobManager, see Job::FirstRun for details.

if IsExceptTable(j.ExceptTable, sourceTableName) {
log.Infof("db sync drop partition, but table is except, just return")
return nil
}

destTableId, err := j.getDestTableIdBySrc(dropPartition.TableId)
if err != nil {
return err
Expand Down Expand Up @@ -1004,6 +1053,11 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error {
tableName = srcTable.Name
}

if IsExceptTable(j.ExceptTable, tableName) {
log.Infof("db sync drop table, but table is except, just return")
return nil
}

sql := fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(tableName))
log.Infof("dropTableSql: %s", sql)
if err = j.IDest.DbExec(sql); err != nil {
Expand Down Expand Up @@ -1049,6 +1103,12 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error {
if j.SyncType == TableSync {
dropTableSql = fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(j.Dest.Table))
} else {
// if table is except, just break
if IsExceptTable(j.ExceptTable, alterJob.TableName) {
log.Infof("db sync alter job, but table is except, just break")
break
}

dropTableSql = fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(alterJob.TableName))
}
log.Infof("dropTableSql: %s", dropTableSql)
Expand All @@ -1074,6 +1134,13 @@ func (j *Job) handleLightningSchemaChange(binlog *festruct.TBinlog) error {
log.Debugf("lightningSchemaChange %v", lightningSchemaChange)

rawSql := lightningSchemaChange.RawSql
var sourceTableName string
sourceTableName, err = j.srcMeta.GetTableNameById(lightningSchemaChange.TableId)
if IsExceptTable(j.ExceptTable, sourceTableName) {
log.Infof("db sync lightning schema change, but table is except, just return")
return nil
}

// "rawSql": "ALTER TABLE `default_cluster:ccr`.`test_ddl` ADD COLUMN `nid1` int(11) NULL COMMENT \"\""
// replace `default_cluster:${Src.Database}`.`test_ddl` to `test_ddl`
var sql string
Expand All @@ -1098,6 +1165,11 @@ func (j *Job) handleTruncateTable(binlog *festruct.TBinlog) error {
var destTableName string
switch j.SyncType {
case DBSync:
if IsExceptTable(j.ExceptTable, truncateTable.TableName) {
log.Infof("db sync truncate table, but table is except, just return")
return nil
}

destTableName = truncateTable.TableName
case TableSync:
destTableName = j.Dest.Table
Expand Down
13 changes: 7 additions & 6 deletions pkg/service/http_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,16 @@ func NewHttpServer(host string, port int, db storage.DB, jobManager *ccr.JobMana

type CreateCcrRequest struct {
// must need all fields required
Name string `json:"name,required"`
Src base.Spec `json:"src,required"`
Dest base.Spec `json:"dest,required"`
SkipError bool `json:"skip_error"`
Name string `json:"name,required"`
Src base.Spec `json:"src,required"`
Dest base.Spec `json:"dest,required"`
SkipError bool `json:"skip_error"`
ExceptTable string `json:"except_table"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define it to an array might be better.

Suggested change
ExceptTable string `json:"except_table"`
ExceptTables []string `json:"except_tables"`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then run strings.TrimSpace(exceptTable) before converting jobContext to Job.

}

// Stringer
func (r *CreateCcrRequest) String() string {
return fmt.Sprintf("name: %s, src: %v, dest: %v", r.Name, r.Src, r.Dest)
return fmt.Sprintf("name: %s, src: %v, dest: %v, except_table: %s", r.Name, r.Src, r.Dest, r.ExceptTable)
}

// version Handler
Expand All @@ -106,7 +107,7 @@ func (s *HttpService) versionHandler(w http.ResponseWriter, r *http.Request) {
func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) error {
log.Infof("create ccr %s", request)

ctx := ccr.NewJobContext(request.Src, request.Dest, request.SkipError, db, jobManager.GetFactory())
ctx := ccr.NewJobContext(request.Src, request.Dest, request.SkipError, request.ExceptTable, db, jobManager.GetFactory())
job, err := ccr.NewJobFromService(request.Name, ctx)
if err != nil {
return err
Expand Down
Loading