event

package
v0.0.0-...-8b7cb91 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Publish

func Publish[T Event[T]](bus *Bus, event T)

Publish publishes an event to all registered handlers for that event type. Events are queued and delivered asynchronously by worker goroutines. The provided context is passed to handlers and can be used for cancellation.

Example:

Publish(bus, ctx, TaskStartedEvent{TaskID: taskID})

func SubscriberCount

func SubscriberCount[T Event[T]](bus *Bus) int

SubscriberCount returns the number of subscribers for a given event type. This is primarily useful for testing and debugging.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

func NewBus

func NewBus(metricsRegistry *prometheus.Registry) *Bus

func (*Bus) Close

func (bus *Bus) Close()

Close gracefully shuts down the EventBus, cancels its context, closes all channel subscriptions, and waits for all in-flight event deliveries to complete. After Close is called, no new events should be published. Safe to call multiple times.

func (*Bus) IsClosed

func (bus *Bus) IsClosed() bool

type Event

type Event[T any] interface {
	Event()
}

Event is a marker interface that all events must implement. This ensures type safety at compile time for event types.

type EventFilter

type EventFilter[T any] func(T) bool

type Handler

type Handler[T any] func(context.Context, T)

Handler is a function type that handles events of type T. Handlers do not return errors and are executed asynchronously.

type MessageBlock

type MessageBlock struct {
	Block    *types.MessageBlock
	Type     MessageBlockType
	Received map[string]bool
}

type MessageBlockType

type MessageBlockType string
const (
	MessageBlockTypeDelta    MessageBlockType = "delta"
	MessageBlockTypeComplete MessageBlockType = "complete"
)

type MessageCreatedEvent

type MessageCreatedEvent struct {
	MessageID uuid.UUID
	TaskID    uuid.UUID
}

func (MessageCreatedEvent) Event

func (MessageCreatedEvent) Event()

type MessageEvent

type MessageEvent struct {
	MessageID uuid.UUID
	TaskID    uuid.UUID
}

func (MessageEvent) Event

func (MessageEvent) Event()

type MessageHub

type MessageHub struct {
	// contains filtered or unexported fields
}

func NewMessageHub

func NewMessageHub(db *memory.Client) (*MessageHub, error)

func (*MessageHub) Publish

func (h *MessageHub) Publish(taskID uuid.UUID, message *v1.SubscribeResponse)

func (*MessageHub) Subscribe

func (h *MessageHub) Subscribe(ctx context.Context, taskID uuid.UUID) iter.Seq2[*v1.SubscribeResponse, error]

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func Subscribe

func Subscribe[T Event[T]](bus *Bus, handler Handler[T], filter EventFilter[T]) *Subscription

Subscribe registers a handler for events of type T. Returns a Subscription that can be used to unsubscribe. The handler will be called asynchronously whenever an event of type T is published.

Example:

sub := Subscribe(bus, func(ctx context.Context, e TaskStartedEvent) {
    log.Printf("Task started: %s", e.TaskID)
})
defer sub.Unsubscribe()

func SubscribeChannel

func SubscribeChannel[T Event[T]](bus *Bus, bufferSize int, filter EventFilter[T]) (<-chan T, *Subscription)

SubscribeChannel creates a channel subscription for events of type T. Returns a receive-only channel that will receive all events of type T, and a Subscription that can be used to unsubscribe.

The channel has a buffer size specified by bufferSize. If the buffer is full, events will be dropped to prevent blocking publishers.

The consumer should call Unsubscribe() when done to clean up resources and close the channel.

Example:

ch, sub := SubscribeChannel[TaskStartedEvent](bus, 10)
defer sub.Unsubscribe()
for event := range ch {
    log.Printf("Task started: %s", event.TaskID)
}

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

Unsubscribe removes the subscription. For channel subscriptions, it also closes the channel. Safe to call multiple times.

type TaskEvent

type TaskEvent struct {
	TaskID uuid.UUID
}

func (TaskEvent) Event

func (TaskEvent) Event()

type TaskSuspendedEvent

type TaskSuspendedEvent struct {
	TaskID uuid.UUID
}

func (TaskSuspendedEvent) Event

func (TaskSuspendedEvent) Event()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL