Skip to content

Commit ee5fa89

Browse files
authored
add option to ignore oplog error in mongo batch (#136)
* add option to ignore oplog error * add some error handling * add mysql router to stdout * configurable position repo
1 parent 775acc2 commit ee5fa89

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+903
-514
lines changed

docker-compose-gravity-test.yml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ services:
2525

2626
mongo:
2727
image: mongo:4.1
28+
container_name: mongo
2829
ports:
2930
- 27017
3031
logging:

integration_test/mongokafka/batch_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@ func TestChunkedBatch(t *testing.T) {
3636
InputPlugin: config.InputConfig{
3737
Type: inputs.Mongo,
3838
Mode: config.Batch,
39-
Config: utils.Struct2Map(
39+
Config: utils.MustAny2Map(
4040
mongobatch.Config{
4141
Source: &mongoCfg,
4242
ChunkThreshold: 200,
4343
BatchSize: 100,
4444
},
4545
),
4646
},
47-
OutputPlugin: config.GenericConfig{
47+
OutputPlugin: config.GenericPluginConfig{
4848
Type: outputs.AsyncKafka,
49-
Config: utils.Struct2Map(async_kafka.AsyncKafkaPluginConfig{
49+
Config: utils.MustAny2Map(async_kafka.AsyncKafkaPluginConfig{
5050
KafkaConfig: &config.KafkaGlobalConfig{
5151
BrokerAddrs: kafkaBroker,
5252
Mode: "async",
@@ -125,16 +125,16 @@ func TestNonChunkBatch(t *testing.T) {
125125
InputPlugin: config.InputConfig{
126126
Type: inputs.Mongo,
127127
Mode: config.Batch,
128-
Config: utils.Struct2Map(
128+
Config: utils.MustAny2Map(
129129
mongobatch.Config{
130130
Source: &mongoCfg,
131131
BatchSize: 100,
132132
},
133133
),
134134
},
135-
OutputPlugin: config.GenericConfig{
135+
OutputPlugin: config.GenericPluginConfig{
136136
Type: outputs.AsyncKafka,
137-
Config: utils.Struct2Map(async_kafka.AsyncKafkaPluginConfig{
137+
Config: utils.MustAny2Map(async_kafka.AsyncKafkaPluginConfig{
138138
KafkaConfig: &config.KafkaGlobalConfig{
139139
BrokerAddrs: kafkaBroker,
140140
Mode: "async",

integration_test/mongokafka/replication_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ func TestReplication(t *testing.T) {
3535
InputPlugin: config.InputConfig{
3636
Type: inputs.Mongo,
3737
Mode: config.Replication,
38-
Config: utils.Struct2Map(
38+
Config: utils.MustAny2Map(
3939
mongobatch.Config{
4040
Source: &mongoCfg,
4141
},
4242
),
4343
},
44-
OutputPlugin: config.GenericConfig{
44+
OutputPlugin: config.GenericPluginConfig{
4545
Type: outputs.AsyncKafka,
46-
Config: utils.Struct2Map(async_kafka.AsyncKafkaPluginConfig{
46+
Config: utils.MustAny2Map(async_kafka.AsyncKafkaPluginConfig{
4747
KafkaConfig: &config.KafkaGlobalConfig{
4848
BrokerAddrs: kafkaBroker,
4949
Mode: "async",

integration_test/mongokafka/stream_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestStream(t *testing.T) {
3939
},
4040
},
4141
},
42-
OutputPlugin: gravityConfig.GenericConfig{
42+
OutputPlugin: gravityConfig.GenericPluginConfig{
4343
Type: "async-kafka",
4444
Config: map[string]interface{}{
4545
"kafka-global-config": map[string]interface{}{

integration_test/mongomysql/replication_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ func TestMongo2MysqlReplication(t *testing.T) {
3939
InputPlugin: config.InputConfig{
4040
Type: inputs.Mongo,
4141
Mode: config.Replication,
42-
Config: utils.Struct2Map(
42+
Config: utils.MustAny2Map(
4343
mongobatch.Config{
4444
Source: &mongoCfg,
4545
},
4646
),
4747
},
48-
OutputPlugin: config.GenericConfig{
48+
OutputPlugin: config.GenericPluginConfig{
4949
Type: outputs.Mysql,
50-
Config: utils.Struct2Map(mysql.MySQLPluginConfig{
50+
Config: utils.MustAny2Map(mysql.MySQLPluginConfig{
5151
DBConfig: targetDBC,
5252
Routes: []map[string]interface{}{
5353
{

integration_test/mysql_mysql_test.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -514,13 +514,13 @@ func TestMySQLBatchNoTableConfig(t *testing.T) {
514514
InputPlugin: config.InputConfig{
515515
Type: "mysql",
516516
Mode: config.Batch,
517-
Config: utils.Struct2Map(mysqlstream.MySQLBinlogInputPluginConfig{
517+
Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{
518518
Source: sourceDBConfig,
519519
}),
520520
},
521-
OutputPlugin: config.GenericConfig{
521+
OutputPlugin: config.GenericPluginConfig{
522522
Type: "mysql",
523-
Config: utils.Struct2Map(mysql.MySQLPluginConfig{
523+
Config: utils.MustAny2Map(mysql.MySQLPluginConfig{
524524
DBConfig: targetDBConfig,
525525
EnableDDL: true,
526526
Routes: []map[string]interface{}{
@@ -572,13 +572,13 @@ func TestZeroTime(t *testing.T) {
572572
InputPlugin: config.InputConfig{
573573
Type: "mysql",
574574
Mode: config.Replication,
575-
Config: utils.Struct2Map(mysqlstream.MySQLBinlogInputPluginConfig{
575+
Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{
576576
Source: sourceDBConfig,
577577
}),
578578
},
579-
OutputPlugin: config.GenericConfig{
579+
OutputPlugin: config.GenericPluginConfig{
580580
Type: "mysql",
581-
Config: utils.Struct2Map(mysql.MySQLPluginConfig{
581+
Config: utils.MustAny2Map(mysql.MySQLPluginConfig{
582582
DBConfig: targetDBConfig,
583583
EnableDDL: true,
584584
Routes: []map[string]interface{}{
@@ -667,7 +667,7 @@ func TestMySQLBatchWithInsertIgnore(t *testing.T) {
667667
"mode": "batch",
668668
},
669669
},
670-
OutputPlugin: config.GenericConfig{
670+
OutputPlugin: config.GenericPluginConfig{
671671
Type: "mysql",
672672
Config: map[string]interface{}{
673673
"target": map[string]interface{}{
@@ -677,7 +677,7 @@ func TestMySQLBatchWithInsertIgnore(t *testing.T) {
677677
"port": targetDBConfig.Port,
678678
},
679679
"enable-ddl": true,
680-
"sql-engine-config": &config.GenericConfig{
680+
"sql-engine-config": &config.GenericPluginConfig{
681681
Type: "mysql-insert-ignore",
682682
},
683683
"routes": []map[string]interface{}{
@@ -960,14 +960,14 @@ func TestMySQLToMyBidirection(t *testing.T) {
960960
InputPlugin: config.InputConfig{
961961
Type: "mysql",
962962
Mode: config.Stream,
963-
Config: utils.Struct2Map(mysqlstream.MySQLBinlogInputPluginConfig{
963+
Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{
964964
IgnoreBiDirectionalData: true,
965965
Source: sourceDBConfig,
966966
}),
967967
},
968-
OutputPlugin: config.GenericConfig{
968+
OutputPlugin: config.GenericPluginConfig{
969969
Type: "mysql",
970-
Config: utils.Struct2Map(mysql.MySQLPluginConfig{
970+
Config: utils.MustAny2Map(mysql.MySQLPluginConfig{
971971
DBConfig: targetDBConfig,
972972
EnableDDL: true,
973973
Routes: []map[string]interface{}{
@@ -977,9 +977,9 @@ func TestMySQLToMyBidirection(t *testing.T) {
977977
"target-schema": targetDBName,
978978
},
979979
},
980-
EngineConfig: &config.GenericConfig{
980+
EngineConfig: &config.GenericPluginConfig{
981981
Type: sql_execution_engine.MySQLReplaceEngine,
982-
Config: utils.Struct2Map(sql_execution_engine.MysqlReplaceEngineConfig{
982+
Config: utils.MustAny2Map(sql_execution_engine.MysqlReplaceEngineConfig{
983983
TagInternalTxn: true,
984984
}),
985985
},
@@ -1050,12 +1050,12 @@ func TestMySQLTagDDL(t *testing.T) {
10501050
InputPlugin: config.InputConfig{
10511051
Type: "mysql",
10521052
Mode: config.Stream,
1053-
Config: utils.Struct2Map(mysqlstream.MySQLBinlogInputPluginConfig{
1053+
Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{
10541054
Source: sourceDBConfig,
10551055
IgnoreBiDirectionalData: true,
10561056
}),
10571057
},
1058-
OutputPlugin: config.GenericConfig{
1058+
OutputPlugin: config.GenericPluginConfig{
10591059
Type: "mysql",
10601060
Config: map[string]interface{}{
10611061
"target": map[string]interface{}{
@@ -1127,7 +1127,7 @@ func TestMySQLDDL(t *testing.T) {
11271127
},
11281128
},
11291129
},
1130-
OutputPlugin: config.GenericConfig{
1130+
OutputPlugin: config.GenericPluginConfig{
11311131
Type: "mysql",
11321132
Config: map[string]interface{}{
11331133
"target": map[string]interface{}{
@@ -1220,7 +1220,7 @@ func TestMySQLDDLNoRoute(t *testing.T) {
12201220
},
12211221
},
12221222
},
1223-
OutputPlugin: config.GenericConfig{
1223+
OutputPlugin: config.GenericPluginConfig{
12241224
Type: "mysql",
12251225
Config: map[string]interface{}{
12261226
"target": map[string]interface{}{

integration_test/mysqlelasticsearch/replication_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ func TestMySQL2ElasticsearchReplication(t *testing.T) {
5050
InputPlugin: config.InputConfig{
5151
Type: inputs.Mysql,
5252
Mode: config.Replication,
53-
Config: utils.Struct2Map(
53+
Config: utils.MustAny2Map(
5454
mysqlbatch.PluginConfig{
5555
Source: sourceDBConfig,
5656
},
5757
),
5858
},
59-
OutputPlugin: config.GenericConfig{
59+
OutputPlugin: config.GenericPluginConfig{
6060
Type: outputs.Elasticsearch,
61-
Config: utils.Struct2Map(elasticsearch.ElasticsearchPluginConfig{
61+
Config: utils.MustAny2Map(elasticsearch.ElasticsearchPluginConfig{
6262
ServerConfig: &elasticsearch.ElasticsearchServerConfig{
6363
URLs: elasticsearch_test.TestURLs(),
6464
Sniff: false,

mock/position_store/mock.go

+7-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/app/server.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/moiot/gravity/pkg/filters"
1414
_ "github.com/moiot/gravity/pkg/inputs"
1515
_ "github.com/moiot/gravity/pkg/outputs"
16-
"github.com/moiot/gravity/pkg/position_store"
16+
"github.com/moiot/gravity/pkg/position_cache"
1717
"github.com/moiot/gravity/pkg/registry"
1818
"github.com/moiot/gravity/pkg/schedulers/batch_table_scheduler"
1919
)
@@ -23,7 +23,7 @@ type Server struct {
2323
filters []core.IFilter
2424
Emitter core.Emitter
2525
Scheduler core.Scheduler
26-
PositionCache position_store.PositionCacheInterface
26+
PositionCache position_cache.PositionCacheInterface
2727
Output core.Output
2828

2929
// When Input is done, server will be closed, when config changed, server will also be closed;
@@ -58,7 +58,7 @@ func ParsePlugins(pipelineConfig config.PipelineConfigV3) (*Server, error) {
5858

5959
// scheduler
6060
if pipelineConfig.SchedulerPlugin == nil {
61-
pipelineConfig.SchedulerPlugin = &config.GenericConfig{
61+
pipelineConfig.SchedulerPlugin = &config.GenericPluginConfig{
6262
Type: "batch-table-scheduler",
6363
Config: batch_table_scheduler.DefaultConfig,
6464
}

pkg/config/config.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,11 @@ type MySQLConfig struct {
115115
}
116116

117117
type SourceTiDBConfig struct {
118-
SourceDB *utils.DBConfig `mapstructure:"source-db" toml:"source-db" json:"source-db"`
119-
SourceKafka *SourceKafkaConfig `mapstructure:"source-kafka" toml:"source-kafka" json:"source-kafka"`
120-
OffsetStoreConfig *SourceProbeCfg `mapstructure:"offset-store" toml:"offset-store" json:"offset-store"`
121-
IgnoreBiDirectionalData bool `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"`
118+
SourceDB *utils.DBConfig `mapstructure:"source-db" toml:"source-db" json:"source-db"`
119+
SourceKafka *SourceKafkaConfig `mapstructure:"source-kafka" toml:"source-kafka" json:"source-kafka"`
120+
// OffsetStoreConfig *SourceProbeCfg `mapstructure:"offset-store" toml:"offset-store" json:"offset-store"`
121+
PositionRepo *GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"`
122+
IgnoreBiDirectionalData bool `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"`
122123
}
123124

124125
type GtmConfig struct {
@@ -134,7 +135,7 @@ type MongoConnConfig struct {
134135
Username string `mapstructure:"username" toml:"username" json:"username"`
135136
Password string `mapstructure:"password" toml:"password" json:"password"`
136137
Database string `mapstructure:"database" toml:"database" json:"database"`
137-
Direct bool `mapstructure:"-" toml:"-" json:"-"`
138+
Direct bool `mapstructure:"direct" toml:"direct" json:"direct"`
138139
}
139140

140141
type TargetMySQLWorkerConfig struct {

pkg/config/config_v2.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (c *PipelineConfigV2) ToV3() PipelineConfigV3 {
3939

4040
for _, f := range c.FilterPlugins {
4141
m := f.(map[string]interface{})
42-
ff := GenericConfig{
42+
ff := GenericPluginConfig{
4343
Type: m["type"].(string),
4444
}
4545
delete(m, "type")
@@ -53,7 +53,7 @@ func (c *PipelineConfigV2) ToV3() PipelineConfigV3 {
5353
}
5454

5555
for k, v := range c.SchedulerPlugins {
56-
ret.SchedulerPlugin = &GenericConfig{
56+
ret.SchedulerPlugin = &GenericPluginConfig{
5757
Type: k,
5858
Config: v.(map[string]interface{}),
5959
}

pkg/config/config_v3.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ import (
99
const PipelineConfigV3Version = "1.0"
1010

1111
type PipelineConfigV3 struct {
12-
PipelineName string `yaml:"name" toml:"name" json:"name"`
13-
Version string `yaml:"version" toml:"version" json:"version"`
14-
InputPlugin InputConfig `yaml:"input" toml:"input" json:"input"`
15-
FilterPlugins []GenericConfig `yaml:"filters" toml:"filters" json:"filters,omitempty"`
16-
OutputPlugin GenericConfig `yaml:"output" toml:"output" json:"output"`
17-
SchedulerPlugin *GenericConfig `yaml:"scheduler" toml:"scheduler" json:"scheduler,omitempty"`
12+
PipelineName string `yaml:"name" toml:"name" json:"name"`
13+
Version string `yaml:"version" toml:"version" json:"version"`
14+
InputPlugin InputConfig `yaml:"input" toml:"input" json:"input"`
15+
FilterPlugins []GenericPluginConfig `yaml:"filters" toml:"filters" json:"filters,omitempty"`
16+
OutputPlugin GenericPluginConfig `yaml:"output" toml:"output" json:"output"`
17+
SchedulerPlugin *GenericPluginConfig `yaml:"scheduler" toml:"scheduler" json:"scheduler,omitempty"`
1818
}
1919

2020
func (c *PipelineConfigV3) SetDefault() {
@@ -58,7 +58,7 @@ type InputConfig struct {
5858
Config map[string]interface{} `yaml:"config" json:"config" toml:"config"`
5959
}
6060

61-
type GenericConfig struct {
61+
type GenericPluginConfig struct {
6262
Type string `yaml:"type" json:"type" toml:"type"`
6363
Config map[string]interface{} `yaml:"config" json:"config,omitempty" toml:"config,omitempty"`
6464
}

pkg/core/input.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ package core
22

33
import (
44
"github.com/moiot/gravity/pkg/config"
5-
"github.com/moiot/gravity/pkg/position_store"
5+
"github.com/moiot/gravity/pkg/position_cache"
6+
"github.com/moiot/gravity/pkg/position_repos"
67
)
78

89
type Input interface {
9-
Start(emitter Emitter, router Router, positionCache position_store.PositionCacheInterface) error
10+
Start(emitter Emitter, router Router, positionCache position_cache.PositionCacheInterface) error
1011
Close()
1112
Stage() config.InputMode
12-
Done() chan position_store.Position
13+
Done() chan position_repos.Position
1314
SendDeadSignal() error // for test only
1415
Wait()
1516
}

0 commit comments

Comments
 (0)