Skip to main content

Go SDK API

Creating a Queue

q, err := seqdelay.New(opts ...Option) (*Queue, error)

Options

OptionDefaultDescription
WithRedis(addr)requiredRedis standalone
WithRedisOptions(opts)Redis with full options (password, TLS, DB)
WithRedisClient(client)Pre-configured Redis client
WithRedisSentinel(addrs, master)Redis Sentinel
WithRedisCluster(addrs)Redis Cluster
WithTickInterval(d)1msTime wheel precision
WithWheelCapacity(n)4096Wheel slot count (power of 2)
WithReadyQueueSize(n)1024Per-topic channel buffer
WithMaxTopics(n)1024Max topic count
WithPopTimeout(d)30sDefault HTTP pop timeout
WithInstanceID(id)autoInstance ID for distributed lock
WithLockTTL(d)500msDistributed lock TTL

Methods

Start

func (q *Queue) Start(ctx context.Context) error

Recovers tasks from Redis, starts the time wheel, begins leader election.

Add

func (q *Queue) Add(ctx context.Context, task *Task) error

Add a delayed task. Task.ID + Task.Topic must be unique. Returns ErrDuplicateTask on conflict.

Pop

func (q *Queue) Pop(ctx context.Context, topic string) (*Task, error)

Pull a ready task. Blocks until a task is available. Sets task state to ACTIVE and starts TTR countdown.

Finish

func (q *Queue) Finish(ctx context.Context, topic, id string) error

Mark a task as completed. Validates state is ACTIVE. Returns ErrInvalidState otherwise.

Cancel

func (q *Queue) Cancel(ctx context.Context, topic, id string) error

Cancel a task from any state.

Get

func (q *Queue) Get(ctx context.Context, topic, id string) (*Task, error)

Query task details and current state.

OnExpire

func (q *Queue) OnExpire(topic string, fn func(context.Context, *Task) error) error

Register a callback for a topic. Mutually exclusive with Pop on the same topic.

  • return nil — auto Finish
  • return error — no Finish, TTR will redeliver

Shutdown

func (q *Queue) Shutdown(ctx context.Context) error

Graceful shutdown. Drains the time wheel and stops all callback goroutines.

Task Struct

type Task struct {
ID string
Topic string
Body []byte
Delay time.Duration
TTR time.Duration
MaxRetries int // 0 = unlimited
}

Errors

ErrorDescription
ErrDuplicateTaskTask with same topic:id already exists
ErrTaskNotFoundTask not found
ErrInvalidStateInvalid state for this operation
ErrTopicConflictTopic already has a callback
ErrTooManyTopicsMax topic count exceeded
ErrQueueFullReady queue buffer full
ErrClosedQueue is closed
ErrRedisRequiredRedis connection not provided
ErrInvalidTaskMissing ID or Topic
ErrInvalidDelayDelay must be positive