Documentation
¶
Overview ¶
Package registry provides agent registration and discovery for swarm coordination.
Overview ¶
The Registry interface enables agents to self-register with capabilities, status, and load. Other agents discover and route to appropriate handlers based on capabilities and availability.
Available Implementations ¶
- MemoryRegistry: In-memory implementation for testing and single-node use
- NATSRegistry: Distributed registry using NATS JetStream KV store
Basic Usage ¶
Register an agent:
reg := registry.NewMemoryRegistry(registry.MemoryConfig{})
err := reg.Register(registry.AgentInfo{
ID: "agent-1",
Name: "Code Review Agent",
Capabilities: []string{"code-review", "testing"},
Status: registry.StatusIdle,
Load: 0.3,
})
Discover agents by capability:
agents, _ := reg.FindByCapability("code-review")
// Returns agents sorted by load (lowest first)
if len(agents) > 0 {
target := agents[0] // Pick the least loaded agent
}
Watch for changes:
events, _ := reg.Watch()
for event := range events {
switch event.Type {
case registry.EventAdded:
fmt.Printf("New agent: %s\n", event.Agent.ID)
case registry.EventUpdated:
fmt.Printf("Agent updated: %s (load=%.2f)\n", event.Agent.ID, event.Agent.Load)
case registry.EventRemoved:
fmt.Printf("Agent removed: %s\n", event.Agent.ID)
}
}
NATS Registry ¶
For distributed deployments, use NATSRegistry with a shared NATS cluster:
import "github.com/vinayprograms/agentkit/bus"
// Reuse bus connection
natsBus, _ := bus.NewNATSBus(bus.NATSConfig{URL: "nats://localhost:4222"})
reg, _ := registry.NewNATSRegistry(natsBus.Conn(), registry.NATSRegistryConfig{
BucketName: "my-swarm-registry",
TTL: 30 * time.Second,
})
Multiple agents across different nodes share the same registry, enabling discovery and load balancing across the swarm.
TTL and Stale Entries ¶
Both implementations support TTL-based expiry. Agents should periodically re-register (heartbeat) to prevent being marked stale:
// Heartbeat every 10 seconds
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
reg.Register(AgentInfo{
ID: myID,
Status: currentStatus,
Load: currentLoad,
})
}
Load Balancing ¶
FindByCapability returns agents sorted by load (lowest first), enabling simple load-aware routing:
agents, _ := reg.FindByCapability("code-review")
agents, _ = reg.List(®istry.Filter{
Capability: "code-review",
Status: registry.StatusIdle,
MaxLoad: 0.8, // Only agents with load <= 80%
})
Package registry provides agent registration and discovery for swarm coordination.
Agents self-register with capabilities, status, and load. Other agents discover and route to appropriate handlers based on capabilities and availability.
Index ¶
- Variables
- func HasCapability(info AgentInfo, capability string) bool
- func MatchesFilter(info AgentInfo, filter *Filter) bool
- func ValidateAgentInfo(info AgentInfo) error
- type AgentInfo
- type Event
- type EventType
- type Filter
- type MemoryConfig
- type MemoryRegistry
- func (r *MemoryRegistry) Close() error
- func (r *MemoryRegistry) Deregister(id string) error
- func (r *MemoryRegistry) FindByCapability(capability string) ([]AgentInfo, error)
- func (r *MemoryRegistry) Get(id string) (*AgentInfo, error)
- func (r *MemoryRegistry) List(filter *Filter) ([]AgentInfo, error)
- func (r *MemoryRegistry) Register(info AgentInfo) error
- func (r *MemoryRegistry) Watch() (<-chan Event, error)
- type NATSRegistry
- func (r *NATSRegistry) Close() error
- func (r *NATSRegistry) Conn() *nats.Conn
- func (r *NATSRegistry) Deregister(id string) error
- func (r *NATSRegistry) FindByCapability(capability string) ([]AgentInfo, error)
- func (r *NATSRegistry) Get(id string) (*AgentInfo, error)
- func (r *NATSRegistry) List(filter *Filter) ([]AgentInfo, error)
- func (r *NATSRegistry) Register(info AgentInfo) error
- func (r *NATSRegistry) Watch() (<-chan Event, error)
- type NATSRegistryConfig
- type Registry
- type Status
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("agent not found") ErrClosed = errors.New("registry closed") ErrInvalidID = errors.New("invalid agent ID") ErrDuplicateID = errors.New("duplicate agent ID") )
Common errors.
Functions ¶
func HasCapability ¶
HasCapability checks if an agent has a specific capability.
func MatchesFilter ¶
MatchesFilter checks if an agent matches the filter criteria.
func ValidateAgentInfo ¶
ValidateAgentInfo checks if agent info is valid.
Types ¶
type AgentInfo ¶
type AgentInfo struct {
// ID uniquely identifies the agent.
ID string
// Name is a human-readable name for the agent.
Name string
// Capabilities lists what the agent can do (e.g., "code-review", "testing").
Capabilities []string
// Status is the agent's current operational state.
Status Status
// Load is the agent's current load (0.0-1.0).
Load float64
// Metadata contains additional key-value pairs.
Metadata map[string]string
// LastSeen is when the agent last updated its registration.
LastSeen time.Time
}
AgentInfo contains registration information for an agent.
type Event ¶
type Event struct {
// Type indicates what happened.
Type EventType
// Agent contains the agent information.
// For removal events, this contains the last known state.
Agent AgentInfo
}
Event represents a change in the registry.
type Filter ¶
type Filter struct {
// Status filters by operational state. Empty means all.
Status Status
// Capability filters to agents with this capability.
Capability string
// MaxLoad filters to agents with load at or below this value.
// Zero means no filter.
MaxLoad float64
}
Filter specifies criteria for listing agents.
type MemoryConfig ¶
type MemoryConfig struct {
// TTL specifies how long before an agent is considered stale.
// Zero means entries never expire.
TTL time.Duration
}
MemoryConfig configures the in-memory registry.
type MemoryRegistry ¶
type MemoryRegistry struct {
// contains filtered or unexported fields
}
MemoryRegistry is an in-memory implementation of Registry. Suitable for testing and single-node deployments.
func NewMemoryRegistry ¶
func NewMemoryRegistry(cfg MemoryConfig) *MemoryRegistry
NewMemoryRegistry creates a new in-memory registry.
func (*MemoryRegistry) Close ¶
func (r *MemoryRegistry) Close() error
Close shuts down the registry.
func (*MemoryRegistry) Deregister ¶
func (r *MemoryRegistry) Deregister(id string) error
Deregister removes an agent from the registry.
func (*MemoryRegistry) FindByCapability ¶
func (r *MemoryRegistry) FindByCapability(capability string) ([]AgentInfo, error)
FindByCapability returns agents with a specific capability.
func (*MemoryRegistry) Get ¶
func (r *MemoryRegistry) Get(id string) (*AgentInfo, error)
Get retrieves a specific agent by ID.
func (*MemoryRegistry) List ¶
func (r *MemoryRegistry) List(filter *Filter) ([]AgentInfo, error)
List returns all agents matching the filter.
func (*MemoryRegistry) Register ¶
func (r *MemoryRegistry) Register(info AgentInfo) error
Register adds or updates an agent in the registry.
func (*MemoryRegistry) Watch ¶
func (r *MemoryRegistry) Watch() (<-chan Event, error)
Watch returns a channel of registry events.
type NATSRegistry ¶
type NATSRegistry struct {
// contains filtered or unexported fields
}
NATSRegistry implements Registry using NATS JetStream KV store. Suitable for distributed deployments across multiple nodes.
func NewNATSRegistry ¶
func NewNATSRegistry(conn *nats.Conn, cfg NATSRegistryConfig) (*NATSRegistry, error)
NewNATSRegistry creates a new NATS registry from an existing connection.
func (*NATSRegistry) Conn ¶
func (r *NATSRegistry) Conn() *nats.Conn
Conn returns the underlying NATS connection.
func (*NATSRegistry) Deregister ¶
func (r *NATSRegistry) Deregister(id string) error
Deregister removes an agent from the registry.
func (*NATSRegistry) FindByCapability ¶
func (r *NATSRegistry) FindByCapability(capability string) ([]AgentInfo, error)
FindByCapability returns agents with a specific capability.
func (*NATSRegistry) Get ¶
func (r *NATSRegistry) Get(id string) (*AgentInfo, error)
Get retrieves a specific agent by ID.
func (*NATSRegistry) List ¶
func (r *NATSRegistry) List(filter *Filter) ([]AgentInfo, error)
List returns all agents matching the filter.
func (*NATSRegistry) Register ¶
func (r *NATSRegistry) Register(info AgentInfo) error
Register adds or updates an agent in the registry.
func (*NATSRegistry) Watch ¶
func (r *NATSRegistry) Watch() (<-chan Event, error)
Watch returns a channel of registry events.
type NATSRegistryConfig ¶
type NATSRegistryConfig struct {
// BucketName is the KV bucket name. Default: "agent-registry"
BucketName string
// TTL for agent entries. Zero means no expiry.
// Note: NATS KV has its own TTL handling.
TTL time.Duration
// Replicas for the KV store (1-5). Default: 1
Replicas int
}
NATSRegistryConfig configures the NATS registry.
func DefaultNATSRegistryConfig ¶
func DefaultNATSRegistryConfig() NATSRegistryConfig
DefaultNATSRegistryConfig returns configuration with sensible defaults.
type Registry ¶
type Registry interface {
// Register adds or updates an agent in the registry.
// If an agent with the same ID exists, it updates the entry.
Register(info AgentInfo) error
// Deregister removes an agent from the registry.
// Returns ErrNotFound if the agent doesn't exist.
Deregister(id string) error
// Get retrieves a specific agent by ID.
// Returns nil, ErrNotFound if not found.
Get(id string) (*AgentInfo, error)
// List returns all agents matching the optional filter.
// Pass nil for no filtering.
List(filter *Filter) ([]AgentInfo, error)
// FindByCapability returns agents with a specific capability.
// Results are sorted by load (lowest first).
FindByCapability(capability string) ([]AgentInfo, error)
// Watch returns a channel of registry events.
// The channel is closed when the registry is closed.
// Multiple watchers are supported.
Watch() (<-chan Event, error)
// Close shuts down the registry client.
Close() error
}
Registry provides agent registration and discovery.