worker

package
v1.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Overview

Package worker defines interfaces and implementations for managed units of work.

It provides a uniform `Worker` interface to manage the lifecycle (Start, Stop, Wait) of various execution units such as OS processes, Goroutines, or Containers.

key features:

  • **Worker Interface**: A standard contract for starting, stopping, and waiting for work.
  • **Functional Worker**: Simple adapter `FromFunc` to turn any function into a Worker.
  • **Process Worker**: A wrapper around `os/exec` that ensures process hygiene (Fail-Closed) using `pkg/proc` logic (Job Objects on Windows, PDeathSig on Linux).
  • **Container Worker**: A bridge to manage containerized workloads via `pkg/container`.
  • **Handover Protocol**: Standard environment variables (`LIFECYCLE_RESUME_ID`, `LIFECYCLE_PREV_EXIT`) to pass context across worker restarts.

This package is foundational for the Supervisor pattern (v1.3+).

Index

Constants

View Source
const (
	// EnvResumeID is the unique session identifier for a worker.
	// It remains constant across restarts within the same supervisor lifecycle.
	EnvResumeID = "LIFECYCLE_RESUME_ID"

	// EnvPrevExit is the exit code of the previous execution of this worker.
	// It is injected by the supervisor upon restart.
	EnvPrevExit = "LIFECYCLE_PREV_EXIT"

	// EnvResumeToken is the opaque token used to resume a worker session.
	// It is injected by the supervisor upon restart/handover.
	EnvResumeToken = "LIFECYCLE_RESUME_TOKEN"
)

Standard environment variables for the Handover Protocol.

View Source
const (
	// MetadataType identifies the kind of worker (supervisor, process, etc.)
	MetadataType = "type"
	// MetadataRestarts tracks the number of times a worker was restarted.
	MetadataRestarts = "restarts"
	// MetadataWindowStart marks the start of the current reliability monitoring window.
	MetadataWindowStart = "window_start"
	// MetadataCircuitBreaker indicates if the circuit breaker has been triggered.
	MetadataCircuitBreaker = "circuit_breaker"
)

Variables

This section is empty.

Functions

func MermaidState

func MermaidState(s State) string

MermaidState returns a simple Mermaid state diagram (FSM) for a single worker. It visualizes the lifecycle transitions: Pending -> Running -> Stopped/Failed. This is useful for understanding the internal behavior of a worker type.

func MermaidTree

func MermaidTree(s State) string

MermaidTree returns a Mermaid diagram string representing the worker structure (Tree). It renders a hierarchical tree (graph TD) showing parent-child relationships.

func NodeLabeler

func NodeLabeler(name, status string, pid int, metadata map[string]string, icon string) string

NodeLabeler provides worker-specific node label formatting.

func NodeStyler

func NodeStyler(metadata map[string]string) (icon, shapeStart, shapeEnd, cssClass string)

NodeStyler provides worker-specific node styling based on metadata type field.

func StopAndWait added in v1.6.5

func StopAndWait(ctx context.Context, w Worker) error

StopAndWait is a utility that requests a worker to stop and blocks until it fully terminates (resolving race conditions on shutdown), returning any combined errors.

Types

type BaseWorker

type BaseWorker struct {

	// Standardized State Fields (for centralized logic)
	StopRequested bool
	Killed        bool
	ExitCode      int
	Err           error
	// contains filtered or unexported fields
}

BaseWorker provides default implementations for common Worker interface methods. It is designed to be embedded in custom worker types to reduce boilerplate and enforce safe concurrency.

**Concurrency Pattern:** All critical state changes use the internal mutex (`mu sync.RWMutex`). For custom state manipulations, always use the exposed `mu` field with generic locking helpers (`withLockAny`, `withLockResultAny`).

Example of safe custom state mutation:

type MyWorker struct {
    lifecycle.BaseWorker
    myField int
}

func (w *MyWorker) SetMyField(val int) {
    withLockAny(&w.mu, func() { w.myField = val })
}

func (w *MyWorker) GetMyField() int {
    return withLockResultAny(&w.mu, func() int { return w.myField })
}

The embedding pattern provides default implementations for:

  • Stop(ctx) — strict wait for quiescence (context cancellation handles cleanup)
  • Wait() — returns done channel
  • String() — returns worker name
  • State() — returns minimal state with name
  • Watch(ctx) — returns state change events (StateWatcher)

These can be overridden if custom behavior is needed, but always follow the locking pattern for state safety.

func NewBaseWorker

func NewBaseWorker(name string) *BaseWorker

NewBaseWorker creates a new BaseWorker with the given name. The name is immutable after creation (construct a new worker to change it).

func (*BaseWorker) ComponentType

func (b *BaseWorker) ComponentType() string

ComponentType returns the component type for introspection.

func (*BaseWorker) DeriveFinalStatus

func (b *BaseWorker) DeriveFinalStatus() Status

DeriveFinalStatus determines the final status based on the strict Intent vs Outcome logic. This centralizes the state machine rules: Killed -> StatusKilled Err != nil -> StatusFailed StopRequested -> StatusStopped Default -> StatusFinished

func (*BaseWorker) ExportState

func (b *BaseWorker) ExportState(fn func(*State)) State

ExportState allows embedding workers to safely construct their state while holding the lock. It builds the base state and passes it to the optional extension function 'fn'.

func (*BaseWorker) Finish

func (b *BaseWorker) Finish(err error)

Finish is the terminal checkpoint for a worker. It centralizes the final state transition logic, metrics, and signaling.

func (*BaseWorker) Lock added in v1.7.2

func (b *BaseWorker) Lock()

Lock satisfies the sync.Locker interface.

func (*BaseWorker) SetStatus

func (b *BaseWorker) SetStatus(new Status)

SetStatus updates the worker's status and emits a state change event. This should be called by worker implementations when status changes.

func (*BaseWorker) StartFunc

func (b *BaseWorker) StartFunc(ctx context.Context, fn func(context.Context) error) error

StartFunc is a helper that runs fn in a goroutine and manages the done channel. It's a common pattern for Start() implementations:

func (w *MyWorker) Start(ctx context.Context) error {
    return w.StartFunc(ctx, w.Run)
}

The function result is sent to the done channel, then the channel is closed.

func (*BaseWorker) State

func (b *BaseWorker) State() State

State returns the current worker state (base fields only).

func (*BaseWorker) Stop

func (b *BaseWorker) Stop(ctx context.Context) error

Stop satisfies the Worker interface. In BaseWorker, it only handles the "Strict Wait" protocol using the provided context. Embedding types should override this to trigger their specific cleanup (e.g. canceling a context or signaling a process) but should call this base implementation if they want to wait for quiescence.

func (*BaseWorker) String

func (b *BaseWorker) String() string

String returns the worker name. This is used for logging and debugging.

func (*BaseWorker) Unlock added in v1.7.2

func (b *BaseWorker) Unlock()

Unlock satisfies the sync.Locker interface.

func (*BaseWorker) Wait

func (b *BaseWorker) Wait() <-chan error

Wait returns the done channel. The channel is populated by StartFunc and closed when the worker exits.

func (*BaseWorker) Watch

func (b *BaseWorker) Watch(ctx context.Context) <-chan introspection.StateChange[State]

Watch implements introspection.TypedWatcher[State] interface. Returns a type-safe channel that emits StateChange[State] events. The channel is closed when the provided context is cancelled.

type ContainerWorker

type ContainerWorker struct {
	*BaseWorker
	// contains filtered or unexported fields
}

ContainerWorker is a Worker that manages a container via the container.Container interface.

func NewContainerWorker

func NewContainerWorker(name string, c container.Container) *ContainerWorker

NewContainerWorker creates a new Worker for a given container.

func (*ContainerWorker) Start

func (cw *ContainerWorker) Start(ctx context.Context) error

func (*ContainerWorker) State

func (cw *ContainerWorker) State() State

State returns the current worker state.

func (*ContainerWorker) Stop

func (cw *ContainerWorker) Stop(ctx context.Context) error

func (*ContainerWorker) String

func (cw *ContainerWorker) String() string

type EnvInjector

type EnvInjector interface {
	SetEnv(key, value string)
}

EnvInjector is an optional interface for workers that support environment variable injection.

type ProcessWorker

type ProcessWorker struct {
	*BaseWorker
	// contains filtered or unexported fields
}

ProcessWorker is a Worker that manages an OS process.

Concorrência: Todos os métodos que alteram estado interno usam mutex. Após o processo ser iniciado, SetEnv e SetOutput não têm efeito e retornam erro. O ciclo de vida do processo é Start -> (Stop|Finish) -> State. O método Stop pode retornar múltiplos erros combinados via errors.Join.

State.Metadata inclui os campos "startedAt" e "stoppedAt" (RFC3339Nano) para rastreabilidade.

func NewProcessWorker

func NewProcessWorker(name string, nameCmd string, args ...string) *ProcessWorker

NewProcessWorker creates a new ProcessWorker for the given command.

func (*ProcessWorker) SetEnv

func (p *ProcessWorker) SetEnv(key, value string) error

SetEnv adds an environment variable to the process. Só pode ser chamado antes de Start. Após o início, retorna erro.

func (*ProcessWorker) SetOutput added in v1.6.3

func (p *ProcessWorker) SetOutput(stdout, stderr io.Writer) error

SetOutput configures the standard output and error writers for the process. Deve ser chamado antes de Start. Após o início, retorna erro.

func (*ProcessWorker) Start

func (p *ProcessWorker) Start(ctx context.Context) error

Start initiates the OS process.

func (*ProcessWorker) State

func (p *ProcessWorker) State() State

State returns a snapshot of the worker's status, incluindo timestamps.

func (*ProcessWorker) Stop

func (p *ProcessWorker) Stop(ctx context.Context) error

Stop sends a signal to the process to terminate. Pode retornar erro composto (errors.Join) contendo erros de sinalização e kill. Use errors.Is/As para inspecionar causas individuais.

func (*ProcessWorker) String

func (p *ProcessWorker) String() string

String returns the worker name.

type Resumable

type Resumable interface {
	Worker
	// Pause requests the worker to stop and return a resume token.
	// This token can be passed to a new worker instance via LIFECYCLE_RESUME_TOKEN.
	Pause(context.Context) (string, error)
}

Resumable is an optional interface for workers that support pausing and resuming.

type State

type State struct {
	Name        string
	Status      Status
	PID         int
	ExitCode    int
	Error       error
	ResumeToken string
	Metadata    map[string]string
	Children    []State
}

State represents a snapshot of the worker's status.

type Status

type Status string

Status represents the lifecycle state of a worker.

const (
	// StatusCreated indicates the worker is instantiated and ready to start immediately.
	// The worker has all resources allocated and no blocking conditions.
	// Transition: NewWorker() → StatusCreated
	StatusCreated Status = "Created"

	// StatusPending indicates the worker is instantiated but blocked from starting.
	// Common blocking conditions:
	//   - Backoff/retry delay (supervisor restart policy)
	//   - Circuit breaker open
	//   - Resource pool exhausted
	//   - Scheduled start time not reached
	// Transition: Supervisor backoff → StatusPending, Circuit breaker → StatusPending
	StatusPending Status = "Pending"

	// StatusStarting indicates Start() was called and initialization is in progress.
	// Transition: Start() called → StatusStarting
	StatusStarting Status = "Starting"

	// StatusRunning indicates the worker is actively executing its workload.
	// Transition: Start() completed → StatusRunning
	StatusRunning Status = "Running"

	// StatusSuspended indicates the worker is paused (implements Suspendable interface).
	// Transition: Suspend() called → StatusSuspended
	StatusSuspended Status = "Suspended"

	// StatusStopping indicates Stop() was called and graceful shutdown is in progress.
	// Transition: Stop() called → StatusStopping
	StatusStopping Status = "Stopping"

	// StatusStopped indicates the worker was explicitly requested to stop (Manual/API).
	// Transition: Stop() called → StatusStopped (if exit code 0)
	StatusStopped Status = "Stopped"

	// StatusFinished indicates the worker has cleanly terminated execution naturally (exit code 0).
	// Transition: Work completed without error → StatusFinished
	StatusFinished Status = "Finished"

	// StatusKilled indicates the worker needed to be forcefully terminated (SIGKILL/Kill).
	// Transition: Stop() timed out → Kill() → StatusKilled
	StatusKilled Status = "Killed"

	// StatusFailed indicates the worker terminated with an error (exit code != 0).
	// Transition: Work completed with error → StatusFailed
	StatusFailed Status = "Failed"
)

