results

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package results provides task result publication and retrieval for agent coordination.

The ResultPublisher interface enables agents to publish task outputs to a shared store, allowing other agents to subscribe to and retrieve results. This decouples result production from consumption and enables async task pipelines.

Two implementations are provided:

  • MemoryPublisher: In-memory storage for testing and single-process scenarios
  • BusPublisher: Bus-backed storage with pub/sub notification for distributed systems

Basic Usage

// Create a memory publisher for testing
pub := results.NewMemoryPublisher()

// Publish a result
err := pub.Publish(ctx, "task-123", results.Result{
    TaskID:   "task-123",
    Status:   results.StatusSuccess,
    Output:   []byte(`{"answer": 42}`),
    Metadata: map[string]string{"model": "gpt-4"},
})

// Retrieve a result
result, err := pub.Get(ctx, "task-123")

// Subscribe to result updates
ch, err := pub.Subscribe("task-123")
for result := range ch {
    fmt.Printf("Result received: %s\n", result.Status)
}

Bus-based Publisher

For distributed systems, use the bus-backed publisher:

nbus, _ := bus.NewNATSBus(bus.NATSConfig{URL: "nats://localhost:4222"})
pub := results.NewBusPublisher(nbus, results.BusPublisherConfig{
    SubjectPrefix: "results",
})

// Results are stored in memory but notifications go over the bus
pub.Publish(ctx, "task-123", result)

// Remote subscribers receive notifications via bus
ch, _ := pub.Subscribe("task-123")

The bus publisher enables agents to be notified of result availability without polling, supporting efficient distributed task coordination.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound      = errors.New("result not found")
	ErrAlreadyExists = errors.New("result already exists")
	ErrClosed        = errors.New("publisher closed")
	ErrInvalidTaskID = errors.New("invalid task ID")
	ErrInvalidStatus = errors.New("invalid result status")
)

Common errors.

Functions

func ValidateResult

func ValidateResult(r Result) error

ValidateResult checks if a result is valid for publishing.

func ValidateTaskID

func ValidateTaskID(taskID string) error

ValidateTaskID checks if a task ID is valid.

Types

type BusPublisher

type BusPublisher struct {
	// contains filtered or unexported fields
}

BusPublisher implements ResultPublisher using a message bus for notifications. Results are stored in memory, but updates are broadcast over the bus, enabling distributed subscribers to receive notifications.

func NewBusPublisher

func NewBusPublisher(mb bus.MessageBus, cfg BusPublisherConfig) *BusPublisher

NewBusPublisher creates a new bus-backed result publisher.

func (*BusPublisher) Close

func (p *BusPublisher) Close() error

Close shuts down the publisher.

func (*BusPublisher) Delete

func (p *BusPublisher) Delete(ctx context.Context, taskID string) error

Delete removes a result by task ID.

func (*BusPublisher) Get

func (p *BusPublisher) Get(ctx context.Context, taskID string) (*Result, error)

Get retrieves a result by task ID.

func (*BusPublisher) List

func (p *BusPublisher) List(filter ResultFilter) ([]*Result, error)

List returns results matching the filter criteria.

func (*BusPublisher) Publish

func (p *BusPublisher) Publish(ctx context.Context, taskID string, result Result) error

Publish stores or updates a task result and broadcasts to subscribers.

func (*BusPublisher) Subscribe

func (p *BusPublisher) Subscribe(taskID string) (<-chan *Result, error)

Subscribe returns a channel that receives updates for a task. Updates come from the message bus, enabling distributed notification.

type BusPublisherConfig

type BusPublisherConfig struct {
	// SubjectPrefix is the prefix for result subjects.
	// Default: "results"
	SubjectPrefix string

	// BufferSize for subscription channels.
	// Default: 16
	BufferSize int
}

BusPublisherConfig configures the bus-backed result publisher.

func DefaultBusPublisherConfig

func DefaultBusPublisherConfig() BusPublisherConfig

DefaultBusPublisherConfig returns configuration with sensible defaults.

type MemoryPublisher

type MemoryPublisher struct {
	// contains filtered or unexported fields
}

MemoryPublisher implements ResultPublisher using in-memory storage. Useful for testing and single-process scenarios.

func NewMemoryPublisher

func NewMemoryPublisher() *MemoryPublisher

NewMemoryPublisher creates a new in-memory result publisher.

func (*MemoryPublisher) Close

func (p *MemoryPublisher) Close() error

Close shuts down the publisher.

func (*MemoryPublisher) Delete

func (p *MemoryPublisher) Delete(ctx context.Context, taskID string) error

Delete removes a result by task ID.

func (*MemoryPublisher) Get

func (p *MemoryPublisher) Get(ctx context.Context, taskID string) (*Result, error)

Get retrieves a result by task ID.

func (*MemoryPublisher) List

func (p *MemoryPublisher) List(filter ResultFilter) ([]*Result, error)

List returns results matching the filter criteria.

func (*MemoryPublisher) Publish

func (p *MemoryPublisher) Publish(ctx context.Context, taskID string, result Result) error

Publish stores or updates a task result.

func (*MemoryPublisher) Subscribe

func (p *MemoryPublisher) Subscribe(taskID string) (<-chan *Result, error)

Subscribe returns a channel that receives updates for a task.

type Result

type Result struct {
	// TaskID uniquely identifies the task.
	TaskID string

	// Status indicates the current state of the result.
	Status ResultStatus

	// Output contains the task's output data.
	// Empty for pending or failed tasks.
	Output []byte

	// Error contains the error message if Status is StatusFailed.
	Error string

	// Metadata contains additional key-value data about the result.
	Metadata map[string]string

	// CreatedAt is when the result was first created.
	CreatedAt time.Time

	// UpdatedAt is when the result was last updated.
	UpdatedAt time.Time
}

Result represents a task's output.

func (*Result) Clone

func (r *Result) Clone() *Result

Clone returns a deep copy of the result.

type ResultFilter

type ResultFilter struct {
	// Status filters by result status. Empty means all statuses.
	Status ResultStatus

	// TaskIDPrefix filters by task ID prefix.
	TaskIDPrefix string

	// CreatedAfter filters results created after this time.
	CreatedAfter time.Time

	// CreatedBefore filters results created before this time.
	CreatedBefore time.Time

	// Limit caps the number of results returned. 0 means no limit.
	Limit int

	// Metadata filters by metadata key-value pairs (all must match).
	Metadata map[string]string
}

ResultFilter specifies criteria for listing results.

func (ResultFilter) Matches

func (f ResultFilter) Matches(r *Result) bool

Matches returns true if the result matches the filter criteria.

type ResultPublisher

type ResultPublisher interface {
	// Publish stores or updates a task result.
	// If the result already exists, it is updated.
	Publish(ctx context.Context, taskID string, result Result) error

	// Get retrieves a result by task ID.
	// Returns ErrNotFound if the result doesn't exist.
	Get(ctx context.Context, taskID string) (*Result, error)

	// Subscribe returns a channel that receives updates for a task.
	// The channel is closed when the task reaches a terminal state
	// or when the subscription is cancelled.
	// If the result already exists, it is sent immediately.
	Subscribe(taskID string) (<-chan *Result, error)

	// List returns results matching the filter criteria.
	List(filter ResultFilter) ([]*Result, error)

	// Delete removes a result by task ID.
	// Returns ErrNotFound if the result doesn't exist.
	Delete(ctx context.Context, taskID string) error

	// Close shuts down the publisher and releases resources.
	Close() error
}

ResultPublisher provides result storage, retrieval, and subscription.

type ResultStatus

type ResultStatus string

ResultStatus represents the state of a task result.

const (
	// StatusPending indicates the task is still in progress.
	StatusPending ResultStatus = "pending"

	// StatusSuccess indicates the task completed successfully.
	StatusSuccess ResultStatus = "success"

	// StatusFailed indicates the task failed.
	StatusFailed ResultStatus = "failed"
)

func (ResultStatus) IsTerminal

func (s ResultStatus) IsTerminal() bool

IsTerminal returns true if the status represents a final state.

func (ResultStatus) Valid

func (s ResultStatus) Valid() bool

Valid returns true if the status is a known value.

type Subscription

type Subscription interface {
	// Results returns the channel for incoming result updates.
	Results() <-chan *Result

	// Cancel cancels the subscription.
	Cancel() error
}

Subscription represents an active result subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL