Skip to content

Commit a94af57

Browse files
cheating
chore(autofmt): Automated formatting
1 parent b86c4cd commit a94af57

File tree

13 files changed

+211
-621
lines changed

13 files changed

+211
-621
lines changed

backend/protos/xyz/block/ftl/provisioner/v1beta1/plugin.pb.go

+106-276
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/protos/xyz/block/ftl/provisioner/v1beta1/plugin.proto

+4-19
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,16 @@ message ProvisionRequest {
1616
repeated string kinds = 5;
1717
}
1818

19-
enum ProvisionResponseStatus {
20-
PROVISION_RESPONSE_STATUS_UNSPECIFIED = 0;
21-
PROVISION_RESPONSE_STATUS_SUBMITTED = 1;
22-
}
23-
2419
message ProvisionResponse {
20+
enum ProvisionResponseStatus {
21+
PROVISION_RESPONSE_STATUS_UNSPECIFIED = 0;
22+
PROVISION_RESPONSE_STATUS_SUBMITTED = 1;
23+
}
2524

2625
string provisioning_token = 1;
2726
ProvisionResponseStatus status = 2;
2827
}
2928

30-
message DeProvisionRequest {
31-
string ftl_cluster_id = 1;
32-
xyz.block.ftl.schema.v1.Module module = 2;
33-
optional xyz.block.ftl.schema.v1.Module replacement_module = 3;
34-
string changeset = 4;
35-
repeated string kinds = 5;
36-
}
37-
38-
message DeProvisionResponse {
39-
string provisioning_token = 1;
40-
ProvisionResponseStatus status = 2;
41-
}
42-
4329
message StatusRequest {
4430
string provisioning_token = 1;
4531
// The outputs of this module are updated if the the status is a success
@@ -67,6 +53,5 @@ service ProvisionerPluginService {
6753
rpc Ping(xyz.block.ftl.v1.PingRequest) returns (xyz.block.ftl.v1.PingResponse);
6854

6955
rpc Provision(ProvisionRequest) returns (ProvisionResponse);
70-
rpc DeProvision(DeProvisionRequest) returns (DeProvisionResponse);
7156
rpc Status(StatusRequest) returns (StatusResponse);
7257
}

backend/protos/xyz/block/ftl/provisioner/v1beta1/provisionerpbconnect/plugin.connect.go

+3-30
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/provisioner/inmem_provisioner.go

+17-90
Original file line numberDiff line numberDiff line change
@@ -57,89 +57,6 @@ type InMemProvisioner struct {
5757
removeHandlers map[schema.ResourceType]InMemResourceProvisionerFn
5858
}
5959

60-
func (d *InMemProvisioner) DeProvision(ctx context.Context, req *connect.Request[provisioner.DeProvisionRequest]) (*connect.Response[provisioner.DeProvisionResponse], error) {
61-
logger := log.FromContext(ctx)
62-
parsed, err := key.ParseChangesetKey(req.Msg.Changeset)
63-
if err != nil {
64-
err = fmt.Errorf("invalid changeset: %w", err)
65-
return nil, connect.NewError(connect.CodeInvalidArgument, err)
66-
}
67-
68-
var replacementModule *schema.Module
69-
if req.Msg.ReplacementModule != nil {
70-
pm, err := schema.ValidatedModuleFromProto(req.Msg.ReplacementModule)
71-
if err != nil {
72-
err = fmt.Errorf("invalid replacment module: %w", err)
73-
return nil, connect.NewError(connect.CodeInvalidArgument, err)
74-
}
75-
replacementModule = pm
76-
}
77-
removingModule, err := schema.ValidatedModuleFromProto(req.Msg.Module)
78-
if err != nil {
79-
err = fmt.Errorf("invalid removing module: %w", err)
80-
return nil, connect.NewError(connect.CodeInvalidArgument, err)
81-
}
82-
kinds := slices.Map(req.Msg.Kinds, func(k string) schema.ResourceType { return schema.ResourceType(k) })
83-
currentNodes := schema.GetProvisioned(replacementModule)
84-
removingNodes := schema.GetProvisioned(removingModule)
85-
86-
task := &inMemProvisioningTask{}
87-
// use chans to safely collect all events before completing each task
88-
completions := make(chan stepCompletedEvent, 16)
89-
90-
for id, toRemove := range removingNodes {
91-
inUse, ok := currentNodes[id]
92-
for _, resource := range toRemove.GetProvisioned() {
93-
if !ok || !resource.IsEqual(inUse.GetProvisioned().Get(resource.Kind)) {
94-
if slices.Contains(kinds, resource.Kind) {
95-
handler, ok := d.removeHandlers[resource.Kind]
96-
if !ok {
97-
err := fmt.Errorf("unsupported resource type: %s", resource.Kind)
98-
return nil, connect.NewError(connect.CodeInvalidArgument, err)
99-
}
100-
step := &inMemProvisioningStep{Done: atomic.New(false)}
101-
task.steps = append(task.steps, step)
102-
go func() {
103-
event, err := handler(ctx, parsed, removingModule.Runtime.Deployment.DeploymentKey, toRemove)
104-
if err != nil {
105-
step.Err = err
106-
logger.Errorf(err, "failed to de-provision resource %s:%s", resource.Kind, toRemove.ResourceID())
107-
completions <- stepCompletedEvent{step: step}
108-
return
109-
}
110-
completions <- stepCompletedEvent{
111-
step: step,
112-
event: optional.Ptr(event),
113-
}
114-
}()
115-
}
116-
}
117-
}
118-
}
119-
120-
go func() {
121-
for c := range channels.IterContext(ctx, completions) {
122-
if e, ok := c.event.Get(); ok {
123-
task.events = append(task.events, &e)
124-
}
125-
c.step.Done.Store(true)
126-
done, err := task.Done()
127-
if done || err != nil {
128-
return
129-
}
130-
}
131-
}()
132-
133-
token := uuid.New().String()
134-
logger.Debugf("started a task with token %s", token)
135-
d.running.Store(token, task)
136-
137-
return connect.NewResponse(&provisioner.DeProvisionResponse{
138-
ProvisioningToken: token,
139-
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
140-
}), nil
141-
}
142-
14360
func NewEmbeddedProvisioner(handlers map[schema.ResourceType]InMemResourceProvisionerFn, deProvisionHandlers map[schema.ResourceType]InMemResourceProvisionerFn) *InMemProvisioner {
14461
return &InMemProvisioner{
14562
running: xsync.NewMapOf[string, *inMemProvisioningTask](),
@@ -190,15 +107,25 @@ func (d *InMemProvisioner) Provision(ctx context.Context, req *connect.Request[p
190107
completions := make(chan stepCompletedEvent, 16)
191108

192109
for id, desired := range desiredNodes {
193-
previous, ok := previousNodes[id]
110+
previous, prevOk := previousNodes[id]
194111

195112
for _, resource := range desired.GetProvisioned() {
196-
if !ok || !resource.IsEqual(previous.GetProvisioned().Get(resource.Kind)) {
113+
if !prevOk || !resource.IsEqual(previous.GetProvisioned().Get(resource.Kind)) {
197114
if slices.Contains(kinds, resource.Kind) {
198-
handler, ok := d.handlers[resource.Kind]
199-
if !ok {
200-
// TODO: should a missing de-provisioner handler be an error?
201-
continue
115+
var handler InMemResourceProvisionerFn
116+
var ok bool
117+
if desiredModule.Runtime.Deployment.State == schema.DeploymentStateDeProvisioning {
118+
handler, ok = d.removeHandlers[resource.Kind]
119+
if !ok {
120+
// TODO: should a missing de-provisioner handler be an error?
121+
continue
122+
}
123+
} else {
124+
handler, ok = d.handlers[resource.Kind]
125+
if !ok {
126+
err := fmt.Errorf("unsupported resource type: %s", resource.Kind)
127+
return nil, connect.NewError(connect.CodeInvalidArgument, err)
128+
}
202129
}
203130
step := &inMemProvisioningStep{Done: atomic.New(false)}
204131
task.steps = append(task.steps, step)
@@ -239,7 +166,7 @@ func (d *InMemProvisioner) Provision(ctx context.Context, req *connect.Request[p
239166

240167
return connect.NewResponse(&provisioner.ProvisionResponse{
241168
ProvisioningToken: token,
242-
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
169+
Status: provisioner.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED,
243170
}), nil
244171
}
245172

backend/provisioner/noop_provisioner.go

+1-8
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,6 @@ import (
1313
// NoopProvisioner is a provisioner that does nothing
1414
type NoopProvisioner struct{}
1515

16-
func (d *NoopProvisioner) DeProvision(ctx context.Context, c *connect.Request[provisioner.DeProvisionRequest]) (*connect.Response[provisioner.DeProvisionResponse], error) {
17-
return connect.NewResponse(&provisioner.DeProvisionResponse{
18-
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
19-
ProvisioningToken: "token",
20-
}), nil
21-
}
22-
2316
var _ provisionerconnect.ProvisionerPluginServiceClient = (*NoopProvisioner)(nil)
2417

2518
func (d *NoopProvisioner) Ping(context.Context, *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
@@ -28,7 +21,7 @@ func (d *NoopProvisioner) Ping(context.Context, *connect.Request[ftlv1.PingReque
2821

2922
func (d *NoopProvisioner) Provision(ctx context.Context, req *connect.Request[provisioner.ProvisionRequest]) (*connect.Response[provisioner.ProvisionResponse], error) {
3023
return connect.NewResponse(&provisioner.ProvisionResponse{
31-
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
24+
Status: provisioner.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED,
3225
ProvisioningToken: "token",
3326
}), nil
3427
}

backend/provisioner/scaling/localscaling/local_scaling.go

+2-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package localscaling
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"net/url"
87
"os"
@@ -288,23 +287,15 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey key.Deploy
288287
exit := make(chan struct{})
289288
go func() {
290289
err := runner.Start(runnerCtx, config, l.storage)
290+
close(exit)
291+
cancel(fmt.Errorf("runner exited %w", err))
291292
l.lock.Lock()
292293
defer l.lock.Unlock()
293294
if devEndpoint != nil {
294295
// Runner is complete, clear the deployment key
295296
devEndpoint.deploymentKey = optional.None[key.Deployment]()
296297
}
297-
close(exit)
298-
// Don't count context.Canceled as an a restart error
299-
if err != nil && !errors.Is(err, context.Canceled) {
300-
logger.Errorf(err, "Runner failed: %s", err)
301-
}
302-
logger.Errorf(fmt.Errorf("too many restarts"), "Runner failed too many times, not restarting")
303-
304298
info.runner = optional.None[runnerInfo]()
305-
if err != nil {
306-
logger.Errorf(err, "Failed to reconcile runners")
307-
}
308299
}()
309300
client := rpc.Dial(ftlv1connect.NewVerbServiceClient, bindURL.String(), log.Error)
310301
timeout := time.After(1 * time.Minute)

0 commit comments

Comments
 (0)