ratelimit

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package ratelimit provides rate limit coordination for agent swarms.

Rate limiting is essential when multiple agents access shared resources (APIs, databases, external services) that have capacity constraints. This package provides both local and distributed rate limiters.

Local Rate Limiting

The MemoryLimiter provides per-process rate limiting using token buckets:

limiter := ratelimit.NewMemoryLimiter()
limiter.SetCapacity("openai-api", 60, time.Minute) // 60 requests per minute

// Block until token available
if err := limiter.Acquire(ctx, "openai-api"); err != nil {
    return err // context cancelled
}
defer limiter.Release("openai-api")

// Non-blocking attempt
if limiter.TryAcquire("openai-api") {
    defer limiter.Release("openai-api")
    // Make request
}

Distributed Rate Limiting

The DistributedLimiter coordinates rate limits across agents via the message bus:

limiter, err := ratelimit.NewDistributedLimiter(ratelimit.DistributedConfig{
    Bus:     nbus,
    AgentID: "agent-1",
})
limiter.SetCapacity("shared-api", 100, time.Minute)

// Announce reduced capacity (e.g., after 429 response)
limiter.AnnounceReduced("shared-api", "received 429 from API")

When an agent announces reduced capacity, all agents in the swarm automatically adjust their local rate limits to share the remaining capacity.

Algorithm

Both implementations use the token bucket algorithm with refill:

  • Tokens are added at a fixed rate based on capacity/window
  • Each Acquire consumes one token
  • If no tokens available, Acquire blocks (or TryAcquire returns false)
  • Release returns a token to the bucket (optional, for request tracking)

Best Practices

  • Set capacity slightly below actual limits for safety margin
  • Use Release to track in-flight requests, not just rate
  • Watch for AnnounceReduced in production to detect limit issues
  • Use TryAcquire with fallback for non-critical requests

Index

Constants

View Source
const SubjectPrefix = "ratelimit."

SubjectPrefix is the message bus subject prefix for rate limit messages.

Variables

View Source
var (
	ErrClosed            = errors.New("limiter closed")
	ErrResourceUnknown   = errors.New("unknown resource")
	ErrCapacityExhausted = errors.New("capacity exhausted")
	ErrInvalidCapacity   = errors.New("invalid capacity")
	ErrInvalidWindow     = errors.New("invalid window")
	ErrInvalidConfig     = errors.New("invalid configuration")
)

Common errors.

Functions

This section is empty.

Types

type Capacity

type Capacity struct {
	// Resource is the unique identifier for the rate-limited resource.
	Resource string

	// Available is the current number of available tokens.
	Available int

	// Total is the maximum capacity (tokens per window).
	Total int

	// Window is the refill period.
	Window time.Duration

	// InFlight tracks requests currently in progress (if Release is used).
	InFlight int
}

Capacity describes the rate limit configuration for a resource.

type CapacityUpdate

type CapacityUpdate struct {
	// Resource that changed.
	Resource string `json:"resource"`

	// AgentID that sent the update.
	AgentID string `json:"agent_id"`

	// NewCapacity is the suggested new total capacity.
	NewCapacity int `json:"new_capacity"`

	// Reason for the change.
	Reason string `json:"reason"`

	// Timestamp of the update.
	Timestamp time.Time `json:"timestamp"`
}

CapacityUpdate is broadcast when capacity changes in the swarm.

type DistributedConfig

type DistributedConfig struct {
	// Bus is the message bus for coordination.
	Bus bus.MessageBus

	// AgentID is the unique identifier for this agent.
	AgentID string

	// ReduceFactor is the multiplier when reducing capacity (0-1).
	// Default: 0.5 (reduce by 50%)
	ReduceFactor float64

	// RecoveryInterval is how often to attempt capacity recovery.
	// Default: 30 seconds
	RecoveryInterval time.Duration

	// RecoveryFactor is the multiplier when recovering capacity (>1).
	// Default: 1.1 (increase by 10%)
	RecoveryFactor float64

	// MaxRecovery caps recovery at original capacity.
	// Default: true
	MaxRecovery bool
}

DistributedConfig configures a distributed rate limiter.

func DefaultDistributedConfig

func DefaultDistributedConfig() DistributedConfig

DefaultDistributedConfig returns configuration with sensible defaults.

func (*DistributedConfig) Validate

func (c *DistributedConfig) Validate() error

Validate checks the configuration.

type DistributedLimiter

type DistributedLimiter struct {
	// contains filtered or unexported fields
}

DistributedLimiter coordinates rate limits across a swarm via message bus.

func NewDistributedLimiter

func NewDistributedLimiter(config DistributedConfig) (*DistributedLimiter, error)

