Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.37.0
github.com/rs/zerolog v1.28.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75
go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.4.3
go.buf.build/protocolbuffers/go/grpc-ecosystem/grpc-gateway v1.3.50
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91
Expand Down Expand Up @@ -83,6 +84,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.0+incompatible // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.13.0 // indirect
Expand All @@ -107,6 +109,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/segmentio/kafka-go v0.4.35 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/xitongsys/parquet-go v1.6.2 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 h1:t7uX3JBHdVwAi3G7sSSdbsk8NfgA+LnUS88V/2EKaA0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0/go.mod h1:4OGVnY4qf2+gw+ssiHbW+pq4mo2yko94YxxMmXZ7jCA=
Expand Down Expand Up @@ -529,6 +531,8 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand All @@ -554,6 +558,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
Expand Down Expand Up @@ -703,6 +709,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
Expand Down
17 changes: 13 additions & 4 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/stats"
"gopkg.in/tomb.v2"

Expand Down Expand Up @@ -320,6 +321,10 @@ func (r *Runtime) serveGRPCAPI(ctx context.Context, t *tomb.Tomb) (net.Addr, err

info := api.NewInformation(Version(false))
info.Register(grpcServer)
// Makes it easier to use command line tools to interact
// with the gRPC API.
// https://github.com/grpc/grpc/blob/master/doc/server-reflection.md
reflection.Register(grpcServer)

healthService := api.NewHealthChecker()
grpc_health_v1.RegisterHealthServer(grpcServer, healthService)
Expand Down Expand Up @@ -436,14 +441,18 @@ func (r *Runtime) serveHTTPAPI(
return nil, cerrors.Errorf("failed to register metrics handler: %w", err)
}

handler := grpcutil.WithWebsockets(
grpcutil.WithDefaultGatewayMiddleware(
r.logger, allowCORS(gwmux, "http://localhost:4200"),
),
)

return r.serveHTTP(
ctx,
t,
&http.Server{
Addr: r.Config.HTTP.Address,
Handler: grpcutil.WithDefaultGatewayMiddleware(
r.logger, allowCORS(gwmux, "http://localhost:4200"),
),
Addr: r.Config.HTTP.Address,
Handler: handler,
ReadHeaderTimeout: 10 * time.Second,
},
)
Expand Down
4 changes: 4 additions & 0 deletions pkg/connector/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/inspector"
"github.com/conduitio/conduit/pkg/plugin"
)

const inspectorBufferSize = 1000

// Builder represents an object that can build a connector.
// The main use of this interface is to be able to switch out the connector
// implementations for mocks in tests.
Expand Down Expand Up @@ -99,6 +102,7 @@ func (b *DefaultBuilder) Init(c Connector, id string, config Config) error {
v.persister = b.persister
v.pluginDispenser = p
v.errs = make(chan error)
v.inspector = inspector.New(v.logger, inspectorBufferSize)
default:
return ErrInvalidConnectorType
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"time"

"github.com/conduitio/conduit/pkg/inspector"
"github.com/conduitio/conduit/pkg/record"
)

Expand Down Expand Up @@ -65,6 +66,10 @@ type Connector interface {
// asynchronously (e.g. persisting state).
Errors() <-chan error

// Inspect returns an inspector.Session which exposes the records
// coming into or out of this connector (depending on the connector type).
Inspect(context.Context) *inspector.Session

// Open will start the plugin process and call the Open method on the
// plugin. After the connector has been successfully opened it is considered
// as running (IsRunning returns true) and can be stopped again with
Expand Down
8 changes: 8 additions & 0 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/inspector"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/record"
)
Expand Down Expand Up @@ -57,6 +58,8 @@ type destination struct {
// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

inspector *inspector.Inspector

// m can lock a destination from concurrent access (e.g. in connector persister).
m sync.Mutex
// wg tracks the number of in flight calls to the plugin.
Expand Down Expand Up @@ -115,6 +118,10 @@ func (d *destination) Errors() <-chan error {
return d.errs
}

func (d *destination) Inspect(ctx context.Context) *inspector.Session {
return d.inspector.NewSession(ctx)
}

func (d *destination) Validate(ctx context.Context, settings map[string]string) (err error) {
dest, err := d.pluginDispenser.DispenseDestination()
if err != nil {
Expand Down Expand Up @@ -227,6 +234,7 @@ func (d *destination) Write(ctx context.Context, r record.Record) error {
return err
}

d.inspector.Send(ctx, r)
err = d.plugin.Write(ctx, r)
if err != nil {
return cerrors.Errorf("error writing record: %w", err)
Expand Down
29 changes: 29 additions & 0 deletions pkg/connector/mock/connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/inspector"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/record"
)
Expand Down Expand Up @@ -114,6 +115,11 @@ func (s *source) Errors() <-chan error {
return s.errs
}

func (s *source) Inspect(_ context.Context) *inspector.Session {
// TODO implement me
panic("implement me")
}

func (s *source) Validate(ctx context.Context, settings map[string]string) (err error) {
src, err := s.pluginDispenser.DispenseSource()
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/foundation/grpcutil/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/tmc/grpc-websocket-proxy/wsproxy"
"google.golang.org/protobuf/encoding/protojson"
)

Expand Down Expand Up @@ -109,6 +110,10 @@ func WithHTTPEndpointHeader(h http.Handler) http.Handler {
})
}

func WithWebsockets(h http.Handler) http.Handler {
return wsproxy.WebsocketProxy(h)
}

func extractEndpoint(r *http.Request) string {
return r.Method + " " + r.URL.Path
}
2 changes: 2 additions & 0 deletions pkg/foundation/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ const (
PluginTypeField = "plugin_type"
PluginNameField = "plugin_name"
PluginPathField = "plugin_path"

InspectorSessionID = "inspector_session_id"
)
Loading