Skip to content

Commit 7cfbd31

Browse files
cheating
chore(autofmt): Automated formatting
1 parent b86c4cd commit 7cfbd31

File tree

12 files changed

+199
-610
lines changed

12 files changed

+199
-610
lines changed

backend/protos/xyz/block/ftl/provisioner/v1beta1/plugin.pb.go

+106-276
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/protos/xyz/block/ftl/provisioner/v1beta1/plugin.proto

+4-19
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,16 @@ message ProvisionRequest {
1616
repeated string kinds = 5;
1717
}
1818

19-
enum ProvisionResponseStatus {
20-
PROVISION_RESPONSE_STATUS_UNSPECIFIED = 0;
21-
PROVISION_RESPONSE_STATUS_SUBMITTED = 1;
22-
}
23-
2419
message ProvisionResponse {
20+
enum ProvisionResponseStatus {
21+
PROVISION_RESPONSE_STATUS_UNSPECIFIED = 0;
22+
PROVISION_RESPONSE_STATUS_SUBMITTED = 1;
23+
}
2524

2625
string provisioning_token = 1;
2726
ProvisionResponseStatus status = 2;
2827
}
2928

30-
message DeProvisionRequest {
31-
string ftl_cluster_id = 1;
32-
xyz.block.ftl.schema.v1.Module module = 2;
33-
optional xyz.block.ftl.schema.v1.Module replacement_module = 3;
34-
string changeset = 4;
35-
repeated string kinds = 5;
36-
}
37-
38-
message DeProvisionResponse {
39-
string provisioning_token = 1;
40-
ProvisionResponseStatus status = 2;
41-
}
42-
4329
message StatusRequest {
4430
string provisioning_token = 1;
4531
// The outputs of this module are updated if the the status is a success
@@ -67,6 +53,5 @@ service ProvisionerPluginService {
6753
rpc Ping(xyz.block.ftl.v1.PingRequest) returns (xyz.block.ftl.v1.PingResponse);
6854

6955
rpc Provision(ProvisionRequest) returns (ProvisionResponse);
70-
rpc DeProvision(DeProvisionRequest) returns (DeProvisionResponse);
7156
rpc Status(StatusRequest) returns (StatusResponse);
7257
}

backend/protos/xyz/block/ftl/provisioner/v1beta1/provisionerpbconnect/plugin.connect.go

+3-30
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/provisioner/inmem_provisioner.go

+17-90
Original file line numberDiff line numberDiff line change
@@ -57,89 +57,6 @@ type InMemProvisioner struct {
5757
removeHandlers map[schema.ResourceType]InMemResourceProvisionerFn
5858
}
5959

60-
func (d *InMemProvisioner) DeProvision(ctx context.Context, req *connect.Request[provisioner.DeProvisionRequest]) (*connect.Response[provisioner.DeProvisionResponse], error) {
61-
logger := log.FromContext(ctx)
62-
parsed, err := key.ParseChangesetKey(req.Msg.Changeset)
63-
if err != nil {
64-
err = fmt.Errorf("invalid changeset: %w", err)
65-
return nil, connect.NewError(connect.CodeInvalidArgument, err)
66-
}
67-
68-
var replacementModule *schema.Module
69-
if req.Msg.ReplacementModule != nil {
70-
pm, err := schema.ValidatedModuleFromProto(req.Msg.ReplacementModule)
71-
if err != nil {
72-
err = fmt.Errorf("invalid replacment module: %w", err)
73-
return nil, connect.NewError(connect.CodeInvalidArgument, err)
74-
}
75-
replacementModule = pm
76-
}
77-
removingModule, err := schema.ValidatedModuleFromProto(req.Msg.Module)
78-
if err != nil {
79-
err = fmt.Errorf("invalid removing module: %w", err)
80-
return nil, connect.NewError(connect.CodeInvalidArgument, err)
81-
}
82-
kinds := slices.Map(req.Msg.Kinds, func(k string) schema.ResourceType { return schema.ResourceType(k) })
83-
currentNodes := schema.GetProvisioned(replacementModule)
84-
removingNodes := schema.GetProvisioned(removingModule)
85-
86-
task := &inMemProvisioningTask{}
87-
// use chans to safely collect all events before completing each task
88-
completions := make(chan stepCompletedEvent, 16)
89-
90-
for id, toRemove := range removingNodes {
91-
inUse, ok := currentNodes[id]
92-
for _, resource := range toRemove.GetProvisioned() {
93-
if !ok || !resource.IsEqual(inUse.GetProvisioned().Get(resource.Kind)) {
94-
if slices.Contains(kinds, resource.Kind) {
95-
handler, ok := d.removeHandlers[resource.Kind]
96-
if !ok {
97-
err := fmt.Errorf("unsupported resource type: %s", resource.Kind)
98-
return nil, connect.NewError(connect.CodeInvalidArgument, err)
99-
}
100-
step := &inMemProvisioningStep{Done: atomic.New(false)}
101-
task.steps = append(task.steps, step)
102-
go func() {
103-
event, err := handler(ctx, parsed, removingModule.Runtime.Deployment.DeploymentKey, toRemove)
104-
if err != nil {
105-
step.Err = err
106-
logger.Errorf(err, "failed to de-provision resource %s:%s", resource.Kind, toRemove.ResourceID())
107-
completions <- stepCompletedEvent{step: step}
108-
return
109-
}
110-
completions <- stepCompletedEvent{
111-
step: step,
112-
event: optional.Ptr(event),
113-
}
114-
}()
115-
}
116-
}
117-
}
118-
}
119-
120-
go func() {
121-
for c := range channels.IterContext(ctx, completions) {
122-
if e, ok := c.event.Get(); ok {
123-
task.events = append(task.events, &e)
124-
}
125-
c.step.Done.Store(true)
126-
done, err := task.Done()
127-
if done || err != nil {
128-
return
129-
}
130-
}
131-
}()
132-
133-
token := uuid.New().String()
134-
logger.Debugf("started a task with token %s", token)
135-
d.running.Store(token, task)
136-
137-
return connect.NewResponse(&provisioner.DeProvisionResponse{
138-
ProvisioningToken: token,
139-
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
140-
}), nil
141-
}
142-
14360
func NewEmbeddedProvisioner(handlers map[schema.ResourceType]InMemResourceProvisionerFn, deProvisionHandlers map[schema.ResourceType]InMemResourceProvisionerFn) *InMemProvisioner {
14461
return &InMemProvisioner{
14562
running: xsync.NewMapOf[string, *inMemProvisioningTask](),
@@ -190,15 +107,25 @@ func (d *InMemProvisioner) Provision(ctx context.Context, req *connect.Request[p
190107
completions := make(chan stepCompletedEvent, 16)
191108

192109
for id, desired := range desiredNodes {
193-
previous, ok := previousNodes[id]
110+
previous, prevOk := previousNodes[id]
194111

195112
for _, resource := range desired.GetProvisioned() {
196-
if !ok || !resource.IsEqual(previous.GetProvisioned().Get(resource.Kind)) {
113+
if !prevOk || !resource.IsEqual(previous.GetProvisioned().Get(resource.Kind)) {
197114
if slices.Contains(kinds, resource.Kind) {
198-
handler, ok := d.handlers[resource.Kind]
199-
if !ok {
200-
// TODO: should a missing de-provisioner handler be an error?
201-
continue
115+
var handler InMemResourceProvisionerFn
116+
var ok bool
117+
if desiredModule.Runtime.Deployment.State == schema.DeploymentStateDeProvisioning {
118+
handler, ok = d.removeHandlers[resource.Kind]
119+
if !ok {
120+
// TODO: should a missing de-provisioner handler be an error?
121+
continue
122+
}
123+
} else {
124+
handler, ok = d.handlers[resource.Kind]
125+
if !ok {
126+
err := fmt.Errorf("unsupported resource type: %s", resource.Kind)
127+
return nil, connect.NewError(connect.CodeInvalidArgument, err)
128+
}
202129
}
203130
step := &inMemProvisioningStep{Done: atomic.New(false)}
204131
task.steps = append(task.steps, step)
@@ -239,7 +166,7 @@ func (d *InMemProvisioner) Provision(ctx context.Context, req *connect.Request[p
239166

240167
return connect.NewResponse(&provisioner.ProvisionResponse{
241168
ProvisioningToken: token,
242-
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
169+
Status: provisioner.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED,
243170
}), nil
244171
}
245172

backend/provisioner/noop_provisioner.go

+1-8
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,6 @@ import (
1313
// NoopProvisioner is a provisioner that does nothing
1414
type NoopProvisioner struct{}
1515

16-
func (d *NoopProvisioner) DeProvision(ctx context.Context, c *connect.Request[provisioner.DeProvisionRequest]) (*connect.Response[provisioner.DeProvisionResponse], error) {
17-
return connect.NewResponse(&provisioner.DeProvisionResponse{
18-
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
19-
ProvisioningToken: "token",
20-
}), nil
21-
}
22-
2316
var _ provisionerconnect.ProvisionerPluginServiceClient = (*NoopProvisioner)(nil)
2417

2518
func (d *NoopProvisioner) Ping(context.Context, *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
@@ -28,7 +21,7 @@ func (d *NoopProvisioner) Ping(context.Context, *connect.Request[ftlv1.PingReque
2821

2922
func (d *NoopProvisioner) Provision(ctx context.Context, req *connect.Request[provisioner.ProvisionRequest]) (*connect.Response[provisioner.ProvisionResponse], error) {
3023
return connect.NewResponse(&provisioner.ProvisionResponse{
31-
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
24+
Status: provisioner.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED,
3225
ProvisioningToken: "token",
3326
}), nil
3427
}

backend/provisioner/scaling/localscaling/local_scaling.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -288,13 +288,14 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey key.Deploy
288288
exit := make(chan struct{})
289289
go func() {
290290
err := runner.Start(runnerCtx, config, l.storage)
291+
close(exit)
292+
cancel(fmt.Errorf("runner exited"))
291293
l.lock.Lock()
292294
defer l.lock.Unlock()
293295
if devEndpoint != nil {
294296
// Runner is complete, clear the deployment key
295297
devEndpoint.deploymentKey = optional.None[key.Deployment]()
296298
}
297-
close(exit)
298299
// Don't count context.Canceled as an a restart error
299300
if err != nil && !errors.Is(err, context.Canceled) {
300301
logger.Errorf(err, "Runner failed: %s", err)

backend/provisioner/service.go

+26-6
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,15 @@ func Start(
9696
err := svc.HandleChangesetPreparing(ctx, e.Changeset)
9797
if err != nil {
9898
logger.Errorf(err, "Error provisioning changeset")
99+
_, err := svc.schemaClient.FailChangeset(ctx, connect.NewRequest(&ftlv1.FailChangesetRequest{Changeset: e.Changeset.Key.String(), Error: err.Error()}))
100+
logger.Errorf(err, "Error provisioning changeset")
99101
}
100102
case *schema.ChangesetPreparedNotification:
101103
err := svc.HandleChangesetPrepared(ctx, e.Key)
102104
if err != nil {
103105
logger.Errorf(err, "Error provisioning changeset")
106+
_, err := svc.schemaClient.FailChangeset(ctx, connect.NewRequest(&ftlv1.FailChangesetRequest{Changeset: e.Key.String(), Error: err.Error()}))
107+
logger.Errorf(err, "Error provisioning changeset")
104108
}
105109
case *schema.ChangesetCommittedNotification:
106110
err := svc.HandleChangesetCommitted(ctx, e.Changeset)
@@ -110,7 +114,9 @@ func Start(
110114
case *schema.ChangesetDrainedNotification:
111115
err := svc.HandleChangesetDrained(ctx, e.Key)
112116
if err != nil {
113-
logger.Errorf(err, "Error provisioning changeset")
117+
if err != nil {
118+
return err
119+
}
114120
}
115121
case *schema.FullSchemaNotification:
116122
logger.Debugf("Provisioning changesets from full schema notification")
@@ -119,12 +125,20 @@ func Start(
119125
err := svc.HandleChangesetPreparing(ctx, cs)
120126
if err != nil {
121127
logger.Errorf(err, "Error provisioning changeset")
128+
_, err := svc.schemaClient.FailChangeset(ctx, connect.NewRequest(&ftlv1.FailChangesetRequest{Changeset: cs.Key.String(), Error: err.Error()}))
129+
if err != nil {
130+
return err
131+
}
122132
continue
123133
}
124134
} else if cs.State == schema.ChangesetStatePrepared {
125135
err := svc.HandleChangesetPrepared(ctx, cs.Key)
126136
if err != nil {
127137
logger.Errorf(err, "Error provisioning changeset")
138+
_, err := svc.schemaClient.FailChangeset(ctx, connect.NewRequest(&ftlv1.FailChangesetRequest{Changeset: cs.Key.String(), Error: err.Error()}))
139+
if err != nil {
140+
return err
141+
}
128142
continue
129143
}
130144
} else if cs.State == schema.ChangesetStateCommitted {
@@ -181,16 +195,22 @@ func (s *Service) HandleChangesetCommitted(ctx context.Context, req *schema.Chan
181195
}()
182196
return nil
183197
}
184-
func (s *Service) HandleChangesetDrained(ctx context.Context, req key.Changeset) error {
198+
func (s *Service) HandleChangesetDrained(ctx context.Context, cs key.Changeset) error {
185199
logger := log.FromContext(ctx)
186200
group := errgroup.Group{}
187201
// TODO: Block deployments to make sure only one module is modified at a time
188-
for _, module := range s.eventSource.ActiveChangeset()[req].RemovingModules {
202+
changeset := s.eventSource.ActiveChangeset()[cs]
203+
for _, module := range changeset.RemovingModules {
189204
moduleName := module.Name
190205

191206
group.Go(func() error {
192-
deployment := s.registry.CreateDeployment(ctx, req.Key, module, existingModule, func(element *schema.RuntimeElement) error {
193-
cs := req.Key.String()
207+
var current *schema.Module
208+
existing := s.eventSource.CanonicalView().Module(moduleName)
209+
if f, ok := existing.Get(); ok {
210+
current = f
211+
}
212+
deployment := s.registry.CreateDeployment(ctx, cs, module, current, func(element *schema.RuntimeElement) error {
213+
cs := cs.String()
194214
_, err := s.schemaClient.UpdateDeploymentRuntime(ctx, connect.NewRequest(&ftlv1.UpdateDeploymentRuntimeRequest{
195215
Changeset: &cs,
196216
Update: element.ToProto(),
@@ -220,7 +240,7 @@ func (s *Service) HandleChangesetDrained(ctx context.Context, req key.Changeset)
220240
if err != nil {
221241
return fmt.Errorf("error running deployments: %w", err)
222242
}
223-
_, err := s.schemaClient.FinalizeChangeset(ctx, connect.NewRequest(&ftlv1.FinalizeChangesetRequest{Changeset: req.String()}))
243+
_, err = s.schemaClient.FinalizeChangeset(ctx, connect.NewRequest(&ftlv1.FinalizeChangesetRequest{Changeset: cs.String()}))
224244
if err != nil {
225245
return fmt.Errorf("error finalizing changeset: %w", err)
226246
}

0 commit comments

Comments
 (0)