Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
/ queue Public

A robust, extensible, plugin-based task queue with retry logic, concurrency control, and event handling.

Notifications You must be signed in to change notification settings

lenqwang/queue

Repository files navigation

Task Queue

A robust, extensible, plugin-based task queue with retry logic, concurrency control, and event handling.

Features

  • Concurrency Control: Limit the number of tasks running simultaneously
  • Retry Logic: Configurable retry strategies (linear, exponential backoff)
  • Plugin System: Extend functionality with plugins
  • Event-Driven: Listen to task lifecycle events
  • Type-Safe: Full TypeScript support
  • State Management: Query queue state at any time

Basic Usage

import { Queue } from "./queue";
const queue = new Queue<number>({
 concurrency: 3,
 maxRetry: 3,
 retryWaitTimeInSeconds: 2,
});
queue.on("success", (result, context) => {
 console.log(`Task ${context.id} completed:`, result);
});
queue.on("all-done", (errors) => {
 console.log("All tasks completed. Errors:", errors.length);
});
// Add tasks
queue.addTask(async () => {
 return 42;
});

Plugins

Logger Plugin

import { LoggerPlugin } from "./plugins/logger-plugin";
queue.use(new LoggerPlugin({ 
 logLevel: "info",
 prefix: "[MyQueue]"
}));

Metrics Plugin

import { MetricsPlugin } from "./plugins/metrics-plugin";
const metricsPlugin = new MetricsPlugin();
queue.use(metricsPlugin);
// Later...
const metrics = metricsPlugin.getMetrics();
console.log(metrics);
// {
// totalTasks: 100,
// successfulTasks: 95,
// failedTasks: 5,
// retriedTasks: 10,
// totalRetries: 15,
// averageExecutionTime: 234
// }

Rate Limiter Plugin

import { RateLimiterPlugin } from "./plugins/rate-limiter-plugin";
queue.use(new RateLimiterPlugin({
 maxTasksPerWindow: 10,
 windowInSeconds: 60
}));

Retry Strategies

Linear Retry

import { LinearRetryStrategy } from "./retry-strategy";
const queue = new Queue({
 concurrency: 2,
 maxRetry: 3,
 retryWaitTimeInSeconds: 5,
 retryStrategy: new LinearRetryStrategy(5) // Wait 5s between retries
});

Exponential Backoff

import { ExponentialBackoffStrategy } from "./retry-strategy";
const queue = new Queue({
 concurrency: 2,
 maxRetry: 5,
 retryWaitTimeInSeconds: 1,
 retryStrategy: new ExponentialBackoffStrategy(1, 30) // 1s, 2s, 4s, 8s, 16s, max 30s
});

Events

  • success: Task completed successfully
  • error: Task failed after all retries
  • retry: Task is being retried
  • all-done: All tasks completed
  • task-start: Task execution started
  • task-end: Task execution ended
  • queue-drain: Queue is empty and all tasks completed

Custom Plugins

import type { QueuePlugin, TaskContext, QueueState } from "./queue-types";
class CustomPlugin<T> implements QueuePlugin<T> {
 name = "custom";
 async onTaskStart(context: TaskContext<T>, state: QueueState<T>) {
 // Your logic here
 }
 async onTaskSuccess(result: T, context: TaskContext<T>, state: QueueState<T>) {
 // Your logic here
 }
}
queue.use(new CustomPlugin());

API

Queue Methods

  • addTask(task): Add a single task, returns task ID
  • addTasks(tasks): Add multiple tasks, returns array of task IDs
  • stop(): Stop the queue and clear pending tasks
  • getState(): Get current queue state
  • use(plugin): Add a plugin
  • removePlugin(name): Remove a plugin by name
  • getPlugin(name): Get a plugin by name
  • on(event, listener): Add event listener
  • off(event, listener): Remove event listener

Options

interface ExtendedQueueOptions {
 concurrency: number; // Max concurrent tasks
 maxRetry: number; // Max retry attempts
 retryWaitTimeInSeconds: number; // Base wait time for retries
 retryStrategy?: RetryStrategy; // Custom retry strategy
 stopOnError?: boolean; // Stop queue on error (default: true)
}

About

A robust, extensible, plugin-based task queue with retry logic, concurrency control, and event handling.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

AltStyle によって変換されたページ (->オリジナル) /