Skip to content

Latest commit

 

History

History
760 lines (614 loc) · 22 KB

File metadata and controls

760 lines (614 loc) · 22 KB

Async Tasks in Anclax

English | 中文

📚 Looking for technical details? Check out the Technical Reference for architecture, lifecycle, and internals.

⚖️ Need scheduling controls? See Scheduling & Runtime Config Guide for WithPriority, WithWeight, strict-cap behavior, and live worker config updates.

Anclax lets you run background tasks that don't block your web requests. For example, you can send emails, process images, or generate reports without making users wait.

Table of Contents

What Are Async Tasks?

Think of async tasks like hiring someone to do work for you later. Instead of doing everything right away when a user makes a request, you can:

  1. Create a task - Tell Anclax what work needs to be done
  2. Queue it up - Put the task in a to-do list
  3. Let workers handle it - Background workers pick up tasks and do the work
  4. Get guarantees - Tasks will run at least once, even if something goes wrong

Common examples:

  • Send welcome emails when users sign up
  • Resize images after upload
  • Generate monthly reports
  • Clean up old data
  • Process payments

Task Definition

Tasks are defined in api/tasks/tasks.yaml using a structured YAML format:

tasks:
  - name: TaskName
    description: "Task description"
    parameters:
      type: object
      required: [param1, param2]
      properties:
        param1:
          type: string
          description: "Parameter description"
        param2:
          type: integer
          format: int32
    retryPolicy:
      interval: 30m
      maxAttempts: -1
    cronjob:
      cronExpression: "0 */1 * * *"  # Every hour
    events:
      - onFailed
    timeout: 10m

Task Properties

  • name (required): Unique task identifier
  • description: Human-readable task description
  • parameters: JSON Schema defining task parameters
  • retryPolicy: Retry configuration for failed tasks
  • cronjob: Cron scheduling configuration
  • events: Array of lifecycle hooks (e.g., [onFailed])
  • timeout: Maximum execution time (default: 1 hour)

Parameter Types

Parameters follow JSON Schema format:

parameters:
  type: object
  required: [userId, amount]
  properties:
    userId:
      type: integer
      format: int32
    amount:
      type: number
      format: float
    metadata:
      type: object
    tags:
      type: array
      items:
        type: string

Task Implementation

After defining tasks, run code generation:

anclax generate

This generates interfaces in pkg/zgen/taskgen/:

Generated Interfaces

// ExecutorInterface - implement this to handle task execution and hooks
type ExecutorInterface interface {
    // Execute the main task
    ExecuteTaskName(ctx context.Context, params *TaskNameParameters) error
    
    // Hook called when the task fails permanently (if events: [onFailed] is configured)
    OnTaskNameFailed(ctx context.Context, taskID int32, params *TaskNameParameters, tx pgx.Tx) error
}

// TaskRunner - use this to enqueue tasks
type TaskRunner interface {
    RunTaskName(ctx context.Context, params *TaskNameParameters, overrides ...taskcore.TaskOverride) (int32, error)
    RunTaskNameWithTx(ctx context.Context, tx pgx.Tx, params *TaskNameParameters, overrides ...taskcore.TaskOverride) (int32, error)
}

// Hook - automatically generated hook dispatcher
type Hook interface {
    OnTaskFailed(ctx context.Context, tx pgx.Tx, failedTaskSpec TaskSpec, taskID int32) error
}

Implementing the Executor

Create an executor that implements the generated interface:

package asynctask

import (
    "context"
    "pkg/zgen/taskgen"
    "pkg/zcore/model"
)

type Executor struct {
    model model.ModelInterface
}

func NewExecutor(model model.ModelInterface) taskgen.ExecutorInterface {
    return &Executor{
        model: model,
    }
}

func (e *Executor) ExecuteTaskName(ctx context.Context, params *taskgen.TaskNameParameters) error {
    // Your task logic here
    return e.model.DoSomething(ctx, params.UserId, params.Amount)
}

Running Tasks

Enqueuing Tasks

Use the generated TaskRunner to enqueue tasks:

func (h *Handler) EnqueueTask(c *fiber.Ctx) error {
    params := &taskgen.TaskNameParameters{
        UserId: 123,
        Amount: 50.0,
    }
    
    taskID, err := h.taskRunner.RunTaskName(c.Context(), params)
    if err != nil {
        return err
    }
    
    return c.JSON(fiber.Map{"taskId": taskID})
}

Task Overrides

You can override task properties at runtime:

// Override retry policy
taskID, err := h.taskRunner.RunTaskName(ctx, params,
    taskcore.WithRetryPolicy("1h", 5),
    taskcore.WithTimeout("30m"),
    taskcore.WithUniqueTag("user-123-daily-task"),
    taskcore.WithParentTaskID(parentID),
)

Priority, Weight, and Runtime Config

Anclax supports two scheduling lanes:

  • Strict lane: tasks with priority > 0.
  • Normal lane: tasks with priority == 0, scheduled with weighted fairness by label group.

Task-level scheduling overrides

_, err := h.taskRunner.RunTaskName(ctx, params,
    taskcore.WithPriority(10), // strict lane (urgent)
    taskcore.WithWeight(3),    // higher order inside selected normal group
)

Validation rules:

  • WithPriority(priority) requires priority >= 0.
  • WithWeight(weight) requires weight >= 1.

Updating worker scheduling config at runtime

Use the worker control plane to enqueue and wait for config updates:

maxStrict := int32(20)
defaultWeight := int32(1)
labels := []string{"w1", "w2"}
weights := []int32{5, 1}

controlPlane := ctrl.NewWorkerControlPlane(h.model, h.taskRunner, h.taskStore, h.taskListener)
err := controlPlane.UpdateWorkerRuntimeConfig(ctx,
    &ctrl.UpdateWorkerRuntimeConfigRequest{
        MaxStrictPercentage: &maxStrict,
        DefaultWeight:       &defaultWeight,
        Labels:              labels,
        Weights:             weights,
    },
)

The control plane always enqueues the config-update task with reserved max strict priority and hides task-wait listener details.

For full semantics (strict cap formula, label-group mapping, LISTEN/NOTIFY propagation, ACK convergence, and supersede behavior), see:

Task hierarchy and control-plane interrupts

Tasks can optionally reference a parent task via parentTaskId. Use taskcore.WithParentTaskID when enqueueing child tasks:

childID, err := h.taskRunner.RunTaskName(ctx, params,
    taskcore.WithParentTaskID(parentID),
)

When you call PauseTask or CancelTask on a task, the control plane now applies the same status change to all descendants in the hierarchy within the same transaction, then enqueues a single interrupt task for the entire set of task IDs.

Transactional Tasks

Enqueue tasks within database transactions:

err := h.model.RunTransaction(ctx, func(txm model.ModelInterface) error {
    // Do some database work
    user, err := txm.GetUser(ctx, userID)
    if err != nil {
        return err
    }
    
    // Enqueue task within the same transaction
    taskID, err := h.taskRunner.RunTaskNameWithTx(ctx, txm.GetTx(), params)
    if err != nil {
        return err
    }
    
    return nil
})

Cronjobs

Define scheduled tasks using cron expressions:

tasks:
  - name: DailyCleanup
    description: "Run daily cleanup tasks"
    cronjob:
      cronExpression: "0 0 2 * * *"  # 2 AM daily
    parameters:
      type: object
      properties:
        daysToKeep:
          type: integer
          format: int32

Cronjobs support extended cron format with seconds:

  • Format: second minute hour dayOfMonth month dayOfWeek
  • Example: "*/30 * * * * *" (every 30 seconds)
  • Example: "0 0 */6 * * *" (every 6 hours)

Retry Policies

Configure how tasks should be retried on failure:

retryPolicy:
  interval: 30m      # Wait 30 minutes between retries
  maxAttempts: -1    # Unlimited retries (-1 means infinite, positive number limits attempts)

Retry Intervals

  • Simple duration: "30m", "1h", "5s"
  • Exponential backoff: "1m,2m,4m,8m" (comma-separated)

Error Handling and Hooks

Task Failure Hooks

Tasks can automatically trigger hook methods when they fail using the events configuration:

tasks:
  - name: ProcessPayment
    description: "Process user payment"
    parameters:
      type: object
      required: [userId, amount]
      properties:
        userId:
          type: integer
          format: int32
        amount:
          type: number
    retryPolicy:
      interval: 30m
      maxAttempts: -1
    events:
      - onFailed

How Hooks Work

  1. Automatic Triggering: When a task fails permanently (after all retries), the system automatically calls the corresponding hook method
  2. Transaction Safety: Both the original task status update and the hook execution happen in the same database transaction
  3. Typed Parameters: Hook methods receive the original task parameters and task ID with full type safety
  4. No Retry Interference: Hooks are only triggered when tasks fail permanently, not during retries

Hook Method Signatures

When you define a task with events: [onFailed], the code generator automatically creates a hook method in the ExecutorInterface:

type ExecutorInterface interface {
    // Execute the main task
    ExecuteTaskName(ctx context.Context, params *TaskNameParameters) error
    
    // Hook called when the task fails permanently
    OnTaskNameFailed(ctx context.Context, taskID int32, params *TaskNameParameters, tx pgx.Tx) error
}

Implementing Failure Hooks

func (e *Executor) ExecuteProcessPayment(ctx context.Context, params *taskgen.ProcessPaymentParameters) error {
    // Your payment processing logic
    if err := e.paymentService.ProcessPayment(params.UserId, params.Amount); err != nil {
        // This error will trigger OnProcessPaymentFailed if retries are exhausted
        return fmt.Errorf("payment processing failed: %w", err)
    }
    return nil
}

func (e *Executor) OnProcessPaymentFailed(ctx context.Context, taskID int32, params *taskgen.ProcessPaymentParameters, tx pgx.Tx) error {
    // Hook receives the original task parameters directly with full type safety
    log.Error("Payment processing failed permanently", 
        zap.Int32("taskID", taskID),
        zap.Int32("userId", params.UserId),
        zap.Float64("amount", params.Amount))
    
    // Handle the failure (notify admin, refund, etc.)
    // The transaction context allows you to make additional database operations
    return e.handlePaymentFailure(ctx, params.UserId, params.Amount, taskID)
}

Custom Error Handling

In your executor, you can control retry behavior:

func (e *Executor) ExecuteProcessPayment(ctx context.Context, params *taskgen.ProcessPaymentParameters) error {
    // Permanent failure - don't retry, immediately trigger onFailed
    if params.Amount <= 0 {
        return taskcore.ErrFatalTask
    }
    
    // Temporary failure - retry without logging error event
    if rateLimitExceeded {
        return taskcore.ErrRetryTaskWithoutErrorEvent
    }
    
    // Regular error - will retry according to policy
    return processPayment(params)
}

Advanced Features

Task Timeouts

Configure maximum execution time:

tasks:
  - name: LongRunningTask
    timeout: 2h  # 2 hours maximum

Unique Tasks

Prevent duplicate tasks using unique tags:

taskID, err := h.taskRunner.RunTaskName(ctx, params, 
    taskcore.WithUniqueTag(fmt.Sprintf("user-%d-daily", userID)),
)

Task Attributes

Access task metadata in your executor:

func (e *Executor) ExecuteTaskName(ctx context.Context, params *taskgen.TaskNameParameters) error {
    // Get task ID from context (if available)
    if taskID, ok := ctx.Value("taskID").(int32); ok {
        log.Info("Processing task", zap.Int32("taskID", taskID))
    }
    
    return e.processTask(params)
}

Examples

Example 1: Simple Background Task

Task Definition:

tasks:
  - name: SendEmail
    description: "Send an email to a user"
    parameters:
      type: object
      required: [userId, templateId]
      properties:
        userId:
          type: integer
          format: int32
        templateId:
          type: string
        variables:
          type: object
    retryPolicy:
      interval: 5m
      maxAttempts: -1

Implementation:

func (e *Executor) ExecuteSendEmail(ctx context.Context, params *taskgen.SendEmailParameters) error {
    user, err := e.model.GetUser(ctx, params.UserId)
    if err != nil {
        return err
    }
    
    template, err := e.emailService.GetTemplate(params.TemplateId)
    if err != nil {
        return err
    }
    
    return e.emailService.SendEmail(user.Email, template, params.Variables)
}

Usage:

func (h *Handler) RegisterUser(c *fiber.Ctx) error {
    // ... user registration logic
    
    // Send welcome email asynchronously
    _, err := h.taskRunner.RunSendEmail(c.Context(), &taskgen.SendEmailParameters{
        UserId:     user.ID,
        TemplateId: "welcome",
        Variables:  map[string]interface{}{"name": user.Name},
    })
    
    return err
}

Example 2: Scheduled Data Processing

Task Definition:

tasks:
  - name: ProcessDailyReports
    description: "Generate daily reports"
    cronjob:
      cronExpression: "0 0 1 * * *"  # 1 AM daily
    parameters:
      type: object
      required: [date]
      properties:
        date:
          type: string
          format: date
    retryPolicy:
      interval: 1h
      maxAttempts: -1

Implementation:

func (e *Executor) ExecuteProcessDailyReports(ctx context.Context, params *taskgen.ProcessDailyReportsParameters) error {
    date, err := time.Parse("2006-01-02", params.Date)
    if err != nil {
        return err
    }
    
    // Process reports for the given date
    return e.reportService.GenerateDailyReports(ctx, date)
}

Example 3: Workflow with Failure Events

Task Definition:

tasks:
  - name: ProcessOrder
    description: "Process customer order"
    parameters:
      type: object
      required: [orderId]
      properties:
        orderId:
          type: integer
          format: int32
    retryPolicy:
      interval: 30m
      maxAttempts: -1
    events:
      - onFailed
    timeout: 10m

Implementation:

func (e *Executor) ExecuteProcessOrder(ctx context.Context, params *taskgen.ProcessOrderParameters) error {
    order, err := e.model.GetOrder(ctx, params.OrderId)
    if err != nil {
        return err
    }
    
    // Process the order
    if err := e.orderService.ProcessOrder(ctx, order); err != nil {
        // This will trigger OnProcessOrderFailed if retries are exhausted
        return err
    }
    
    return nil
}

func (e *Executor) OnProcessOrderFailed(ctx context.Context, taskID int32, params *taskgen.ProcessOrderParameters, tx pgx.Tx) error {
    // Hook receives the original parameters directly with full type safety
    log.Error("Order processing failed permanently", 
        zap.Int32("taskID", taskID),
        zap.Int32("orderId", params.OrderId))
    
    // Handle the failure - notify customer service, update order status, etc.
    // Use the transaction context for additional database operations
    return e.orderService.HandleFailure(ctx, params.OrderId, taskID)
}

Example 4: Complex Failure Handling with Custom Parameters

Task Definition:

tasks:
  - name: SendNotification
    description: "Send notification to user"
    parameters:
      type: object
      required: [userId, message]
      properties:
        userId:
          type: integer
          format: int32
        message:
          type: string
        priority:
          type: string
          enum: [low, medium, high]
    retryPolicy:
      interval: 5m
      maxAttempts: -1
    events:
      - onFailed

Implementation:

func (e *Executor) ExecuteSendNotification(ctx context.Context, params *taskgen.SendNotificationParameters) error {
    return e.notificationService.Send(ctx, params.UserId, params.Message, params.Priority)
}

func (e *Executor) OnSendNotificationFailed(ctx context.Context, taskID int32, params *taskgen.SendNotificationParameters, tx pgx.Tx) error {
    // Hook receives the original parameters directly with full type safety
    log.Error("Notification sending failed permanently", 
        zap.Int32("taskID", taskID),
        zap.Int32("userId", params.UserId),
        zap.String("message", params.Message),
        zap.String("priority", params.Priority))
    
    // Escalate to admin with original context
    return e.adminService.EscalateFailedNotification(ctx, EscalationRequest{
        FailedTaskID: taskID,
        OriginalUserId: params.UserId,
        OriginalMessage: params.Message,
        Priority: params.Priority,
        EscalationLevel: "admin",
    })
}

Real-World Example: Delete Operation with Failure Handling

This example from the Anclax codebase shows how to implement a task that deletes sensitive data with proper failure handling:

Task Definition (api/tasks/tasks.yaml):

tasks:
  - name: deleteOpaqueKey
    description: Delete an opaque key
    parameters:
      type: object
      required: [keyID]
      properties:
        keyID:
          type: integer
          format: int64
          description: The ID of the opaque key to delete
    retryPolicy:
      interval: 30m
      maxAttempts: -1
    events:
      - onFailed