NewDistributedLimiter creates a new distributed rate limiter.

func (*DistributedLimiter) Acquire

func (d *DistributedLimiter) Acquire(ctx context.Context, resource string) error

Acquire blocks until a token is available for the resource.

func (*DistributedLimiter) AnnounceReduced

func (d *DistributedLimiter) AnnounceReduced(resource string, reason string)

AnnounceReduced broadcasts that capacity should be reduced.

func (*DistributedLimiter) Close

func (d *DistributedLimiter) Close() error

Close shuts down the limiter.

func (*DistributedLimiter) GetCapacity

func (d *DistributedLimiter) GetCapacity(resource string) *Capacity

GetCapacity returns the current capacity info for a resource.

func (*DistributedLimiter) OnCapacityChange

func (d *DistributedLimiter) OnCapacityChange(cb OnCapacityChange)

OnCapacityChange sets a callback for capacity change notifications.

func (*DistributedLimiter) Release

func (d *DistributedLimiter) Release(resource string)

Release returns a token to the resource bucket.

func (*DistributedLimiter) SetCapacity

func (d *DistributedLimiter) SetCapacity(resource string, capacity int, window time.Duration)

SetCapacity configures the rate limit for a resource.

func (*DistributedLimiter) TryAcquire

func (d *DistributedLimiter) TryAcquire(resource string) bool

TryAcquire attempts to acquire a token without blocking.

type MemoryLimiter

type MemoryLimiter struct {
	// contains filtered or unexported fields
}

MemoryLimiter provides local rate limiting using token buckets. It is safe for concurrent use.

func NewMemoryLimiter

func NewMemoryLimiter() *MemoryLimiter

NewMemoryLimiter creates a new in-memory rate limiter.

func (*MemoryLimiter) Acquire

func (m *MemoryLimiter) Acquire(ctx context.Context, resource string) error

Acquire blocks until a token is available for the resource.

func (*MemoryLimiter) AnnounceReduced

func (m *MemoryLimiter) AnnounceReduced(resource string, reason string)

AnnounceReduced is a no-op for the memory limiter. It logs locally but doesn't broadcast to other agents.

func (*MemoryLimiter) Close

func (m *MemoryLimiter) Close() error

Close shuts down the limiter.

func (*MemoryLimiter) GetCapacity

func (m *MemoryLimiter) GetCapacity(resource string) *Capacity

GetCapacity returns the current capacity info for a resource.

func (*MemoryLimiter) Release

func (m *MemoryLimiter) Release(resource string)

Release returns a token to the resource bucket.

func (*MemoryLimiter) SetCapacity

func (m *MemoryLimiter) SetCapacity(resource string, capacity int, window time.Duration)

SetCapacity configures the rate limit for a resource.

func (*MemoryLimiter) TryAcquire

func (m *MemoryLimiter) TryAcquire(resource string) bool

TryAcquire attempts to acquire a token without blocking.

type OnCapacityChange

type OnCapacityChange func(update *CapacityUpdate)

OnCapacityChange is a callback for capacity change notifications.

type RateLimiter

type RateLimiter interface {
	// Acquire blocks until a token is available for the resource.
	// Returns context.Canceled or context.DeadlineExceeded if context ends.
	// Returns ErrResourceUnknown if the resource has no configured capacity.
	Acquire(ctx context.Context, resource string) error

	// TryAcquire attempts to acquire a token without blocking.
	// Returns true if a token was acquired, false otherwise.
	TryAcquire(resource string) bool

	// Release returns a token to the resource bucket.
	// This is optional and useful for tracking in-flight requests.
	// Has no effect if the resource is unknown or already at capacity.
	Release(resource string)

	// SetCapacity configures the rate limit for a resource.
	// capacity is the number of tokens per window.
	// window is the time period for refill (e.g., time.Minute).
	SetCapacity(resource string, capacity int, window time.Duration)

	// AnnounceReduced broadcasts that capacity should be reduced.
	// reason describes why (e.g., "received 429 response").
	// For distributed limiters, this notifies other agents.
	// For local limiters, this is a no-op.
	AnnounceReduced(resource string, reason string)

	// GetCapacity returns the current capacity info for a resource.
	// Returns nil if the resource is unknown.
	GetCapacity(resource string) *Capacity

	// Close shuts down the limiter and releases resources.
	Close() error
}

RateLimiter coordinates rate limits for shared resources.

type WatchOption

type WatchOption func(*watchConfig)

WatchOption configures watching behavior.

func WithCapacityCallback

func WithCapacityCallback(cb OnCapacityChange) WatchOption

WithCapacityCallback sets a callback for capacity changes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL