Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/v1/prefectworkpool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
)

Expand Down Expand Up @@ -52,10 +53,17 @@ type PrefectWorkPoolSpec struct {

// DeploymentLabels defines additional labels to add to the Prefect Server Deployment
DeploymentLabels map[string]string `json:"deploymentLabels,omitempty"`

// Base job template for flow runs on this Work Pool
BaseJobTemplate *runtime.RawExtension `json:"baseJobTemplate,omitempty"`
}

// PrefectWorkPoolStatus defines the observed state of PrefectWorkPool
type PrefectWorkPoolStatus struct {
// Id is the workPool ID from Prefect
// +optional
Id *string `json:"id,omitempty"`
Copy link
Collaborator

@chrisguidry chrisguidry Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 How were we getting away without the Id before? I guess the API mostly deals in work pool names rather than IDs, but this is a good addition for sure.


// Version is the version of the Prefect Worker that is currently running
Version string `json:"version"`

Expand All @@ -65,6 +73,17 @@ type PrefectWorkPoolStatus struct {
// Ready is true if the work pool is ready to accept work
Ready bool `json:"ready"`

// SpecHash tracks changes to the spec to minimize API calls
SpecHash string `json:"specHash,omitempty"`

// LastSyncTime is the last time the workPool was synced with Prefect
// +optional
LastSyncTime *metav1.Time `json:"lastSyncTime,omitempty"`

// ObservedGeneration tracks the last processed generation
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`

// Conditions store the status conditions of the PrefectWorkPool instances
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
}
Expand Down
14 changes: 14 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ spec:
spec:
description: PrefectWorkPoolSpec defines the desired state of PrefectWorkPool
properties:
baseJobTemplate:
description: Base job template for flow runs on this Work Pool
type: object
x-kubernetes-preserve-unknown-fields: true
deploymentLabels:
additionalProperties:
type: string
Expand Down Expand Up @@ -2049,6 +2053,18 @@ spec:
- type
type: object
type: array
id:
description: Id is the workPool ID from Prefect
type: string
lastSyncTime:
description: LastSyncTime is the last time the workPool was synced
with Prefect
format: date-time
type: string
observedGeneration:
description: ObservedGeneration tracks the last processed generation
format: int64
type: integer
ready:
description: Ready is true if the work pool is ready to accept work
type: boolean
Expand All @@ -2057,6 +2073,9 @@ spec:
ready
format: int32
type: integer
specHash:
description: SpecHash tracks changes to the spec to minimize API calls
type: string
version:
description: Version is the version of the Prefect Worker that is
currently running
Expand Down
168 changes: 159 additions & 9 deletions internal/controller/prefectworkpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,31 @@ import (
"github.com/PrefectHQ/prefect-operator/internal/prefect"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log"

prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
"github.com/PrefectHQ/prefect-operator/internal/conditions"
"github.com/PrefectHQ/prefect-operator/internal/constants"
"github.com/PrefectHQ/prefect-operator/internal/utils"
)

const (
// PrefectWorkPoolFinalizer is the finalizer used to ensure cleanup of Prefect work pools
PrefectWorkPoolFinalizer = "prefect.io/work-pool-cleanup"

// PrefectDeploymentConditionReady indicates the deployment is ready
PrefectWorkPoolConditionReady = "Ready"

// PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API
PrefectWorkPoolConditionSynced = "Synced"
)

// PrefectWorkPoolReconciler reconciles a PrefectWorkPool object
Expand All @@ -59,7 +67,7 @@ type PrefectWorkPoolReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile
func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrllog.FromContext(ctx)
log := log.FromContext(ctx)
log.V(1).Info("Reconciling PrefectWorkPool")

var workPool prefectiov1.PrefectWorkPool
Expand All @@ -70,6 +78,21 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}

currentStatus := workPool.Status

// Defer a final status update at the end of the reconciliation loop, so that any of the
// individual reconciliation functions can update the status as they see fit.
defer func() {
// Skip status update if nothing changed, to avoid conflicts
if equality.Semantic.DeepEqual(workPool.Status, currentStatus) {
return
}

if statusErr := r.Status().Update(ctx, &workPool); statusErr != nil {
log.Error(statusErr, "Failed to update WorkPool status")
}
}()

// Handle deletion
if workPool.DeletionTimestamp != nil {
return r.handleDeletion(ctx, &workPool)
Expand All @@ -85,13 +108,19 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{RequeueAfter: time.Second}, nil
}

// Defer a final status update at the end of the reconciliation loop, so that any of the
// individual reconciliation functions can update the status as they see fit.
defer func() {
if statusErr := r.Status().Update(ctx, &workPool); statusErr != nil {
log.Error(statusErr, "Failed to update WorkPool status")
specHash, err := utils.Hash(workPool.Spec, 16)
if err != nil {
log.Error(err, "Failed to calculate spec hash", "workPool", workPool.Name)
return ctrl.Result{}, err
}

if r.needsSync(&workPool, specHash) {
log.Info("Starting sync with Prefect API", "deployment", workPool.Name)
err := r.syncWithPrefect(ctx, &workPool)
if err != nil {
return ctrl.Result{}, err
}
}()
}

objName := constants.Deployment

Expand Down Expand Up @@ -187,12 +216,114 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
}

r.setCondition(&workPool, PrefectWorkPoolConditionReady, metav1.ConditionTrue, "WorkPoolReady", "Work pool is ready and operational")

return ctrl.Result{}, nil
}

func (r *PrefectWorkPoolReconciler) needsSync(workPool *prefectiov1.PrefectWorkPool, currentSpecHash string) bool {
if workPool.Status.Id == nil || *workPool.Status.Id == "" {
return true
}

if workPool.Status.SpecHash != currentSpecHash {
return true
}

if workPool.Status.ObservedGeneration < workPool.Generation {
return true
}

// Drift detection: sync if last sync was too long ago
if workPool.Status.LastSyncTime == nil {
return true
}

timeSinceLastSync := time.Since(workPool.Status.LastSyncTime.Time)
return timeSinceLastSync > 10*time.Minute
}

func (r *PrefectWorkPoolReconciler) syncWithPrefect(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) error {
name := workPool.Name
log := log.FromContext(ctx)

prefectClient, err := r.getPrefectClient(ctx, workPool)
if err != nil {
log.Error(err, "Failed to create Prefect client", "workPool", name)
return err
}

prefectWorkPool, err := prefectClient.GetWorkPool(ctx, name)
if err != nil {
log.Error(err, "Failed to get work pool in Prefect", "workPool", name)
return err
}

if prefectWorkPool == nil {
workPoolSpec, err := prefect.ConvertToWorkPoolSpec(workPool)
if err != nil {
log.Error(err, "Failed to convert work pool spec", "workPool", name)
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error())
return err
}

prefectWorkPool, err = prefectClient.CreateWorkPool(ctx, workPoolSpec)
if err != nil {
log.Error(err, "Failed to create work pool in Prefect", "workPool", name)
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "SyncError", err.Error())
return err
}
} else {
workPoolSpec, err := prefect.ConvertToWorkPoolUpdateSpec(workPool)
if err != nil {
log.Error(err, "Failed to convert work pool spec", "workPool", name)
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error())
return err
}

err = prefectClient.UpdateWorkPool(ctx, workPool.Name, workPoolSpec)
if err != nil {
log.Error(err, "Failed to update work pool in Prefect", "workPool", name)
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "SyncError", err.Error())
return err
}
}

prefect.UpdateWorkPoolStatus(workPool, prefectWorkPool)

specHash, err := utils.Hash(workPool.Spec, 16)
if err != nil {
log.Error(err, "Failed to calculate spec hash", "workPool", workPool.Name)
return err
}

now := metav1.Now()

workPool.Status.SpecHash = specHash
workPool.Status.ObservedGeneration = workPool.Generation
workPool.Status.LastSyncTime = &now

r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionTrue, "SyncSuccessful", "Work pool successfully synced with Prefect API")

return nil
}

// setCondition sets a condition on the deployment status
func (r *PrefectWorkPoolReconciler) setCondition(workPool *prefectiov1.PrefectWorkPool, conditionType string, status metav1.ConditionStatus, reason, message string) {
condition := metav1.Condition{
Type: conditionType,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}

meta.SetStatusCondition(&workPool.Status.Conditions, condition)
}

// handleDeletion handles the cleanup of a PrefectWorkPool that is being deleted
func (r *PrefectWorkPoolReconciler) handleDeletion(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) (ctrl.Result, error) {
log := ctrllog.FromContext(ctx)
log := log.FromContext(ctx)
log.Info("Handling deletion of PrefectWorkPool", "workPool", workPool.Name)

// If the finalizer is not present, nothing to do
Expand Down Expand Up @@ -234,6 +365,25 @@ func (r *PrefectWorkPoolReconciler) handleDeletion(ctx context.Context, workPool
return ctrl.Result{}, nil
}

func (r *PrefectWorkPoolReconciler) getPrefectClient(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) (prefect.PrefectClient, error) {
log := log.FromContext(ctx)
name := workPool.Name

// Use injected client if available (for testing)
prefectClient := r.PrefectClient

if prefectClient == nil {
var err error
prefectClient, err = prefect.NewClientFromK8s(ctx, &workPool.Spec.Server, r.Client, workPool.Namespace, log)
if err != nil {
log.Error(err, "Failed to create Prefect client", "workPool", name)
return nil, err
}
}

return prefectClient, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *PrefectWorkPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Loading