stm

package module
v0.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2025 License: MIT Imports: 14 Imported by: 0

README

STM - Stupid Task Manager

Distributed task queue with crash recovery and multi-instance coordination.

STM v2 is a persistent task queue designed for distributed Go applications that need:

  • Task survival across server restarts
  • Multi-instance coordination (multiple workers processing the same queue)
  • Per-server concurrency limits
  • Priority-based task scheduling
  • Automatic retry with exponential backoff

Two backends available:

  • PostgreSQL: Durable storage, ACID guarantees, transaction pooling compatible
  • Redis: In-memory speed (~3x faster), simpler architecture, automatic TTL cleanup

Choose based on your persistence requirements and throughput needs.

Features

Core (Both Backends)
  • Crash Recovery: Tasks survive server restarts - crashed tasks automatically return to pending state
  • Multi-Instance Safe: Atomic operations prevent duplicate processing across multiple workers
  • Per-Server Concurrency: Limit parallel tasks per backend server (e.g., SSR workers, API endpoints)
  • Priority Scheduling: Higher priority tasks processed first within each provider
  • Hot Config Reload: Update server lists and concurrency limits without restarting workers
  • Scheduled Tasks: Delayed and recurring task execution with retry logic
PostgreSQL Backend
  • ACID Guarantees: Full transaction support with rollback capability
  • Pgbouncer Compatible: Simple protocol mode works with transaction pooling (no prepared statement errors)
  • Advisory Locks: Database-level coordination with FOR UPDATE SKIP LOCKED
  • Persistent Indexes: Optimized composite indexes for high-throughput task grabbing
  • Throughput: 20,000+ tasks/sec sustained
Redis Backend
  • In-Memory Speed: 67,000+ tasks/sec throughput (~3x faster than PostgreSQL)
  • Automatic TTL: Tasks auto-delete after 7 days, scheduled tasks after 2 months
  • Simpler Operations: Sorted sets + atomic counters instead of transactions
  • Pub/Sub Ready: Easy to add instant worker notification (vs polling)
  • No Migrations: Schema-less, just connect and run

Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Instance A │     │  Instance B │     │  Instance C │
│  (Leader)   │     │  (Reader)   │     │  (Reader)   │
└──────┬──────┘     └──────┬──────┘     └──────┬──────┘
       │                   │                   │
       └───────────────────┼───────────────────┘
                           │
                  ┌────────▼────────┐
                  │   PostgreSQL    │
                  │   or Redis      │
                  │                 │
                  │  PostgreSQL:    │
                  │  - stm_tasks    │
                  │  - stm_config   │
                  │  - advisory     │
                  │    locks        │
                  │                 │
                  │  Redis:         │
                  │  - sorted sets  │
                  │  - hashes       │
                  │  - atomic       │
                  │    counters     │
                  └─────────────────┘

Leader Mode: Creates schema/config, initializes provider config, starts workers Reader Mode: Connects to existing config, starts workers

All instances coordinate through backend storage - tasks grabbed atomically, server capacity tracked globally.

Quick Start

Installation
go get github.com/coffyg/stm
Basic Usage (Leader Instance)
package main

import (
    "time"
    "github.com/rs/zerolog"
    "github.com/coffyg/stm"
)

func main() {
    logger := zerolog.New(os.Stdout).With().Timestamp().Logger()

    // Define providers and their servers
    providers := []stm.ProviderConfig{
        {
            Name: "ssr_rendering",
            Servers: []string{
                "http://ssr-worker-1:8080",
                "http://ssr-worker-2:8080",
            },
        },
        {
            Name: "api_calls",
            Servers: []string{"http://api-backend:3000"},
        },
    }

    // Create leader instance (initializes schema)
    tm, err := stm.NewStupidTaskManager(
        "postgres://user:pass@localhost/mydb",
        "worker-instance-1",  // Unique runner ID
        providers,
        &logger,
        getTimeout,  // func(callbackName, provider string) time.Duration
        25,          // Max open connections
        10,          // Max idle connections
    )
    if err != nil {
        panic(err)
    }
    defer tm.Shutdown()

    // Set server concurrency limits
    tm.SetServerMaxParallel("http://ssr-worker-1:8080", 4)
    tm.SetServerMaxParallel("http://ssr-worker-2:8080", 4)
    tm.SetServerMaxParallel("http://api-backend:3000", 50)

    // Start workers
    tm.Start()

    // Add tasks
    task := &MyTask{
        id: "task-123",
        provider: &MyProvider{name: "ssr_rendering"},
        priority: 10,
    }
    tm.AddTask(task)

    // Keep running
    select {}
}

func getTimeout(callbackName, provider string) time.Duration {
    if provider == "ssr_rendering" {
        return 2 * time.Minute
    }
    return 30 * time.Second
}
Adding Reader Instances
// Additional instances connect as readers
tm, err := stm.NewStupidTaskManagerReader(
    "postgres://user:pass@localhost/mydb",
    "worker-instance-2",  // Different runner ID
    &logger,
    getTimeout,
    25, 10,
)
tm.Start()  // Joins the worker pool automatically
Redis Backend Usage
package main

import (
    "time"
    "github.com/rs/zerolog"
    "github.com/coffyg/stm"
)

func main() {
    logger := zerolog.New(os.Stdout).With().Timestamp().Logger()

    // Define providers (same as PostgreSQL)
    providers := []stm.ProviderConfig{
        {
            Name: "ssr_rendering",
            Servers: []string{
                "http://ssr-worker-1:8080",
                "http://ssr-worker-2:8080",
            },
        },
    }

    // Create Redis task manager (leader mode)
    tm, err := stm.NewRedisTaskManager(
        "localhost:6379",  // Redis address
        0,                 // Database index (0-15)
        "worker-instance-1",
        providers,
        &logger,
        getTimeout,
    )
    if err != nil {
        panic(err)
    }
    defer tm.Shutdown()

    // Set server concurrency limits (same API as PostgreSQL)
    tm.SetServerMaxParallel("http://ssr-worker-1:8080", 4)
    tm.SetServerMaxParallel("http://ssr-worker-2:8080", 4)

    // Start workers
    tm.Start()

    // Add tasks (same API as PostgreSQL)
    task := &MyTask{
        id: "task-123",
        provider: &MyProvider{name: "ssr_rendering"},
        priority: 10,
    }
    tm.AddTask(task)

    select {}
}

Redis Reader Instances:

// Additional instances connect as readers
tm, err := stm.NewRedisTaskManagerReader(
    "localhost:6379",
    0,  // Same database index
    "worker-instance-2",  // Different runner ID
    &logger,
    getTimeout,
)
tm.Start()  // Joins the worker pool automatically

Redis Scheduled Tasks:

// Create Redis scheduler
scheduler := stm.NewRedisScheduledTaskScheduler(
    "localhost:6379",
    0,  // Database index
    logger,
)
scheduler.Start()
defer scheduler.Shutdown()

// Register callbacks (same API as PostgreSQL)
scheduler.RegisterCallback(&CleanupCallback{})

// Schedule tasks (same API)
taskID, _ := scheduler.RunIn(3*time.Hour, "cleanup_old_files", map[string]interface{}{
    "directory": "/tmp/uploads",
})

Task Interface

Implement the ITask interface:

type ITask interface {
    GetID() string
    GetProvider() IProvider
    GetPriority() int
    GetRetries() int
    GetMaxRetries() int
    UpdateRetries(int)
    GetTimeout() time.Duration
    GetCallbackName() string
    MarkAsSuccess(totalTime int64)
    MarkAsFailed(totalTime int64, err error)
    OnComplete()
}

Scheduled Tasks

STM includes a built-in scheduler for delayed and recurring task execution:

// Create scheduler
scheduler := stm.NewScheduledTaskScheduler(db, logger)
scheduler.Start()
defer scheduler.Shutdown()

// Implement callback interface
type CleanupCallback struct{}

func (c *CleanupCallback) Name() string {
    return "cleanup_old_files"
}

func (c *CleanupCallback) Handle(payload map[string]interface{}) error {
    directory := payload["directory"].(string)
    deleted, err := cleanupOldFiles(directory)
    fmt.Printf("Cleaned up %d files\n", deleted)
    return err
}

