Skip to content

Commit 145bc5d

Browse files
committed
pr feedback
1 parent 8b928cc commit 145bc5d

File tree

8 files changed

+59
-50
lines changed

8 files changed

+59
-50
lines changed

pkg/connector/destination.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func (d *destination) Write(ctx context.Context, r record.Record) error {
234234
return err
235235
}
236236

237-
d.inspector.Send(r)
237+
d.inspector.Send(ctx, r)
238238
err = d.plugin.Write(ctx, r)
239239
if err != nil {
240240
return cerrors.Errorf("error writing record: %w", err)

pkg/inspector/inspector.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,26 @@ import (
2727
// 1. a way to send records to it asynchronously
2828
// 2. a way to know if it's closed or not
2929
type Session struct {
30-
C chan record.Record
31-
CloseReason error
30+
C chan record.Record
3231

3332
logger log.CtxLogger
3433
onClose func()
3534
}
3635

37-
func (s *Session) Close(err error) {
36+
func (s *Session) Close() {
3837
s.onClose()
3938
close(s.C)
40-
s.CloseReason = err
4139
}
4240

4341
// send sends a record to the session's channel.
4442
// If the channel has already reached its capacity,
4543
// the record will be ignored.
46-
func (s *Session) send(r record.Record) {
44+
func (s *Session) send(ctx context.Context, r record.Record) {
4745
select {
4846
case s.C <- r:
4947
default:
5048
s.logger.
51-
Warn(context.Background()).
49+
Warn(ctx).
5250
Msg("session buffer full, record will be dropped")
5351
}
5452
}
@@ -77,7 +75,7 @@ func New(logger log.CtxLogger, bufferSize int) *Inspector {
7775

7876
// Send sends the given record to all registered sessions.
7977
// The method does not wait for consumers to get the records.
80-
func (i *Inspector) Send(r record.Record) {
78+
func (i *Inspector) Send(ctx context.Context, r record.Record) {
8179
// copy metadata, to prevent issues when concurrently accessing the metadata
8280
var meta record.Metadata
8381
if len(r.Metadata) != 0 {
@@ -93,7 +91,7 @@ func (i *Inspector) Send(r record.Record) {
9391
i.lock.Lock()
9492
defer i.lock.Unlock()
9593
for _, s := range i.sessions {
96-
s.send(record.Record{
94+
s.send(ctx, record.Record{
9795
Position: r.Position,
9896
Operation: r.Operation,
9997
Metadata: meta,
@@ -114,7 +112,10 @@ func (i *Inspector) NewSession(ctx context.Context) *Session {
114112
}
115113
go func() {
116114
<-ctx.Done()
117-
s.Close(ctx.Err())
115+
s.logger.
116+
Info(context.Background()).
117+
Msgf("context canceled: %v", ctx.Err())
118+
s.Close()
118119
}()
119120

120121
i.lock.Lock()

pkg/inspector/inspector_benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ func BenchmarkInspector_SingleSession_Send(b *testing.B) {
2727
ins.NewSession(context.Background())
2828

2929
for i := 0; i < b.N; i++ {
30-
ins.Send(record.Record{Position: record.Position("test-pos")})
30+
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
3131
}
3232
}

pkg/inspector/inspector_test.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929

3030
func TestInspector_Send_NoSessions(t *testing.T) {
3131
underTest := New(log.Nop(), 10)
32-
underTest.Send(record.Record{})
32+
underTest.Send(context.Background(), record.Record{})
3333
}
3434

3535
func TestInspector_Send_SingleSession(t *testing.T) {
@@ -39,7 +39,7 @@ func TestInspector_Send_SingleSession(t *testing.T) {
3939
r := record.Record{
4040
Position: record.Position("test-pos"),
4141
}
42-
underTest.Send(r)
42+
underTest.Send(context.Background(), r)
4343
assertGotRecord(is.New(t), s, r)
4444
}
4545

@@ -53,7 +53,7 @@ func TestInspector_Send_MultipleSessions(t *testing.T) {
5353
r := record.Record{
5454
Position: record.Position("test-pos"),
5555
}
56-
underTest.Send(r)
56+
underTest.Send(context.Background(), r)
5757
assertGotRecord(is, s1, r)
5858
assertGotRecord(is, s2, r)
5959
}
@@ -67,12 +67,16 @@ func TestInspector_Send_SessionClosed(t *testing.T) {
6767
r := record.Record{
6868
Position: record.Position("test-pos"),
6969
}
70-
underTest.Send(r)
70+
underTest.Send(context.Background(), r)
7171
assertGotRecord(is.New(t), s, r)
7272

73-
err := cerrors.New("yet another error")
74-
s.Close(err)
75-
is.True(cerrors.Is(s.CloseReason, err))
73+
s.Close()
74+
underTest.Send(
75+
context.Background(),
76+
record.Record{
77+
Position: record.Position("test-pos-2"),
78+
},
79+
)
7680
}
7781

7882
func TestInspector_Send_SessionCtxCanceled(t *testing.T) {
@@ -85,12 +89,18 @@ func TestInspector_Send_SessionCtxCanceled(t *testing.T) {
8589
r := record.Record{
8690
Position: record.Position("test-pos"),
8791
}
88-
underTest.Send(r)
92+
underTest.Send(context.Background(), r)
8993
assertGotRecord(is.New(t), s, r)
9094

9195
cancel()
9296
time.Sleep(100 * time.Millisecond)
93-
is.True(cerrors.Is(s.CloseReason, context.Canceled))
97+
98+
select {
99+
case _, ok := <-s.C:
100+
is.True(!ok) // expected no record
101+
default:
102+
is.Fail() // expected channel to be closed
103+
}
94104
}
95105

96106
func TestInspector_Send_SlowConsumer(t *testing.T) {
@@ -106,9 +116,12 @@ func TestInspector_Send_SlowConsumer(t *testing.T) {
106116
s := underTest.NewSession(context.Background())
107117

108118
for i := 0; i < bufferSize+1; i++ {
109-
s.send(record.Record{
110-
Position: record.Position(fmt.Sprintf("test-pos-%v", i)),
111-
})
119+
s.send(
120+
context.Background(),
121+
record.Record{
122+
Position: record.Position(fmt.Sprintf("test-pos-%v", i)),
123+
},
124+
)
112125
}
113126

114127
for i := 0; i < bufferSize; i++ {

pkg/orchestrator/connectors.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,24 @@ package orchestrator
1616

1717
import (
1818
"context"
19+
1920
"github.com/conduitio/conduit/pkg/connector"
2021
"github.com/conduitio/conduit/pkg/foundation/cerrors"
2122
"github.com/conduitio/conduit/pkg/foundation/rollback"
22-
"github.com/conduitio/conduit/pkg/inspector"
2323
"github.com/conduitio/conduit/pkg/pipeline"
24+
"github.com/conduitio/conduit/pkg/record"
2425
"github.com/google/uuid"
2526
)
2627

2728
type ConnectorOrchestrator base
2829

29-
func (c *ConnectorOrchestrator) Inspect(ctx context.Context, id string) (*inspector.Session, error) {
30-
// fetch the connector from the ConnectorOrchestrator
30+
func (c *ConnectorOrchestrator) Inspect(ctx context.Context, id string) (chan record.Record, error) {
3131
conn, err := c.Get(ctx, id)
3232
if err != nil {
3333
return nil, cerrors.Errorf("failed to get connector by ID %v: %w", id, err)
3434
}
3535

36-
session := conn.Inspect(ctx)
37-
38-
return session, nil
36+
return conn.Inspect(ctx).C, nil
3937
}
4038

4139
func (c *ConnectorOrchestrator) Create(

pkg/web/api/connector_v1.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ package api
1919
import (
2020
"context"
2121
"fmt"
22+
2223
"github.com/conduitio/conduit/pkg/connector"
2324
"github.com/conduitio/conduit/pkg/foundation/cerrors"
24-
"github.com/conduitio/conduit/pkg/inspector"
25+
"github.com/conduitio/conduit/pkg/record"
2526
"github.com/conduitio/conduit/pkg/web/api/fromproto"
2627
"github.com/conduitio/conduit/pkg/web/api/status"
2728
"github.com/conduitio/conduit/pkg/web/api/toproto"
@@ -36,7 +37,7 @@ type ConnectorOrchestrator interface {
3637
Delete(ctx context.Context, id string) error
3738
Update(ctx context.Context, id string, config connector.Config) (connector.Connector, error)
3839
Validate(ctx context.Context, t connector.Type, config connector.Config) error
39-
Inspect(ctx context.Context, id string) (*inspector.Session, error)
40+
Inspect(ctx context.Context, id string) (chan record.Record, error)
4041
}
4142

4243
type ConnectorAPIv1 struct {
@@ -73,26 +74,26 @@ func (c *ConnectorAPIv1) InspectConnector(req *apiv1.InspectConnectorRequest, se
7374
return status.ConnectorError(cerrors.ErrEmptyID)
7475
}
7576

76-
s, err := c.cs.Inspect(server.Context(), req.Id)
77+
records, err := c.cs.Inspect(server.Context(), req.Id)
7778
if err != nil {
7879
return status.ConnectorError(cerrors.Errorf("failed to get connector by ID %v: %w", req.Id, err))
79-
8080
}
81-
for r := range s.C {
82-
record, err2 := toproto.Record(r)
81+
82+
for rec := range records {
83+
recProto, err2 := toproto.Record(rec)
8384
if err2 != nil {
8485
return fmt.Errorf("failed converting record: %w", err2)
8586
}
8687

8788
err2 = server.Send(&apiv1.InspectConnectorResponse{
88-
Record: record,
89+
Record: recProto,
8990
})
9091
if err2 != nil {
9192
return fmt.Errorf("failed sending record: %w", err2)
9293
}
9394
}
9495

95-
return cerrors.Errorf("records channel closed: %w", s.CloseReason)
96+
return cerrors.New("records channel closed")
9697
}
9798

9899
// GetConnector returns a single Connector proto response or an error.

pkg/web/api/connector_v1_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ import (
2626
"github.com/conduitio/conduit/pkg/foundation/assert"
2727
"github.com/conduitio/conduit/pkg/foundation/cchan"
2828
"github.com/conduitio/conduit/pkg/foundation/cerrors"
29-
"github.com/conduitio/conduit/pkg/foundation/log"
30-
"github.com/conduitio/conduit/pkg/inspector"
3129
"github.com/conduitio/conduit/pkg/record"
3230
apimock "github.com/conduitio/conduit/pkg/web/api/mock"
3331
"github.com/conduitio/conduit/pkg/web/api/toproto"
@@ -220,12 +218,11 @@ func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) {
220218
recProto, err := toproto.Record(rec)
221219
assert.Ok(t, err)
222220

223-
ins := inspector.New(log.Nop(), 10)
224-
session := ins.NewSession(ctx)
221+
records := make(chan record.Record)
225222

226223
csMock.EXPECT().
227224
Inspect(ctx, id).
228-
Return(session, nil).
225+
Return(records, nil).
229226
Times(1)
230227

231228
inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl)
@@ -238,7 +235,7 @@ func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) {
238235
inspectServer,
239236
)
240237
}()
241-
ins.Send(rec)
238+
records <- rec
242239

243240
time.Sleep(100 * time.Millisecond)
244241
}
@@ -250,12 +247,11 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) {
250247
csMock := apimock.NewConnectorOrchestrator(ctrl)
251248
api := NewConnectorAPIv1(csMock)
252249
id := uuid.NewString()
253-
ins := inspector.New(log.Nop(), 10)
254-
session := ins.NewSession(ctx)
250+
records := make(chan record.Record)
255251

256252
csMock.EXPECT().
257253
Inspect(ctx, id).
258-
Return(session, nil).
254+
Return(records, nil).
259255
Times(1)
260256

261257
inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl)
@@ -271,7 +267,7 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) {
271267
)
272268
errC <- err
273269
}()
274-
ins.Send(generateTestRecord())
270+
records <- generateTestRecord()
275271

276272
err, b, err2 := cchan.Chan[error](errC).RecvTimeout(context.Background(), 100*time.Millisecond)
277273
assert.Ok(t, err2)

pkg/web/api/mock/connector.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)