A robust, extensible, plugin-based task queue with retry logic, concurrency control, and event handling.
- Concurrency Control: Limit the number of tasks running simultaneously
- Retry Logic: Configurable retry strategies (linear, exponential backoff)
- Plugin System: Extend functionality with plugins
- Event-Driven: Listen to task lifecycle events
- Type-Safe: Full TypeScript support
- State Management: Query queue state at any time
import { Queue } from "./queue"; const queue = new Queue<number>({ concurrency: 3, maxRetry: 3, retryWaitTimeInSeconds: 2, }); queue.on("success", (result, context) => { console.log(`Task ${context.id} completed:`, result); }); queue.on("all-done", (errors) => { console.log("All tasks completed. Errors:", errors.length); }); // Add tasks queue.addTask(async () => { return 42; });
import { LoggerPlugin } from "./plugins/logger-plugin"; queue.use(new LoggerPlugin({ logLevel: "info", prefix: "[MyQueue]" }));
import { MetricsPlugin } from "./plugins/metrics-plugin"; const metricsPlugin = new MetricsPlugin(); queue.use(metricsPlugin); // Later... const metrics = metricsPlugin.getMetrics(); console.log(metrics); // { // totalTasks: 100, // successfulTasks: 95, // failedTasks: 5, // retriedTasks: 10, // totalRetries: 15, // averageExecutionTime: 234 // }
import { RateLimiterPlugin } from "./plugins/rate-limiter-plugin"; queue.use(new RateLimiterPlugin({ maxTasksPerWindow: 10, windowInSeconds: 60 }));
import { LinearRetryStrategy } from "./retry-strategy"; const queue = new Queue({ concurrency: 2, maxRetry: 3, retryWaitTimeInSeconds: 5, retryStrategy: new LinearRetryStrategy(5) // Wait 5s between retries });
import { ExponentialBackoffStrategy } from "./retry-strategy"; const queue = new Queue({ concurrency: 2, maxRetry: 5, retryWaitTimeInSeconds: 1, retryStrategy: new ExponentialBackoffStrategy(1, 30) // 1s, 2s, 4s, 8s, 16s, max 30s });
success: Task completed successfullyerror: Task failed after all retriesretry: Task is being retriedall-done: All tasks completedtask-start: Task execution startedtask-end: Task execution endedqueue-drain: Queue is empty and all tasks completed
import type { QueuePlugin, TaskContext, QueueState } from "./queue-types"; class CustomPlugin<T> implements QueuePlugin<T> { name = "custom"; async onTaskStart(context: TaskContext<T>, state: QueueState<T>) { // Your logic here } async onTaskSuccess(result: T, context: TaskContext<T>, state: QueueState<T>) { // Your logic here } } queue.use(new CustomPlugin());
addTask(task): Add a single task, returns task IDaddTasks(tasks): Add multiple tasks, returns array of task IDsstop(): Stop the queue and clear pending tasksgetState(): Get current queue stateuse(plugin): Add a pluginremovePlugin(name): Remove a plugin by namegetPlugin(name): Get a plugin by nameon(event, listener): Add event listeneroff(event, listener): Remove event listener
interface ExtendedQueueOptions { concurrency: number; // Max concurrent tasks maxRetry: number; // Max retry attempts retryWaitTimeInSeconds: number; // Base wait time for retries retryStrategy?: RetryStrategy; // Custom retry strategy stopOnError?: boolean; // Stop queue on error (default: true) }