func (c *CleanupCallback) OnComplete(payload map[string]interface{}) {
    // Called after successful execution
}

func (c *CleanupCallback) OnFail(payload map[string]interface{}, err error) {
    // Called after max retries exhausted
}

// Register callback
scheduler.RegisterCallback(&CleanupCallback{})

// Schedule one-time delayed execution
taskID, _ := scheduler.RunIn(3*time.Hour, "cleanup_old_files", map[string]interface{}{
    "directory": "/tmp/uploads",
})

// Schedule recurring execution
taskID, _ := scheduler.RunEvery(24*time.Hour, "cleanup_old_files", map[string]interface{}{
    "directory": "/tmp/cache",
})

// Cancel before execution
scheduler.CancelScheduled(taskID)

// Permanently delete
scheduler.DeleteScheduled(taskID)

Features:

  • Crash recovery: Scheduled tasks survive server restarts
  • Multi-instance safe: Only one instance executes each scheduled task
  • Automatic retry: Exponential backoff (10s, 20s, 40s, 60s max)
  • Recurring tasks: Run at fixed intervals indefinitely
  • Callback registry: Register functions at startup, reference by name

Use cases:

  • Delayed user actions ("delete my account in 30 days")
  • Recurring cleanup jobs
  • Reminder escalation chains
  • Rate-limited API calls
  • Gradual feature rollouts

Performance

Benchmark Results

PostgreSQL Backend (PG 17, 25/10 connection pool):

  • 20,663 tasks/sec sustained throughput (1000 concurrent goroutines × 10 tasks)
  • Crash recovery overhead: ~100-200ms per task (database INSERT + SELECT)
  • ACID guarantees: Full transaction support

Redis Backend (Redis 8.2.1):

  • 67,740 tasks/sec sustained throughput (~3.3x faster than PostgreSQL)
  • 10,000 tasks in 147ms (stress test)
  • In-memory operations: Sub-millisecond latency
  • Automatic cleanup: TTL-based expiration (7 days for tasks, 2 months for scheduled)

Compare to in-memory (SMT v1): 1M+ tasks/sec, but no persistence or multi-instance coordination.

Backend Selection Guide

Use PostgreSQL when:

  • ACID guarantees required (transactions, rollback)
  • Long-term task history needed (>7 days)
  • Already using PostgreSQL infrastructure
  • Pgbouncer transaction pooling required
  • Complex queries on task metadata needed

Use Redis when:

  • Maximum throughput critical (50k+ tasks/sec)
  • Tasks can expire after 7 days
  • Already using Redis infrastructure
  • Simpler operations preferred (sorted sets vs SQL)
  • Instant TTL cleanup desired

Use in-memory queues when:

  • Single instance only
  • Tasks are cheap to regenerate
  • No crash recovery needed
  • Maximum throughput critical (>100k tasks/sec)

Configuration

Environment Variables

PostgreSQL:

DATABASE_URL=postgres://user:pass@localhost/dbname
RUNNER_ID=instance-1
MAX_OPEN_CONNS=25
MAX_IDLE_CONNS=10

Redis:

REDIS_ADDR=localhost:6379
REDIS_DB=0
RUNNER_ID=instance-1
Dynamic Configuration

Update provider servers without restart:

tm.UpdateProviderServers("ssr_rendering", []string{
    "http://ssr-worker-1:8080",
    "http://ssr-worker-3:8080",  // New server
})

Update concurrency limits:

tm.SetServerMaxParallel("http://ssr-worker-1:8080", 8)  // Increase to 8 parallel

Config changes picked up within 5 seconds (automatic polling).

Storage Architecture

PostgreSQL Backend

STM creates the following tables:

  • stm_tasks - Main task queue with priority scheduling
  • stm_scheduled_tasks - Delayed and recurring task execution
  • stm_providers - Provider configuration (servers per provider)
  • stm_server_limits - Per-server concurrency limits
  • stm_server_usage - Active task tracking for capacity control
  • stm_config - Version tracking for hot reload
  • stm_checkpoints - Optional task checkpoint storage
  • stm_task_groups - Task batching and grouping

