Skip to content

Commit bc7e608

Browse files
authored
Merge pull request #351 from maxekman/feature-109/persistent-scheduled-commands
109 / Add persistance to command scheduler
2 parents c4089ff + c1fd089 commit bc7e608

16 files changed

+921
-99
lines changed

Makefile

+5-5
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,27 @@ lint:
66

77
.PHONY: test
88
test:
9-
go test -v -race -short ./...
9+
go test -race -short ./...
1010

1111
.PHONY: test_cover
1212
test_cover:
13-
go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -v -race -short -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c
13+
go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -race -short -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c
1414
go run ./hack/coverage/coverage.go . unit.coverprofile
1515
@find . -name \.coverprofile -type f -delete
1616

1717
.PHONY: test_integration
1818
test_integration:
19-
go test -v -race -run Integration ./...
19+
go test -race -run Integration ./...
2020

2121
.PHONY: test_integration_cover
2222
test_integration_cover:
23-
go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -v -race -run Integration -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c
23+
go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -race -run Integration -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c
2424
go run ./hack/coverage/coverage.go . integration.coverprofile
2525
@find . -name \.coverprofile -type f -delete
2626

2727
.PHONY: test_loadtest
2828
test_loadtest:
29-
go test -race -v -run Loadtest ./...
29+
go test -race -run Loadtest ./...
3030

3131
.PHONY: test_all_docker
3232
test_all_docker:

codec.go

+8
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,11 @@ type EventCodec interface {
2323
// UnmarshalEvent unmarshals an event and supported parts of context from bytes.
2424
UnmarshalEvent(context.Context, []byte) (Event, context.Context, error)
2525
}
26+
27+
// CommandCodec is a codec for marshaling and unmarshaling commands to and from bytes.
28+
type CommandCodec interface {
29+
// MarshalCommand marshals a command and the supported parts of context into bytes.
30+
MarshalCommand(context.Context, Command) ([]byte, error)
31+
// UnmarshalCommand unmarshals a command and supported parts of context from bytes.
32+
UnmarshalCommand(context.Context, []byte) (Command, context.Context, error)
33+
}

codec/acceptance_testing.go

+91-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package codec
1616

