diff --git a/CHANGELOG.md b/CHANGELOG.md index f9b6d549e..db043f583 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Implemented box.session.su request and sugar interface only for current session granting (#426). - Defined `ErrConcurrentSchemaUpdate` constant for "concurrent schema update" error. Now you can check this error with `errors.Is(err, tarantool.ErrConcurrentSchemaUpdate)`. +- Implemented support for `IPROTO_IS_SYNC` flag in stream transactions, + added `IsSync(bool)` method for `BeginRequest`/`CommitRequest` (#447). ### Changed diff --git a/example_test.go b/example_test.go index d4480cea4..463f3289c 100644 --- a/example_test.go +++ b/example_test.go @@ -998,6 +998,90 @@ func ExampleBeginRequest_TxnIsolation() { fmt.Printf("Select after Rollback: response is %#v\n", data) } +func ExampleBeginRequest_IsSync() { + conn := exampleConnect(dialer, opts) + defer conn.Close() + + // Tarantool supports IS_SYNC flag for BeginRequest since version 3.1.0. + isLess, err := test_helpers.IsTarantoolVersionLess(3, 1, 0) + if err != nil || isLess { + return + } + + stream, err := conn.NewStream() + if err != nil { + fmt.Printf("error getting the stream: %s\n", err) + return + } + + // Begin transaction with synchronous mode. + req := tarantool.NewBeginRequest().IsSync(true) + resp, err := stream.Do(req).GetResponse() + switch { + case err != nil: + fmt.Printf("error getting the response: %s\n", err) + case resp.Header().Error != tarantool.ErrorNo: + fmt.Printf("response error code: %s\n", resp.Header().Error) + default: + fmt.Println("Success.") + } +} + +func ExampleCommitRequest_IsSync() { + conn := exampleConnect(dialer, opts) + defer conn.Close() + + // Tarantool supports IS_SYNC flag for CommitRequest since version 3.1.0. + isLess, err := test_helpers.IsTarantoolVersionLess(3, 1, 0) + if err != nil || isLess { + return + } + + var req tarantool.Request + + stream, err := conn.NewStream() + if err != nil { + fmt.Printf("error getting the stream: %s\n", err) + return + } + + // Begin transaction. + req = tarantool.NewBeginRequest() + resp, err := stream.Do(req).GetResponse() + switch { + case err != nil: + fmt.Printf("error getting the response: %s\n", err) + return + case resp.Header().Error != tarantool.ErrorNo: + fmt.Printf("response error code: %s\n", resp.Header().Error) + return + } + + // Insert in stream. + req = tarantool.NewReplaceRequest("test").Tuple([]interface{}{1, "test"}) + resp, err = stream.Do(req).GetResponse() + switch { + case err != nil: + fmt.Printf("error getting the response: %s\n", err) + return + case resp.Header().Error != tarantool.ErrorNo: + fmt.Printf("response error code: %s\n", resp.Header().Error) + return + } + + // Commit transaction in sync mode. + req = tarantool.NewCommitRequest().IsSync(true) + resp, err = stream.Do(req).GetResponse() + switch { + case err != nil: + fmt.Printf("error getting the response: %s\n", err) + case resp.Header().Error != tarantool.ErrorNo: + fmt.Printf("response error code: %s\n", resp.Header().Error) + default: + fmt.Println("Success.") + } +} + func ExampleErrorNo() { conn := exampleConnect(dialer, opts) defer conn.Close() diff --git a/export_test.go b/export_test.go deleted file mode 100644 index ab3784b3b..000000000 --- a/export_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package tarantool - -import ( - "time" - - "github.com/vmihailenco/msgpack/v5" -) - -// RefImplPingBody is reference implementation for filling of a ping -// request's body. -func RefImplPingBody(enc *msgpack.Encoder) error { - return fillPing(enc) -} - -// RefImplSelectBody is reference implementation for filling of a select -// request's body. -func RefImplSelectBody(enc *msgpack.Encoder, res SchemaResolver, space, index interface{}, - offset, limit uint32, iterator Iter, key, after interface{}, fetchPos bool) error { - spaceEnc, err := newSpaceEncoder(res, space) - if err != nil { - return err - } - indexEnc, err := newIndexEncoder(res, index, spaceEnc.Id) - if err != nil { - return err - } - return fillSelect(enc, spaceEnc, indexEnc, offset, limit, iterator, key, after, fetchPos) -} - -// RefImplInsertBody is reference implementation for filling of an insert -// request's body. -func RefImplInsertBody(enc *msgpack.Encoder, res SchemaResolver, space, - tuple interface{}) error { - spaceEnc, err := newSpaceEncoder(res, space) - if err != nil { - return err - } - return fillInsert(enc, spaceEnc, tuple) -} - -// RefImplReplaceBody is reference implementation for filling of a replace -// request's body. -func RefImplReplaceBody(enc *msgpack.Encoder, res SchemaResolver, space, - tuple interface{}) error { - spaceEnc, err := newSpaceEncoder(res, space) - if err != nil { - return err - } - return fillInsert(enc, spaceEnc, tuple) -} - -// RefImplDeleteBody is reference implementation for filling of a delete -// request's body. -func RefImplDeleteBody(enc *msgpack.Encoder, res SchemaResolver, space, index, - key interface{}) error { - spaceEnc, err := newSpaceEncoder(res, space) - if err != nil { - return err - } - indexEnc, err := newIndexEncoder(res, index, spaceEnc.Id) - if err != nil { - return err - } - return fillDelete(enc, spaceEnc, indexEnc, key) -} - -// RefImplUpdateBody is reference implementation for filling of an update -// request's body. -func RefImplUpdateBody(enc *msgpack.Encoder, res SchemaResolver, space, index, - key interface{}, ops *Operations) error { - spaceEnc, err := newSpaceEncoder(res, space) - if err != nil { - return err - } - indexEnc, err := newIndexEncoder(res, index, spaceEnc.Id) - if err != nil { - return err - } - return fillUpdate(enc, spaceEnc, indexEnc, key, ops) -} - -// RefImplUpsertBody is reference implementation for filling of an upsert -// request's body. -func RefImplUpsertBody(enc *msgpack.Encoder, res SchemaResolver, space, - tuple interface{}, ops *Operations) error { - spaceEnc, err := newSpaceEncoder(res, space) - if err != nil { - return err - } - return fillUpsert(enc, spaceEnc, tuple, ops) -} - -// RefImplCallBody is reference implementation for filling of a call or call17 -// request's body. -func RefImplCallBody(enc *msgpack.Encoder, function string, args interface{}) error { - return fillCall(enc, function, args) -} - -// RefImplEvalBody is reference implementation for filling of an eval -// request's body. -func RefImplEvalBody(enc *msgpack.Encoder, expr string, args interface{}) error { - return fillEval(enc, expr, args) -} - -// RefImplExecuteBody is reference implementation for filling of an execute -// request's body. -func RefImplExecuteBody(enc *msgpack.Encoder, expr string, args interface{}) error { - return fillExecute(enc, expr, args) -} - -// RefImplPrepareBody is reference implementation for filling of an prepare -// request's body. -func RefImplPrepareBody(enc *msgpack.Encoder, expr string) error { - return fillPrepare(enc, expr) -} - -// RefImplUnprepareBody is reference implementation for filling of an execute prepared -// request's body. -func RefImplExecutePreparedBody(enc *msgpack.Encoder, stmt Prepared, args interface{}) error { - return fillExecutePrepared(enc, stmt, args) -} - -// RefImplUnprepareBody is reference implementation for filling of an unprepare -// request's body. -func RefImplUnprepareBody(enc *msgpack.Encoder, stmt Prepared) error { - return fillUnprepare(enc, stmt) -} - -// RefImplBeginBody is reference implementation for filling of an begin -// request's body. -func RefImplBeginBody(enc *msgpack.Encoder, txnIsolation TxnIsolationLevel, - timeout time.Duration) error { - return fillBegin(enc, txnIsolation, timeout) -} - -// RefImplCommitBody is reference implementation for filling of an commit -// request's body. -func RefImplCommitBody(enc *msgpack.Encoder) error { - return fillCommit(enc) -} - -// RefImplRollbackBody is reference implementation for filling of an rollback -// request's body. -func RefImplRollbackBody(enc *msgpack.Encoder) error { - return fillRollback(enc) -} - -// RefImplIdBody is reference implementation for filling of an id -// request's body. -func RefImplIdBody(enc *msgpack.Encoder, protocolInfo ProtocolInfo) error { - return fillId(enc, protocolInfo) -} - -// RefImplWatchOnceBody is reference implementation for filling of an watchOnce -// request's body. -func RefImplWatchOnceBody(enc *msgpack.Encoder, key string) error { - return fillWatchOnce(enc, key) -} diff --git a/future_test.go b/future_test.go index 0c8bb79cc..fbb30fe62 100644 --- a/future_test.go +++ b/future_test.go @@ -27,7 +27,7 @@ func (req *futureMockRequest) Async() bool { return false } -func (req *futureMockRequest) Body(resolver SchemaResolver, enc *msgpack.Encoder) error { +func (req *futureMockRequest) Body(_ SchemaResolver, _ *msgpack.Encoder) error { return nil } diff --git a/golden_test.go b/golden_test.go new file mode 100644 index 000000000..7cb616eb3 --- /dev/null +++ b/golden_test.go @@ -0,0 +1,167 @@ +package tarantool_test + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/tarantool/go-iproto" + "github.com/tarantool/go-tarantool/v2" + "os" + "path" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack/v5" +) + +var ( + DEBUG = false +) + +func setDebug() { + DEBUG = true +} + +func logMsgpackAsJsonConvert(t *testing.T, data []byte) { + var decodedMsgpack map[int]interface{} + + decoder := msgpack.NewDecoder(bytes.NewReader(data)) + require.NoError(t, decoder.Decode(&decodedMsgpack)) + + var encodedJson []byte + if DEBUG { + decodedConvertedMsgpack := map[string]interface{}{} + for k, v := range decodedMsgpack { + decodedConvertedMsgpack[fmt.Sprintf("%s[%d]", iproto.Key(k).String(), k)] = v + } + + var err error + encodedJson, err = json.MarshalIndent(decodedConvertedMsgpack, "", " ") + require.NoError(t, err, "failed to convert msgpack to json") + } else { + var err error + encodedJson, err = json.MarshalIndent(decodedMsgpack, "", " ") + require.NoError(t, err) + } + + for _, line := range bytes.Split(encodedJson, []byte("\n")) { + t.Log(string(line)) + } +} + +func compareGoldenMsgpack(t *testing.T, name string, data []byte) bool { + t.Helper() + + testContent, err := os.ReadFile(name) + require.NoError(t, err, "failed to read golden file", name) + + if assert.Equal(t, testContent, data, "golden file content is not equal to actual") { + if DEBUG { + logMsgpackAsJsonConvert(t, data) + } + return true + } + + t.Logf("expected:\n") + logMsgpackAsJsonConvert(t, testContent) + t.Logf("actual:\n") + logMsgpackAsJsonConvert(t, data) + + return false +} + +func fileExists(name string) bool { + _, err := os.Stat(name) + return !os.IsNotExist(err) +} + +func executeGoldenTest(t *testing.T, name string, f func(t *testing.T, enc *msgpack.Encoder)) { + t.Helper() + + t.Run(name, func(t *testing.T) { + var out bytes.Buffer + encoder := msgpack.NewEncoder(&out) + + switch f(t, encoder); { + case !fileExists(name): + t.Logf("writing golden file %s", name) + err := os.WriteFile(name, out.Bytes(), 0644) + assert.NoError(t, err, "failed to write golden file", name) + case !compareGoldenMsgpack(t, name, out.Bytes()): + // ??? + } + }) +} + +func TestGolden(t *testing.T) { + setDebug() + + var pathPrefix = "testdata/requests" + + var testCases = []struct { + name string + getter func(t *testing.T, enc *msgpack.Encoder) + }{ + { + name: "commit-raw.msgpack", + getter: func(t *testing.T, enc *msgpack.Encoder) { + commit := tarantool.NewCommitRequest() + require.NoError(t, commit.Body(nil, enc), "failed to encode request") + }, + }, + { + name: "commit-with-sync.msgpack", + getter: func(t *testing.T, enc *msgpack.Encoder) { + commit := tarantool.NewCommitRequest().IsSync(true) + require.NoError(t, commit.Body(nil, enc), "failed to encode request") + }, + }, + { + name: "commit-with-sync-false.msgpack", + getter: func(t *testing.T, enc *msgpack.Encoder) { + commit := tarantool.NewCommitRequest().IsSync(false) + require.NoError(t, commit.Body(nil, enc), "failed to encode request") + }, + }, + { + name: "begin.msgpack", + getter: func(t *testing.T, enc *msgpack.Encoder) { + begin := tarantool.NewBeginRequest() + require.NoError(t, begin.Body(nil, enc), "failed to encode request") + }, + }, + { + name: "begin-with-txn-isolation.msgpack", + getter: func(t *testing.T, enc *msgpack.Encoder) { + begin := tarantool.NewBeginRequest(). + TxnIsolation(tarantool.ReadCommittedLevel) + require.NoError(t, begin.Body(nil, enc), "failed to encode request") + }, + }, + { + name: "begin-with-txn-isolation-is-sync.msgpack", + getter: func(t *testing.T, enc *msgpack.Encoder) { + begin := tarantool.NewBeginRequest(). + TxnIsolation(tarantool.ReadCommittedLevel). + IsSync(true) + require.NoError(t, begin.Body(nil, enc), "failed to encode request") + }, + }, + { + name: "begin-with-txn-isolation-is-sync-timeout.msgpack", + getter: func(t *testing.T, enc *msgpack.Encoder) { + begin := tarantool.NewBeginRequest(). + TxnIsolation(tarantool.ReadCommittedLevel). + IsSync(true). + Timeout(2 * time.Second) + require.NoError(t, begin.Body(nil, enc), "failed to encode request") + }, + }, + } + + for _, tc := range testCases { + executeGoldenTest(t, path.Join(pathPrefix, tc.name), tc.getter) + } +} diff --git a/prepared.go b/prepared.go index 3a03d740f..6f7ace911 100644 --- a/prepared.go +++ b/prepared.go @@ -22,26 +22,6 @@ type Prepared struct { Conn *Connection } -func fillPrepare(enc *msgpack.Encoder, expr string) error { - enc.EncodeMapLen(1) - enc.EncodeUint(uint64(iproto.IPROTO_SQL_TEXT)) - return enc.EncodeString(expr) -} - -func fillUnprepare(enc *msgpack.Encoder, stmt Prepared) error { - enc.EncodeMapLen(1) - enc.EncodeUint(uint64(iproto.IPROTO_STMT_ID)) - return enc.EncodeUint(uint64(stmt.StatementID)) -} - -func fillExecutePrepared(enc *msgpack.Encoder, stmt Prepared, args interface{}) error { - enc.EncodeMapLen(2) - enc.EncodeUint(uint64(iproto.IPROTO_STMT_ID)) - enc.EncodeUint(uint64(stmt.StatementID)) - enc.EncodeUint(uint64(iproto.IPROTO_SQL_BIND)) - return encodeSQLBind(enc, args) -} - // NewPreparedFromResponse constructs a Prepared object. func NewPreparedFromResponse(conn *Connection, resp Response) (*Prepared, error) { if resp == nil { @@ -81,8 +61,16 @@ func NewPrepareRequest(expr string) *PrepareRequest { } // Body fills an msgpack.Encoder with the execute request body. -func (req *PrepareRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillPrepare(enc, req.expr) +func (req *PrepareRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(1); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_SQL_TEXT)); err != nil { + return err + } + + return enc.EncodeString(req.expr) } // Context sets a passed context to the request. @@ -126,8 +114,16 @@ func (req *UnprepareRequest) Conn() *Connection { } // Body fills an msgpack.Encoder with the execute request body. -func (req *UnprepareRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillUnprepare(enc, *req.stmt) +func (req *UnprepareRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(1); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_STMT_ID)); err != nil { + return err + } + + return enc.EncodeUint(uint64(req.stmt.StatementID)) } // Context sets a passed context to the request. @@ -171,8 +167,24 @@ func (req *ExecutePreparedRequest) Args(args interface{}) *ExecutePreparedReques } // Body fills an msgpack.Encoder with the execute request body. -func (req *ExecutePreparedRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillExecutePrepared(enc, *req.stmt, req.args) +func (req *ExecutePreparedRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(2); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_STMT_ID)); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(req.stmt.StatementID)); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_SQL_BIND)); err != nil { + return err + } + + return encodeSQLBind(enc, req.args) } // Context sets a passed context to the request. diff --git a/protocol.go b/protocol.go index 9296943ce..9f4d6a1b9 100644 --- a/protocol.go +++ b/protocol.go @@ -68,22 +68,37 @@ type IdRequest struct { protocolInfo ProtocolInfo } -func fillId(enc *msgpack.Encoder, protocolInfo ProtocolInfo) error { - enc.EncodeMapLen(2) +// NewIdRequest returns a new IdRequest. +func NewIdRequest(protocolInfo ProtocolInfo) *IdRequest { + req := new(IdRequest) + req.rtype = iproto.IPROTO_ID + req.protocolInfo = protocolInfo.Clone() + return req +} - enc.EncodeUint(uint64(iproto.IPROTO_VERSION)) - if err := enc.Encode(protocolInfo.Version); err != nil { +// Body fills an msgpack.Encoder with the id request body. +func (req *IdRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(2); err != nil { return err } - enc.EncodeUint(uint64(iproto.IPROTO_FEATURES)) + if err := enc.EncodeUint(uint64(iproto.IPROTO_VERSION)); err != nil { + return err + } - t := len(protocolInfo.Features) - if err := enc.EncodeArrayLen(t); err != nil { + if err := enc.Encode(req.protocolInfo.Version); err != nil { return err } - for _, feature := range protocolInfo.Features { + if err := enc.EncodeUint(uint64(iproto.IPROTO_FEATURES)); err != nil { + return err + } + + if err := enc.EncodeArrayLen(len(req.protocolInfo.Features)); err != nil { + return err + } + + for _, feature := range req.protocolInfo.Features { if err := enc.Encode(feature); err != nil { return err } @@ -92,19 +107,6 @@ func fillId(enc *msgpack.Encoder, protocolInfo ProtocolInfo) error { return nil } -// NewIdRequest returns a new IdRequest. -func NewIdRequest(protocolInfo ProtocolInfo) *IdRequest { - req := new(IdRequest) - req.rtype = iproto.IPROTO_ID - req.protocolInfo = protocolInfo.Clone() - return req -} - -// Body fills an msgpack.Encoder with the id request body. -func (req *IdRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillId(enc, req.protocolInfo) -} - // Context sets a passed context to the request. // // Pay attention that when using context with request objects, diff --git a/request.go b/request.go index 21ed0eba1..c18b3aeb2 100644 --- a/request.go +++ b/request.go @@ -103,160 +103,6 @@ func fillSearch(enc *msgpack.Encoder, spaceEnc spaceEncoder, indexEnc indexEncod return enc.Encode(key) } -func fillIterator(enc *msgpack.Encoder, offset, limit uint32, iterator Iter) error { - if err := enc.EncodeUint(uint64(iproto.IPROTO_ITERATOR)); err != nil { - return err - } - if err := enc.EncodeUint(uint64(iterator)); err != nil { - return err - } - if err := enc.EncodeUint(uint64(iproto.IPROTO_OFFSET)); err != nil { - return err - } - if err := enc.EncodeUint(uint64(offset)); err != nil { - return err - } - if err := enc.EncodeUint(uint64(iproto.IPROTO_LIMIT)); err != nil { - return err - } - return enc.EncodeUint(uint64(limit)) -} - -func fillInsert(enc *msgpack.Encoder, spaceEnc spaceEncoder, tuple interface{}) error { - if err := enc.EncodeMapLen(2); err != nil { - return err - } - if err := spaceEnc.Encode(enc); err != nil { - return err - } - - if err := enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)); err != nil { - return err - } - return enc.Encode(tuple) -} - -func fillSelect(enc *msgpack.Encoder, spaceEnc spaceEncoder, indexEnc indexEncoder, - offset, limit uint32, iterator Iter, key, after interface{}, fetchPos bool) error { - mapLen := 6 - if fetchPos { - mapLen += 1 - } - if after != nil { - mapLen += 1 - } - if err := enc.EncodeMapLen(mapLen); err != nil { - return err - } - if err := fillIterator(enc, offset, limit, iterator); err != nil { - return err - } - if err := fillSearch(enc, spaceEnc, indexEnc, key); err != nil { - return err - } - if fetchPos { - if err := enc.EncodeUint(uint64(iproto.IPROTO_FETCH_POSITION)); err != nil { - return err - } - if err := enc.EncodeBool(fetchPos); err != nil { - return err - } - } - if after != nil { - if pos, ok := after.([]byte); ok { - if err := enc.EncodeUint(uint64(iproto.IPROTO_AFTER_POSITION)); err != nil { - return err - } - if err := enc.EncodeString(string(pos)); err != nil { - return err - } - } else { - if err := enc.EncodeUint(uint64(iproto.IPROTO_AFTER_TUPLE)); err != nil { - return err - } - if err := enc.Encode(after); err != nil { - return err - } - } - } - return nil -} - -func fillUpdate(enc *msgpack.Encoder, spaceEnc spaceEncoder, indexEnc indexEncoder, - key interface{}, ops *Operations) error { - enc.EncodeMapLen(4) - if err := fillSearch(enc, spaceEnc, indexEnc, key); err != nil { - return err - } - enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)) - if ops == nil { - return enc.Encode([]interface{}{}) - } - return enc.Encode(ops) -} - -func fillUpsert(enc *msgpack.Encoder, spaceEnc spaceEncoder, tuple interface{}, - ops *Operations) error { - enc.EncodeMapLen(3) - if err := spaceEnc.Encode(enc); err != nil { - return err - } - - enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)) - if err := enc.Encode(tuple); err != nil { - return err - } - enc.EncodeUint(uint64(iproto.IPROTO_OPS)) - if ops == nil { - return enc.Encode([]interface{}{}) - } - return enc.Encode(ops) -} - -func fillDelete(enc *msgpack.Encoder, spaceEnc spaceEncoder, indexEnc indexEncoder, - key interface{}) error { - enc.EncodeMapLen(3) - return fillSearch(enc, spaceEnc, indexEnc, key) -} - -func fillCall(enc *msgpack.Encoder, functionName string, args interface{}) error { - enc.EncodeMapLen(2) - enc.EncodeUint(uint64(iproto.IPROTO_FUNCTION_NAME)) - enc.EncodeString(functionName) - enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)) - return enc.Encode(args) -} - -func fillEval(enc *msgpack.Encoder, expr string, args interface{}) error { - enc.EncodeMapLen(2) - enc.EncodeUint(uint64(iproto.IPROTO_EXPR)) - enc.EncodeString(expr) - enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)) - return enc.Encode(args) -} - -func fillExecute(enc *msgpack.Encoder, expr string, args interface{}) error { - enc.EncodeMapLen(2) - enc.EncodeUint(uint64(iproto.IPROTO_SQL_TEXT)) - enc.EncodeString(expr) - enc.EncodeUint(uint64(iproto.IPROTO_SQL_BIND)) - return encodeSQLBind(enc, args) -} - -func fillPing(enc *msgpack.Encoder) error { - return enc.EncodeMapLen(0) -} - -func fillWatchOnce(enc *msgpack.Encoder, key string) error { - if err := enc.EncodeMapLen(1); err != nil { - return err - } - if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil { - return err - } - return enc.EncodeString(key) -} - // Ping sends empty request to Tarantool to check connection. // // Deprecated: the method will be removed in the next major version, @@ -934,28 +780,35 @@ func (req authRequest) Ctx() context.Context { } // Body fills an encoder with the auth request body. -func (req authRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { +func (req authRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { if err := enc.EncodeMapLen(2); err != nil { return err } + if err := enc.EncodeUint32(uint32(iproto.IPROTO_USER_NAME)); err != nil { return err } + if err := enc.EncodeString(req.user); err != nil { return err } + if err := enc.EncodeUint32(uint32(iproto.IPROTO_TUPLE)); err != nil { return err } + if err := enc.EncodeArrayLen(2); err != nil { return err } + if err := enc.EncodeString(req.auth.String()); err != nil { return err } + if err := enc.EncodeString(req.pass); err != nil { return err } + return nil } @@ -978,8 +831,8 @@ func NewPingRequest() *PingRequest { } // Body fills an msgpack.Encoder with the ping request body. -func (req *PingRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillPing(enc) +func (req *PingRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + return enc.EncodeMapLen(0) } // Context sets a passed context to the request. @@ -1086,13 +939,83 @@ func (req *SelectRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { if err != nil { return err } + indexEnc, err := newIndexEncoder(res, req.index, spaceEnc.Id) if err != nil { return err } - return fillSelect(enc, spaceEnc, indexEnc, req.offset, req.limit, req.iterator, - req.key, req.after, req.fetchPos) + mapLen := 6 + if req.fetchPos { + mapLen++ + } + if req.after != nil { + mapLen++ + } + + if err := enc.EncodeMapLen(mapLen); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_ITERATOR)); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(req.iterator)); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_OFFSET)); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(req.offset)); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_LIMIT)); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(req.limit)); err != nil { + return err + } + + if err := fillSearch(enc, spaceEnc, indexEnc, req.key); err != nil { + return err + } + + if req.fetchPos { + if err := enc.EncodeUint(uint64(iproto.IPROTO_FETCH_POSITION)); err != nil { + return err + } + + if err := enc.EncodeBool(req.fetchPos); err != nil { + return err + } + } + + if req.after != nil { + if pos, ok := req.after.([]byte); ok { + if err := enc.EncodeUint(uint64(iproto.IPROTO_AFTER_POSITION)); err != nil { + return err + } + + if err := enc.EncodeString(string(pos)); err != nil { + return err + } + } else { + if err := enc.EncodeUint(uint64(iproto.IPROTO_AFTER_TUPLE)); err != nil { + return err + } + + if err := enc.Encode(req.after); err != nil { + return err + } + } + } + + return nil } // Context sets a passed context to the request. @@ -1145,7 +1068,19 @@ func (req *InsertRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { return err } - return fillInsert(enc, spaceEnc, req.tuple) + if err := enc.EncodeMapLen(2); err != nil { + return err + } + + if err := spaceEnc.Encode(enc); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)); err != nil { + return err + } + + return enc.Encode(req.tuple) } // Context sets a passed context to the request. @@ -1189,7 +1124,19 @@ func (req *ReplaceRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error return err } - return fillInsert(enc, spaceEnc, req.tuple) + if err := enc.EncodeMapLen(2); err != nil { + return err + } + + if err := spaceEnc.Encode(enc); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)); err != nil { + return err + } + + return enc.Encode(req.tuple) } // Context sets a passed context to the request. @@ -1239,12 +1186,17 @@ func (req *DeleteRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { if err != nil { return err } + indexEnc, err := newIndexEncoder(res, req.index, spaceEnc.Id) if err != nil { return err } - return fillDelete(enc, spaceEnc, indexEnc, req.key) + if err := enc.EncodeMapLen(3); err != nil { + return err + } + + return fillSearch(enc, spaceEnc, indexEnc, req.key) } // Context sets a passed context to the request. @@ -1302,12 +1254,29 @@ func (req *UpdateRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { if err != nil { return err } + indexEnc, err := newIndexEncoder(res, req.index, spaceEnc.Id) if err != nil { return err } - return fillUpdate(enc, spaceEnc, indexEnc, req.key, req.ops) + if err := enc.EncodeMapLen(4); err != nil { + return err + } + + if err := fillSearch(enc, spaceEnc, indexEnc, req.key); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)); err != nil { + return err + } + + if req.ops == nil { + return enc.EncodeArrayLen(0) + } else { + return enc.Encode(req.ops) + } } // Context sets a passed context to the request. @@ -1359,7 +1328,31 @@ func (req *UpsertRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { return err } - return fillUpsert(enc, spaceEnc, req.tuple, req.ops) + if err := enc.EncodeMapLen(3); err != nil { + return err + } + + if err := spaceEnc.Encode(enc); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)); err != nil { + return err + } + + if err := enc.Encode(req.tuple); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_OPS)); err != nil { + return err + } + + if req.ops == nil { + return enc.EncodeArrayLen(0) + } else { + return enc.Encode(req.ops) + } } // Context sets a passed context to the request. @@ -1398,12 +1391,28 @@ func (req *CallRequest) Args(args interface{}) *CallRequest { } // Body fills an encoder with the call request body. -func (req *CallRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - args := req.args - if args == nil { - args = []interface{}{} +func (req *CallRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(2); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_FUNCTION_NAME)); err != nil { + return err + } + + if err := enc.EncodeString(req.function); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)); err != nil { + return err + } + + if req.args == nil { + return enc.EncodeArrayLen(0) + } else { + return enc.Encode(req.args) } - return fillCall(enc, req.function, args) } // Context sets a passed context to the request. @@ -1459,8 +1468,28 @@ func (req *EvalRequest) Args(args interface{}) *EvalRequest { } // Body fills an msgpack.Encoder with the eval request body. -func (req *EvalRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillEval(enc, req.expr, req.args) +func (req *EvalRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(2); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_EXPR)); err != nil { + return err + } + + if err := enc.EncodeString(req.expr); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_TUPLE)); err != nil { + return err + } + + if req.args == nil { + return enc.EncodeArrayLen(0) + } else { + return enc.Encode(req.args) + } } // Context sets a passed context to the request. @@ -1499,8 +1528,24 @@ func (req *ExecuteRequest) Args(args interface{}) *ExecuteRequest { } // Body fills an msgpack.Encoder with the execute request body. -func (req *ExecuteRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillExecute(enc, req.expr, req.args) +func (req *ExecuteRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(2); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_SQL_TEXT)); err != nil { + return err + } + + if err := enc.EncodeString(req.expr); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_SQL_BIND)); err != nil { + return err + } + + return encodeSQLBind(enc, req.args) } // Context sets a passed context to the request. @@ -1539,8 +1584,16 @@ func NewWatchOnceRequest(key string) *WatchOnceRequest { } // Body fills an msgpack.Encoder with the watchOnce request body. -func (req *WatchOnceRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillWatchOnce(enc, req.key) +func (req *WatchOnceRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + if err := enc.EncodeMapLen(1); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil { + return err + } + + return enc.EncodeString(req.key) } // Context sets a passed context to the request. diff --git a/request_test.go b/request_test.go index 8ced455cd..62baffb32 100644 --- a/request_test.go +++ b/request_test.go @@ -6,13 +6,13 @@ import ( "errors" "fmt" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack/v5" + "github.com/tarantool/go-iproto" . "github.com/tarantool/go-tarantool/v2" - "github.com/vmihailenco/msgpack/v5" ) const invalidSpaceMsg = "invalid space" @@ -27,17 +27,13 @@ const validKey = "foo" // Any string. const defaultSpace uint32 = 0 // And valid too. const defaultIndex uint32 = 0 // And valid too. -const defaultIsolationLevel = DefaultIsolationLevel -const defaultTimeout = 0 - -const validTimeout = 500 * time.Millisecond - -var validStmt *Prepared = &Prepared{StatementID: 1, Conn: &Connection{}} - -var validProtocolInfo ProtocolInfo = ProtocolInfo{ - Version: ProtocolVersion(3), - Features: []iproto.Feature{iproto.IPROTO_FEATURE_STREAMS}, -} +var ( + validStmt = &Prepared{StatementID: 1, Conn: &Connection{}} + validProtocolInfo = ProtocolInfo{ + Version: ProtocolVersion(3), + Features: []iproto.Feature{iproto.IPROTO_FEATURE_STREAMS}, + } +) type ValidSchemeResolver struct { nameUseSupported bool @@ -102,38 +98,6 @@ func assertBodyCall(t testing.TB, requests []Request, errorMsg string) { } } -func assertBodyEqual(t testing.TB, reference []byte, req Request) { - t.Helper() - - var reqBuf bytes.Buffer - reqEnc := msgpack.NewEncoder(&reqBuf) - - err := req.Body(&resolver, reqEnc) - if err != nil { - t.Fatalf("An unexpected Response.Body() error: %q", err.Error()) - } - - reqBody := reqBuf.Bytes() - if !bytes.Equal(reqBody, reference) { - t.Errorf("Encoded request %v != reference %v", reqBody, reference) - } -} - -func getTestOps() *Operations { - operations := NewOperations(). - Add(1, 2). - Subtract(3, 4). - BitwiseAnd(5, 6). - BitwiseOr(7, 8). - BitwiseXor(9, 1). - BitwiseXor(9, 1). // The duplication is for test purposes. - Splice(2, 3, 1, "!!"). - Insert(4, 5). - Delete(6, 7). - Assign(8, 9) - return operations -} - func TestRequestsValidSpaceAndIndex(t *testing.T) { requests := []Request{ NewSelectRequest(validSpace), @@ -318,645 +282,6 @@ func TestRequestsCtx_setter(t *testing.T) { } } -func TestPingRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplPingBody(refEnc) - if err != nil { - t.Fatalf("An unexpected RefImplPingBody() error: %q", err.Error()) - } - - req := NewPingRequest() - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestSelectRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplSelectBody(refEnc, &resolver, validSpace, defaultIndex, 0, 0xFFFFFFFF, - IterAll, []interface{}{}, nil, false) - if err != nil { - t.Fatalf("An unexpected RefImplSelectBody() error %q", err.Error()) - } - - req := NewSelectRequest(validSpace) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestSelectRequestSpaceByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplSelectBody(refEnc, &resolver, "valid", defaultIndex, 0, 0xFFFFFFFF, - IterAll, []interface{}{}, nil, false) - if err != nil { - t.Fatalf("An unexpected RefImplSelectBody() error %q", err.Error()) - } - - req := NewSelectRequest("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestSelectRequestIndexByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplSelectBody(refEnc, &resolver, defaultSpace, "valid", 0, 0xFFFFFFFF, - IterAll, []interface{}{}, nil, false) - if err != nil { - t.Fatalf("An unexpected RefImplSelectBody() error %q", err.Error()) - } - - req := NewSelectRequest(defaultSpace) - req.Index("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestSelectRequestDefaultIteratorEqIfKey(t *testing.T) { - var refBuf bytes.Buffer - key := []interface{}{uint(18)} - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplSelectBody(refEnc, &resolver, validSpace, defaultIndex, 0, 0xFFFFFFFF, - IterEq, key, nil, false) - if err != nil { - t.Fatalf("An unexpected RefImplSelectBody() error %q", err.Error()) - } - - req := NewSelectRequest(validSpace). - Key(key) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestSelectRequestIteratorNotChangedIfKey(t *testing.T) { - var refBuf bytes.Buffer - key := []interface{}{uint(678)} - const iter = IterGe - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplSelectBody(refEnc, &resolver, validSpace, defaultIndex, 0, 0xFFFFFFFF, - iter, key, nil, false) - if err != nil { - t.Fatalf("An unexpected RefImplSelectBody() error %q", err.Error()) - } - - req := NewSelectRequest(validSpace). - Iterator(iter). - Key(key) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestSelectRequestSetters(t *testing.T) { - const offset = 4 - const limit = 5 - const iter = IterLt - key := []interface{}{uint(36)} - afterBytes := []byte{0x1, 0x2, 0x3} - afterKey := []interface{}{uint(13)} - var refBufAfterBytes, refBufAfterKey bytes.Buffer - - refEncAfterBytes := msgpack.NewEncoder(&refBufAfterBytes) - err := RefImplSelectBody(refEncAfterBytes, &resolver, validSpace, validIndex, offset, - limit, iter, key, afterBytes, true) - if err != nil { - t.Fatalf("An unexpected RefImplSelectBody() error %s", err) - } - - refEncAfterKey := msgpack.NewEncoder(&refBufAfterKey) - err = RefImplSelectBody(refEncAfterKey, &resolver, validSpace, validIndex, offset, - limit, iter, key, afterKey, true) - if err != nil { - t.Fatalf("An unexpected RefImplSelectBody() error %s", err) - } - - reqAfterBytes := NewSelectRequest(validSpace). - Index(validIndex). - Offset(offset). - Limit(limit). - Iterator(iter). - Key(key). - After(afterBytes). - FetchPos(true) - reqAfterKey := NewSelectRequest(validSpace). - Index(validIndex). - Offset(offset). - Limit(limit). - Iterator(iter). - Key(key). - After(afterKey). - FetchPos(true) - - assertBodyEqual(t, refBufAfterBytes.Bytes(), reqAfterBytes) - assertBodyEqual(t, refBufAfterKey.Bytes(), reqAfterKey) -} - -func TestInsertRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplInsertBody(refEnc, &resolver, validSpace, []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplInsertBody() error: %q", err.Error()) - } - - req := NewInsertRequest(validSpace) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestInsertRequestSpaceByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplInsertBody(refEnc, &resolver, "valid", []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplInsertBody() error: %q", err.Error()) - } - - req := NewInsertRequest("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestInsertRequestSetters(t *testing.T) { - tuple := []interface{}{uint(24)} - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplInsertBody(refEnc, &resolver, validSpace, tuple) - if err != nil { - t.Fatalf("An unexpected RefImplInsertBody() error: %q", err.Error()) - } - - req := NewInsertRequest(validSpace). - Tuple(tuple) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestReplaceRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplReplaceBody(refEnc, &resolver, validSpace, []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplReplaceBody() error: %q", err.Error()) - } - - req := NewReplaceRequest(validSpace) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestReplaceRequestSpaceByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplReplaceBody(refEnc, &resolver, "valid", []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplReplaceBody() error: %q", err.Error()) - } - - req := NewReplaceRequest("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestReplaceRequestSetters(t *testing.T) { - tuple := []interface{}{uint(99)} - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplReplaceBody(refEnc, &resolver, validSpace, tuple) - if err != nil { - t.Fatalf("An unexpected RefImplReplaceBody() error: %q", err.Error()) - } - - req := NewReplaceRequest(validSpace). - Tuple(tuple) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestDeleteRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplDeleteBody(refEnc, &resolver, validSpace, defaultIndex, []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplDeleteBody() error: %q", err.Error()) - } - - req := NewDeleteRequest(validSpace) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestDeleteRequestSpaceByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplDeleteBody(refEnc, &resolver, "valid", defaultIndex, []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplDeleteBody() error: %q", err.Error()) - } - - req := NewDeleteRequest("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestDeleteRequestIndexByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplDeleteBody(refEnc, &resolver, defaultSpace, "valid", []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplDeleteBody() error: %q", err.Error()) - } - - req := NewDeleteRequest(defaultSpace) - req.Index("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestDeleteRequestSetters(t *testing.T) { - key := []interface{}{uint(923)} - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplDeleteBody(refEnc, &resolver, validSpace, validIndex, key) - if err != nil { - t.Fatalf("An unexpected RefImplDeleteBody() error: %q", err.Error()) - } - - req := NewDeleteRequest(validSpace). - Index(validIndex). - Key(key) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestUpdateRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplUpdateBody(refEnc, &resolver, validSpace, defaultIndex, - []interface{}{}, nil) - if err != nil { - t.Fatalf("An unexpected RefImplUpdateBody() error: %q", err.Error()) - } - - req := NewUpdateRequest(validSpace) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestUpdateRequestSpaceByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplUpdateBody(refEnc, &resolver, "valid", defaultIndex, - []interface{}{}, nil) - if err != nil { - t.Fatalf("An unexpected RefImplUpdateBody() error: %q", err.Error()) - } - - req := NewUpdateRequest("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestUpdateRequestIndexByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplUpdateBody(refEnc, &resolver, defaultSpace, "valid", - []interface{}{}, nil) - if err != nil { - t.Fatalf("An unexpected RefImplUpdateBody() error: %q", err.Error()) - } - - req := NewUpdateRequest(defaultSpace) - req.Index("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestUpdateRequestSetters(t *testing.T) { - key := []interface{}{uint(44)} - reqOps := getTestOps() - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplUpdateBody(refEnc, &resolver, validSpace, validIndex, key, reqOps) - if err != nil { - t.Fatalf("An unexpected RefImplUpdateBody() error: %q", err.Error()) - } - - req := NewUpdateRequest(validSpace). - Index(validIndex). - Key(key). - Operations(reqOps) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestUpsertRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplUpsertBody(refEnc, &resolver, validSpace, []interface{}{}, nil) - if err != nil { - t.Fatalf("An unexpected RefImplUpsertBody() error: %q", err.Error()) - } - - req := NewUpsertRequest(validSpace) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestUpsertRequestSpaceByName(t *testing.T) { - var refBuf bytes.Buffer - - resolver.nameUseSupported = true - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplUpsertBody(refEnc, &resolver, "valid", []interface{}{}, nil) - if err != nil { - t.Fatalf("An unexpected RefImplUpsertBody() error: %q", err.Error()) - } - - req := NewUpsertRequest("valid") - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestUpsertRequestSetters(t *testing.T) { - tuple := []interface{}{uint(64)} - reqOps := getTestOps() - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplUpsertBody(refEnc, &resolver, validSpace, tuple, reqOps) - if err != nil { - t.Fatalf("An unexpected RefImplUpsertBody() error: %q", err.Error()) - } - - req := NewUpsertRequest(validSpace). - Tuple(tuple). - Operations(reqOps) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestCallRequestsDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplCallBody(refEnc, validExpr, []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplCallBody() error: %q", err.Error()) - } - - req := NewCallRequest(validExpr) - req16 := NewCall16Request(validExpr) - req17 := NewCall17Request(validExpr) - assertBodyEqual(t, refBuf.Bytes(), req) - assertBodyEqual(t, refBuf.Bytes(), req16) - assertBodyEqual(t, refBuf.Bytes(), req17) -} - -func TestCallRequestsSetters(t *testing.T) { - args := []interface{}{uint(34)} - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplCallBody(refEnc, validExpr, args) - if err != nil { - t.Fatalf("An unexpected RefImplCallBody() error: %q", err.Error()) - } - - req := NewCallRequest(validExpr). - Args(args) - req16 := NewCall16Request(validExpr). - Args(args) - req17 := NewCall17Request(validExpr). - Args(args) - assertBodyEqual(t, refBuf.Bytes(), req) - assertBodyEqual(t, refBuf.Bytes(), req16) - assertBodyEqual(t, refBuf.Bytes(), req17) -} - -func TestEvalRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplEvalBody(refEnc, validExpr, []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplEvalBody() error: %q", err.Error()) - } - - req := NewEvalRequest(validExpr) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestEvalRequestSetters(t *testing.T) { - args := []interface{}{uint(34), int(12)} - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplEvalBody(refEnc, validExpr, args) - if err != nil { - t.Fatalf("An unexpected RefImplEvalBody() error: %q", err.Error()) - } - - req := NewEvalRequest(validExpr). - Args(args) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestExecuteRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplExecuteBody(refEnc, validExpr, []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplExecuteBody() error: %q", err.Error()) - } - - req := NewExecuteRequest(validExpr) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestExecuteRequestSetters(t *testing.T) { - args := []interface{}{uint(11)} - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplExecuteBody(refEnc, validExpr, args) - if err != nil { - t.Fatalf("An unexpected RefImplExecuteBody() error: %q", err.Error()) - } - - req := NewExecuteRequest(validExpr). - Args(args) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestPrepareRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplPrepareBody(refEnc, validExpr) - if err != nil { - t.Fatalf("An unexpected RefImplPrepareBody() error: %q", err.Error()) - } - - req := NewPrepareRequest(validExpr) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestUnprepareRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplUnprepareBody(refEnc, *validStmt) - if err != nil { - t.Fatalf("An unexpected RefImplUnprepareBody() error: %q", err.Error()) - } - - req := NewUnprepareRequest(validStmt) - assert.Equal(t, req.Conn(), validStmt.Conn) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestExecutePreparedRequestSetters(t *testing.T) { - args := []interface{}{uint(11)} - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplExecutePreparedBody(refEnc, *validStmt, args) - if err != nil { - t.Fatalf("An unexpected RefImplExecutePreparedBody() error: %q", err.Error()) - } - - req := NewExecutePreparedRequest(validStmt). - Args(args) - assert.Equal(t, req.Conn(), validStmt.Conn) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestExecutePreparedRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplExecutePreparedBody(refEnc, *validStmt, []interface{}{}) - if err != nil { - t.Fatalf("An unexpected RefImplExecutePreparedBody() error: %q", err.Error()) - } - - req := NewExecutePreparedRequest(validStmt) - assert.Equal(t, req.Conn(), validStmt.Conn) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestBeginRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplBeginBody(refEnc, defaultIsolationLevel, defaultTimeout) - if err != nil { - t.Fatalf("An unexpected RefImplBeginBody() error: %q", err.Error()) - } - - req := NewBeginRequest() - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestBeginRequestSetters(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplBeginBody(refEnc, ReadConfirmedLevel, validTimeout) - if err != nil { - t.Fatalf("An unexpected RefImplBeginBody() error: %q", err.Error()) - } - - req := NewBeginRequest().TxnIsolation(ReadConfirmedLevel).Timeout(validTimeout) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestCommitRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplCommitBody(refEnc) - if err != nil { - t.Fatalf("An unexpected RefImplCommitBody() error: %q", err.Error()) - } - - req := NewCommitRequest() - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestRollbackRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplRollbackBody(refEnc) - if err != nil { - t.Fatalf("An unexpected RefImplRollbackBody() error: %q", err.Error()) - } - - req := NewRollbackRequest() - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestBroadcastRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - expectedArgs := []interface{}{validKey} - err := RefImplCallBody(refEnc, "box.broadcast", expectedArgs) - if err != nil { - t.Fatalf("An unexpected RefImplCallBody() error: %q", err.Error()) - } - - req := NewBroadcastRequest(validKey) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestBroadcastRequestSetters(t *testing.T) { - value := []interface{}{uint(34), int(12)} - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - expectedArgs := []interface{}{validKey, value} - err := RefImplCallBody(refEnc, "box.broadcast", expectedArgs) - if err != nil { - t.Fatalf("An unexpected RefImplCallBody() error: %q", err.Error()) - } - - req := NewBroadcastRequest(validKey).Value(value) - assertBodyEqual(t, refBuf.Bytes(), req) -} - -func TestWatchOnceRequestDefaultValues(t *testing.T) { - var refBuf bytes.Buffer - - refEnc := msgpack.NewEncoder(&refBuf) - err := RefImplWatchOnceBody(refEnc, validKey) - if err != nil { - t.Fatalf("An unexpected RefImplCallBody() error: %q", err.Error()) - } - - req := NewWatchOnceRequest(validKey) - assertBodyEqual(t, refBuf.Bytes(), req) -} - func TestResponseDecode(t *testing.T) { header := Header{} data := bytes.NewBuffer([]byte{'v', '2'}) diff --git a/stream.go b/stream.go index 43e80fc28..cdabf12fa 100644 --- a/stream.go +++ b/stream.go @@ -35,57 +35,6 @@ type Stream struct { Conn *Connection } -func fillBegin(enc *msgpack.Encoder, txnIsolation TxnIsolationLevel, timeout time.Duration) error { - hasTimeout := timeout > 0 - hasIsolationLevel := txnIsolation != DefaultIsolationLevel - mapLen := 0 - if hasTimeout { - mapLen += 1 - } - if hasIsolationLevel { - mapLen += 1 - } - - err := enc.EncodeMapLen(mapLen) - if err != nil { - return err - } - - if hasTimeout { - err = enc.EncodeUint(uint64(iproto.IPROTO_TIMEOUT)) - if err != nil { - return err - } - - err = enc.Encode(timeout.Seconds()) - if err != nil { - return err - } - } - - if hasIsolationLevel { - err = enc.EncodeUint(uint64(iproto.IPROTO_TXN_ISOLATION)) - if err != nil { - return err - } - - err = enc.EncodeUint(uint64(txnIsolation)) - if err != nil { - return err - } - } - - return err -} - -func fillCommit(enc *msgpack.Encoder) error { - return enc.EncodeMapLen(0) -} - -func fillRollback(enc *msgpack.Encoder) error { - return enc.EncodeMapLen(0) -} - // BeginRequest helps you to create a begin request object for execution // by a Stream. // Begin request can not be processed out of stream. @@ -93,6 +42,8 @@ type BeginRequest struct { baseRequest txnIsolation TxnIsolationLevel timeout time.Duration + isSync bool + isSyncSet bool } // NewBeginRequest returns a new BeginRequest. @@ -110,15 +61,74 @@ func (req *BeginRequest) TxnIsolation(txnIsolation TxnIsolationLevel) *BeginRequ return req } -// WithTimeout allows to set up a timeout for call BeginRequest. +// Timeout allows to set up a timeout for call BeginRequest. func (req *BeginRequest) Timeout(timeout time.Duration) *BeginRequest { req.timeout = timeout return req } +// IsSync allows to set up a IsSync flag for call BeginRequest. +func (req *BeginRequest) IsSync(isSync bool) *BeginRequest { + req.isSync = isSync + req.isSyncSet = true + return req +} + // Body fills an msgpack.Encoder with the begin request body. -func (req *BeginRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillBegin(enc, req.txnIsolation, req.timeout) +func (req *BeginRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + var ( + mapLen = 0 + hasTimeout = req.timeout > 0 + hasIsolationLevel = req.txnIsolation != DefaultIsolationLevel + ) + + if hasTimeout { + mapLen++ + } + + if hasIsolationLevel { + mapLen++ + } + + if req.isSyncSet { + mapLen++ + } + + if err := enc.EncodeMapLen(mapLen); err != nil { + return err + } + + if hasTimeout { + if err := enc.EncodeUint(uint64(iproto.IPROTO_TIMEOUT)); err != nil { + return err + } + + if err := enc.Encode(req.timeout.Seconds()); err != nil { + return err + } + } + + if hasIsolationLevel { + if err := enc.EncodeUint(uint64(iproto.IPROTO_TXN_ISOLATION)); err != nil { + return err + } + + if err := enc.EncodeUint(uint64(req.txnIsolation)); err != nil { + return err + } + } + + if req.isSyncSet { + if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil { + return err + } + + if err := enc.EncodeBool(req.isSync); err != nil { + return err + } + } + + return nil } // Context sets a passed context to the request. @@ -137,6 +147,9 @@ func (req *BeginRequest) Context(ctx context.Context) *BeginRequest { // Commit request can not be processed out of stream. type CommitRequest struct { baseRequest + + isSync bool + isSyncSet bool } // NewCommitRequest returns a new CommitRequest. @@ -146,9 +159,38 @@ func NewCommitRequest() *CommitRequest { return req } +// IsSync allows to set up a IsSync flag for call BeginRequest. +func (req *CommitRequest) IsSync(isSync bool) *CommitRequest { + req.isSync = isSync + req.isSyncSet = true + return req +} + // Body fills an msgpack.Encoder with the commit request body. -func (req *CommitRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillCommit(enc) +func (req *CommitRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + var ( + mapLen = 0 + ) + + if req.isSyncSet { + mapLen++ + } + + if err := enc.EncodeMapLen(mapLen); err != nil { + return err + } + + if req.isSyncSet { + if err := enc.EncodeUint(uint64(iproto.IPROTO_IS_SYNC)); err != nil { + return err + } + + if err := enc.EncodeBool(req.isSync); err != nil { + return err + } + } + + return nil } // Context sets a passed context to the request. @@ -177,8 +219,8 @@ func NewRollbackRequest() *RollbackRequest { } // Body fills an msgpack.Encoder with the rollback request body. -func (req *RollbackRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { - return fillRollback(enc) +func (req *RollbackRequest) Body(_ SchemaResolver, enc *msgpack.Encoder) error { + return enc.EncodeMapLen(0) } // Context sets a passed context to the request. diff --git a/tarantool_test.go b/tarantool_test.go index 14bb63ef7..4902e96f4 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -4199,6 +4199,75 @@ func TestFdDialer(t *testing.T) { require.Equal(t, int8(0), resp[0]) } +const ( + errNoSyncTransactionQueue = "The synchronous transaction queue doesn't belong to any instance" +) + +func TestDoBeginRequest_IsSync(t *testing.T) { + test_helpers.SkipIfIsSyncUnsupported(t) + + conn := test_helpers.ConnectWithValidation(t, dialer, opts) + defer conn.Close() + + stream, err := conn.NewStream() + require.NoError(t, err) + + _, err = stream.Do(NewBeginRequest().IsSync(true)).Get() + assert.Nil(t, err) + + _, err = stream.Do( + NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}), + ).Get() + require.Nil(t, err) + + _, err = stream.Do(NewCommitRequest()).Get() + require.NotNil(t, err) + assert.Contains(t, err.Error(), errNoSyncTransactionQueue) +} + +func TestDoCommitRequest_IsSync(t *testing.T) { + test_helpers.SkipIfIsSyncUnsupported(t) + + conn := test_helpers.ConnectWithValidation(t, dialer, opts) + defer conn.Close() + + stream, err := conn.NewStream() + require.NoError(t, err) + + _, err = stream.Do(NewBeginRequest()).Get() + require.Nil(t, err) + + _, err = stream.Do( + NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}), + ).Get() + require.Nil(t, err) + + _, err = stream.Do(NewCommitRequest().IsSync(true)).Get() + require.NotNil(t, err) + assert.Contains(t, err.Error(), errNoSyncTransactionQueue) +} + +func TestDoCommitRequest_NoSync(t *testing.T) { + test_helpers.SkipIfIsSyncUnsupported(t) + + conn := test_helpers.ConnectWithValidation(t, dialer, opts) + defer conn.Close() + + stream, err := conn.NewStream() + require.NoError(t, err) + + _, err = stream.Do(NewBeginRequest()).Get() + require.Nil(t, err) + + _, err = stream.Do( + NewReplaceRequest("test").Tuple([]interface{}{1, "foo"}), + ).Get() + require.Nil(t, err) + + _, err = stream.Do(NewCommitRequest()).Get() + assert.Nil(t, err) +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body diff --git a/test_helpers/utils.go b/test_helpers/utils.go index d5a65be10..579f507c9 100644 --- a/test_helpers/utils.go +++ b/test_helpers/utils.go @@ -217,6 +217,14 @@ func SkipIfCrudSpliceBroken(t *testing.T) { SkipIfFeatureUnsupported(t, "crud update splice", 2, 0, 0) } +// SkipIfIsSyncUnsupported skips test run if Tarantool without +// IS_SYNC support is used. +func SkipIfIsSyncUnsupported(t *testing.T) { + t.Helper() + + SkipIfFeatureUnsupported(t, "is sync", 3, 1, 0) +} + // IsTcsSupported checks if Tarantool supports centralized storage. // Tarantool supports centralized storage with Enterprise since 3.3.0 version. func IsTcsSupported() (bool, error) { diff --git a/testdata/requests/begin-with-txn-isolation-is-sync-timeout.msgpack b/testdata/requests/begin-with-txn-isolation-is-sync-timeout.msgpack new file mode 100644 index 000000000..dc04ebcd0 Binary files /dev/null and b/testdata/requests/begin-with-txn-isolation-is-sync-timeout.msgpack differ diff --git a/testdata/requests/begin-with-txn-isolation-is-sync.msgpack b/testdata/requests/begin-with-txn-isolation-is-sync.msgpack new file mode 100644 index 000000000..962d8b233 --- /dev/null +++ b/testdata/requests/begin-with-txn-isolation-is-sync.msgpack @@ -0,0 +1 @@ +Ya \ No newline at end of file diff --git a/testdata/requests/begin-with-txn-isolation.msgpack b/testdata/requests/begin-with-txn-isolation.msgpack new file mode 100644 index 000000000..bce57fe8c --- /dev/null +++ b/testdata/requests/begin-with-txn-isolation.msgpack @@ -0,0 +1 @@ +Y \ No newline at end of file diff --git a/testdata/requests/begin.msgpack b/testdata/requests/begin.msgpack new file mode 100644 index 000000000..5416677bc --- /dev/null +++ b/testdata/requests/begin.msgpack @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/testdata/requests/commit-raw.msgpack b/testdata/requests/commit-raw.msgpack new file mode 100644 index 000000000..5416677bc --- /dev/null +++ b/testdata/requests/commit-raw.msgpack @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/testdata/requests/commit-with-sync-false.msgpack b/testdata/requests/commit-with-sync-false.msgpack new file mode 100644 index 000000000..1c1f8b153 --- /dev/null +++ b/testdata/requests/commit-with-sync-false.msgpack @@ -0,0 +1 @@ +a \ No newline at end of file diff --git a/testdata/requests/commit-with-sync.msgpack b/testdata/requests/commit-with-sync.msgpack new file mode 100644 index 000000000..118311ccd --- /dev/null +++ b/testdata/requests/commit-with-sync.msgpack @@ -0,0 +1 @@ +a \ No newline at end of file diff --git a/watch.go b/watch.go index 9f1273134..9628b96ac 100644 --- a/watch.go +++ b/watch.go @@ -84,9 +84,11 @@ func (req *watchRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { if err := enc.EncodeMapLen(1); err != nil { return err } + if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil { return err } + return enc.EncodeString(req.key) } @@ -118,9 +120,11 @@ func (req *unwatchRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error if err := enc.EncodeMapLen(1); err != nil { return err } + if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil { return err } + return enc.EncodeString(req.key) }