Skip to content

Commit

Permalink
grpc plugin support (#91)
Browse files Browse the repository at this point in the history
* grpc filter plugin support
  • Loading branch information
ming535 authored Feb 26, 2019
1 parent 4c03a5b commit d3382da
Show file tree
Hide file tree
Showing 249 changed files with 36,687 additions and 13,850 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ default.etcd

gravity_mongo.meta.toml

private_plugins
private_plugins
grpc-sidecar
46 changes: 43 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,15 @@
[[constraint]]
branch = "master"
name = "github.com/pingcap/tidb"

[[constraint]]
branch = "master"
name = "github.com/hashicorp/go-plugin"

[[constraint]]
name = "github.com/gogo/protobuf"
version = "0.5.0"

[[constraint]]
name = "github.com/golang/protobuf"
version = "1.2.0"
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ LDFLAGS += -X "$(GRAVITY_PKG)/pkg/utils.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M
LDFLAGS += -X "$(GRAVITY_PKG)/pkg/utils.GitHash=$(shell git rev-parse HEAD)"
LDFLAGS += -X "$(GRAVITY_PKG)/pkg/utils.GitBranch=$(shell git rev-parse --abbrev-ref HEAD)"


GO := go
GOBUILD := $(GO) build
GOTEST := $(GO) test
Expand Down Expand Up @@ -72,8 +73,15 @@ lint:
proto:
@ which protoc >/dev/null || brew install protobuf
@ which protoc-gen-gofast >/dev/null || go get github.com/gogo/protobuf/protoc-gen-gofast
protoc --gofast_out=Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types:./pkg protocol/msgpb/message.proto
protoc --gofast_out=plugins=grpc:./pkg protocol/dcp/message.proto

protoc -I=protocol/msgpb -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf --gofast_out=\
plugins=grpc,\
Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types:./pkg/protocol/msgpb \
protocol/msgpb/message.proto


mock:
mockgen -destination ./mock/binlog_checker/mock.go github.com/moiot/gravity/pkg/inputs/helper/binlog_checker BinlogChecker
Expand Down
7 changes: 6 additions & 1 deletion cmd/gravity/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"syscall"

"github.com/fsnotify/fsnotify"
jsoniter "github.com/json-iterator/go"
"github.com/json-iterator/go"
"github.com/juju/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"

hplugin "github.com/hashicorp/go-plugin"
"github.com/moiot/gravity/pkg/app"
"github.com/moiot/gravity/pkg/config"
"github.com/moiot/gravity/pkg/core"
Expand Down Expand Up @@ -65,6 +66,10 @@ func main() {

logutil.PipelineName = cfg.PipelineConfig.PipelineName

log.RegisterExitHandler(func() {
hplugin.CleanupClients()
})

server, err := app.NewServer(cfg.PipelineConfig)
if err != nil {
log.Fatal(errors.ErrorStack(err))
Expand Down
19 changes: 18 additions & 1 deletion docs/2.0/05-filters-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Currently, DRC supports the following Filter plugins:
- `accept`: A whitelist filter that only accept specific messages.
- `delete-dml-column`: Deletes specific columns in the source DML messages.
- `rename-dml-column`: Renames specific columns in the source DML messages.
- `grpc-sidecar`: Filters that deployed as a grpc server that can do anything you like.

## `reject` configuration

Expand Down Expand Up @@ -81,4 +82,20 @@ from = ["a", "b"]
to = ["c", "d"]
```

For the above configuration, the "a" column is renamed to "c" and the "b" column is renamed to "d".
## `grpc-sidecar`

The `grpc-sidecar` Filter will download the binary you specified and run the binary. This binary
should start a GRPC server that talks with filter plugin's protocol. An example of the filter
implemented in Golang can be found [here](https://github.com/moiot/gravity-grpc-sidecar-filter-example)


The protocol of the GRPC plugin can be found [here](https://github.com/moiot/gravity/blob/master/protocol/msgpb/message.proto)
```toml
[[filters]]
type = "grpc-sidecar"
[filters.config]
match-schema = "test"
match-table = "test_table"
binary-url = "binary url that stores the binary"
name = "unique name of this plugin"
```
15 changes: 15 additions & 0 deletions docs/2.0/05-filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,18 @@ from = ["a", "b"]
to = ["c", "d"]
```

## `grpc-sidecar`

`grpc-sidecar` Filter 会下载一个你指定的二进制文件并启动一个进程。你的这个程序需要实现一个 GRPC 的服务。一个 Golang 的例子在[这里](https://github.com/moiot/gravity-grpc-sidecar-filter-example)


GRPC 协议的定义在[这里](https://github.com/moiot/gravity/blob/master/protocol/msgpb/message.proto)
```toml
[[filters]]
type = "grp-sidecar"
[filters.config]
match-schema = "test"
match-table = "test_table"
binary-url = "binary url that stores the binary"
name = "unique name of this plugin"
```
3 changes: 3 additions & 0 deletions pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/juju/errors"
log "github.com/sirupsen/logrus"

rpcplugin "github.com/hashicorp/go-plugin"
"github.com/moiot/gravity/pkg/config"
"github.com/moiot/gravity/pkg/core"
"github.com/moiot/gravity/pkg/emitter"
Expand All @@ -29,6 +30,8 @@ type Server struct {
// add a lock here to prevent race condition
isClosed bool
sync.Mutex

rpcClient *rpcplugin.Client
}

func ParsePlugins(pipelineConfig config.PipelineConfigV3) (*Server, error) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/core/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ type Emitter interface {
// Emit use fs to modify messages and submit job to scheduler
// msg is the message to send
//
// TODO better interface for Emit, so that we don't need InputStreamKey...
//
Emit(msg *Msg) error
Close() error
}
80 changes: 80 additions & 0 deletions pkg/core/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"fmt"
"time"

"github.com/gogo/protobuf/types"
"github.com/juju/errors"
"github.com/moiot/gravity/pkg/protocol/msgpb"

"github.com/moiot/gravity/pkg/core"
)

Expand Down Expand Up @@ -36,3 +40,79 @@ func NewEncoder(input string, format string) Encoder {

panic(fmt.Sprintf("no serde find for input %s, format %s", input, format))
}

func EncodeMsgHeaderToPB(msg *core.Msg) (*msgpb.Msg, error) {
timestamp, err := types.TimestampProto(msg.Timestamp)
if err != nil {
return nil, errors.Trace(err)
}

pb := msgpb.Msg{
Database: msg.Database,
Table: msg.Table,
Timestamp: timestamp,
MsgType: string(msg.Type),
}

return &pb, nil
}

func DecodeMsgHeaderFromPB(pbmsg *msgpb.Msg) (*core.Msg, error) {
msg := core.Msg{
Database: pbmsg.Database,
Table: pbmsg.Table,
Type: core.MsgType(pbmsg.MsgType),
}
t, err := types.TimestampFromProto(pbmsg.Timestamp)
if err != nil {
return nil, errors.Trace(err)
}

msg.Timestamp = t
return &msg, nil
}

func EncodeMsgToPB(msg *core.Msg) (*msgpb.Msg, error) {

pb, err := EncodeMsgHeaderToPB(msg)
if err != nil {
return nil, errors.Trace(err)
}

// only supports dml message right now
if msg.DmlMsg == nil {
return nil, errors.Errorf("dml is nil")
}

data, err := DataMapToPB(msg.DmlMsg.Data)
if err != nil {
return nil, errors.Trace(err)
}

pb.DmlMsg = &msgpb.DMLMsg{
Op: string(msg.DmlMsg.Operation),
Data: data,
}

return pb, nil
}

func DecodeMsgFromPB(pbmsg *msgpb.Msg) (*core.Msg, error) {
msg, err := DecodeMsgHeaderFromPB(pbmsg)
if err != nil {
return nil, errors.Trace(err)
}

msg.DmlMsg = &core.DMLMsg{
Operation: core.DMLOp(pbmsg.DmlMsg.Op),
Data: make(map[string]interface{}),
}

data, err := PBToDataMap(pbmsg.DmlMsg.Data)
if err != nil {
return nil, errors.Trace(err)
}

msg.DmlMsg.Data = data
return msg, nil
}
Loading

0 comments on commit d3382da

Please sign in to comment.