1717
import (
1818
"context"
19+
"reflect"
1920
"testing"
2021
"time"
2122

@@ -26,21 +27,27 @@ import (
2627

2728
func init() {
2829
eh.RegisterEventData(EventType, func() eh.EventData { return &EventData{} })
30+
31+
eh.RegisterCommand(func() eh.Command { return &Command{} })
2932
}
3033

3134
const (
3235
// EventType is a the type for Event.
3336
EventType eh.EventType = "CodecEvent"
37+
// AggregateType is the type for Aggregate.
38+
AggregateType eh.AggregateType = "CodecAggregate"
39+
// CommandType is the type for Command.
40+
CommandType eh.CommandType = "CodecCommand"
3441
)
3542

3643
// EventCodecAcceptanceTest is the acceptance test that all implementations of
37-
// Codec should pass. It should manually be called from a test case in each
44+
// EventCodec should pass. It should manually be called from a test case in each
3845
// implementation:
3946
//
4047
// func TestEventCodec(t *testing.T) {
4148
// c := EventCodec{}
4249
// expectedBytes = []byte("")
43-
// eventbus.AcceptanceTest(t, c, expectedBytes)
50+
// codec.EventCodecAcceptanceTest(t, c, expectedBytes)
4451
// }
4552
//
4653
func EventCodecAcceptanceTest(t *testing.T, c eh.EventCodec, expectedBytes []byte) {
@@ -117,3 +124,85 @@ type Nested struct {
117124
String string
118125
Number float64
119126
}
127+
128+
// CommandCodecAcceptanceTest is the acceptance test that all implementations of
129+
// CommandCodec should pass. It should manually be called from a test case in each
130+
// implementation:
131+
//
132+
// func TestCommandCodec(t *testing.T) {
133+
// c := CommandCodec{}
134+
// expectedBytes = []byte("")
135+
// codec.CommandCodecAcceptanceTest(t, c, expectedBytes)
136+
// }
137+
//
138+
func CommandCodecAcceptanceTest(t *testing.T, c eh.CommandCodec, expectedBytes []byte) {
139+
// Marshaling.
140+
ctx := mocks.WithContextOne(context.Background(), "testval")
141+
id := uuid.MustParse("10a7ec0f-7f2b-46f5-bca1-877b6e33c9fd")
142+
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
143+
cmd := &Command{
144+
ID: id,
145+
Bool: true,
146+
String: "string",
147+
Number: 42.0,
148+
Slice: []string{"a", "b"},
149+
Map: map[string]interface{}{"key": "value"}, // NOTE: Just one key to avoid compare issues.
150+
Time: timestamp,
151+
TimeRef: &timestamp,
152+
Struct: Nested{
153+
Bool: true,
154+
String: "string",
155+
Number: 42.0,
156+
},
157+
StructRef: &Nested{
158+
Bool: true,
159+
String: "string",
160+
Number: 42.0,
161+
},
162+
}
163+
164+
b, err := c.MarshalCommand(ctx, cmd)
165+
if err != nil {
166+
t.Error("there should be no error:", err)
167+
}
168+
169+
if string(b) != string(expectedBytes) {
170+
t.Error("the encoded bytes should be correct:", string(b))
171+
}
172+
173+
// Unmarshaling.
174+
decodedCmd, decodedContext, err := c.UnmarshalCommand(context.Background(), b)
175+
if err != nil {
176+
t.Error("there should be no error:", err)
177+
}
178+
179+
if !reflect.DeepEqual(decodedCmd, cmd) {
180+
t.Error("the decoded command was incorrect:", err)
181+
}
182+
183+
if val, ok := mocks.ContextOne(decodedContext); !ok || val != "testval" {
184+
t.Error("the decoded context was incorrect:", decodedContext)
185+
}
186+
}
187+
188+
// Command is a mocked eventhorizon.Command, useful in testing.
189+
type Command struct {
190+
ID uuid.UUID
191+
Bool bool
192+
String string
193+
Number float64
194+
Slice []string
195+
Map map[string]interface{}
196+
Time time.Time
197+
TimeRef *time.Time
198+
NullTime *time.Time
199+
Struct Nested
200+
StructRef *Nested
201+
NullStruct *Nested
202+
}
203+
204+
var _ = eh.Command(&Command{})
205+
206+
func (t *Command) AggregateID() uuid.UUID { return t.ID }
207+
func (t *Command) AggregateType() eh.AggregateType { return AggregateType }
208+
func (t *Command) CommandType() eh.CommandType { return CommandType }

codec/bson/command.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) 2021 - The Event Horizon authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bson
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"go.mongodb.org/mongo-driver/bson"
22+
23+
eh "github.com/looplab/eventhorizon"
24+
)
25+
26+
// CommandCodec is a codec for marshaling and unmarshaling commands
27+
// to and from bytes in BSON format.
28+
type CommandCodec struct{}
29+
30+
// MarshalCommand marshals a command into bytes in BSON format.
31+
func (_ CommandCodec) MarshalCommand(ctx context.Context, cmd eh.Command) ([]byte, error) {
32+
c := command{
33+
CommandType: cmd.CommandType(),
34+
Context: eh.MarshalContext(ctx),
35+
}
36+
37+
var err error
38+
if c.Command, err = bson.Marshal(cmd); err != nil {
39+
return nil, fmt.Errorf("could not marshal command data: %w", err)
40+
}
41+
42+
b, err := bson.Marshal(c)
43+
if err != nil {
44+
return nil, fmt.Errorf("could not marshal command: %w", err)
45+
}
46+
47+
return b, nil
48+
}
49+
50+
// UnmarshalCommand unmarshals a command from bytes in BSON format.
51+
func (_ CommandCodec) UnmarshalCommand(ctx context.Context, b []byte) (eh.Command, context.Context, error) {
52+
var c command
53+
if err := bson.Unmarshal(b, &c); err != nil {
54+
return nil, nil, fmt.Errorf("could not unmarshal command: %w", err)
55+
}
56+
57+
cmd, err := eh.CreateCommand(c.CommandType)
58+
if err != nil {
59+
return nil, nil, fmt.Errorf("could not create command: %w", err)
60+
}
61+
62+
if err := bson.Unmarshal(c.Command, cmd); err != nil {
63+
return nil, nil, fmt.Errorf("could not unmarshal command data: %w", err)
64+
}
65+
66+
ctx = eh.UnmarshalContext(ctx, c.Context)
67+
68+
return cmd, ctx, nil
69+
}
70+
71+
// command is the internal structure used on the wire only.
72+
type command struct {
73+
CommandType eh.CommandType `bson:"command_type"`
74+
Command bson.Raw `bson:"command"`
75+
Context map[string]interface{} `bson:"context"`
76+
}

codec/bson/command_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) 2021 - The Event Horizon authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bson
16+
17+
import (
18+
"encoding/base64"
19+
"testing"
20+
21+
"github.com/looplab/eventhorizon/codec"
22+
)
23+
24+
func TestCommandCodec(t *testing.T) {
25+
c := &CommandCodec{}
26+
27+
expectedBytes, err := base64.StdEncoding.DecodeString("jQEAAAJjb21tYW5kX3R5cGUADQAAAENvZGVjQ29tbWFuZAADY29tbWFuZAA5AQAAAmlkACUAAAAxMGE3ZWMwZi03ZjJiLTQ2ZjUtYmNhMS04NzdiNmUzM2M5ZmQACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVABHNsaWNlABcAAAACMAACAAAAYQACMQACAAAAYgAAA21hcAAUAAAAAmtleQAGAAAAdmFsdWUAAAl0aW1lAIA1U+AkAQAACXRpbWVyZWYAgDVT4CQBAAAKbnVsbHRpbWUAA3N0cnVjdAAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAANzdHJ1Y3RyZWYALwAAAAhib29sAAECc3RyaW5nAAcAAABzdHJpbmcAAW51bWJlcgAAAAAAAABFQAAKbnVsbHN0cnVjdAAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==")
28+
if err != nil {
29+
t.Error("could not decode expected bytes:", err)
30+
}
31+
32+
codec.CommandCodecAcceptanceTest(t, c, expectedBytes)
33+
}
File renamed without changes.

codec/bson/codec_test.go codec/bson/event_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323

2424
func TestEventCodec(t *testing.T) {
2525
c := &EventCodec{}
26-
expectedBytes, err := base64.StdEncoding.DecodeString("4QEAAAJldmVudF90eXBlAAsAAABDb2RlY0V2ZW50AANkYXRhAAwBAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAEc2xpY2UAFwAAAAIwAAIAAABhAAIxAAIAAABiAAADbWFwABQAAAACa2V5AAYAAAB2YWx1ZQAACXRpbWUAgDVT4CQBAAAJdGltZXJlZgCANVPgJAEAAApudWxsdGltZQADc3RydWN0AC8AAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAAA3N0cnVjdHJlZgAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAApudWxsc3RydWN0AAAJdGltZXN0YW1wAIA1U+AkAQAAAmFnZ3JlZ2F0ZV90eXBlAAoAAABBZ2dyZWdhdGUAAl9pZAAlAAAAMTBhN2VjMGYtN2YyYi00NmY1LWJjYTEtODc3YjZlMzNjOWZkABB2ZXJzaW9uAAEAAAADbWV0YWRhdGEAEgAAAAFudW0AAAAAAAAARUAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==")
2726

27+
expectedBytes, err := base64.StdEncoding.DecodeString("4QEAAAJldmVudF90eXBlAAsAAABDb2RlY0V2ZW50AANkYXRhAAwBAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAEc2xpY2UAFwAAAAIwAAIAAABhAAIxAAIAAABiAAADbWFwABQAAAACa2V5AAYAAAB2YWx1ZQAACXRpbWUAgDVT4CQBAAAJdGltZXJlZgCANVPgJAEAAApudWxsdGltZQADc3RydWN0AC8AAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAAA3N0cnVjdHJlZgAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAApudWxsc3RydWN0AAAJdGltZXN0YW1wAIA1U+AkAQAAAmFnZ3JlZ2F0ZV90eXBlAAoAAABBZ2dyZWdhdGUAAl9pZAAlAAAAMTBhN2VjMGYtN2YyYi00NmY1LWJjYTEtODc3YjZlMzNjOWZkABB2ZXJzaW9uAAEAAAADbWV0YWRhdGEAEgAAAAFudW0AAAAAAAAARUAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==")
2828
if err != nil {
2929
t.Error("could not decode expected bytes:", err)
3030
}

codec/json/command.go

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright (c) 2021 - The Event Horizon authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package json
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
22+
eh "github.com/looplab/eventhorizon"
23+
)
24+
25+
// CommandCodec is a codec for marshaling and unmarshaling commands
26+
// to and from bytes in JSON format.
27+
type CommandCodec struct{}
28+
29+
// MarshalCommand marshals a command into bytes in JSON format.
30+
func (_ CommandCodec) MarshalCommand(ctx context.Context, cmd eh.Command) ([]byte, error) {
31+
c := command{
32+
CommandType: cmd.CommandType(),
33+
Context: eh.MarshalContext(ctx),
34+
}
35+
36+
var err error
37+
if c.Command, err = json.Marshal(cmd); err != nil {
38+
return nil, fmt.Errorf("could not marshal command data: %w", err)
39+
}
40+
41+
b, err := json.Marshal(c)
42+
if err != nil {
43+
return nil, fmt.Errorf("could not marshal command: %w", err)
44+
}
45+
46+
return b, nil
47+
}
48+
49+
// UnmarshalCommand unmarshals a command from bytes in JSON format.
50+
func (_ CommandCodec) UnmarshalCommand(ctx context.Context, b []byte) (eh.Command, context.Context, error) {
51+
var c command
52+
if err := json.Unmarshal(b, &c); err != nil {
53+
return nil, nil, fmt.Errorf("could not unmarshal command: %w", err)
54+
}
55+
56+
cmd, err := eh.CreateCommand(c.CommandType)
57+
if err != nil {
58+
return nil, nil, fmt.Errorf("could not create command: %w", err)
59+
}
60+
61+
if err := json.Unmarshal(c.Command, &cmd); err != nil {
62+
return nil, nil, fmt.Errorf("could not unmarshal command data: %w", err)
63+
}
64+
65+
ctx = eh.UnmarshalContext(ctx, c.Context)
66+
67+
return cmd, ctx, nil
68+
}
69+
70+
// command is the internal structure used on the wire only.
71+
type command struct {
72+
CommandType eh.CommandType `json:"command_type"`
73+
Command json.RawMessage `json:"command"`
74+
Context map[string]interface{} `json:"context"`
75+
}

0 commit comments

Comments
 (0)