See stm_schema.sql for full schema.

Optimized indexes for task grabbing:

-- Worker task grabbing (composite index)
CREATE INDEX idx_tasks_pending ON stm_tasks
(runner_hash, provider_name, priority DESC, created_at ASC)
WHERE status = 'pending';

-- Provider discovery (Start())
CREATE INDEX idx_tasks_runner_providers ON stm_tasks
(runner_hash, provider_name);

-- Scheduled task polling
CREATE INDEX idx_scheduled_ready ON stm_scheduled_tasks (next_run_at)
WHERE status = 'pending';

Partial indexes (WHERE clauses) minimize write overhead - only pending/running tasks maintain index entries.

Redis Backend

Redis uses the following key patterns:

Task Queue:

  • stm:tasks:{runnerID}:{provider}:pending - Sorted set (score = priority * 1e12 + timestamp)
  • stm:tasks:{runnerID}:{provider}:meta:{taskID} - Hash (task metadata, 7 day TTL)

Scheduled Tasks:

  • stm:scheduled:pending - Sorted set (score = next_run_at timestamp)
  • stm:scheduled:meta:{taskID} - Hash (task metadata, 2 month TTL)

Configuration:

  • stm:config:providers - Hash (provider → servers JSON)
  • stm:config:limits - Hash (server_prefix → max_parallel)
  • stm:config:version - Integer (hot reload version tracking)

Concurrency Control:

  • stm:server:{server}:slots - Atomic counter (current active tasks)

Advantages:

  • No schema migrations required
  • Automatic TTL cleanup
  • Atomic operations (ZPOPMAX, INCR, DECR)
  • In-memory speed

Acknowledgments

Scheduled tasks feature inspired by Absurd Workflows: PostgreSQL Enables Ridiculous Scheduling Patterns - exploring what becomes possible when task queues persist in PostgreSQL instead of memory.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddTask

func AddTask(task ITask, logger *zerolog.Logger)

AddTask is a global helper for adding tasks to the singleton This is for backward compatibility with SMT v1 code. New code should call tm.AddTask() directly on the instance.

func DelTask

func DelTask(taskID string, interruptFn func(task ITask, server string) error, logger *zerolog.Logger) string

DelTask is a global helper for deleting/cancelling tasks from the singleton This is for backward compatibility with SMT v1 code. New code should call tm.DelTask() directly on the instance.

func InitRedisTaskQueueManager added in v0.0.9

func InitRedisTaskQueueManager(
	redisAddr string,
	password string,
	dbIndex int,
	runnerID string,
	providers []ProviderConfig,
	logger *zerolog.Logger,
	getTimeout func(string, string) time.Duration,
) error

InitRedisTaskQueueManager creates the global singleton instance (leader mode) This is for backward compatibility with SMT v1 code. Caller must call RedisTaskQueueManagerInstance.Start() explicitly after init. New code should use NewRedisTaskManager + Start() directly.

func InitRedisTaskQueueManagerReader added in v0.0.9

func InitRedisTaskQueueManagerReader(
	redisAddr string,
	password string,
	dbIndex int,
	runnerID string,
	logger *zerolog.Logger,
	getTimeout func(string, string) time.Duration,
) error

InitRedisTaskQueueManagerReader creates the global singleton instance (reader mode) This is for backward compatibility with SMT v1 code. Reader connects to existing config initialized by leader instance. Caller must call RedisTaskQueueManagerInstance.Start() explicitly after init. New code should use NewRedisTaskManagerReader + Start() directly.

func InitTaskQueueManager

func InitTaskQueueManager(
	dbConnString string,
	runnerID string,
	providers []ProviderConfig,
	logger *zerolog.Logger,
	getTimeout func(string, string) time.Duration,
) error

InitTaskQueueManager creates the global singleton instance (leader mode) This is for backward compatibility with SMT v1 code. Caller must call TaskQueueManagerInstance.Start() explicitly after init. New code should use NewStupidTaskManager + Start() directly.

func InitTaskQueueManagerReader

func InitTaskQueueManagerReader(
	dbConnString string,
	runnerID string,
	logger *zerolog.Logger,
	getTimeout func(string, string) time.Duration,
) error

InitTaskQueueManagerReader creates the global singleton instance (reader mode) This is for backward compatibility with SMT v1 code. Reader connects to existing schema initialized by leader instance. Caller must call TaskQueueManagerInstance.Start() explicitly after init. New code should use NewStupidTaskManagerReader + Start() directly.

func RedisAddTask added in v0.0.9

func RedisAddTask(task ITask, logger *zerolog.Logger)

RedisAddTask is a global helper for adding tasks to the Redis singleton This is for backward compatibility with SMT v1 code. New code should call tm.AddTask() directly on the instance.

func RedisDelTask added in v0.0.9

func RedisDelTask(taskID string, interruptFn func(task ITask, server string) error, logger *zerolog.Logger) string

RedisDelTask is a global helper for deleting/cancelling tasks from the Redis singleton This is for backward compatibility with SMT v1 code. New code should call tm.DelTask() directly on the instance.

func RedisRequeueTaskIfNeeded added in v0.0.9

func RedisRequeueTaskIfNeeded(logger *zerolog.Logger, tasks []ITask)

RedisRequeueTaskIfNeeded re-adds tasks to the global Redis singleton This is for backward compatibility with SMT v1 code.

func RequeueTaskIfNeeded

func RequeueTaskIfNeeded(logger *zerolog.Logger, tasks []ITask)

RequeueTaskIfNeeded re-adds tasks to the global singleton This is for backward compatibility with SMT v1 code.

Types

type CommandProvider

type CommandProvider struct {
	// contains filtered or unexported fields
}

CommandProvider wraps command execution to implement IProvider interface

func NewCommandProvider

func NewCommandProvider(name string) *CommandProvider

NewCommandProvider creates a new CommandProvider

func (*CommandProvider) Handle

func (cp *CommandProvider) Handle(task ITask, server string) error

Handle implements IProvider for command execution

func (*CommandProvider) Name

func (cp *CommandProvider) Name() string

Name implements IProvider

type CommandTask

type CommandTask struct {
	// contains filtered or unexported fields
}

CommandTask wraps a command function to implement ITask interface

func NewCommandTask

func NewCommandTask(providerName string, provider IProvider, commandFunc func(server string) error, priority int) *CommandTask

NewCommandTask creates a new CommandTask

func (*CommandTask) GetCallbackName

func (ct *CommandTask) GetCallbackName() string

GetCallbackName implements ITask

func (*CommandTask) GetCreatedAt

func (ct *CommandTask) GetCreatedAt() time.Time

GetCreatedAt implements ITask

func (*CommandTask) GetID

func (ct *CommandTask) GetID() string

GetID implements ITask

func (*CommandTask) GetMaxRetries

func (ct *CommandTask) GetMaxRetries() int

GetMaxRetries implements ITask

func (*CommandTask) GetPriority

func (ct *CommandTask) GetPriority() int

GetPriority implements ITask

func (*CommandTask) GetProvider

func (ct *CommandTask) GetProvider() IProvider

GetProvider implements ITask

func (*CommandTask) GetRetries

func (ct *CommandTask) GetRetries() int

GetRetries implements ITask

func (*CommandTask) GetTaskGroup

func (ct *CommandTask) GetTaskGroup() ITaskGroup

GetTaskGroup implements ITask

func (*CommandTask) GetTimeout

func (ct *CommandTask) GetTimeout() time.Duration

GetTimeout implements ITask

func (*CommandTask) MarkAsFailed

func (ct *CommandTask) MarkAsFailed(t int64, err error)

MarkAsFailed implements ITask

func (*CommandTask) MarkAsSuccess

func (ct *CommandTask) MarkAsSuccess(t int64)

MarkAsSuccess implements ITask

func (*CommandTask) OnComplete

func (ct *CommandTask) OnComplete()

OnComplete implements ITask

func (*CommandTask) OnStart

func (ct *CommandTask) OnStart()

OnStart implements ITask

func (*CommandTask) UpdateLastError

func (ct *CommandTask) UpdateLastError(error string) error

UpdateLastError implements ITask

func (*CommandTask) UpdateRetries

func (ct *CommandTask) UpdateRetries(retries int) error

UpdateRetries implements ITask

type ConfigCache

type ConfigCache struct {
	// contains filtered or unexported fields
}

ConfigCache holds provider/server/limit configuration in memory

type IProvider

type IProvider interface {
	Handle(task ITask, server string) error
	Name() string
}

type IScheduledCallback

type IScheduledCallback interface {
	Name() string                                     // Unique identifier for callback registration
	Handle(payload map[string]interface{}) error      // Execute the scheduled task
	OnComplete(payload map[string]interface{})        // Called after successful execution
	OnFail(payload map[string]interface{}, err error) // Called after max retries exhausted
}

IScheduledCallback is the interface for scheduled task handlers

type ITask

type ITask interface {
	MarkAsSuccess(t int64)
	MarkAsFailed(t int64, err error)
	GetPriority() int
	GetID() string
	GetMaxRetries() int
	GetRetries() int
	GetCreatedAt() time.Time
	GetTaskGroup() ITaskGroup
	GetProvider() IProvider
	UpdateRetries(int) error
	GetTimeout() time.Duration
	UpdateLastError(string) error
	GetCallbackName() string
	OnComplete()
	OnStart()
}

Task interfaces (task.go)

type ITaskGroup

type ITaskGroup interface {
	MarkComplete() error
	GetTaskCount() int
	GetTaskCompletedCount() int
	UpdateTaskCompletedCount(int) error
}

type ProviderConfig

type ProviderConfig struct {
	Name    string
	Servers []string
}

ProviderConfig for leader initialization

type RedisScheduledTaskScheduler added in v0.0.8

type RedisScheduledTaskScheduler struct {
	// contains filtered or unexported fields
}

RedisScheduledTaskScheduler manages delayed and recurring task execution using Redis

func NewRedisScheduledTaskScheduler added in v0.0.8

func NewRedisScheduledTaskScheduler(rdb *redis.Client, logger *zerolog.Logger) *RedisScheduledTaskScheduler

NewRedisScheduledTaskScheduler creates a scheduler instance

func (*RedisScheduledTaskScheduler) CancelScheduled added in v0.0.8

func (s *RedisScheduledTaskScheduler) CancelScheduled(taskID string) error

CancelScheduled cancels a scheduled task

func (*RedisScheduledTaskScheduler) DeleteScheduled added in v0.0.8

func (s *RedisScheduledTaskScheduler) DeleteScheduled(taskID string) error

DeleteScheduled permanently deletes a scheduled task

func (*RedisScheduledTaskScheduler) RegisterCallback added in v0.0.8

func (s *RedisScheduledTaskScheduler) RegisterCallback(callback IScheduledCallback)

RegisterCallback registers a scheduled task callback

func (*RedisScheduledTaskScheduler) RunEvery added in v0.0.8

func (s *RedisScheduledTaskScheduler) RunEvery(interval time.Duration, callbackName string, payload map[string]interface{}) (string, error)

RunEvery schedules a recurring task to run at intervals

func (*RedisScheduledTaskScheduler) RunIn added in v0.0.8

func (s *RedisScheduledTaskScheduler) RunIn(delay time.Duration, callbackName string, payload map[string]interface{}) (string, error)

RunIn schedules a one-shot task to run after a delay

func (*RedisScheduledTaskScheduler) RunInWithRetries added in v0.0.8

func (s *RedisScheduledTaskScheduler) RunInWithRetries(delay time.Duration, callbackName string, payload map[string]interface{}, maxRetries int) (string, error)

RunInWithRetries schedules a one-shot task with custom retry limit

func (*RedisScheduledTaskScheduler) Shutdown added in v0.0.8

func (s *RedisScheduledTaskScheduler) Shutdown()

Shutdown stops the scheduler

func (*RedisScheduledTaskScheduler) Start added in v0.0.8

func (s *RedisScheduledTaskScheduler) Start()

Start begins the scheduler polling loop

type RedisTaskManager added in v0.0.8

type RedisTaskManager struct {
	// contains filtered or unexported fields
}

RedisTaskManager - STM with Redis backend (simpler + faster than PostgreSQL)

var (
	// RedisTaskQueueManagerInstance is the global singleton instance
	// This exists for backward compatibility with SMT v1 code.
	// New code should create instances directly via NewRedisTaskManager.
	RedisTaskQueueManagerInstance *RedisTaskManager
)

func NewRedisTaskManager added in v0.0.8

func NewRedisTaskManager(
	redisAddr string,
	password string,
	dbIndex int,
	runnerID string,
	providers []ProviderConfig,
	logger *zerolog.Logger,
	getTimeout func(string, string) time.Duration,
) (*RedisTaskManager, error)

NewRedisTaskManager creates Redis-backed STM (leader mode) Leader initializes config keys in Redis redisAddr: "localhost:6379" password: Redis password (empty string for no auth) dbIndex: Redis database index (0-15, use different index per environment)

func NewRedisTaskManagerReader added in v0.0.8

func NewRedisTaskManagerReader(
	redisAddr string,
	password string,
	dbIndex int,
	runnerID string,
	logger *zerolog.Logger,
	getTimeout func(string, string) time.Duration,
) (*RedisTaskManager, error)

NewRedisTaskManagerReader creates Redis-backed STM (reader mode) Reader connects to existing config

func (*RedisTaskManager) AddTask added in v0.0.8

func (tm *RedisTaskManager) AddTask(task ITask) bool

AddTask enqueues a task

func (*RedisTaskManager) AddTasks added in v0.0.8

func (tm *RedisTaskManager) AddTasks(tasks []ITask) (count int, err error)

AddTasks enqueues multiple tasks

func (*RedisTaskManager) DelTask added in v0.0.8

func (tm *RedisTaskManager) DelTask(taskID string, interruptFn func(task ITask, server string) error) string

DelTask removes a task from queue or cancels if running

func (*RedisTaskManager) SetServerMaxParallel added in v0.0.8

func (tm *RedisTaskManager) SetServerMaxParallel(serverPrefix string, maxParallel int) error

SetServerMaxParallel updates server concurrency limit

func (*RedisTaskManager) Shutdown added in v0.0.8

func (tm *RedisTaskManager) Shutdown()

Shutdown gracefully stops the task manager

func (*RedisTaskManager) Start added in v0.0.8

func (tm *RedisTaskManager) Start()

Start begins worker goroutines

func (*RedisTaskManager) UpdateProviderServers added in v0.0.8

func (tm *RedisTaskManager) UpdateProviderServers(name string, servers []string) error

UpdateProviderServers updates provider's server list

type RunningTaskInfo

type RunningTaskInfo struct {
	// contains filtered or unexported fields
}

RunningTaskInfo holds information about a currently executing task (copied from SMT v1 for compatibility)

type ScheduledTaskScheduler

type ScheduledTaskScheduler struct {
	// contains filtered or unexported fields
}

ScheduledTaskScheduler manages delayed and recurring task execution

func NewScheduledTaskScheduler

func NewScheduledTaskScheduler(db *pgxpool.Pool, logger *zerolog.Logger) *ScheduledTaskScheduler

NewScheduledTaskScheduler creates a scheduler instance

func (*ScheduledTaskScheduler) CancelScheduled

func (s *ScheduledTaskScheduler) CancelScheduled(taskID string) error

CancelScheduled cancels a scheduled task

func (*ScheduledTaskScheduler) DeleteScheduled

func (s *ScheduledTaskScheduler) DeleteScheduled(taskID string) error

DeleteScheduled permanently deletes a scheduled task from database

func (*ScheduledTaskScheduler) RegisterCallback

func (s *ScheduledTaskScheduler) RegisterCallback(callback IScheduledCallback)

RegisterCallback registers a scheduled task callback

func (*ScheduledTaskScheduler) RunEvery

func (s *ScheduledTaskScheduler) RunEvery(interval time.Duration, callbackName string, payload map[string]interface{}) (string, error)

RunEvery schedules a recurring task to run at intervals

func (*ScheduledTaskScheduler) RunIn

func (s *ScheduledTaskScheduler) RunIn(delay time.Duration, callbackName string, payload map[string]interface{}) (string, error)

RunIn schedules a one-shot task to run after a delay

func (*ScheduledTaskScheduler) RunInWithRetries

func (s *ScheduledTaskScheduler) RunInWithRetries(delay time.Duration, callbackName string, payload map[string]interface{}, maxRetries int) (string, error)

RunInWithRetries schedules a one-shot task with custom retry limit

func (*ScheduledTaskScheduler) Shutdown

func (s *ScheduledTaskScheduler) Shutdown()

Shutdown stops the scheduler

func (*ScheduledTaskScheduler) Start

func (s *ScheduledTaskScheduler) Start()

Start begins the scheduler polling loop

type StupidTaskManager

type StupidTaskManager struct {
	// contains filtered or unexported fields
}

StupidTaskManager - STM v2 with PostgreSQL backend

var (
	// TaskQueueManagerInstance is the global singleton instance
	// This exists for backward compatibility with SMT v1 code.
	// New code should create instances directly via NewStupidTaskManager.
	TaskQueueManagerInstance *StupidTaskManager
)

func NewStupidTaskManager

func NewStupidTaskManager(
	dbConnString string,
	runnerID string,
	providers []ProviderConfig,
	logger *zerolog.Logger,
	getTimeout func(string, string) time.Duration,
	maxOpenConns int,
	maxIdleConns int,
) (*StupidTaskManager, error)

NewStupidTaskManager creates STM v2 instance (leader mode) Leader is responsible for: - Creating database schema if not exists - Initializing provider configuration - Starting config watcher runnerID: Unique identifier for this instance (e.g. "soulkyn-stm", "pixi-stm") maxOpenConns: Maximum number of open connections to database (0 = unlimited, recommended: 25) maxIdleConns: Maximum number of idle connections (recommended: 10)

Uses pgx/v5 with simple protocol (no prepared statements) for pgbouncer transaction mode compatibility.

func NewStupidTaskManagerReader

func NewStupidTaskManagerReader(
	dbConnString string,
	runnerID string,
	logger *zerolog.Logger,
	getTimeout func(string, string) time.Duration,
	maxOpenConns int,
	maxIdleConns int,
) (*StupidTaskManager, error)

NewStupidTaskManagerReader creates STM v2 instance (reader mode) Reader is responsible for: - Verifying schema exists (fail fast if missing) - Loading existing configuration from database - Starting config watcher runnerID: Unique identifier for this instance (e.g. "soulkyn-stm", "pixi-stm") maxOpenConns: Maximum number of open connections to database (0 = unlimited, recommended: 25) maxIdleConns: Maximum number of idle connections (recommended: 10)

Uses pgx/v5 with simple protocol (no prepared statements) for pgbouncer transaction mode compatibility.

func (*StupidTaskManager) AddTask

func (tm *StupidTaskManager) AddTask(task ITask) bool

AddTask enqueues a task if not already known. Returns true if successfully enqueued.

func (*StupidTaskManager) AddTasks

func (tm *StupidTaskManager) AddTasks(tasks []ITask) (count int, err error)

AddTasks enqueues multiple tasks. Returns count of successfully enqueued tasks.

func (*StupidTaskManager) DelTask

func (tm *StupidTaskManager) DelTask(taskID string, interruptFn func(task ITask, server string) error) string

DelTask removes a task from queue or cancels it if running Returns: "removed_from_queue", "interrupted_running", "not_found", or "error: <msg>"

func (*StupidTaskManager) SetServerMaxParallel

func (tm *StupidTaskManager) SetServerMaxParallel(serverPrefix string, maxParallel int) error

SetServerMaxParallel updates server concurrency limit

func (*StupidTaskManager) Shutdown

func (tm *StupidTaskManager) Shutdown()

Shutdown gracefully stops the task manager

func (*StupidTaskManager) Start

func (tm *StupidTaskManager) Start()

Start begins worker goroutines for task processing

func (*StupidTaskManager) UpdateProviderServers

func (tm *StupidTaskManager) UpdateProviderServers(name string, servers []string) error

UpdateProviderServers updates provider's server list

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL