Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 292 additions & 37 deletions .gen/proto/sharddistributor/v1/service.pb.go

Large diffs are not rendered by default.

65 changes: 34 additions & 31 deletions .gen/proto/sharddistributor/v1/service.pb.yarpc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions common/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,22 @@ func (c *Collection) GetStringPropertyFilteredByNamespace(key dynamicproperties.
}
}

// GetIntPropertyFilteredByNamespace gets property with domain filter and asserts that it's a string
func (c *Collection) GetIntPropertyFilteredByNamespace(key dynamicproperties.IntKey) dynamicproperties.IntPropertyFnWithNamespaceFilters {
return func(namespace string) int {
filters := c.toFilterMap(dynamicproperties.NamespaceFilter(namespace))
val, err := c.client.GetIntValue(
key,
filters,
)
if err != nil {
c.logError(key, filters, err)
return key.DefaultInt()
}
return val
}
}

// GetBoolPropertyFilteredByDomain gets property with domain filter and asserts that it's a bool
func (c *Collection) GetBoolPropertyFilteredByDomain(key dynamicproperties.BoolKey) dynamicproperties.BoolPropertyFnWithDomainFilter {
return func(domain string) bool {
Expand Down
13 changes: 13 additions & 0 deletions common/dynamicconfig/dynamicproperties/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,13 @@ const (

QueueMaxVirtualQueueCount

// ShardDistributorMaxEphemeralShards is the maximum number of ephemeral shards a shard distributor can create
// KeyName: shardDistributor.maxEphemeralShards
// Value type: Int
// Default value: 8192
// Allowed filters: namespace
ShardDistributorMaxEphemeralShards

// LastIntKey must be the last one in this const group
LastIntKey
)
Expand Down Expand Up @@ -4119,6 +4126,12 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "QueueMaxVirtualQueueCount is the max number of virtual queues",
DefaultValue: 2,
},
ShardDistributorMaxEphemeralShards: {
KeyName: "shardDistributor.maxEphemeralShards",
Description: "ShardDistributorMaxEphemeralShards is the maximum number of ephemeral shards a shard distributor can create",
DefaultValue: 8192,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this value need to map to anything like history shards?

Filters: []Filter{Namespace},
},
}

var BoolKeys = map[BoolKey]DynamicBool{
Expand Down
3 changes: 3 additions & 0 deletions common/dynamicconfig/dynamicproperties/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type StringPropertyFnWithTaskListInfoFilters func(domain string, taskList string
// StringPropertyFnWithNamespaceFilters is a wrapper to get string property from dynamic config with namespace as filter
type StringPropertyFnWithNamespaceFilters func(namespace string) string

// IntPropertyFnWithNamespaceFilters is a wrapper to get int property from dynamic config with namespace as filter
type IntPropertyFnWithNamespaceFilters func(namespace string) int

// BoolPropertyFnWithDomainFilter is a wrapper to get bool property from dynamic config with domain as filter
type BoolPropertyFnWithDomainFilter func(domain string) bool

Expand Down
10 changes: 10 additions & 0 deletions common/types/mapper/proto/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func FromError(err error) error {
return typedErr
} else if ok, typedErr = errorutils.ConvertError(err, fromShardNotFoundErr); ok {
return typedErr
} else if ok, typedErr = errorutils.ConvertError(err, fromEphemeralShardLimitExceededErr); ok {
return typedErr
}

return protobuf.NewError(yarpcerrors.CodeUnknown, err.Error())
Expand Down Expand Up @@ -417,3 +419,11 @@ func fromShardNotFoundErr(e *types.ShardNotFoundError) error {
ShardKey: e.ShardKey,
}))
}

func fromEphemeralShardLimitExceededErr(e *types.EphemeralShardLimitExceededError) error {
return protobuf.NewError(yarpcerrors.CodeResourceExhausted, e.Error(), protobuf.WithErrorDetails(&sharddistributorv1.EphemeralShardLimitExceededError{
Namespace: e.Namespace,
MaxLimit: int64(e.MaxLimit),
CurrentValue: int64(e.CurrentValue),
}))
}
13 changes: 13 additions & 0 deletions common/types/sharddistributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ func (n *ShardNotFoundError) Error() (o string) {
return
}

type EphemeralShardLimitExceededError struct {
Namespace string
MaxLimit int
CurrentValue int
}

func (n *EphemeralShardLimitExceededError) Error() (o string) {
if n != nil {
return fmt.Sprintf("ephemeral shard limit exceeded for namespace %v: current %v, max %v", n.Namespace, n.CurrentValue, n.MaxLimit)
}
return
}

type ExecutorHeartbeatRequest struct {
Namespace string
ExecutorID string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ message ShardNotFoundError {
string shard_key = 2;
}

message EphemeralShardLimitExceededError {
string namespace = 1;
int64 max_limit = 2;
int64 current_value = 3;
}

message WatchNamespaceStateRequest {
string namespace = 1;
}
Expand Down
10 changes: 6 additions & 4 deletions service/sharddistributor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
type (
// Config represents configuration for shard manager service
Config struct {
LoadBalancingMode dynamicproperties.StringPropertyFnWithNamespaceFilters
MigrationMode dynamicproperties.StringPropertyFnWithNamespaceFilters
LoadBalancingMode dynamicproperties.StringPropertyFnWithNamespaceFilters
MigrationMode dynamicproperties.StringPropertyFnWithNamespaceFilters
MaxEphemeralShards dynamicproperties.IntPropertyFnWithNamespaceFilters
}

StaticConfig struct {
Expand Down Expand Up @@ -119,8 +120,9 @@ var MigrationMode = map[string]types.MigrationMode{
// NewConfig returns a new instance of Config
func NewConfig(dc *dynamicconfig.Collection) *Config {
return &Config{
LoadBalancingMode: dc.GetStringPropertyFilteredByNamespace(dynamicproperties.ShardDistributorLoadBalancingMode),
MigrationMode: dc.GetStringPropertyFilteredByNamespace(dynamicproperties.ShardDistributorMigrationMode),
LoadBalancingMode: dc.GetStringPropertyFilteredByNamespace(dynamicproperties.ShardDistributorLoadBalancingMode),
MigrationMode: dc.GetStringPropertyFilteredByNamespace(dynamicproperties.ShardDistributorMigrationMode),
MaxEphemeralShards: dc.GetIntPropertyFilteredByNamespace(dynamicproperties.ShardDistributorMaxEphemeralShards),
}
}

Expand Down
17 changes: 17 additions & 0 deletions service/sharddistributor/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ import (
func NewHandler(
logger log.Logger,
shardDistributionCfg config.ShardDistribution,
cfg *config.Config,
storage store.Store,
) Handler {
handler := &handlerImpl{
logger: logger,
shardDistributionCfg: shardDistributionCfg,
cfg: cfg,
storage: storage,
}

Expand All @@ -59,6 +61,7 @@ type handlerImpl struct {

storage store.Store
shardDistributionCfg config.ShardDistribution
cfg *config.Config
}

func (h *handlerImpl) Start() {
Expand Down Expand Up @@ -121,12 +124,26 @@ func (h *handlerImpl) assignEphemeralShard(ctx context.Context, namespace string

var executorID string
minAssignedShards := math.MaxInt
totalAssignedShards := 0

for assignedExecutor, assignment := range state.ShardAssignments {
if len(assignment.AssignedShards) < minAssignedShards {
minAssignedShards = len(assignment.AssignedShards)
executorID = assignedExecutor
}
totalAssignedShards += len(assignment.AssignedShards)
}

// check against max ephemeral shards limit
maxEphemeralShardsLimit := h.cfg.MaxEphemeralShards(namespace)

// we should consider the new shard to be assigned here
if totalAssignedShards+1 > maxEphemeralShardsLimit {
return nil, &types.EphemeralShardLimitExceededError{
Namespace: namespace,
MaxLimit: maxEphemeralShardsLimit,
CurrentValue: totalAssignedShards,
}
}

// Assign the shard to the executor with the least assigned shards
Expand Down
48 changes: 45 additions & 3 deletions service/sharddistributor/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
Expand Down Expand Up @@ -59,9 +61,11 @@ func TestGetShardOwner(t *testing.T) {
}

tests := []struct {
name string
request *types.GetShardOwnerRequest
setupMocks func(mockStore *store.MockStore)
name string
request *types.GetShardOwnerRequest
setupMocks func(mockStore *store.MockStore)
setupDynamicConfig func(t *testing.T, client dynamicconfig.Client)

expectedOwner string
expectedError bool
expectedErrMsg string
Expand Down Expand Up @@ -163,6 +167,38 @@ func TestGetShardOwner(t *testing.T) {
expectedOwner: "owner2",
expectedError: false,
},
{
name: "ShardNotFound_Ephemeral_LimitExceeded",
request: &types.GetShardOwnerRequest{
Namespace: _testNamespaceEphemeral,
ShardKey: "NON-EXISTING-SHARD",
},
setupMocks: func(mockStore *store.MockStore) {
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespaceEphemeral, "NON-EXISTING-SHARD").Return(nil, store.ErrShardNotFound)
mockStore.EXPECT().GetState(gomock.Any(), _testNamespaceEphemeral).Return(&store.NamespaceState{
ShardAssignments: map[string]store.AssignedState{
"owner1": {
AssignedShards: map[string]*types.ShardAssignment{
"shard1": {Status: types.AssignmentStatusREADY},
"shard2": {Status: types.AssignmentStatusREADY},
"shard3": {Status: types.AssignmentStatusREADY},
},
},
"owner2": {
AssignedShards: map[string]*types.ShardAssignment{
"shard4": {Status: types.AssignmentStatusREADY},
},
},
},
}, nil)
},
// set max ephemeral shards to 4 to cause an limit exceeded error
setupDynamicConfig: func(t *testing.T, client dynamicconfig.Client) {
client.UpdateValue(dynamicproperties.ShardDistributorMaxEphemeralShards, 4)
},
expectedError: true,
expectedErrMsg: "ephemeral shard limit exceeded for namespace test-ephemeral: current 4, max 4",
},
{
name: "ShardNotFound_Ephemeral_GetStateFailure",
request: &types.GetShardOwnerRequest{
Expand Down Expand Up @@ -199,15 +235,21 @@ func TestGetShardOwner(t *testing.T) {

logger := testlogger.New(t)
mockStorage := store.NewMockStore(ctrl)
dynamicClient := dynamicconfig.NewInMemoryClient()

handler := &handlerImpl{
logger: logger,
shardDistributionCfg: cfg,
storage: mockStorage,
cfg: config.NewConfig(dynamicconfig.NewCollection(dynamicClient, testlogger.New(t))),
}
if tt.setupMocks != nil {
tt.setupMocks(mockStorage)
}
if tt.setupDynamicConfig != nil {
tt.setupDynamicConfig(t, dynamicClient)
}

resp, err := handler.GetShardOwner(context.Background(), tt.request)
if tt.expectedError {
require.Error(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ type registerHandlersParams struct {

func registerHandlers(params registerHandlersParams) error {
dispatcher := params.RPCFactory.GetDispatcher()
cfg := config.NewConfig(params.DynamicCollection)

rawHandler := handler.NewHandler(params.Logger, params.ShardDistributionCfg, params.Store)
rawHandler := handler.NewHandler(params.Logger, params.ShardDistributionCfg, cfg, params.Store)
wrappedHandler := metered.NewMetricsHandler(rawHandler, params.Logger, params.MetricsClient)

config := config.NewConfig(params.DynamicCollection)
executorHandler := handler.NewExecutorHandler(params.Logger, params.Store, params.TimeSource, params.ShardDistributionCfg, config, params.MetricsClient)
executorHandler := handler.NewExecutorHandler(params.Logger, params.Store, params.TimeSource, params.ShardDistributionCfg, cfg, params.MetricsClient)
wrappedExecutor := metered.NewExecutorMetricsExecutor(executorHandler, params.Logger, params.MetricsClient)

grpcHandler := grpc.NewGRPCHandler(wrappedHandler)
Expand Down
Loading