Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

condrove10/queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

History

6 Commits

Repository files navigation

Queue - Concurrent FIFO Queue for Go

A high-performance, thread-safe FIFO (First-In-First-Out) queue implementation in Go with blocking read capabilities. This package is optimized for high-throughput message processing scenarios with minimal overhead.


Features

  • Thread-safe: All operations are protected by read-write mutexes
  • Blocking reads: DequeueBlocking() with context support for cancellation and timeouts
  • Non-blocking reads: Dequeue() for immediate operations
  • High performance: Optimized for minimal overhead and high throughput
  • Graceful shutdown: Queue can be closed to signal completion to waiting consumers

Installation

go github.com/condrove10/queue

Quick Start

package main
import (
 "context"
 "fmt"
 "time"
 "github.com/condrove10/queue"
)
func main() {
 // Create a new queue for strings
 q := queue.New()
 
 // Add some items
 q.Enqueue("hello")
 q.Enqueue("world")
 
 // Remove items (non-blocking)
 value, hasNext := q.Dequeue()
 fmt.Println(value, hasNext) // Output: hello true
 
 value, hasNext = q.Dequeue()
 fmt.Println(value, hasNext) // Output: world false
}

API Reference

Creating a Queue

// Create a new queue for any type T
q := queue.New()

Adding Items

// Enqueue adds an item to the tail of the queue
err := q.Enqueue(item)
if err != nil {
 // Handle error (queue is closed)
}

Removing Items

Non-blocking Dequeue

// Dequeue removes and returns the front item
value, hasNext := q.Dequeue()
// hasNext indicates if more items remain in the queue

Blocking Dequeue

// DequeueBlocking waits until an item is available
ctx := context.Background()
value, err := q.DequeueBlocking(ctx)
if err != nil {
 // Handle error (context cancelled or queue closed)
}

Queue Operations

// Check queue size
size := q.Size()
// Check if empty
empty := q.IsEmpty()
// Clear all items
q.Clear()
// Close the queue (signals waiting consumers)
q.Close()
// Check if closed
closed := q.IsClosed()
open := q.IsOpen()

Examples

Producer-Consumer Pattern

package main
import (
 "context"
 "fmt"
 "sync"
 "time"
 "github.com/condrove10/queue"
)
func main() {
 q := queue.New()
 var wg sync.WaitGroup
 
 // Producer goroutine
 wg.Add(1)
 go func() {
 defer wg.Done()
 defer q.Close() // Signal consumers when done
 
 for i := 0; i < 10; i++ {
 q.Enqueue(i)
 fmt.Printf("Produced: %d\n", i)
 time.Sleep(100 * time.Millisecond)
 }
 }()
 
 // Consumer goroutines
 for i := 0; i < 3; i++ {
 wg.Add(1)
 go func(consumerID int) {
 defer wg.Done()
 ctx := context.Background()
 
 for {
 value, err := q.DequeueBlocking(ctx)
 if err != nil {
 fmt.Printf("Consumer %d: Queue closed\n", consumerID)
 return
 }
 fmt.Printf("Consumer %d consumed: %d\n", consumerID, value)
 }
 }(i)
 }
 
 wg.Wait()
}

With Context Timeout

package main
import (
 "context"
 "fmt"
 "time"
 "github.com/condrove10/queue"
)
func main() {
 q := queue.New()
 
 // Try to dequeue with a 2-second timeout
 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
 defer cancel()
 
 value, err := q.DequeueBlocking(ctx)
 if err != nil {
 if err == context.DeadlineExceeded {
 fmt.Println("Timeout: No items available within 2 seconds")
 }
 } else {
 fmt.Printf("Received: %s\n", value)
 }
}

Work Queue with Graceful Shutdown

package main
import (
 "context"
 "fmt"
 "os"
 "os/signal"
 "sync"
 "syscall"
 "github.com/condrove10/queue"
)
type Task struct {
 ID int
 Data string
}
func main() {
 q := queue.New()
 ctx, cancel := context.WithCancel(context.Background())
 var wg sync.WaitGroup
 
 // Handle shutdown signals
 sigCh := make(chan os.Signal, 1)
 signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
 
 go func() {
 <-sigCh
 fmt.Println("\nShutdown signal received...")
 cancel()
 q.Close()
 }()
 
 // Start workers
 for i := 0; i < 3; i++ {
 wg.Add(1)
 go worker(ctx, q, i, &wg)
 }
 
 // Add some tasks
 for i := 0; i < 20; i++ {
 task := Task{ID: i, Data: fmt.Sprintf("task-%d", i)}
 if err := q.Enqueue(task); err != nil {
 break // Queue is closed
 }
 }
 
 // Wait for workers to finish
 wg.Wait()
 fmt.Println("All workers finished")
}
func worker(ctx context.Context, q *queue.Queue, workerID int, wg *sync.WaitGroup) {
 defer wg.Done()
 
 for {
 task, err := q.DequeueBlocking(ctx)
 if err != nil {
 fmt.Printf("Worker %d: Shutting down (%v)\n", workerID, err)
 return
 }
 
 // Process task
 fmt.Printf("Worker %d processing task %d: %s\n", workerID, task.ID, task.Data)
 // Simulate work...
 }
}

Error Handling

The queue defines custom errors:

  • queue.ErrQueueClosed: Returned when trying to enqueue to a closed queue
err := q.Enqueue(item)
if err == queue.ErrQueueClosed {
 // Handle closed queue
}

About

Queue is a concurrency-safe FIFO linked list with blocking read capabilities, optimized for high-throughput message processing with minimal overhead.

Resources

License

Stars

Watchers

Forks

Packages

Contributors

Languages

AltStyle によって変換されたページ (->オリジナル) /