diff --git a/database/mongodb/mongodb_test.go b/database/mongodb/mongodb_test.go index a08d094ea..fcaa3d7cf 100644 --- a/database/mongodb/mongodb_test.go +++ b/database/mongodb/mongodb_test.go @@ -7,24 +7,21 @@ import ( "log" - "github.com/golang-migrate/migrate/v4" "io" "os" "strconv" "testing" "time" -) -import ( "github.com/dhui/dktest" + "github.com/golang-migrate/migrate/v4" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" -) -import ( dt "github.com/golang-migrate/migrate/v4/database/testing" "github.com/golang-migrate/migrate/v4/dktesting" + _ "github.com/golang-migrate/migrate/v4/source/file" ) @@ -411,7 +408,7 @@ func waitForReplicaInit(client *mongo.Client) error { //during replica set initialization, the first node first becomes a secondary and then becomes the primary //should consider that initialization is completed only after the node has become the primary result := client.Database("admin").RunCommand(context.TODO(), bson.D{bson.E{Key: "isMaster", Value: 1}}) - r, err := result.DecodeBytes() + r, err := result.Raw() if err != nil { return err } diff --git a/database/mongodb/v2/README.md b/database/mongodb/v2/README.md new file mode 100644 index 000000000..b2d0d0539 --- /dev/null +++ b/database/mongodb/v2/README.md @@ -0,0 +1,25 @@ +# MongoDB + +* This package is for [mongo-go-driver/v2]. A backend for the older [monog-go-driver/v1]. is [also available](..). +* Driver work with mongo through [db.runCommands](https://docs.mongodb.com/manual/reference/command/) +* Migrations support json format. It contains array of commands for `db.runCommand`. Every command is executed in separate request to database +* All keys have to be in quotes `"` +* [Examples](./examples) + +# Usage + +`mongodb://user:password@host:port/dbname?query` (`mongodb+srv://` also works, but behaves a bit differently. See [docs](https://docs.mongodb.com/manual/reference/connection-string/#dns-seedlist-connection-format) for more information) + +| URL Query | WithInstance Config | Description | +|------------|---------------------|-------------| +| `x-migrations-collection` | `MigrationsCollection` | Name of the migrations collection | +| `x-transaction-mode` | `TransactionMode` | If set to `true` wrap commands in [transaction](https://docs.mongodb.com/manual/core/transactions). Available only for replica set. Driver is using [strconv.ParseBool](https://golang.org/pkg/strconv/#ParseBool) for parsing| +| `x-advisory-locking` | `true` | Feature flag for advisory locking, if set to false, disable advisory locking | +| `x-advisory-lock-collection` | `migrate_advisory_lock` | The name of the collection to use for advisory locking.| +| `x-advisory-lock-timeout` | `15` | The max time in seconds that migrate will wait to acquire a lock before failing. | +| `x-advisory-lock-timeout-interval` | `10` | The max time in seconds between attempts to acquire the advisory lock, the lock is attempted to be acquired using an exponential backoff algorithm. | +| `dbname` | `DatabaseName` | The name of the database to connect to | +| `user` | | The user to sign in as. Can be omitted | +| `password` | | The user's password. Can be omitted | +| `host` | | The host to connect to | +| `port` | | The port to bind to | \ No newline at end of file diff --git a/database/mongodb/v2/mongodb.go b/database/mongodb/v2/mongodb.go new file mode 100644 index 000000000..f339fc085 --- /dev/null +++ b/database/mongodb/v2/mongodb.go @@ -0,0 +1,405 @@ +package mongodb + +import ( + "context" + "fmt" + "io" + "net/url" + "os" + "strconv" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/golang-migrate/migrate/v4/database" + "github.com/hashicorp/go-multierror" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/connstring" + "go.uber.org/atomic" +) + +func init() { + db := Mongo{} + database.Register("mongodb", &db) + database.Register("mongodb+srv", &db) +} + +var DefaultMigrationsCollection = "schema_migrations" + +const DefaultLockingCollection = "migrate_advisory_lock" // the collection to use for advisory locking by default. +const lockKeyUniqueValue = 0 // the unique value to lock on. If multiple clients try to insert the same key, it will fail (locked). +const DefaultLockTimeout = 15 // the default maximum time to wait for a lock to be released. +const DefaultLockTimeoutInterval = 10 // the default maximum intervals time for the locking timout. +const DefaultAdvisoryLockingFlag = true // the default value for the advisory locking feature flag. Default is true. +const LockIndexName = "lock_unique_key" // the name of the index which adds unique constraint to the locking_key field. +const contextWaitTimeout = 5 * time.Second // how long to wait for the request to mongo to block/wait for. + +var ( + ErrNoDatabaseName = fmt.Errorf("no database name") + ErrNilConfig = fmt.Errorf("no config") + ErrLockTimeoutConfigConflict = fmt.Errorf("both x-advisory-lock-timeout-interval and x-advisory-lock-timout-interval were specified") +) + +type Mongo struct { + client *mongo.Client + db *mongo.Database + config *Config + isLocked atomic.Bool +} + +type Locking struct { + CollectionName string + Timeout int + Enabled bool + Interval int +} +type Config struct { + DatabaseName string + MigrationsCollection string + TransactionMode bool + Locking Locking +} +type versionInfo struct { + Version int `bson:"version"` + Dirty bool `bson:"dirty"` +} + +type lockObj struct { + Key int `bson:"locking_key"` + Pid int `bson:"pid"` + Hostname string `bson:"hostname"` + CreatedAt time.Time `bson:"created_at"` +} +type findFilter struct { + Key int `bson:"locking_key"` +} + +func WithInstance(instance *mongo.Client, config *Config) (database.Driver, error) { + if config == nil { + return nil, ErrNilConfig + } + if len(config.DatabaseName) == 0 { + return nil, ErrNoDatabaseName + } + if len(config.MigrationsCollection) == 0 { + config.MigrationsCollection = DefaultMigrationsCollection + } + if len(config.Locking.CollectionName) == 0 { + config.Locking.CollectionName = DefaultLockingCollection + } + if config.Locking.Timeout <= 0 { + config.Locking.Timeout = DefaultLockTimeout + } + if config.Locking.Interval <= 0 { + config.Locking.Interval = DefaultLockTimeoutInterval + } + + mc := &Mongo{ + client: instance, + db: instance.Database(config.DatabaseName), + config: config, + } + + if mc.config.Locking.Enabled { + if err := mc.ensureLockTable(); err != nil { + return nil, err + } + } + if err := mc.ensureVersionTable(); err != nil { + return nil, err + } + + return mc, nil +} + +func (m *Mongo) Open(dsn string) (database.Driver, error) { + // connstring is experimental package, but it used for parse connection string in mongo.Connect function + uri, err := connstring.Parse(dsn) + if err != nil { + return nil, err + } + if len(uri.Database) == 0 { + return nil, ErrNoDatabaseName + } + unknown := url.Values(uri.UnknownOptions) + + migrationsCollection := unknown.Get("x-migrations-collection") + lockCollection := unknown.Get("x-advisory-lock-collection") + transactionMode, err := parseBoolean(unknown.Get("x-transaction-mode"), false) + if err != nil { + return nil, err + } + advisoryLockingFlag, err := parseBoolean(unknown.Get("x-advisory-locking"), DefaultAdvisoryLockingFlag) + if err != nil { + return nil, err + } + lockingTimout, err := parseInt(unknown.Get("x-advisory-lock-timeout"), DefaultLockTimeout) + if err != nil { + return nil, err + } + + lockTimeoutIntervalValue := unknown.Get("x-advisory-lock-timeout-interval") + // The initial release had a typo for this argument but for backwards compatibility sake, we will keep supporting it + // and we will error out if both values are set. + lockTimeoutIntervalValueFromTypo := unknown.Get("x-advisory-lock-timout-interval") + + lockTimeout := lockTimeoutIntervalValue + + if lockTimeoutIntervalValue != "" && lockTimeoutIntervalValueFromTypo != "" { + return nil, ErrLockTimeoutConfigConflict + } else if lockTimeoutIntervalValueFromTypo != "" { + lockTimeout = lockTimeoutIntervalValueFromTypo + } + + maxLockCheckInterval, err := parseInt(lockTimeout, DefaultLockTimeoutInterval) + + if err != nil { + return nil, err + } + client, err := mongo.Connect(options.Client().ApplyURI(dsn)) + if err != nil { + return nil, err + } + + if err = client.Ping(context.TODO(), nil); err != nil { + return nil, err + } + mc, err := WithInstance(client, &Config{ + DatabaseName: uri.Database, + MigrationsCollection: migrationsCollection, + TransactionMode: transactionMode, + Locking: Locking{ + CollectionName: lockCollection, + Timeout: lockingTimout, + Enabled: advisoryLockingFlag, + Interval: maxLockCheckInterval, + }, + }) + if err != nil { + return nil, err + } + return mc, nil +} + +// Parse the url param, convert it to boolean +// returns error if param invalid. returns defaultValue if param not present +func parseBoolean(urlParam string, defaultValue bool) (bool, error) { + + // if parameter passed, parse it (otherwise return default value) + if urlParam != "" { + result, err := strconv.ParseBool(urlParam) + if err != nil { + return false, err + } + return result, nil + } + + // if no url Param passed, return default value + return defaultValue, nil +} + +// Parse the url param, convert it to int +// returns error if param invalid. returns defaultValue if param not present +func parseInt(urlParam string, defaultValue int) (int, error) { + + // if parameter passed, parse it (otherwise return default value) + if urlParam != "" { + result, err := strconv.Atoi(urlParam) + if err != nil { + return -1, err + } + return result, nil + } + + // if no url Param passed, return default value + return defaultValue, nil +} +func (m *Mongo) SetVersion(version int, dirty bool) error { + migrationsCollection := m.db.Collection(m.config.MigrationsCollection) + if err := migrationsCollection.Drop(context.TODO()); err != nil { + return &database.Error{OrigErr: err, Err: "drop migrations collection failed"} + } + _, err := migrationsCollection.InsertOne(context.TODO(), bson.M{"version": version, "dirty": dirty}) + if err != nil { + return &database.Error{OrigErr: err, Err: "save version failed"} + } + return nil +} + +func (m *Mongo) Version() (version int, dirty bool, err error) { + var versionInfo versionInfo + err = m.db.Collection(m.config.MigrationsCollection).FindOne(context.TODO(), bson.M{}).Decode(&versionInfo) + switch { + case err == mongo.ErrNoDocuments: + return database.NilVersion, false, nil + case err != nil: + return 0, false, &database.Error{OrigErr: err, Err: "failed to get migration version"} + default: + return versionInfo.Version, versionInfo.Dirty, nil + } +} + +func (m *Mongo) Run(migration io.Reader) error { + migr, err := io.ReadAll(migration) + if err != nil { + return err + } + var cmds []bson.D + err = bson.UnmarshalExtJSON(migr, true, &cmds) + if err != nil { + return fmt.Errorf("unmarshaling json error: %s", err) + } + if m.config.TransactionMode { + if err := m.executeCommandsWithTransaction(context.TODO(), cmds); err != nil { + return err + } + } else { + if err := m.executeCommands(context.TODO(), cmds); err != nil { + return err + } + } + return nil +} + +func (m *Mongo) executeCommandsWithTransaction(ctx context.Context, cmds []bson.D) error { + err := m.db.Client().UseSession(ctx, func(sessionContext context.Context) error { + sess := mongo.SessionFromContext(ctx) + if err := sess.StartTransaction(); err != nil { + return &database.Error{OrigErr: err, Err: "failed to start transaction"} + } + if err := m.executeCommands(sessionContext, cmds); err != nil { + // When command execution is failed, it's aborting transaction + // If you tried to call abortTransaction, it`s return error that transaction already aborted + return err + } + if err := sess.CommitTransaction(sessionContext); err != nil { + return &database.Error{OrigErr: err, Err: "failed to commit transaction"} + } + return nil + }) + if err != nil { + return err + } + return nil +} + +func (m *Mongo) executeCommands(ctx context.Context, cmds []bson.D) error { + for _, cmd := range cmds { + err := m.db.RunCommand(ctx, cmd).Err() + if err != nil { + return &database.Error{OrigErr: err, Err: fmt.Sprintf("failed to execute command:%v", cmd)} + } + } + return nil +} + +func (m *Mongo) Close() error { + return m.client.Disconnect(context.TODO()) +} + +func (m *Mongo) Drop() error { + return m.db.Drop(context.TODO()) +} + +func (m *Mongo) ensureLockTable() error { + indexes := m.db.Collection(m.config.Locking.CollectionName).Indexes() + + indexOptions := options.Index().SetUnique(true).SetName(LockIndexName) + _, err := indexes.CreateOne(context.TODO(), mongo.IndexModel{ + Options: indexOptions, + Keys: findFilter{Key: -1}, + }) + if err != nil { + return err + } + return nil +} + +// ensureVersionTable checks if versions table exists and, if not, creates it. +// Note that this function locks the database, which deviates from the usual +// convention of "caller locks" in the MongoDb type. +func (m *Mongo) ensureVersionTable() (err error) { + if err = m.Lock(); err != nil { + return err + } + + defer func() { + if e := m.Unlock(); e != nil { + if err == nil { + err = e + } else { + err = multierror.Append(err, e) + } + } + }() + + if err != nil { + return err + } + if _, _, err = m.Version(); err != nil { + return err + } + return nil +} + +// Utilizes advisory locking on the config.LockingCollection collection +// This uses a unique index on the `locking_key` field. +func (m *Mongo) Lock() error { + return database.CasRestoreOnErr(&m.isLocked, false, true, database.ErrLocked, func() error { + if !m.config.Locking.Enabled { + return nil + } + + pid := os.Getpid() + hostname, err := os.Hostname() + if err != nil { + hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error()) + } + + newLockObj := lockObj{ + Key: lockKeyUniqueValue, + Pid: pid, + Hostname: hostname, + CreatedAt: time.Now(), + } + operation := func() error { + timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout) + _, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj) + defer cancelFunc() + return err + } + exponentialBackOff := backoff.NewExponentialBackOff() + duration := time.Duration(m.config.Locking.Timeout) * time.Second + exponentialBackOff.MaxElapsedTime = duration + exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second + + err = backoff.Retry(operation, exponentialBackOff) + if err != nil { + return database.ErrLocked + } + + return nil + }) +} + +func (m *Mongo) Unlock() error { + return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error { + if !m.config.Locking.Enabled { + return nil + } + + filter := findFilter{ + Key: lockKeyUniqueValue, + } + + ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout) + _, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter) + defer cancel() + + if err != nil { + return err + } + return nil + }) +} diff --git a/database/mongodb/v2/mongodb_test.go b/database/mongodb/v2/mongodb_test.go new file mode 100644 index 000000000..43c09e977 --- /dev/null +++ b/database/mongodb/v2/mongodb_test.go @@ -0,0 +1,427 @@ +package mongodb + +import ( + "bytes" + "context" + "fmt" + + "log" + + "io" + "os" + "strconv" + "testing" + "time" + + "github.com/dhui/dktest" + "github.com/golang-migrate/migrate/v4" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + + dt "github.com/golang-migrate/migrate/v4/database/testing" + "github.com/golang-migrate/migrate/v4/dktesting" + + _ "github.com/golang-migrate/migrate/v4/source/file" +) + +var ( + opts = dktest.Options{PortRequired: true, ReadyFunc: isReady} + // Supported versions: https://www.mongodb.com/support-policy + specs = []dktesting.ContainerSpec{ + {ImageName: "mongo:5.0", Options: opts}, + {ImageName: "mongo:6.0", Options: opts}, + {ImageName: "mongo:7.0", Options: opts}, + {ImageName: "mongo:8.0", Options: opts}, + } +) + +func mongoConnectionString(host, port string) string { + // there is connect option for excluding serverConnection algorithm + // it's let avoid errors with mongo replica set connection in docker container + return fmt.Sprintf("mongodb://%s:%s/testMigration?connect=direct", host, port) +} + +func isReady(ctx context.Context, c dktest.ContainerInfo) bool { + ip, port, err := c.FirstPort() + if err != nil { + return false + } + + client, err := mongo.Connect(options.Client().ApplyURI(mongoConnectionString(ip, port))) + if err != nil { + return false + } + defer func() { + if err := client.Disconnect(ctx); err != nil { + log.Println("close error:", err) + } + }() + + if err = client.Ping(ctx, nil); err != nil { + switch err { + case io.EOF: + return false + default: + log.Println(err) + } + return false + } + return true +} + +func Test(t *testing.T) { + t.Run("test", test) + t.Run("testMigrate", testMigrate) + t.Run("testWithAuth", testWithAuth) + t.Run("testLockWorks", testLockWorks) + + t.Cleanup(func() { + for _, spec := range specs { + t.Log("Cleaning up ", spec.ImageName) + if err := spec.Cleanup(); err != nil { + t.Error("Error removing ", spec.ImageName, "error:", err) + } + } + }) +} + +func test(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + + addr := mongoConnectionString(ip, port) + p := &Mongo{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + dt.TestNilVersion(t, d) + dt.TestLockAndUnlock(t, d) + dt.TestRun(t, d, bytes.NewReader([]byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`))) + dt.TestSetVersion(t, d) + dt.TestDrop(t, d) + }) +} + +func testMigrate(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + + addr := mongoConnectionString(ip, port) + p := &Mongo{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + m, err := migrate.NewWithDatabaseInstance("file://../examples/migrations", "", d) + if err != nil { + t.Fatal(err) + } + dt.TestMigrate(t, m) + }) +} + +func testWithAuth(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + + addr := mongoConnectionString(ip, port) + p := &Mongo{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + createUserCMD := []byte(`[{"createUser":"deminem","pwd":"gogo","roles":[{"role":"readWrite","db":"testMigration"}]}]`) + err = d.Run(bytes.NewReader(createUserCMD)) + if err != nil { + t.Fatal(err) + } + testcases := []struct { + name string + connectUri string + isErrorExpected bool + }{ + {"right auth data", "mongodb://deminem:gogo@%s:%v/testMigration", false}, + {"wrong auth data", "mongodb://wrong:auth@%s:%v/testMigration", true}, + } + + for _, tcase := range testcases { + t.Run(tcase.name, func(t *testing.T) { + mc := &Mongo{} + d, err := mc.Open(fmt.Sprintf(tcase.connectUri, ip, port)) + if err == nil { + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + } + + switch { + case tcase.isErrorExpected && err == nil: + t.Fatalf("no error when expected") + case !tcase.isErrorExpected && err != nil: + t.Fatalf("unexpected error: %v", err) + } + }) + } + }) +} + +func testLockWorks(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + + addr := mongoConnectionString(ip, port) + p := &Mongo{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + dt.TestRun(t, d, bytes.NewReader([]byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`))) + + mc := d.(*Mongo) + + err = mc.Lock() + if err != nil { + t.Fatal(err) + } + err = mc.Unlock() + if err != nil { + t.Fatal(err) + } + + err = mc.Lock() + if err != nil { + t.Fatal(err) + } + err = mc.Unlock() + if err != nil { + t.Fatal(err) + } + + // enable locking, + //try to hit a lock conflict + mc.config.Locking.Enabled = true + mc.config.Locking.Timeout = 1 + err = mc.Lock() + if err != nil { + t.Fatal(err) + } + err = mc.Lock() + if err == nil { + t.Fatal("should have failed, mongo should be locked already") + } + }) +} + +func TestTransaction(t *testing.T) { + transactionSpecs := []dktesting.ContainerSpec{ + {ImageName: "mongo:4", Options: dktest.Options{PortRequired: true, ReadyFunc: isReady, + Cmd: []string{"mongod", "--bind_ip_all", "--replSet", "rs0"}}}, + } + t.Cleanup(func() { + for _, spec := range transactionSpecs { + t.Log("Cleaning up ", spec.ImageName) + if err := spec.Cleanup(); err != nil { + t.Error("Error removing ", spec.ImageName, "error:", err) + } + } + }) + + dktesting.ParallelTest(t, transactionSpecs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + + client, err := mongo.Connect(options.Client().ApplyURI(mongoConnectionString(ip, port))) + if err != nil { + t.Fatal(err) + } + err = client.Ping(context.TODO(), nil) + if err != nil { + t.Fatal(err) + } + //rs.initiate() + err = client.Database("admin").RunCommand(context.TODO(), bson.D{bson.E{Key: "replSetInitiate", Value: bson.D{}}}).Err() + if err != nil { + t.Fatal(err) + } + err = waitForReplicaInit(client) + if err != nil { + t.Fatal(err) + } + d, err := WithInstance(client, &Config{ + DatabaseName: "testMigration", + }) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + //We have to create collection + //transactions don't support operations with creating new dbs, collections + //Unique index need for checking transaction aborting + insertCMD := []byte(`[ + {"create":"hello"}, + {"createIndexes": "hello", + "indexes": [{ + "key": { + "wild": 1 + }, + "name": "unique_wild", + "unique": true, + "background": true + }] + }]`) + err = d.Run(bytes.NewReader(insertCMD)) + if err != nil { + t.Fatal(err) + } + testcases := []struct { + name string + cmds []byte + documentsCount int64 + isErrorExpected bool + }{ + { + name: "success transaction", + cmds: []byte(`[{"insert":"hello","documents":[ + {"wild":"world"}, + {"wild":"west"}, + {"wild":"natural"} + ] + }]`), + documentsCount: 3, + isErrorExpected: false, + }, + { + name: "failure transaction", + //transaction have to be failure - duplicate unique key wild:west + //none of the documents should be added + cmds: []byte(`[{"insert":"hello","documents":[{"wild":"flower"}]}, + {"insert":"hello","documents":[ + {"wild":"cat"}, + {"wild":"west"} + ] + }]`), + documentsCount: 3, + isErrorExpected: true, + }, + } + for _, tcase := range testcases { + t.Run(tcase.name, func(t *testing.T) { + client, err := mongo.Connect(options.Client().ApplyURI(mongoConnectionString(ip, port))) + if err != nil { + t.Fatal(err) + } + err = client.Ping(context.TODO(), nil) + if err != nil { + t.Fatal(err) + } + d, err := WithInstance(client, &Config{ + DatabaseName: "testMigration", + TransactionMode: true, + }) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + runErr := d.Run(bytes.NewReader(tcase.cmds)) + if runErr != nil { + if !tcase.isErrorExpected { + t.Fatal(runErr) + } + } + documentsCount, err := client.Database("testMigration").Collection("hello").CountDocuments(context.TODO(), bson.M{}) + if err != nil { + t.Fatal(err) + } + if tcase.documentsCount != documentsCount { + t.Fatalf("expected %d and actual %d documents count not equal. run migration error:%s", tcase.documentsCount, documentsCount, runErr) + } + }) + } + }) +} + +type isMaster struct { + IsMaster bool `bson:"ismaster"` +} + +func waitForReplicaInit(client *mongo.Client) error { + ticker := time.NewTicker(time.Second * 1) + defer ticker.Stop() + timeout, err := strconv.Atoi(os.Getenv("MIGRATE_TEST_MONGO_REPLICA_SET_INIT_TIMEOUT")) + if err != nil { + timeout = 30 + } + timeoutTimer := time.NewTimer(time.Duration(timeout) * time.Second) + defer timeoutTimer.Stop() + for { + select { + case <-ticker.C: + var status isMaster + //Check that node is primary because + //during replica set initialization, the first node first becomes a secondary and then becomes the primary + //should consider that initialization is completed only after the node has become the primary + result := client.Database("admin").RunCommand(context.TODO(), bson.D{bson.E{Key: "isMaster", Value: 1}}) + r, err := result.Raw() + if err != nil { + return err + } + err = bson.Unmarshal(r, &status) + if err != nil { + return err + } + if status.IsMaster { + return nil + } + case <-timeoutTimer.C: + return fmt.Errorf("replica init timeout") + } + } + +} diff --git a/go.mod b/go.mod index 3c20151f2..f47ecf28f 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,8 @@ require ( github.com/snowflakedb/gosnowflake v1.6.19 github.com/stretchr/testify v1.9.0 github.com/xanzy/go-gitlab v0.15.0 - go.mongodb.org/mongo-driver v1.7.5 + go.mongodb.org/mongo-driver v1.17.3 + go.mongodb.org/mongo-driver/v2 v2.1.0 go.uber.org/atomic v1.7.0 golang.org/x/oauth2 v0.18.0 golang.org/x/tools v0.24.0 @@ -54,6 +55,7 @@ require ( github.com/moby/docker-image-spec v1.3.1 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect @@ -113,7 +115,6 @@ require ( github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect github.com/gabriel-vasile/mimetype v1.4.1 // indirect - github.com/go-stack/stack v1.8.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -145,7 +146,7 @@ require ( github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.15.11 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-isatty v0.0.16 // indirect @@ -168,9 +169,9 @@ require ( github.com/shopspring/decimal v1.2.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect - github.com/xdg-go/scram v1.1.1 // indirect - github.com/xdg-go/stringprep v1.0.3 // indirect - github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b // indirect go.opencensus.io v0.24.0 // indirect diff --git a/go.sum b/go.sum index e30a51a2a..23da4bb18 100644 --- a/go.sum +++ b/go.sum @@ -203,7 +203,6 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= -github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gobuffalo/here v0.6.0 h1:hYrd0a6gDmWxBM4TnrGw8mQg24iSVoIkHEk7FodQcBI= @@ -255,7 +254,6 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -420,9 +418,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= -github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -484,8 +481,9 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= -github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= @@ -567,20 +565,16 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/xanzy/go-gitlab v0.15.0 h1:rWtwKTgEnXyNUGrOArN7yyc3THRkpYcKXIXia9abywQ= github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= -github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= -github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= -github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= -github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= -github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= -github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= -github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= @@ -591,8 +585,10 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b h1:7gd+rd8P3bqcn/96gOZa3F5dpJr/vEiDQYlNb/y2uNs= gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b/go.mod h1:T3BPAOm2cqquPa0MKWeNkmOM5RQsRhkrwMWonFMN7fE= -go.mongodb.org/mongo-driver v1.7.5 h1:ny3p0reEpgsR2cfA5cjgwFZg3Cv/ofFh/8jbhGtz9VI= -go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng= +go.mongodb.org/mongo-driver v1.17.3 h1:TQyXhnsWfWtgAhMtOgtYHMTkZIfBTpMTsMnd9ZBeHxQ= +go.mongodb.org/mongo-driver v1.17.3/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= +go.mongodb.org/mongo-driver/v2 v2.1.0 h1:/ELnVNjmfUKDsoBisXxuJL0noR9CfeUIrP7Yt3R+egg= +go.mongodb.org/mongo-driver/v2 v2.1.0/go.mod h1:AWiLRShSrk5RHQS3AEn3RL19rqOzVq49MCpWQ3x/huI= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -636,7 +632,6 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -803,7 +798,6 @@ golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBn golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=