Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ They have almost same semantics with [Spanner JDBC properties](https://cloud.goo
| MAX_COMMIT_DELAY | READ_WRITE | `"500ms"` |
| AUTOCOMMIT_DML_MODE | READ_WRITE | `"PARTITIONED_NON_ATOMIC"` |
| MAX_PARTITIONED_PARALLELISM | READ_WRITE | `4` |
| DEFAULT_ISOLATION_LEVEL | READ_WRITE | `REPEATABLE_READ` |

#### spanner-mycli original variables

Expand Down
1 change: 1 addition & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func (s *Session) BeginReadWriteTransaction(ctx context.Context, priority sppb.R
CommitPriority: priority,
TransactionTag: tag,
ExcludeTxnFromChangeStreams: s.systemVariables.ExcludeTxnFromChangeStreams,
IsolationLevel: s.systemVariables.DefaultIsolationLevel,
}

txn, err := spanner.NewReadWriteStmtBasedTransactionWithOptions(ctx, s.client, opts)
Expand Down
91 changes: 91 additions & 0 deletions session_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,97 @@ func TestRequestPriority(t *testing.T) {
}
}

func TestIsolationLevel(t *testing.T) {
ctx := t.Context()

emulator, teardown, err := spanemuboost.NewEmulator(ctx,
spanemuboost.WithProjectID(project),
spanemuboost.WithInstanceID(instance),
spanemuboost.WithDatabaseID(database),
spanemuboost.WithSetupDDLs(sliceOf("CREATE TABLE t1 (Id INT64) PRIMARY KEY (Id)")),
)
if err != nil {
t.Fatalf("failed to start emulator: %v", err)
}
defer teardown()

var recorder requestRecorder
unaryInterceptor, streamInterceptor := recordRequestsInterceptors(&recorder)
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
}

conn, err := grpc.NewClient(emulator.URI, opts...)
if err != nil {
t.Fatalf("failed to dial: %v", err)
}

for _, test := range []struct {
desc string
defaultIsolationLevel sppb.TransactionOptions_IsolationLevel
want sppb.TransactionOptions_IsolationLevel
}{
{
desc: "use default isolation level",
defaultIsolationLevel: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
want: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
},
{
desc: "use serializable isolation level",
defaultIsolationLevel: sppb.TransactionOptions_SERIALIZABLE,
want: sppb.TransactionOptions_SERIALIZABLE,
},
{
desc: "use repeatable_read isolation level",
defaultIsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
want: sppb.TransactionOptions_REPEATABLE_READ,
},
} {
t.Run(test.desc, func(t *testing.T) {
defer recorder.flush()

session, err := NewSession(ctx, &systemVariables{
Project: project,
Instance: instance,
Database: database,
DefaultIsolationLevel: test.defaultIsolationLevel,
}, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("failed to create spanner-cli session: %v", err)
}

// Read-Write Transaction.
if err := session.BeginReadWriteTransaction(ctx, sppb.RequestOptions_PRIORITY_UNSPECIFIED); err != nil {
t.Fatalf("failed to begin read write transaction: %v", err)
}
iter, _ := session.RunQuery(ctx, spanner.NewStatement("SELECT * FROM t1"))
if err := iter.Do(func(r *spanner.Row) error {
return nil
}); err != nil {
t.Fatalf("failed to run query: %v", err)
}
if _, _, _, _, err := session.RunUpdate(ctx, spanner.NewStatement("DELETE FROM t1 WHERE Id = 1"), true); err != nil {
t.Fatalf("failed to run update: %v", err)
}
if _, err := session.CommitReadWriteTransaction(ctx); err != nil {
t.Fatalf("failed to commit: %v", err)
}

// Check request priority.
for _, r := range recorder.requests {
switch v := r.(type) {
case *sppb.BeginTransactionRequest:
if got := v.GetOptions().GetIsolationLevel(); got != test.want {
t.Errorf("transaction level mismatch: got = %v, want = %v", got, test.want)
}
}
}
})
}
}

// requestRecorder is a recorder to retain gRPC requests for spannertest.Server.
type requestRecorder struct {
requests []interface{}
Expand Down
20 changes: 20 additions & 0 deletions system_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type systemVariables struct {
MaxPartitionedParallelism int64 // MAX_PARTITIONED_PARALLELISM
AutocommitDMLMode AutocommitDMLMode // AUTOCOMMIT_DML_MODE

DefaultIsolationLevel sppb.TransactionOptions_IsolationLevel // DEFAULT_ISOLATION_LEVEL

// CLI_* variables

CLIFormat DisplayMode // CLI_FORMAT
Expand Down Expand Up @@ -215,6 +217,24 @@ var systemVariableDefMap = map[string]systemVariableDef{
Getter: boolGetter(func(sysVars *systemVariables) *bool { return &sysVars.ReadOnly }),
},
},
"DEFAULT_ISOLATION_LEVEL": {
Description: "The transaction isolation level that is used by default for read/write transactions.",
Accessor: accessor{
Setter: func(this *systemVariables, name, value string) error {
v := strings.Join(strings.Fields(strings.ToUpper(unquoteString(value))), "_")
isolation, ok := sppb.TransactionOptions_IsolationLevel_value[v]
if ok {
this.DefaultIsolationLevel = sppb.TransactionOptions_IsolationLevel(isolation)
} else {
return fmt.Errorf("invalid isolation level: %v", v)
}
return nil
},
Getter: func(this *systemVariables, name string) (map[string]string, error) {
return singletonMap(name, this.DefaultIsolationLevel.String()), nil
},
},
},
"AUTO_PARTITION_MODE": {
Description: "A property of type BOOL indicating whether the connection automatically uses partitioned queries for all queries that are executed.",
Accessor: boolAccessor(func(variables *systemVariables) *bool {
Expand Down
33 changes: 32 additions & 1 deletion system_variables_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package main

import "testing"
import (
"testing"

sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
)

func TestSystemVariables_AddCLIProtoDescriptorFile(t *testing.T) {
// TODO: More test
Expand All @@ -23,3 +27,30 @@ func TestSystemVariables_AddCLIProtoDescriptorFile(t *testing.T) {
})
}
}

func TestSystemVariables_DefaultIsolationLevel(t *testing.T) {
// TODO: More test
tests := []struct {
value string
want sppb.TransactionOptions_IsolationLevel
}{
{"REPEATABLE READ", sppb.TransactionOptions_REPEATABLE_READ},
{"repeatable read", sppb.TransactionOptions_REPEATABLE_READ},
{"REPEATABLE_READ", sppb.TransactionOptions_REPEATABLE_READ},
{"repeatable_read", sppb.TransactionOptions_REPEATABLE_READ},
{"serializable", sppb.TransactionOptions_SERIALIZABLE},
{"SERIALIZABLE", sppb.TransactionOptions_SERIALIZABLE},
}
for _, test := range tests {
t.Run(test.value, func(t *testing.T) {
var sysVars systemVariables
if err := sysVars.Set("DEFAULT_ISOLATION_LEVEL", test.value); err != nil {
t.Errorf("should success, but failed, value: %v, err: %v", test.value, err)
}

if sysVars.DefaultIsolationLevel != test.want {
t.Errorf("DefaultIsolationLevel should be %v, but %v", test.want, sysVars.DefaultIsolationLevel)
}
})
}
}