Generated Types: After running anclax generate, you get:

type DeleteOpaqueKeyParameters struct {
    KeyID int64 `json:"keyID"`
}

type ExecutorInterface interface {
    ExecuteDeleteOpaqueKey(ctx context.Context, params *DeleteOpaqueKeyParameters) error
    OnDeleteOpaqueKeyFailed(ctx context.Context, taskID int32, params *DeleteOpaqueKeyParameters, tx pgx.Tx) error
}

Implementation:

func (e *Executor) ExecuteDeleteOpaqueKey(ctx context.Context, params *taskgen.DeleteOpaqueKeyParameters) error {
    // Attempt to delete the opaque key
    err := e.model.DeleteOpaqueKey(ctx, params.KeyID)
    if err != nil {
        // If delete fails, this will trigger OnDeleteOpaqueKeyFailed after retries
        return fmt.Errorf("failed to delete opaque key %d: %w", params.KeyID, err)
    }
    
    log.Info("Successfully deleted opaque key", zap.Int64("keyID", params.KeyID))
    return nil
}

func (e *Executor) OnDeleteOpaqueKeyFailed(ctx context.Context, taskID int32, params *taskgen.DeleteOpaqueKeyParameters, tx pgx.Tx) error {
    // Hook receives the original parameters directly with full type safety
    log.Error("Critical: Failed to delete opaque key after all retries", 
        zap.Int64("keyID", params.KeyID),
        zap.Int32("failedTaskID", taskID))
    
    // Notify security team about failed key deletion
    // Use the transaction context for additional database operations if needed
    return e.securityService.NotifyFailedKeyDeletion(ctx, params.KeyID, taskID)
}

Starting the Task:

func (h *Handler) DeleteKey(c *fiber.Ctx) error {
    keyID := c.Params("id")
    keyIDInt, err := strconv.ParseInt(keyID, 10, 64)
    if err != nil {
        return c.Status(400).JSON(fiber.Map{"error": "Invalid key ID"})
    }
    
    // Queue the deletion task
    taskID, err := h.taskRunner.RunDeleteOpaqueKey(c.Context(), &taskgen.DeleteOpaqueKeyParameters{
        KeyID: keyIDInt,
    })
    if err != nil {
        return err
    }
    
    return c.JSON(fiber.Map{
        "message": "Key deletion queued",
        "taskID": taskID,
    })
}

This example demonstrates:

  • Graceful degradation: If deletion fails, the system doesn't just give up
  • Audit trail: Failed deletions are logged and tracked
  • Administrative oversight: Critical failures are escalated to security teams
  • Transactional safety: Both task status update and hook execution are atomic
  • Type safety: Hook methods receive strongly-typed parameters instead of raw JSON

Best Practices

  1. Keep tasks idempotent - Tasks may be retried, so ensure they can be safely executed multiple times
  2. Use unique tags - Prevent duplicate tasks for critical operations
  3. Set appropriate timeouts - Don't let tasks run indefinitely
  4. Handle errors gracefully - Use specific error types to control retry behavior
  5. Design failure hooks carefully - Failure hooks should handle cleanup, notifications, or escalations
  6. Monitor task performance - Use metrics to track task execution times and failure rates
  7. Use transactions - Enqueue tasks within database transactions for consistency
  8. Test failure scenarios - Ensure your failure hooks work correctly and don't create infinite loops
  9. Use async tasks for module decoupling - Instead of calling methods directly between modules, use async tasks to keep modules loosely coupled. For example, when an order is paid, enqueue an orderFinished task rather than directly calling factory operations. This keeps code clean and maintainable. Note: Only use this for eventual consistency scenarios, not for strong consistency requirements like real-time financial transactions.

Worker Configuration

The worker runs automatically when you start your Anclax application. You can configure worker behavior:

// Disable worker for specific environments
cfg := &config.Config{
    Worker: config.Worker{
        Disable: true,  // Disable worker
    },
}

Workers poll the database every second for pending tasks and process them with configurable concurrency based on available goroutines. The default concurrency is 10.

By default Anclax starts the worker runtime automatically with the unified worker implementation.