Documentation
¶
Overview ¶
Package worker defines interfaces and implementations for managed units of work.
It provides a uniform `Worker` interface to manage the lifecycle (Start, Stop, Wait) of various execution units such as OS processes, Goroutines, or Containers.
key features:
- **Worker Interface**: A standard contract for starting, stopping, and waiting for work.
- **Functional Worker**: Simple adapter `FromFunc` to turn any function into a Worker.
- **Process Worker**: A wrapper around `os/exec` that ensures process hygiene (Fail-Closed) using `pkg/proc` logic (Job Objects on Windows, PDeathSig on Linux).
- **Container Worker**: A bridge to manage containerized workloads via `pkg/container`.
- **Handover Protocol**: Standard environment variables (`LIFECYCLE_RESUME_ID`, `LIFECYCLE_PREV_EXIT`) to pass context across worker restarts.
This package is foundational for the Supervisor pattern (v1.3+).
Index ¶
- Constants
- func MermaidState(s State) string
- func MermaidTree(s State) string
- func NodeLabeler(name, status string, pid int, metadata map[string]string, icon string) string
- func NodeStyler(metadata map[string]string) (icon, shapeStart, shapeEnd, cssClass string)
- func StopAndWait(ctx context.Context, w Worker) error
- type BaseWorker
- func (b *BaseWorker) ComponentType() string
- func (b *BaseWorker) DeriveFinalStatus() Status
- func (b *BaseWorker) ExportState(fn func(*State)) State
- func (b *BaseWorker) Finish(err error)
- func (b *BaseWorker) Lock()
- func (b *BaseWorker) SetStatus(new Status)
- func (b *BaseWorker) StartFunc(ctx context.Context, fn func(context.Context) error) error
- func (b *BaseWorker) State() State
- func (b *BaseWorker) Stop(ctx context.Context) error
- func (b *BaseWorker) String() string
- func (b *BaseWorker) Unlock()
- func (b *BaseWorker) Wait() <-chan error
- func (b *BaseWorker) Watch(ctx context.Context) <-chan introspection.StateChange[State]
- type ContainerWorker
- type EnvInjector
- type ProcessWorker
- func (p *ProcessWorker) SetEnv(key, value string) error
- func (p *ProcessWorker) SetOutput(stdout, stderr io.Writer) error
- func (p *ProcessWorker) Start(ctx context.Context) error
- func (p *ProcessWorker) State() State
- func (p *ProcessWorker) Stop(ctx context.Context) error
- func (p *ProcessWorker) String() string
- type Resumable
- type State
- type Status
- type SuspendGate
- type Suspendable
- type Type
- type Worker
Constants ¶
const ( // EnvResumeID is the unique session identifier for a worker. // It remains constant across restarts within the same supervisor lifecycle. EnvResumeID = "LIFECYCLE_RESUME_ID" // EnvPrevExit is the exit code of the previous execution of this worker. // It is injected by the supervisor upon restart. EnvPrevExit = "LIFECYCLE_PREV_EXIT" // EnvResumeToken is the opaque token used to resume a worker session. // It is injected by the supervisor upon restart/handover. EnvResumeToken = "LIFECYCLE_RESUME_TOKEN" )
Standard environment variables for the Handover Protocol.
const ( // MetadataType identifies the kind of worker (supervisor, process, etc.) MetadataType = "type" // MetadataRestarts tracks the number of times a worker was restarted. MetadataRestarts = "restarts" // MetadataWindowStart marks the start of the current reliability monitoring window. MetadataWindowStart = "window_start" // MetadataCircuitBreaker indicates if the circuit breaker has been triggered. MetadataCircuitBreaker = "circuit_breaker" )
Variables ¶
This section is empty.
Functions ¶
func MermaidState ¶
MermaidState returns a simple Mermaid state diagram (FSM) for a single worker. It visualizes the lifecycle transitions: Pending -> Running -> Stopped/Failed. This is useful for understanding the internal behavior of a worker type.
func MermaidTree ¶
MermaidTree returns a Mermaid diagram string representing the worker structure (Tree). It renders a hierarchical tree (graph TD) showing parent-child relationships.
func NodeLabeler ¶
NodeLabeler provides worker-specific node label formatting.
func NodeStyler ¶
NodeStyler provides worker-specific node styling based on metadata type field.
Types ¶
type BaseWorker ¶
type BaseWorker struct {
// Standardized State Fields (for centralized logic)
StopRequested bool
Killed bool
ExitCode int
Err error
// contains filtered or unexported fields
}
BaseWorker provides default implementations for common Worker interface methods. It is designed to be embedded in custom worker types to reduce boilerplate and enforce safe concurrency.
**Concurrency Pattern:** All critical state changes use the internal mutex (`mu sync.RWMutex`). For custom state manipulations, always use the exposed `mu` field with generic locking helpers (`withLockAny`, `withLockResultAny`).
Example of safe custom state mutation:
type MyWorker struct {
lifecycle.BaseWorker
myField int
}
func (w *MyWorker) SetMyField(val int) {
withLockAny(&w.mu, func() { w.myField = val })
}
func (w *MyWorker) GetMyField() int {
return withLockResultAny(&w.mu, func() int { return w.myField })
}
The embedding pattern provides default implementations for:
- Stop(ctx) — strict wait for quiescence (context cancellation handles cleanup)
- Wait() — returns done channel
- String() — returns worker name
- State() — returns minimal state with name
- Watch(ctx) — returns state change events (StateWatcher)
These can be overridden if custom behavior is needed, but always follow the locking pattern for state safety.
func NewBaseWorker ¶
func NewBaseWorker(name string) *BaseWorker
NewBaseWorker creates a new BaseWorker with the given name. The name is immutable after creation (construct a new worker to change it).
func (*BaseWorker) ComponentType ¶
func (b *BaseWorker) ComponentType() string
ComponentType returns the component type for introspection.
func (*BaseWorker) DeriveFinalStatus ¶
func (b *BaseWorker) DeriveFinalStatus() Status
DeriveFinalStatus determines the final status based on the strict Intent vs Outcome logic. This centralizes the state machine rules: Killed -> StatusKilled Err != nil -> StatusFailed StopRequested -> StatusStopped Default -> StatusFinished
func (*BaseWorker) ExportState ¶
func (b *BaseWorker) ExportState(fn func(*State)) State
ExportState allows embedding workers to safely construct their state while holding the lock. It builds the base state and passes it to the optional extension function 'fn'.
func (*BaseWorker) Finish ¶
func (b *BaseWorker) Finish(err error)
Finish is the terminal checkpoint for a worker. It centralizes the final state transition logic, metrics, and signaling.
func (*BaseWorker) Lock ¶ added in v1.7.2
func (b *BaseWorker) Lock()
Lock satisfies the sync.Locker interface.
func (*BaseWorker) SetStatus ¶
func (b *BaseWorker) SetStatus(new Status)
SetStatus updates the worker's status and emits a state change event. This should be called by worker implementations when status changes.
func (*BaseWorker) StartFunc ¶
StartFunc is a helper that runs fn in a goroutine and manages the done channel. It's a common pattern for Start() implementations:
func (w *MyWorker) Start(ctx context.Context) error {
return w.StartFunc(ctx, w.Run)
}
The function result is sent to the done channel, then the channel is closed.
func (*BaseWorker) State ¶
func (b *BaseWorker) State() State
State returns the current worker state (base fields only).
func (*BaseWorker) Stop ¶
func (b *BaseWorker) Stop(ctx context.Context) error
Stop satisfies the Worker interface. In BaseWorker, it only handles the "Strict Wait" protocol using the provided context. Embedding types should override this to trigger their specific cleanup (e.g. canceling a context or signaling a process) but should call this base implementation if they want to wait for quiescence.
func (*BaseWorker) String ¶
func (b *BaseWorker) String() string
String returns the worker name. This is used for logging and debugging.
func (*BaseWorker) Unlock ¶ added in v1.7.2
func (b *BaseWorker) Unlock()
Unlock satisfies the sync.Locker interface.
func (*BaseWorker) Wait ¶
func (b *BaseWorker) Wait() <-chan error
Wait returns the done channel. The channel is populated by StartFunc and closed when the worker exits.
func (*BaseWorker) Watch ¶
func (b *BaseWorker) Watch(ctx context.Context) <-chan introspection.StateChange[State]
Watch implements introspection.TypedWatcher[State] interface. Returns a type-safe channel that emits StateChange[State] events. The channel is closed when the provided context is cancelled.
type ContainerWorker ¶
type ContainerWorker struct {
*BaseWorker
// contains filtered or unexported fields
}
ContainerWorker is a Worker that manages a container via the container.Container interface.
func NewContainerWorker ¶
func NewContainerWorker(name string, c container.Container) *ContainerWorker
NewContainerWorker creates a new Worker for a given container.
func (*ContainerWorker) State ¶
func (cw *ContainerWorker) State() State
State returns the current worker state.
func (*ContainerWorker) String ¶
func (cw *ContainerWorker) String() string
type EnvInjector ¶
type EnvInjector interface {
SetEnv(key, value string)
}
EnvInjector is an optional interface for workers that support environment variable injection.
type ProcessWorker ¶
type ProcessWorker struct {
*BaseWorker
// contains filtered or unexported fields
}
ProcessWorker is a Worker that manages an OS process.
Concorrência: Todos os métodos que alteram estado interno usam mutex. Após o processo ser iniciado, SetEnv e SetOutput não têm efeito e retornam erro. O ciclo de vida do processo é Start -> (Stop|Finish) -> State. O método Stop pode retornar múltiplos erros combinados via errors.Join.
State.Metadata inclui os campos "startedAt" e "stoppedAt" (RFC3339Nano) para rastreabilidade.
func NewProcessWorker ¶
func NewProcessWorker(name string, nameCmd string, args ...string) *ProcessWorker
NewProcessWorker creates a new ProcessWorker for the given command.
func (*ProcessWorker) SetEnv ¶
func (p *ProcessWorker) SetEnv(key, value string) error
SetEnv adds an environment variable to the process. Só pode ser chamado antes de Start. Após o início, retorna erro.
func (*ProcessWorker) SetOutput ¶ added in v1.6.3
func (p *ProcessWorker) SetOutput(stdout, stderr io.Writer) error
SetOutput configures the standard output and error writers for the process. Deve ser chamado antes de Start. Após o início, retorna erro.
func (*ProcessWorker) Start ¶
func (p *ProcessWorker) Start(ctx context.Context) error
Start initiates the OS process.
func (*ProcessWorker) State ¶
func (p *ProcessWorker) State() State
State returns a snapshot of the worker's status, incluindo timestamps.
func (*ProcessWorker) Stop ¶
func (p *ProcessWorker) Stop(ctx context.Context) error
Stop sends a signal to the process to terminate. Pode retornar erro composto (errors.Join) contendo erros de sinalização e kill. Use errors.Is/As para inspecionar causas individuais.
func (*ProcessWorker) String ¶
func (p *ProcessWorker) String() string
String returns the worker name.
type Resumable ¶
type Resumable interface {
Worker
// Pause requests the worker to stop and return a resume token.
// This token can be passed to a new worker instance via LIFECYCLE_RESUME_TOKEN.
Pause(context.Context) (string, error)
}
Resumable is an optional interface for workers that support pausing and resuming.
type State ¶
type State struct {
Name string
Status Status
PID int
ExitCode int
Error error
ResumeToken string
Metadata map[string]string
Children []State
}
State represents a snapshot of the worker's status.
type Status ¶
type Status string
Status represents the lifecycle state of a worker.
const ( // StatusCreated indicates the worker is instantiated and ready to start immediately. // The worker has all resources allocated and no blocking conditions. // Transition: NewWorker() → StatusCreated StatusCreated Status = "Created" // StatusPending indicates the worker is instantiated but blocked from starting. // Common blocking conditions: // - Backoff/retry delay (supervisor restart policy) // - Circuit breaker open // - Resource pool exhausted // - Scheduled start time not reached // Transition: Supervisor backoff → StatusPending, Circuit breaker → StatusPending StatusPending Status = "Pending" // StatusStarting indicates Start() was called and initialization is in progress. // Transition: Start() called → StatusStarting StatusStarting Status = "Starting" // StatusRunning indicates the worker is actively executing its workload. // Transition: Start() completed → StatusRunning StatusRunning Status = "Running" // StatusSuspended indicates the worker is paused (implements Suspendable interface). // Transition: Suspend() called → StatusSuspended StatusSuspended Status = "Suspended" // StatusStopping indicates Stop() was called and graceful shutdown is in progress. // Transition: Stop() called → StatusStopping StatusStopping Status = "Stopping" // StatusStopped indicates the worker was explicitly requested to stop (Manual/API). // Transition: Stop() called → StatusStopped (if exit code 0) StatusStopped Status = "Stopped" // StatusFinished indicates the worker has cleanly terminated execution naturally (exit code 0). // Transition: Work completed without error → StatusFinished StatusFinished Status = "Finished" // StatusKilled indicates the worker needed to be forcefully terminated (SIGKILL/Kill). // Transition: Stop() timed out → Kill() → StatusKilled StatusKilled Status = "Killed" // StatusFailed indicates the worker terminated with an error (exit code != 0). // Transition: Work completed with error → StatusFailed StatusFailed Status = "Failed" )
type SuspendGate ¶
type SuspendGate struct {
// contains filtered or unexported fields
}
SuspendGate orchestrates safe suspension (pause) for a worker. It ensures the worker finishes its current unit of work before pausing, satisfying the strict quiescence requirement of the Suspendable interface.
func NewSuspendGate ¶
func NewSuspendGate() *SuspendGate
NewSuspendGate creates a new gate ready for use.
func (*SuspendGate) Check ¶
func (g *SuspendGate) Check(ctx context.Context) error
Check should be called by the Worker loop before starting a new unit of work. If a pause was requested, this method blocks until resumed or context is cancelled.
func (*SuspendGate) IsPaused ¶
func (g *SuspendGate) IsPaused() bool
IsPaused returns true if the worker is currently suspended.
func (*SuspendGate) RequestPause ¶
func (g *SuspendGate) RequestPause()
RequestPause signals the worker to pause at the next safe opportunity (the next Check call). Unlike Suspend, it does not block for confirmation.
func (*SuspendGate) Resume ¶
func (g *SuspendGate) Resume()
Resume wakes up the worker and allows it to proceed past the Check call.
type Suspendable ¶
type Suspendable interface {
Worker
// Suspend pauses the worker's processing. It must be non-blocking.
Suspend(context.Context) error
// Resume restarts the worker's processing.
Resume(context.Context) error
}
Suspendable defines a worker that can pause its execution in-place without exiting. Unlike Resumable (which implies a restart/handover), Suspendable implies freezing state.
type Worker ¶
type Worker interface {
// Start initiates the worker. It must be non-blocking.
// The context can be used to control the startup phase.
Start(context.Context) error
// Stop requests the worker to stop.
// It should respect the provided context for timeout/cancellation of the stop request.
Stop(context.Context) error
// Wait returns a channel that is closed when the worker has exited.
// The error associated with the exit (if any) is sent on the channel.
Wait() <-chan error
// String returns a human-readable description/ID of the worker.
String() string
// State returns the current state of the worker for introspection.
// Note: This returns a snapshot; some fields might be empty if not applicable.
State() State
}
Worker defines the interface for a managed unit of work (process, goroutine, container).
The lifecycle is:
- Start(ctx) -> Non-blocking.
- Wait() -> Returns channel that closes when work finishes.
- Stop(ctx) -> Graceful termination request.