Documentation
¶
Overview ¶
Package ratelimit provides rate limit coordination for agent swarms.
Rate limiting is essential when multiple agents access shared resources (APIs, databases, external services) that have capacity constraints. This package provides both local and distributed rate limiters.
Local Rate Limiting ¶
The MemoryLimiter provides per-process rate limiting using token buckets:
limiter := ratelimit.NewMemoryLimiter()
limiter.SetCapacity("openai-api", 60, time.Minute) // 60 requests per minute
// Block until token available
if err := limiter.Acquire(ctx, "openai-api"); err != nil {
return err // context cancelled
}
defer limiter.Release("openai-api")
// Non-blocking attempt
if limiter.TryAcquire("openai-api") {
defer limiter.Release("openai-api")
// Make request
}
Distributed Rate Limiting ¶
The DistributedLimiter coordinates rate limits across agents via the message bus:
limiter, err := ratelimit.NewDistributedLimiter(ratelimit.DistributedConfig{
Bus: nbus,
AgentID: "agent-1",
})
limiter.SetCapacity("shared-api", 100, time.Minute)
// Announce reduced capacity (e.g., after 429 response)
limiter.AnnounceReduced("shared-api", "received 429 from API")
When an agent announces reduced capacity, all agents in the swarm automatically adjust their local rate limits to share the remaining capacity.
Algorithm ¶
Both implementations use the token bucket algorithm with refill:
- Tokens are added at a fixed rate based on capacity/window
- Each Acquire consumes one token
- If no tokens available, Acquire blocks (or TryAcquire returns false)
- Release returns a token to the bucket (optional, for request tracking)
Best Practices ¶
- Set capacity slightly below actual limits for safety margin
- Use Release to track in-flight requests, not just rate
- Watch for AnnounceReduced in production to detect limit issues
- Use TryAcquire with fallback for non-critical requests
Index ¶
- Constants
- Variables
- type Capacity
- type CapacityUpdate
- type DistributedConfig
- type DistributedLimiter
- func (d *DistributedLimiter) Acquire(ctx context.Context, resource string) error
- func (d *DistributedLimiter) AnnounceReduced(resource string, reason string)
- func (d *DistributedLimiter) Close() error
- func (d *DistributedLimiter) GetCapacity(resource string) *Capacity
- func (d *DistributedLimiter) OnCapacityChange(cb OnCapacityChange)
- func (d *DistributedLimiter) Release(resource string)
- func (d *DistributedLimiter) SetCapacity(resource string, capacity int, window time.Duration)
- func (d *DistributedLimiter) TryAcquire(resource string) bool
- type MemoryLimiter
- func (m *MemoryLimiter) Acquire(ctx context.Context, resource string) error
- func (m *MemoryLimiter) AnnounceReduced(resource string, reason string)
- func (m *MemoryLimiter) Close() error
- func (m *MemoryLimiter) GetCapacity(resource string) *Capacity
- func (m *MemoryLimiter) Release(resource string)
- func (m *MemoryLimiter) SetCapacity(resource string, capacity int, window time.Duration)
- func (m *MemoryLimiter) TryAcquire(resource string) bool
- type OnCapacityChange
- type RateLimiter
- type WatchOption
Constants ¶
const SubjectPrefix = "ratelimit."
SubjectPrefix is the message bus subject prefix for rate limit messages.
Variables ¶
var ( ErrClosed = errors.New("limiter closed") ErrResourceUnknown = errors.New("unknown resource") ErrCapacityExhausted = errors.New("capacity exhausted") ErrInvalidCapacity = errors.New("invalid capacity") ErrInvalidWindow = errors.New("invalid window") ErrInvalidConfig = errors.New("invalid configuration") )
Common errors.
Functions ¶
This section is empty.
Types ¶
type Capacity ¶
type Capacity struct {
// Resource is the unique identifier for the rate-limited resource.
Resource string
// Available is the current number of available tokens.
Available int
// Total is the maximum capacity (tokens per window).
Total int
// Window is the refill period.
Window time.Duration
// InFlight tracks requests currently in progress (if Release is used).
InFlight int
}
Capacity describes the rate limit configuration for a resource.
type CapacityUpdate ¶
type CapacityUpdate struct {
// Resource that changed.
Resource string `json:"resource"`
// AgentID that sent the update.
AgentID string `json:"agent_id"`
// NewCapacity is the suggested new total capacity.
NewCapacity int `json:"new_capacity"`
// Reason for the change.
Reason string `json:"reason"`
// Timestamp of the update.
Timestamp time.Time `json:"timestamp"`
}
CapacityUpdate is broadcast when capacity changes in the swarm.
type DistributedConfig ¶
type DistributedConfig struct {
// Bus is the message bus for coordination.
Bus bus.MessageBus
// AgentID is the unique identifier for this agent.
AgentID string
// ReduceFactor is the multiplier when reducing capacity (0-1).
// Default: 0.5 (reduce by 50%)
ReduceFactor float64
// RecoveryInterval is how often to attempt capacity recovery.
// Default: 30 seconds
RecoveryInterval time.Duration
// RecoveryFactor is the multiplier when recovering capacity (>1).
// Default: 1.1 (increase by 10%)
RecoveryFactor float64
// MaxRecovery caps recovery at original capacity.
// Default: true
MaxRecovery bool
}
DistributedConfig configures a distributed rate limiter.
func DefaultDistributedConfig ¶
func DefaultDistributedConfig() DistributedConfig
DefaultDistributedConfig returns configuration with sensible defaults.
func (*DistributedConfig) Validate ¶
func (c *DistributedConfig) Validate() error
Validate checks the configuration.
type DistributedLimiter ¶
type DistributedLimiter struct {
// contains filtered or unexported fields
}
DistributedLimiter coordinates rate limits across a swarm via message bus.
func NewDistributedLimiter ¶
func NewDistributedLimiter(config DistributedConfig) (*DistributedLimiter, error)
NewDistributedLimiter creates a new distributed rate limiter.
func (*DistributedLimiter) Acquire ¶
func (d *DistributedLimiter) Acquire(ctx context.Context, resource string) error
Acquire blocks until a token is available for the resource.
func (*DistributedLimiter) AnnounceReduced ¶
func (d *DistributedLimiter) AnnounceReduced(resource string, reason string)
AnnounceReduced broadcasts that capacity should be reduced.
func (*DistributedLimiter) Close ¶
func (d *DistributedLimiter) Close() error
Close shuts down the limiter.
func (*DistributedLimiter) GetCapacity ¶
func (d *DistributedLimiter) GetCapacity(resource string) *Capacity
GetCapacity returns the current capacity info for a resource.
func (*DistributedLimiter) OnCapacityChange ¶
func (d *DistributedLimiter) OnCapacityChange(cb OnCapacityChange)
OnCapacityChange sets a callback for capacity change notifications.
func (*DistributedLimiter) Release ¶
func (d *DistributedLimiter) Release(resource string)
Release returns a token to the resource bucket.
func (*DistributedLimiter) SetCapacity ¶
func (d *DistributedLimiter) SetCapacity(resource string, capacity int, window time.Duration)
SetCapacity configures the rate limit for a resource.
func (*DistributedLimiter) TryAcquire ¶
func (d *DistributedLimiter) TryAcquire(resource string) bool
TryAcquire attempts to acquire a token without blocking.
type MemoryLimiter ¶
type MemoryLimiter struct {
// contains filtered or unexported fields
}
MemoryLimiter provides local rate limiting using token buckets. It is safe for concurrent use.
func NewMemoryLimiter ¶
func NewMemoryLimiter() *MemoryLimiter
NewMemoryLimiter creates a new in-memory rate limiter.
func (*MemoryLimiter) Acquire ¶
func (m *MemoryLimiter) Acquire(ctx context.Context, resource string) error
Acquire blocks until a token is available for the resource.
func (*MemoryLimiter) AnnounceReduced ¶
func (m *MemoryLimiter) AnnounceReduced(resource string, reason string)
AnnounceReduced is a no-op for the memory limiter. It logs locally but doesn't broadcast to other agents.
func (*MemoryLimiter) GetCapacity ¶
func (m *MemoryLimiter) GetCapacity(resource string) *Capacity
GetCapacity returns the current capacity info for a resource.
func (*MemoryLimiter) Release ¶
func (m *MemoryLimiter) Release(resource string)
Release returns a token to the resource bucket.
func (*MemoryLimiter) SetCapacity ¶
func (m *MemoryLimiter) SetCapacity(resource string, capacity int, window time.Duration)
SetCapacity configures the rate limit for a resource.
func (*MemoryLimiter) TryAcquire ¶
func (m *MemoryLimiter) TryAcquire(resource string) bool
TryAcquire attempts to acquire a token without blocking.
type OnCapacityChange ¶
type OnCapacityChange func(update *CapacityUpdate)
OnCapacityChange is a callback for capacity change notifications.
type RateLimiter ¶
type RateLimiter interface {
// Acquire blocks until a token is available for the resource.
// Returns context.Canceled or context.DeadlineExceeded if context ends.
// Returns ErrResourceUnknown if the resource has no configured capacity.
Acquire(ctx context.Context, resource string) error
// TryAcquire attempts to acquire a token without blocking.
// Returns true if a token was acquired, false otherwise.
TryAcquire(resource string) bool
// Release returns a token to the resource bucket.
// This is optional and useful for tracking in-flight requests.
// Has no effect if the resource is unknown or already at capacity.
Release(resource string)
// SetCapacity configures the rate limit for a resource.
// capacity is the number of tokens per window.
// window is the time period for refill (e.g., time.Minute).
SetCapacity(resource string, capacity int, window time.Duration)
// AnnounceReduced broadcasts that capacity should be reduced.
// reason describes why (e.g., "received 429 response").
// For distributed limiters, this notifies other agents.
// For local limiters, this is a no-op.
AnnounceReduced(resource string, reason string)
// GetCapacity returns the current capacity info for a resource.
// Returns nil if the resource is unknown.
GetCapacity(resource string) *Capacity
// Close shuts down the limiter and releases resources.
Close() error
}
RateLimiter coordinates rate limits for shared resources.
type WatchOption ¶
type WatchOption func(*watchConfig)
WatchOption configures watching behavior.
func WithCapacityCallback ¶
func WithCapacityCallback(cb OnCapacityChange) WatchOption
WithCapacityCallback sets a callback for capacity changes.