diff --git a/go.mod b/go.mod index a6be2926b..a9fae9359 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.37.0 github.com/rs/zerolog v1.28.0 + github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.4.3 go.buf.build/protocolbuffers/go/grpc-ecosystem/grpc-gateway v1.3.50 golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 @@ -83,6 +84,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.0+incompatible // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.13.0 // indirect @@ -107,6 +109,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/segmentio/kafka-go v0.4.35 // indirect + github.com/sirupsen/logrus v1.8.1 // indirect github.com/xdg/scram v1.0.5 // indirect github.com/xdg/stringprep v1.0.3 // indirect github.com/xitongsys/parquet-go v1.6.2 // indirect diff --git a/go.sum b/go.sum index b8070a437..7e10d50b4 100644 --- a/go.sum +++ b/go.sum @@ -308,6 +308,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 h1:t7uX3JBHdVwAi3G7sSSdbsk8NfgA+LnUS88V/2EKaA0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0/go.mod h1:4OGVnY4qf2+gw+ssiHbW+pq4mo2yko94YxxMmXZ7jCA= @@ -530,6 +532,8 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -555,6 +559,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE= +github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw= github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= @@ -704,6 +710,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index ff8dff79d..3de42a7c8 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -57,6 +57,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" "google.golang.org/grpc/stats" "gopkg.in/tomb.v2" @@ -320,6 +321,10 @@ func (r *Runtime) serveGRPCAPI(ctx context.Context, t *tomb.Tomb) (net.Addr, err info := api.NewInformation(Version(false)) info.Register(grpcServer) + // Makes it easier to use command line tools to interact + // with the gRPC API. + // https://github.com/grpc/grpc/blob/master/doc/server-reflection.md + reflection.Register(grpcServer) healthService := api.NewHealthChecker() grpc_health_v1.RegisterHealthServer(grpcServer, healthService) @@ -436,14 +441,18 @@ func (r *Runtime) serveHTTPAPI( return nil, cerrors.Errorf("failed to register metrics handler: %w", err) } + handler := grpcutil.WithWebsockets( + grpcutil.WithDefaultGatewayMiddleware( + r.logger, allowCORS(gwmux, "http://localhost:4200"), + ), + ) + return r.serveHTTP( ctx, t, &http.Server{ - Addr: r.Config.HTTP.Address, - Handler: grpcutil.WithDefaultGatewayMiddleware( - r.logger, allowCORS(gwmux, "http://localhost:4200"), - ), + Addr: r.Config.HTTP.Address, + Handler: handler, ReadHeaderTimeout: 10 * time.Second, }, ) diff --git a/pkg/connector/builder.go b/pkg/connector/builder.go index 9d8aa8c43..1d8c9f4c0 100644 --- a/pkg/connector/builder.go +++ b/pkg/connector/builder.go @@ -20,9 +20,12 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/plugin" ) +const inspectorBufferSize = 1000 + // Builder represents an object that can build a connector. // The main use of this interface is to be able to switch out the connector // implementations for mocks in tests. @@ -99,6 +102,7 @@ func (b *DefaultBuilder) Init(c Connector, id string, config Config) error { v.persister = b.persister v.pluginDispenser = p v.errs = make(chan error) + v.inspector = inspector.New(v.logger, inspectorBufferSize) default: return ErrInvalidConnectorType } diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index c56037f54..0bd9bb61a 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -21,6 +21,7 @@ import ( "context" "time" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/record" ) @@ -65,6 +66,10 @@ type Connector interface { // asynchronously (e.g. persisting state). Errors() <-chan error + // Inspect returns an inspector.Session which exposes the records + // coming into or out of this connector (depending on the connector type). + Inspect(context.Context) *inspector.Session + // Open will start the plugin process and call the Open method on the // plugin. After the connector has been successfully opened it is considered // as running (IsRunning returns true) and can be stopped again with diff --git a/pkg/connector/destination.go b/pkg/connector/destination.go index c62163839..0393f965a 100644 --- a/pkg/connector/destination.go +++ b/pkg/connector/destination.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/plugin" "github.com/conduitio/conduit/pkg/record" ) @@ -57,6 +58,8 @@ type destination struct { // stopStream is a function that closes the context of the stream stopStream context.CancelFunc + inspector *inspector.Inspector + // m can lock a destination from concurrent access (e.g. in connector persister). m sync.Mutex // wg tracks the number of in flight calls to the plugin. @@ -115,6 +118,10 @@ func (d *destination) Errors() <-chan error { return d.errs } +func (d *destination) Inspect(ctx context.Context) *inspector.Session { + return d.inspector.NewSession(ctx) +} + func (d *destination) Validate(ctx context.Context, settings map[string]string) (err error) { dest, err := d.pluginDispenser.DispenseDestination() if err != nil { @@ -227,6 +234,7 @@ func (d *destination) Write(ctx context.Context, r record.Record) error { return err } + d.inspector.Send(ctx, r) err = d.plugin.Write(ctx, r) if err != nil { return cerrors.Errorf("error writing record: %w", err) diff --git a/pkg/connector/mock/connector.go b/pkg/connector/mock/connector.go index 06a0b2a4e..00fdb6633 100644 --- a/pkg/connector/mock/connector.go +++ b/pkg/connector/mock/connector.go @@ -10,6 +10,7 @@ import ( time "time" connector "github.com/conduitio/conduit/pkg/connector" + inspector "github.com/conduitio/conduit/pkg/inspector" record "github.com/conduitio/conduit/pkg/record" gomock "github.com/golang/mock/gomock" ) @@ -107,6 +108,20 @@ func (mr *SourceMockRecorder) ID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*Source)(nil).ID)) } +// Inspect mocks base method. +func (m *Source) Inspect(arg0 context.Context) *inspector.Session { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Inspect", arg0) + ret0, _ := ret[0].(*inspector.Session) + return ret0 +} + +// Inspect indicates an expected call of Inspect. +func (mr *SourceMockRecorder) Inspect(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Inspect", reflect.TypeOf((*Source)(nil).Inspect), arg0) +} + // IsRunning mocks base method. func (m *Source) IsRunning() bool { m.ctrl.T.Helper() @@ -379,6 +394,20 @@ func (mr *DestinationMockRecorder) ID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*Destination)(nil).ID)) } +// Inspect mocks base method. +func (m *Destination) Inspect(arg0 context.Context) *inspector.Session { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Inspect", arg0) + ret0, _ := ret[0].(*inspector.Session) + return ret0 +} + +// Inspect indicates an expected call of Inspect. +func (mr *DestinationMockRecorder) Inspect(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Inspect", reflect.TypeOf((*Destination)(nil).Inspect), arg0) +} + // IsRunning mocks base method. func (m *Destination) IsRunning() bool { m.ctrl.T.Helper() diff --git a/pkg/connector/source.go b/pkg/connector/source.go index 7e1bacfaa..da4c51bdf 100644 --- a/pkg/connector/source.go +++ b/pkg/connector/source.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/plugin" "github.com/conduitio/conduit/pkg/record" ) @@ -114,6 +115,11 @@ func (s *source) Errors() <-chan error { return s.errs } +func (s *source) Inspect(_ context.Context) *inspector.Session { + // TODO implement me + panic("implement me") +} + func (s *source) Validate(ctx context.Context, settings map[string]string) (err error) { src, err := s.pluginDispenser.DispenseSource() if err != nil { diff --git a/pkg/foundation/grpcutil/gateway.go b/pkg/foundation/grpcutil/gateway.go index 9d08220a5..77c9c6275 100644 --- a/pkg/foundation/grpcutil/gateway.go +++ b/pkg/foundation/grpcutil/gateway.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/log" "github.com/google/uuid" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/tmc/grpc-websocket-proxy/wsproxy" "google.golang.org/protobuf/encoding/protojson" ) @@ -109,6 +110,10 @@ func WithHTTPEndpointHeader(h http.Handler) http.Handler { }) } +func WithWebsockets(h http.Handler) http.Handler { + return wsproxy.WebsocketProxy(h) +} + func extractEndpoint(r *http.Request) string { return r.Method + " " + r.URL.Path } diff --git a/pkg/foundation/log/fields.go b/pkg/foundation/log/fields.go index 432406381..e70c7fc96 100644 --- a/pkg/foundation/log/fields.go +++ b/pkg/foundation/log/fields.go @@ -32,4 +32,6 @@ const ( PluginTypeField = "plugin_type" PluginNameField = "plugin_name" PluginPathField = "plugin_path" + + InspectorSessionID = "inspector_session_id" ) diff --git a/pkg/inspector/inspector.go b/pkg/inspector/inspector.go new file mode 100644 index 000000000..5b729b193 --- /dev/null +++ b/pkg/inspector/inspector.go @@ -0,0 +1,145 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inspector + +import ( + "context" + "sync" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/google/uuid" +) + +// Session wraps a channel of records and provides: +// 1. a way to send records to it asynchronously +// 2. a way to know if it's closed or not +type Session struct { + C chan record.Record + + id string + logger log.CtxLogger + onClose func() +} + +func (s *Session) close() { + s.onClose() + close(s.C) +} + +// send a record to the session's channel. +// If the channel has already reached its capacity, +// the record will be ignored. +func (s *Session) send(ctx context.Context, r record.Record) { + select { + case s.C <- r: + default: + s.logger. + Warn(ctx). + Str(log.InspectorSessionID, s.id). + Msg("session buffer full, record will be dropped") + } +} + +// Inspector is attached to an inspectable pipeline component +// and makes returns records coming in or out of the component. +// An Inspector is a "proxy" between the pipeline component being +// inspected and the API, which broadcasts records to all clients. +type Inspector struct { + // sessions is a map of sessions. + // keys are sessions IDs. + sessions map[string]*Session + // guards access to sessions + lock sync.Mutex + logger log.CtxLogger + bufferSize int +} + +func New(logger log.CtxLogger, bufferSize int) *Inspector { + return &Inspector{ + sessions: make(map[string]*Session), + logger: logger.WithComponent("inspector.Inspector"), + bufferSize: bufferSize, + } +} + +// Send the given record to all registered sessions. +// The method does not wait for consumers to get the records. +func (i *Inspector) Send(ctx context.Context, r record.Record) { + // copy metadata, to prevent issues when concurrently accessing the metadata + var meta record.Metadata + if len(r.Metadata) != 0 { + meta = make(record.Metadata, len(r.Metadata)) + for k, v := range r.Metadata { + meta[k] = v + } + } + + // todo optimize this, as we have locks for every record. + // locks are needed to make sure the `sessions` slice + // is not modified as we're iterating over it + i.lock.Lock() + defer i.lock.Unlock() + for _, s := range i.sessions { + s.send(ctx, record.Record{ + Position: r.Position, + Operation: r.Operation, + Metadata: meta, + Key: r.Key, + Payload: r.Payload, + }) + } +} + +func (i *Inspector) NewSession(ctx context.Context) *Session { + id := uuid.NewString() + s := &Session{ + C: make(chan record.Record, i.bufferSize), + id: id, + logger: i.logger.WithComponent("inspector.Session"), + onClose: func() { + i.remove(id) + }, + } + go func() { + <-ctx.Done() + s.logger. + Info(context.Background()). + Msgf("context canceled: %v", ctx.Err()) + s.close() + }() + + i.lock.Lock() + defer i.lock.Unlock() + + i.sessions[id] = s + i.logger. + Info(context.Background()). + Str(log.InspectorSessionID, id). + Msg("session created") + return s +} + +// remove a session with given ID from this Inspector. +func (i *Inspector) remove(id string) { + i.lock.Lock() + defer i.lock.Unlock() + + delete(i.sessions, id) + i.logger. + Info(context.Background()). + Str(log.InspectorSessionID, id). + Msg("session removed") +} diff --git a/pkg/inspector/inspector_benchmark_test.go b/pkg/inspector/inspector_benchmark_test.go new file mode 100644 index 000000000..78c41cb5f --- /dev/null +++ b/pkg/inspector/inspector_benchmark_test.go @@ -0,0 +1,32 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inspector + +import ( + "context" + "testing" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" +) + +func BenchmarkInspector_SingleSession_Send(b *testing.B) { + ins := New(log.Nop(), 10) + ins.NewSession(context.Background()) + + for i := 0; i < b.N; i++ { + ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")}) + } +} diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go new file mode 100644 index 000000000..328bbb104 --- /dev/null +++ b/pkg/inspector/inspector_test.go @@ -0,0 +1,143 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inspector + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/conduitio/conduit/pkg/foundation/cchan" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/matryer/is" +) + +func TestInspector_Send_NoSessions(t *testing.T) { + underTest := New(log.Nop(), 10) + underTest.Send(context.Background(), record.Record{}) +} + +func TestInspector_Send_SingleSession(t *testing.T) { + underTest := New(log.Nop(), 10) + s := underTest.NewSession(context.Background()) + + r := record.Record{ + Position: record.Position("test-pos"), + } + underTest.Send(context.Background(), r) + assertGotRecord(is.New(t), s, r) +} + +func TestInspector_Send_MultipleSessions(t *testing.T) { + is := is.New(t) + + underTest := New(log.Nop(), 10) + s1 := underTest.NewSession(context.Background()) + s2 := underTest.NewSession(context.Background()) + + r := record.Record{ + Position: record.Position("test-pos"), + } + underTest.Send(context.Background(), r) + assertGotRecord(is, s1, r) + assertGotRecord(is, s2, r) +} + +func TestInspector_Send_SessionClosed(t *testing.T) { + is := is.New(t) + + underTest := New(log.Nop(), 10) + s := underTest.NewSession(context.Background()) + + r := record.Record{ + Position: record.Position("test-pos"), + } + underTest.Send(context.Background(), r) + assertGotRecord(is, s, r) + + s.close() + underTest.Send( + context.Background(), + record.Record{ + Position: record.Position("test-pos-2"), + }, + ) +} + +func TestInspector_Send_SessionCtxCanceled(t *testing.T) { + is := is.New(t) + + underTest := New(log.Nop(), 10) + ctx, cancel := context.WithCancel(context.Background()) + s := underTest.NewSession(ctx) + + r := record.Record{ + Position: record.Position("test-pos"), + } + underTest.Send(context.Background(), r) + assertGotRecord(is, s, r) + + cancel() + + _, got, err := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) + is.NoErr(err) + is.True(!got) // expected no record +} + +func TestInspector_Send_SlowConsumer(t *testing.T) { + // When a session's buffer is full, then further records will be dropped. + // In this test we set up an inspector with a buffer size of 10. + // Then we send 11 records (without reading them immediately). + // The expected behavior is that we will be able to get the first 10, + // and that the last record never arrives. + is := is.New(t) + + bufferSize := 10 + underTest := New(log.Nop(), bufferSize) + s := underTest.NewSession(context.Background()) + + for i := 0; i < bufferSize+1; i++ { + s.send( + context.Background(), + record.Record{ + Position: record.Position(fmt.Sprintf("test-pos-%v", i)), + }, + ) + } + + for i := 0; i < bufferSize; i++ { + assertGotRecord( + is, + s, + record.Record{ + Position: record.Position(fmt.Sprintf("test-pos-%v", i)), + }, + ) + } + + _, got, err := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) + is.True(cerrors.Is(err, context.DeadlineExceeded)) + is.True(!got) +} + +func assertGotRecord(is *is.I, s *Session, recWant record.Record) { + recGot, got, err := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) + is.NoErr(err) + is.True(got) + is.Equal(recWant, recGot) +} diff --git a/pkg/orchestrator/connectors.go b/pkg/orchestrator/connectors.go index 265711b05..e2cdb1c1c 100644 --- a/pkg/orchestrator/connectors.go +++ b/pkg/orchestrator/connectors.go @@ -20,12 +20,22 @@ import ( "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/rollback" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/pipeline" "github.com/google/uuid" ) type ConnectorOrchestrator base +func (c *ConnectorOrchestrator) Inspect(ctx context.Context, id string) (*inspector.Session, error) { + conn, err := c.Get(ctx, id) + if err != nil { + return nil, err + } + + return conn.Inspect(ctx), nil +} + func (c *ConnectorOrchestrator) Create( ctx context.Context, t connector.Type, diff --git a/pkg/web/api/connector_v1.go b/pkg/web/api/connector_v1.go index d63b1cd08..68d3c1be3 100644 --- a/pkg/web/api/connector_v1.go +++ b/pkg/web/api/connector_v1.go @@ -13,6 +13,7 @@ // limitations under the License. //go:generate mockgen -destination=mock/connector.go -package=mock -mock_names=ConnectorOrchestrator=ConnectorOrchestrator . ConnectorOrchestrator +//go:generate mockgen -destination=mock/connector_service.go -package=mock -mock_names=ConnectorService_InspectConnectorServer=ConnectorService_InspectConnectorServer github.com/conduitio/conduit/proto/gen/api/v1 ConnectorService_InspectConnectorServer package api import ( @@ -20,6 +21,7 @@ import ( "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/web/api/fromproto" "github.com/conduitio/conduit/pkg/web/api/status" "github.com/conduitio/conduit/pkg/web/api/toproto" @@ -34,6 +36,7 @@ type ConnectorOrchestrator interface { Delete(ctx context.Context, id string) error Update(ctx context.Context, id string, config connector.Config) (connector.Connector, error) Validate(ctx context.Context, t connector.Type, config connector.Config) error + Inspect(ctx context.Context, id string) (*inspector.Session, error) } type ConnectorAPIv1 struct { @@ -65,6 +68,33 @@ func (c *ConnectorAPIv1) ListConnectors( return &apiv1.ListConnectorsResponse{Connectors: clist}, nil } +func (c *ConnectorAPIv1) InspectConnector(req *apiv1.InspectConnectorRequest, server apiv1.ConnectorService_InspectConnectorServer) error { + if req.Id == "" { + return status.ConnectorError(cerrors.ErrEmptyID) + } + + session, err := c.cs.Inspect(server.Context(), req.Id) + if err != nil { + return status.ConnectorError(cerrors.Errorf("failed to get connector: %w", err)) + } + + for rec := range session.C { + recProto, err2 := toproto.Record(rec) + if err2 != nil { + return cerrors.Errorf("failed converting record: %w", err2) + } + + err2 = server.Send(&apiv1.InspectConnectorResponse{ + Record: recProto, + }) + if err2 != nil { + return cerrors.Errorf("failed sending record: %w", err2) + } + } + + return cerrors.New("inspector session closed") +} + // GetConnector returns a single Connector proto response or an error. func (c *ConnectorAPIv1) GetConnector( ctx context.Context, diff --git a/pkg/web/api/connector_v1_test.go b/pkg/web/api/connector_v1_test.go index 64ed8543e..4e7ebeb5c 100644 --- a/pkg/web/api/connector_v1_test.go +++ b/pkg/web/api/connector_v1_test.go @@ -23,9 +23,13 @@ import ( "github.com/conduitio/conduit/pkg/connector" connmock "github.com/conduitio/conduit/pkg/connector/mock" "github.com/conduitio/conduit/pkg/foundation/assert" + "github.com/conduitio/conduit/pkg/foundation/cchan" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/record" apimock "github.com/conduitio/conduit/pkg/web/api/mock" + "github.com/conduitio/conduit/pkg/web/api/toproto" apiv1 "github.com/conduitio/conduit/proto/gen/api/v1" "github.com/golang/mock/gomock" "github.com/google/uuid" @@ -203,6 +207,119 @@ func TestConnectorAPIv1_CreateConnector(t *testing.T) { assert.Equal(t, want, got) } +func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl := gomock.NewController(t) + csMock := apimock.NewConnectorOrchestrator(ctrl) + api := NewConnectorAPIv1(csMock) + + id := uuid.NewString() + rec := generateTestRecord() + recProto, err := toproto.Record(rec) + assert.Ok(t, err) + + ins := inspector.New(log.Nop(), 10) + session := ins.NewSession(ctx) + + csMock.EXPECT(). + Inspect(ctx, id). + Return(session, nil). + Times(1) + + inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl) + inspectServer.EXPECT().Send(gomock.Eq(&apiv1.InspectConnectorResponse{Record: recProto})) + inspectServer.EXPECT().Context().Return(ctx).AnyTimes() + + go func() { + _ = api.InspectConnector( + &apiv1.InspectConnectorRequest{Id: id}, + inspectServer, + ) + }() + ins.Send(ctx, rec) + + time.Sleep(100 * time.Millisecond) +} + +func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl := gomock.NewController(t) + csMock := apimock.NewConnectorOrchestrator(ctrl) + api := NewConnectorAPIv1(csMock) + id := uuid.NewString() + + ins := inspector.New(log.Nop(), 10) + session := ins.NewSession(ctx) + + csMock.EXPECT(). + Inspect(ctx, id). + Return(session, nil). + Times(1) + + inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl) + inspectServer.EXPECT().Context().Return(ctx).AnyTimes() + errSend := cerrors.New("I'm sorry, but no.") + inspectServer.EXPECT().Send(gomock.Any()).Return(errSend) + + errC := make(chan error) + go func() { + err := api.InspectConnector( + &apiv1.InspectConnectorRequest{Id: id}, + inspectServer, + ) + errC <- err + }() + ins.Send(ctx, generateTestRecord()) + + err, b, err2 := cchan.Chan[error](errC).RecvTimeout(context.Background(), 100*time.Millisecond) + assert.Ok(t, err2) + assert.True(t, b, "expected to receive an error") + assert.True(t, cerrors.Is(err, errSend), "expected 'I'm sorry, but no.' error") +} + +func TestConnectorAPIv1_InspectConnector_Err(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl := gomock.NewController(t) + csMock := apimock.NewConnectorOrchestrator(ctrl) + api := NewConnectorAPIv1(csMock) + id := uuid.NewString() + err := cerrors.New("not found, sorry") + + csMock.EXPECT(). + Inspect(ctx, gomock.Any()). + Return(nil, err). + Times(1) + + inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl) + inspectServer.EXPECT().Context().Return(ctx).AnyTimes() + + errAPI := api.InspectConnector( + &apiv1.InspectConnectorRequest{Id: id}, + inspectServer, + ) + assert.NotNil(t, errAPI) + assert.Equal( + t, + "rpc error: code = Internal desc = failed to get connector: not found, sorry", + errAPI.Error(), + ) +} + +func generateTestRecord() record.Record { + return record.Record{ + Position: record.Position("test-position"), + Operation: record.OperationCreate, + Metadata: record.Metadata{"metadata-key": "metadata-value"}, + Key: record.RawData{Raw: []byte("test-key")}, + Payload: record.Change{ + After: record.RawData{Raw: []byte("test-payload")}, + }, + } +} + func TestConnectorAPIv1_GetConnector(t *testing.T) { ctx := context.Background() ctrl := gomock.NewController(t) diff --git a/pkg/web/api/mock/connector.go b/pkg/web/api/mock/connector.go index fade1a74e..5b0dee76c 100644 --- a/pkg/web/api/mock/connector.go +++ b/pkg/web/api/mock/connector.go @@ -9,6 +9,7 @@ import ( reflect "reflect" connector "github.com/conduitio/conduit/pkg/connector" + inspector "github.com/conduitio/conduit/pkg/inspector" gomock "github.com/golang/mock/gomock" ) @@ -79,6 +80,21 @@ func (mr *ConnectorOrchestratorMockRecorder) Get(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*ConnectorOrchestrator)(nil).Get), arg0, arg1) } +// Inspect mocks base method. +func (m *ConnectorOrchestrator) Inspect(arg0 context.Context, arg1 string) (*inspector.Session, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Inspect", arg0, arg1) + ret0, _ := ret[0].(*inspector.Session) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Inspect indicates an expected call of Inspect. +func (mr *ConnectorOrchestratorMockRecorder) Inspect(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Inspect", reflect.TypeOf((*ConnectorOrchestrator)(nil).Inspect), arg0, arg1) +} + // List mocks base method. func (m *ConnectorOrchestrator) List(arg0 context.Context) map[string]connector.Connector { m.ctrl.T.Helper() diff --git a/pkg/web/api/mock/connector_service.go b/pkg/web/api/mock/connector_service.go new file mode 100644 index 000000000..fc296e0e5 --- /dev/null +++ b/pkg/web/api/mock/connector_service.go @@ -0,0 +1,133 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit/proto/gen/api/v1 (interfaces: ConnectorService_InspectConnectorServer) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + apiv1 "github.com/conduitio/conduit/proto/gen/api/v1" + gomock "github.com/golang/mock/gomock" + metadata "google.golang.org/grpc/metadata" +) + +// ConnectorService_InspectConnectorServer is a mock of ConnectorService_InspectConnectorServer interface. +type ConnectorService_InspectConnectorServer struct { + ctrl *gomock.Controller + recorder *ConnectorService_InspectConnectorServerMockRecorder +} + +// ConnectorService_InspectConnectorServerMockRecorder is the mock recorder for ConnectorService_InspectConnectorServer. +type ConnectorService_InspectConnectorServerMockRecorder struct { + mock *ConnectorService_InspectConnectorServer +} + +// NewConnectorService_InspectConnectorServer creates a new mock instance. +func NewConnectorService_InspectConnectorServer(ctrl *gomock.Controller) *ConnectorService_InspectConnectorServer { + mock := &ConnectorService_InspectConnectorServer{ctrl: ctrl} + mock.recorder = &ConnectorService_InspectConnectorServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *ConnectorService_InspectConnectorServer) EXPECT() *ConnectorService_InspectConnectorServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *ConnectorService_InspectConnectorServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).Context)) +} + +// RecvMsg mocks base method. +func (m *ConnectorService_InspectConnectorServer) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *ConnectorService_InspectConnectorServer) Send(arg0 *apiv1.InspectConnectorResponse) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).Send), arg0) +} + +// SendHeader mocks base method. +func (m *ConnectorService_InspectConnectorServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m *ConnectorService_InspectConnectorServer) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).SendMsg), arg0) +} + +// SetHeader mocks base method. +func (m *ConnectorService_InspectConnectorServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *ConnectorService_InspectConnectorServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).SetTrailer), arg0) +} diff --git a/pkg/web/api/toproto/record.go b/pkg/web/api/toproto/record.go new file mode 100644 index 000000000..e1a2b4a15 --- /dev/null +++ b/pkg/web/api/toproto/record.go @@ -0,0 +1,98 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toproto + +import ( + "fmt" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/record" + opencdcv1 "go.buf.build/grpc/go/conduitio/conduit-connector-protocol/opencdc/v1" + "google.golang.org/protobuf/types/known/structpb" +) + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + var cTypes [1]struct{} + _ = cTypes[int(record.OperationCreate)-int(opencdcv1.Operation_OPERATION_CREATE)] + _ = cTypes[int(record.OperationUpdate)-int(opencdcv1.Operation_OPERATION_UPDATE)] + _ = cTypes[int(record.OperationDelete)-int(opencdcv1.Operation_OPERATION_DELETE)] + _ = cTypes[int(record.OperationSnapshot)-int(opencdcv1.Operation_OPERATION_SNAPSHOT)] +} + +func Record(in record.Record) (*opencdcv1.Record, error) { + key, err := Data(in.Key) + if err != nil { + return nil, err + } + payload, err := Change(in.Payload) + if err != nil { + return nil, err + } + + out := &opencdcv1.Record{ + Position: in.Position, + Operation: opencdcv1.Operation(in.Operation), + Metadata: in.Metadata, + Key: key, + Payload: payload, + } + return out, nil +} + +func Change(in record.Change) (*opencdcv1.Change, error) { + before, err := Data(in.Before) + if err != nil { + return nil, fmt.Errorf("error converting before: %w", err) + } + + after, err := Data(in.After) + if err != nil { + return nil, fmt.Errorf("error converting after: %w", err) + } + + out := opencdcv1.Change{ + Before: before, + After: after, + } + return &out, nil +} + +func Data(in record.Data) (*opencdcv1.Data, error) { + if in == nil { + return nil, nil + } + + switch v := in.(type) { + case record.RawData: + return &opencdcv1.Data{ + Data: &opencdcv1.Data_RawData{ + RawData: v.Raw, + }, + }, nil + case record.StructuredData: + data, err := structpb.NewStruct(v) + if err != nil { + return nil, err + } + return &opencdcv1.Data{ + Data: &opencdcv1.Data_StructuredData{ + StructuredData: data, + }, + }, nil + default: + return nil, cerrors.Errorf("invalid Data type '%T'", in) + } +}