func (Status) Key

func (s Status) Key() string

Key returns the normalized lowercase representation of the status.

type SuspendGate

type SuspendGate struct {
	// contains filtered or unexported fields
}

SuspendGate orchestrates safe suspension (pause) for a worker. It ensures the worker finishes its current unit of work before pausing, satisfying the strict quiescence requirement of the Suspendable interface.

func NewSuspendGate

func NewSuspendGate() *SuspendGate

NewSuspendGate creates a new gate ready for use.

func (*SuspendGate) Check

func (g *SuspendGate) Check(ctx context.Context) error

Check should be called by the Worker loop before starting a new unit of work. If a pause was requested, this method blocks until resumed or context is cancelled.

func (*SuspendGate) IsPaused

func (g *SuspendGate) IsPaused() bool

IsPaused returns true if the worker is currently suspended.

func (*SuspendGate) RequestPause

func (g *SuspendGate) RequestPause()

RequestPause signals the worker to pause at the next safe opportunity (the next Check call). Unlike Suspend, it does not block for confirmation.

func (*SuspendGate) Resume

func (g *SuspendGate) Resume()

Resume wakes up the worker and allows it to proceed past the Check call.

func (*SuspendGate) Suspend

func (g *SuspendGate) Suspend(ctx context.Context) error

Suspend signals the worker to pause at the next safe opportunity (the next Check call). It blocks until the worker loop confirms it has reached the paused state.

type Suspendable

type Suspendable interface {
	Worker
	// Suspend pauses the worker's processing. It must be non-blocking.
	Suspend(context.Context) error
	// Resume restarts the worker's processing.
	Resume(context.Context) error
}

Suspendable defines a worker that can pause its execution in-place without exiting. Unlike Resumable (which implies a restart/handover), Suspendable implies freezing state.

type Type

type Type string

Type represents the kind of worker.

const (
	TypeProcess    Type = "process"
	TypeContainer  Type = "container"
	TypeFunc       Type = "func"
	TypeSupervisor Type = "supervisor"
	TypeGoroutine  Type = "goroutine"
)

func (Type) String

func (t Type) String() string

String returns the capitalized representation of the type for logs.

type Worker

type Worker interface {
	// Start initiates the worker. It must be non-blocking.
	// The context can be used to control the startup phase.
	Start(context.Context) error

	// Stop requests the worker to stop.
	// It should respect the provided context for timeout/cancellation of the stop request.
	Stop(context.Context) error

	// Wait returns a channel that is closed when the worker has exited.
	// The error associated with the exit (if any) is sent on the channel.
	Wait() <-chan error

	// String returns a human-readable description/ID of the worker.
	String() string

	// State returns the current state of the worker for introspection.
	// Note: This returns a snapshot; some fields might be empty if not applicable.
	State() State
}

Worker defines the interface for a managed unit of work (process, goroutine, container).

The lifecycle is:

  1. Start(ctx) -> Non-blocking.
  2. Wait() -> Returns channel that closes when work finishes.
  3. Stop(ctx) -> Graceful termination request.

func FromFunc

func FromFunc(name string, fn func(context.Context) error) Worker

FromFunc creates a Worker from a simple function.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL