Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(go-client): refactor admin APIs for meta by reflection #1929

Merged
merged 20 commits into from
Mar 8, 2024
2 changes: 1 addition & 1 deletion .github/workflows/lint_and_test_go-client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.55.2
version: v1.56.2
working-directory: ./go-client

build_server:
Expand Down
192 changes: 144 additions & 48 deletions go-client/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package admin
import (
"context"
"fmt"
"reflect"
"time"

"github.com/apache/incubator-pegasus/go-client/idl/admin"
Expand All @@ -33,51 +34,109 @@ import (
// Client provides the administration API to a specific cluster.
// Remember only the superusers configured to the cluster have the admin priviledges.
type Client interface {
CreateTable(ctx context.Context, tableName string, partitionCount int, successIfExist_optional ...bool) error

DropTable(ctx context.Context, tableName string) error

ListTables(ctx context.Context) ([]*TableInfo, error)
}

// TableInfo is the table information.
type TableInfo struct {
Name string

// Envs is a set of attributes binding to this table.
Envs map[string]string
Close() error

// The timeout specify the max duration that is spent on an client request. For
// example, if the client is based on RPC, it would be the timeout for the RPC
// request.
GetTimeout() time.Duration
SetTimeout(timeout time.Duration)

// `maxWaitSeconds` specify the number of seconds that is spent on waiting for
// the created table to be ready. This method would return error once the table
// is still not ready after `maxWaitSeconds`. The administrator should check if
// there is something wrong with the table.
CreateTable(tableName string, partitionCount int32, replicaCount int32, envs map[string]string, maxWaitSeconds int32, successIfExistOptional ...bool) (int32, error)
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved

// `reserveSeconds` specify the retention interval for a table before it is actually dropped.
DropTable(tableName string, reserveSeconds int64) error

// Empty `args` means "list all available tables"; Otherwise, the only parameter would
// specify the status of the returned tables.
ListTables(args ...interface{}) ([]*replication.AppInfo, error)
}

type Config struct {
MetaServers []string `json:"meta_servers"`
Timeout time.Duration
}

// NewClient returns an instance of Client.
func NewClient(cfg Config) Client {
return &rpcBasedClient{
metaManager: session.NewMetaManager(cfg.MetaServers, session.NewNodeSession),
meta: session.NewMetaManager(cfg.MetaServers, session.NewNodeSession),
rpcTimeout: cfg.Timeout,
}
}

type rpcBasedClient struct {
metaManager *session.MetaManager
meta *session.MetaManager
rpcTimeout time.Duration
}

func (c *rpcBasedClient) Close() error {
return c.meta.Close()
}

func (c *rpcBasedClient) GetTimeout() time.Duration {
return c.rpcTimeout
}

func (c *rpcBasedClient) waitTableReady(ctx context.Context, tableName string, partitionCount int) error {
const replicaCount int = 3
func (c *rpcBasedClient) SetTimeout(timeout time.Duration) {
c.rpcTimeout = timeout
}

// Call RPC methods(go-client/session/admin_rpc_types.go) of session.MetaManager by reflection.
// `req` and `resp` are the request and response structs of RPC. `callback` always accepts
// non-nil `resp`.
func (c *rpcBasedClient) callMeta(methodName string, req interface{}, callback func(resp interface{})) error {
ctx, cancel := context.WithTimeout(context.Background(), c.rpcTimeout)
defer cancel()

// There are 2 kinds of structs for the result which could be processed:
// * error
// * (response, error)
result := reflect.ValueOf(c.meta).MethodByName(methodName).Call([]reflect.Value{
reflect.ValueOf(ctx),
reflect.ValueOf(req),
})

// The last element must be error.
ierr := result[len(result)-1].Interface()

var err error
if ierr != nil {
err = ierr.(error)
}

if len(result) == 1 {
return err
}

// The struct of result must be (response, error).
if !result[0].IsNil() {
callback(result[0].Interface())
}

return err
}

for {
resp, err := c.metaManager.QueryConfig(ctx, tableName)
func (c *rpcBasedClient) waitTableReady(tableName string, partitionCount int32, replicaCount int32, maxWaitSeconds int32) error {
for ; maxWaitSeconds > 0; maxWaitSeconds-- {
var resp *replication.QueryCfgResponse
err := c.callMeta("QueryConfig", tableName, func(iresp interface{}) {
resp = iresp.(*replication.QueryCfgResponse)
})
if err != nil {
return err
}
if resp.GetErr().Errno != base.ERR_OK.String() {
return fmt.Errorf("QueryConfig failed: %s", resp.GetErr().String())
return fmt.Errorf("QueryConfig failed: %s", base.GetResponseError(resp))
}

readyCount := 0
readyCount := int32(0)
for _, part := range resp.Partitions {
if part.Primary.GetRawAddress() != 0 && len(part.Secondaries)+1 == replicaCount {
if part.Primary.GetRawAddress() != 0 && int32(len(part.Secondaries)+1) == replicaCount {
readyCount++
}
}
Expand All @@ -86,55 +145,92 @@ func (c *rpcBasedClient) waitTableReady(ctx context.Context, tableName string, p
}
time.Sleep(time.Second)
}

if maxWaitSeconds <= 0 {
return fmt.Errorf("After %d seconds, table '%s' is still not ready", maxWaitSeconds, tableName)
}

return nil
}

func (c *rpcBasedClient) CreateTable(ctx context.Context, tableName string, partitionCount int, successIfExist_optional ...bool) error {
func (c *rpcBasedClient) CreateTable(tableName string, partitionCount int32, replicaCount int32, envs map[string]string, maxWaitSeconds int32, successIfExistOptional ...bool) (int32, error) {
successIfExist := true
if len(successIfExist_optional) > 0 {
successIfExist = successIfExist_optional[0]
if len(successIfExistOptional) > 0 {
successIfExist = successIfExistOptional[0]
}
_, err := c.metaManager.CreateApp(ctx, &admin.ConfigurationCreateAppRequest{

req := &admin.ConfigurationCreateAppRequest{
AppName: tableName,
Options: &admin.CreateAppOptions{
PartitionCount: int32(partitionCount),
ReplicaCount: 3,
ReplicaCount: replicaCount,
SuccessIfExist: successIfExist,
AppType: "pegasus",
Envs: make(map[string]string),
IsStateful: true,
Envs: envs,
}}

var appID int32
var respErr error
err := c.callMeta("CreateApp", req, func(iresp interface{}) {
resp := iresp.(*admin.ConfigurationCreateAppResponse)
appID = resp.Appid
respErr = base.GetResponseError(resp)
})
if err != nil {
return appID, err
}

err = c.waitTableReady(tableName, partitionCount, replicaCount, maxWaitSeconds)
if err != nil {
return appID, err
}

return appID, respErr
}

func (c *rpcBasedClient) DropTable(tableName string, reserveSeconds int64) error {
req := &admin.ConfigurationDropAppRequest{
AppName: tableName,
Options: &admin.DropAppOptions{
SuccessIfNotExist: true,
ReserveSeconds: &reserveSeconds, // Optional for thrift
},
}

var respErr error
err := c.callMeta("DropApp", req, func(iresp interface{}) {
respErr = base.GetResponseError(iresp.(*admin.ConfigurationDropAppResponse))
})
if err != nil {
return err
}
err = c.waitTableReady(ctx, tableName, partitionCount)
return err

return respErr
}

func (c *rpcBasedClient) DropTable(ctx context.Context, tableName string) error {
req := admin.NewConfigurationDropAppRequest()
req.AppName = tableName
reserveSeconds := int64(1) // delete immediately. the caller is responsible for the soft deletion of table.
req.Options = &admin.DropAppOptions{
SuccessIfNotExist: true,
ReserveSeconds: &reserveSeconds,
func (c *rpcBasedClient) listTables(status replication.AppStatus) ([]*replication.AppInfo, error) {
req := &admin.ConfigurationListAppsRequest{
Status: status,
}
_, err := c.metaManager.DropApp(ctx, req)
return err
}

func (c *rpcBasedClient) ListTables(ctx context.Context) ([]*TableInfo, error) {
resp, err := c.metaManager.ListApps(ctx, &admin.ConfigurationListAppsRequest{
Status: replication.AppStatus_AS_AVAILABLE,
var tables []*replication.AppInfo
var respErr error
err := c.callMeta("ListApps", req, func(iresp interface{}) {
resp := iresp.(*admin.ConfigurationListAppsResponse)
tables = resp.Infos
respErr = base.GetResponseError(resp)
})
if err != nil {
return nil, err
return tables, err
}

var results []*TableInfo
for _, app := range resp.Infos {
results = append(results, &TableInfo{Name: app.AppName, Envs: app.Envs})
return tables, respErr
}

func (c *rpcBasedClient) ListTables(args ...interface{}) ([]*replication.AppInfo, error) {
if len(args) == 0 {
return c.listTables(replication.AppStatus_AS_AVAILABLE)
}
return results, nil
return c.listTables(args[0].(replication.AppStatus))
}
57 changes: 32 additions & 25 deletions go-client/admin/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,66 +25,72 @@ import (
"testing"
"time"

"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegasus"
"github.com/stretchr/testify/assert"
)

func TestAdmin_Table(t *testing.T) {
c := NewClient(Config{
const (
replicaCount = 3
maxWaitSeconds = 600
reserveSeconds = 1
)

func defaultConfig() Config {
return Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
})
Timeout: 30 * time.Second,
}
}

func TestAdmin_Table(t *testing.T) {
c := NewClient(defaultConfig())

hasTable := func(tables []*TableInfo, tableName string) bool {
hasTable := func(tables []*replication.AppInfo, tableName string) bool {
for _, tb := range tables {
if tb.Name == tableName {
if tb.AppName == tableName {
return true
}
}
return false
}

err := c.DropTable(context.Background(), "admin_table_test")
err := c.DropTable("admin_table_test", reserveSeconds)
assert.Nil(t, err)

// no such table after deletion
tables, err := c.ListTables(context.Background())
tables, err := c.ListTables()
assert.Nil(t, err)
assert.False(t, hasTable(tables, "admin_table_test"))

err = c.CreateTable(context.Background(), "admin_table_test", 16)
_, err = c.CreateTable("admin_table_test", 16, replicaCount, make(map[string]string), maxWaitSeconds)
assert.Nil(t, err)

tables, err = c.ListTables(context.Background())
tables, err = c.ListTables()
assert.Nil(t, err)
assert.True(t, hasTable(tables, "admin_table_test"))

err = c.DropTable(context.Background(), "admin_table_test")
err = c.DropTable("admin_table_test", reserveSeconds)
assert.Nil(t, err)
}

func TestAdmin_ListTablesTimeout(t *testing.T) {
c := NewClient(Config{
MetaServers: []string{"0.0.0.0:123456"},
Timeout: 500 * time.Millisecond,
})

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
_, err := c.ListTables(ctx)
_, err := c.ListTables()
assert.Equal(t, err, context.DeadlineExceeded)
}

// Ensures after the call `CreateTable` ends, the table must be right available to access.
func TestAdmin_CreateTableMustAvailable(t *testing.T) {
const tableName = "admin_table_test"

c := NewClient(Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
})

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
c := NewClient(defaultConfig())

err := c.CreateTable(context.Background(), tableName, 8)
_, err := c.CreateTable(tableName, 8, replicaCount, make(map[string]string), maxWaitSeconds)
if !assert.NoError(t, err) {
assert.Fail(t, err.Error())
}
Expand Down Expand Up @@ -115,24 +121,25 @@ func TestAdmin_CreateTableMustAvailable(t *testing.T) {
}
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err = tb.Set(ctx, []byte("a"), []byte("a"), []byte("a"))
if !assert.NoError(t, err) {
assert.Fail(t, err.Error())
}

// cleanup
err = c.DropTable(context.Background(), tableName)
err = c.DropTable(tableName, reserveSeconds)
if !assert.NoError(t, err) {
assert.Fail(t, err.Error())
}
}

func TestAdmin_GetAppEnvs(t *testing.T) {
c := NewClient(Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
})
c := NewClient(defaultConfig())

tables, err := c.ListTables(context.Background())
tables, err := c.ListTables()
assert.Nil(t, err)
for _, tb := range tables {
assert.Empty(t, tb.Envs)
Expand Down
Loading
Loading