Skip to content

Commit

Permalink
Handle unknown instance ID during a config apply operation (#960)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley authored Jan 17, 2025
1 parent 406a927 commit 2b427d0
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 24 deletions.
114 changes: 98 additions & 16 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ type (
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
instances []*mpi.Instance
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
configApplyRequestQueueMutex sync.Mutex
instancesMutex sync.Mutex
}
)

Expand All @@ -61,6 +63,7 @@ func NewCommandService(
isConnected: isConnected,
subscribeChannel: subscribeChannel,
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
instances: []*mpi.Instance{},
}

var subscribeCtx context.Context
Expand Down Expand Up @@ -127,6 +130,10 @@ func (cs *CommandService) UpdateDataPlaneStatus(
}
slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response)

cs.instancesMutex.Lock()
defer cs.instancesMutex.Unlock()
cs.instances = resource.GetInstances()

return err
}

Expand Down Expand Up @@ -252,6 +259,10 @@ func (cs *CommandService) CreateConnection(

cs.isConnected.Store(true)

cs.instancesMutex.Lock()
defer cs.instancesMutex.Unlock()
cs.instances = resource.GetInstances()

return response, nil
}

Expand Down Expand Up @@ -416,30 +427,101 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
return recvError
}

switch request.GetRequest().(type) {
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
cs.configApplyRequestQueueMutex.Lock()
defer cs.configApplyRequestQueueMutex.Unlock()

instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request)
if len(cs.configApplyRequestQueue[instanceID]) == 1 {
if cs.isValidRequest(ctx, request) {
switch request.GetRequest().(type) {
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
cs.queueConfigApplyRequests(ctx, request)
default:
cs.subscribeChannel <- request
} else {
slog.DebugContext(
ctx,
"Config apply request is already in progress, queuing new config apply request",
"request", request,
)
}
default:
cs.subscribeChannel <- request
}

return nil
}
}

func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request *mpi.ManagementPlaneRequest) {
cs.configApplyRequestQueueMutex.Lock()
defer cs.configApplyRequestQueueMutex.Unlock()

instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request)
if len(cs.configApplyRequestQueue[instanceID]) == 1 {
cs.subscribeChannel <- request
} else {
slog.DebugContext(
ctx,
"Config apply request is already in progress, queuing new config apply request",
"request", request,
)
}
}

func (cs *CommandService) isValidRequest(ctx context.Context, request *mpi.ManagementPlaneRequest) bool {
var validRequest bool

switch request.GetRequest().(type) {
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
requestInstanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
case *mpi.ManagementPlaneRequest_ConfigUploadRequest:
requestInstanceID := request.GetConfigUploadRequest().GetOverview().GetConfigVersion().GetInstanceId()
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
case *mpi.ManagementPlaneRequest_ActionRequest:
requestInstanceID := request.GetActionRequest().GetInstanceId()
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
default:
validRequest = true
}

return validRequest
}

func (cs *CommandService) checkIfInstanceExists(
ctx context.Context,
request *mpi.ManagementPlaneRequest,
requestInstanceID string,
) bool {
instanceFound := false

cs.instancesMutex.Lock()
for _, instance := range cs.instances {
if instance.GetInstanceMeta().GetInstanceId() == requestInstanceID {
instanceFound = true
}
}
cs.instancesMutex.Unlock()

if !instanceFound {
slog.WarnContext(
ctx,
"Unable to handle request, instance not found",
"instance", requestInstanceID,
"request", request,
)

response := &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: proto.GenerateMessageID(),
CorrelationId: request.GetMessageMeta().GetCorrelationId(),
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
Message: "Unable to handle request",
Error: "Instance ID not found",
},
InstanceId: requestInstanceID,
}
err := cs.SendDataPlaneResponse(ctx, response)
if err != nil {
slog.ErrorContext(ctx, "Failed to send data plane response", "error", err)
}
}

return instanceFound
}

// Retry callback for establishing the connection between the Management Plane and the Agent.
func (cs *CommandService) connectCallback(
ctx context.Context,
Expand Down
149 changes: 147 additions & 2 deletions internal/command/command_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (*FakeConfigApplySubscribeClient) Send(*mpi.DataPlaneResponse) error {

// nolint: nilnil
func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) {
protos.CreateManagementPlaneRequest()
nginxInstance := protos.GetNginxOssInstance([]string{})

return &mpi.ManagementPlaneRequest{
MessageMeta: &mpi.MessageMeta{
MessageId: "1",
Expand All @@ -67,7 +68,7 @@ func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, erro
ConfigApplyRequest: &mpi.ConfigApplyRequest{
Overview: &mpi.FileOverview{
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "12314",
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
Version: "4215432",
},
},
Expand Down Expand Up @@ -113,6 +114,11 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
subscribeChannel,
)

nginxInstance := protos.GetNginxOssInstance([]string{})
commandService.instancesMutex.Lock()
commandService.instances = append(commandService.instances, nginxInstance)
commandService.instancesMutex.Unlock()

defer commandService.CancelSubscription(ctx)

var wg sync.WaitGroup
Expand Down Expand Up @@ -389,3 +395,142 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
assert.Equal(t, request3, commandService.configApplyRequestQueue["12314"][0])
wg.Wait()
}

func TestCommandService_isValidRequest(t *testing.T) {
ctx := context.Background()
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
subscribeClient := &FakeSubscribeClient{}

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
)

commandService.subscribeClientMutex.Lock()
commandService.subscribeClient = subscribeClient
commandService.subscribeClientMutex.Unlock()

nginxInstance := protos.GetNginxOssInstance([]string{})

commandService.instancesMutex.Lock()
commandService.instances = append(commandService.instances, nginxInstance)
commandService.instancesMutex.Unlock()

testCases := []struct {
req *mpi.ManagementPlaneRequest
name string
result bool
}{
{
name: "Test 1: valid health request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_HealthRequest{HealthRequest: &mpi.HealthRequest{}},
},
result: true,
},
{
name: "Test 2: valid config apply request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: protos.CreateConfigApplyRequest(&mpi.FileOverview{
Files: make([]*mpi.File, 0),
ConfigVersion: &mpi.ConfigVersion{
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
Version: "e23brbei3u2bru93",
},
}),
},
},
result: true,
},
{
name: "Test 3: invalid config apply request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: protos.CreateConfigApplyRequest(&mpi.FileOverview{
Files: make([]*mpi.File, 0),
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "unknown-id",
Version: "e23brbei3u2bru93",
},
}),
},
},
result: false,
},
{
name: "Test 4: valid config upload request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
ConfigUploadRequest: &mpi.ConfigUploadRequest{
Overview: &mpi.FileOverview{
Files: make([]*mpi.File, 0),
ConfigVersion: &mpi.ConfigVersion{
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
Version: "e23brbei3u2bru93",
},
},
},
},
},
result: true,
},
{
name: "Test 5: invalid config upload request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
ConfigUploadRequest: &mpi.ConfigUploadRequest{
Overview: &mpi.FileOverview{
Files: make([]*mpi.File, 0),
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "unknown-id",
Version: "e23brbei3u2bru93",
},
},
},
},
},
result: false,
},
{
name: "Test 6: valid action request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ActionRequest{
ActionRequest: &mpi.APIActionRequest{
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
Action: nil,
},
},
},
result: true,
},
{
name: "Test 7: invalid action request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ActionRequest{
ActionRequest: &mpi.APIActionRequest{
InstanceId: "unknown-id",
Action: nil,
},
},
},
result: false,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
result := commandService.isValidRequest(ctx, testCase.req)
assert.Equal(t, testCase.result, result)
})
}
}
4 changes: 4 additions & 0 deletions internal/resource/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) er
var instance *mpi.Instance
operator := r.instanceOperators[instanceID]

if operator == nil {
return fmt.Errorf("instance %s not found", instanceID)
}

for _, resourceInstance := range r.resource.GetInstances() {
if resourceInstance.GetInstanceMeta().GetInstanceId() == instanceID {
instance = resourceInstance
Expand Down
14 changes: 12 additions & 2 deletions internal/resource/resource_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,40 @@ func TestResourceService_ApplyConfig(t *testing.T) {
ctx := context.Background()

tests := []struct {
instanceID string
reloadErr error
validateErr error
expected error
name string
}{
{
name: "Test 1: Successful reload",
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
reloadErr: nil,
validateErr: nil,
expected: nil,
},
{
name: "Test 2: Failed reload",
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
reloadErr: fmt.Errorf("something went wrong"),
validateErr: nil,
expected: fmt.Errorf("failed to reload NGINX %w", fmt.Errorf("something went wrong")),
},
{
name: "Test 3: Failed validate",
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
reloadErr: nil,
validateErr: fmt.Errorf("something went wrong"),
expected: fmt.Errorf("failed validating config %w", fmt.Errorf("something went wrong")),
},
{
name: "Test 4: Unknown instance ID",
instanceID: "unknown",
reloadErr: nil,
validateErr: nil,
expected: fmt.Errorf("instance unknown not found"),
},
}

for _, test := range tests {
Expand All @@ -283,8 +294,7 @@ func TestResourceService_ApplyConfig(t *testing.T) {
}
resourceService.resource.Instances = instances

reloadError := resourceService.ApplyConfig(ctx,
protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId())
reloadError := resourceService.ApplyConfig(ctx, test.instanceID)
assert.Equal(t, test.expected, reloadError)
})
}
Expand Down
Loading

0 comments on commit 2b427d0

Please sign in to comment.