Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

coregx/stream

Repository files navigation

🌊 stream - Real-time Communications for Go 1.25+

Server-Sent Events and WebSocket implementations - Zero external dependencies, RFC-compliant, production-ready

Go Reference Go Report Card Tests codecov License: MIT Release


⚑ Quick Start

SSE (Server-Sent Events)

package main
import (
 "net/http"
 "time"
 "github.com/coregx/stream/sse"
)
func main() {
 http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
 conn, _ := sse.Upgrade(w, r)
 defer conn.Close()
 ticker := time.NewTicker(1 * time.Second)
 defer ticker.Stop()
 for {
 select {
 case t := <-ticker.C:
 event := sse.NewEvent(t.Format(time.RFC3339)).WithType("time")
 conn.Send(event)
 case <-conn.Done():
 return
 }
 }
 })
 http.ListenAndServe(":8080", nil)
}

WebSocket

package main
import (
 "log"
 "net/http"
 "github.com/coregx/stream/websocket"
)
func main() {
 http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
 conn, _ := websocket.Upgrade(w, r, nil)
 defer conn.Close()
 for {
 msgType, data, err := conn.Read()
 if err != nil {
 break
 }
 conn.Write(msgType, data)
 }
 })
 log.Fatal(http.ListenAndServe(":8080", nil))
}

Broadcasting with Hub:

hub := websocket.NewHub()
go hub.Run()
defer hub.Close()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
 conn, _ := websocket.Upgrade(w, r, nil)
 hub.Register(conn)
 defer hub.Unregister(conn)
 for {
 _, data, _ := conn.Read()
 hub.Broadcast(data)
 }
})

🌟 Why stream?

Zero Dependencies, Maximum Control

// Pure stdlib - no external dependencies in production
import "github.com/coregx/stream/sse"
import "github.com/coregx/stream/websocket"

stream is built with zero external dependencies for production code. No vendor lock-in, no dependency hell, just pure Go stdlib implementations of SSE and WebSocket protocols.

RFC-Compliant Protocols

Fully standards-compliant implementations ensure compatibility with all browsers and clients.

Broadcasting Made Simple

// Hub pattern for efficient broadcasting
hub := websocket.NewHub()
hub.BroadcastText("Hello, everyone!")
hub.BroadcastJSON(Message{Type: "update", Data: "..."})

Built-in Hub pattern for efficient message broadcasting to multiple clients with minimal allocations.


πŸ“¦ Installation

go get github.com/coregx/stream

Requirements: Go 1.25+ (uses encoding/json/v2 and modern generics)


πŸš€ Features

SSE (Server-Sent Events)

  • βœ… RFC Compliant - text/event-stream standard
  • βœ… Event Types - Named events (message, update, custom)
  • βœ… Event IDs - Client reconnection with Last-Event-ID
  • βœ… Retry Control - Configurable reconnection delays
  • βœ… Automatic Flushing - Real-time event delivery
  • βœ… Graceful Shutdown - Clean connection closure
  • βœ… 92.3% Test Coverage - 215 tests, comprehensive validation

WebSocket

  • βœ… RFC 6455 Compliant - Full WebSocket protocol
  • βœ… Text & Binary - Both message types supported
  • βœ… Control Frames - Ping/Pong, Close handshake
  • βœ… Broadcasting Hub - Efficient multi-client messaging
  • βœ… Connection Management - Auto cleanup, timeouts
  • βœ… Frame Masking - Client-to-server masking (RFC requirement)
  • βœ… 84.3% Test Coverage - 99 tests, production-ready

Common Features

  • πŸš€ Zero Dependencies - Pure stdlib implementation
  • 🎯 Type-Safe - Modern Go 1.25+ with generics
  • ⚑ High Performance - <100 ΞΌs broadcasts, minimal allocations
  • πŸ§ͺ Well-Tested - 314 tests total, 84.3% coverage
  • 🏒 Production Ready - Used in coregx ecosystem
  • πŸ“š Comprehensive Docs - Guides, examples, API reference

πŸ“š Documentation


🎯 Use Cases

Real-time Dashboards (SSE)

// Push live metrics to dashboard
conn.Send(sse.NewEvent(metricsJSON).WithType("metrics"))

Perfect for server-to-client updates: live metrics, notifications, stock prices, or any real-time data stream.

Chat Applications (WebSocket)

// Bidirectional messaging
hub.BroadcastText(fmt.Sprintf("%s: %s", username, message))

Full-duplex communication for chat, collaborative editing, multiplayer games.

Live Notifications (SSE)

// Server pushes notifications
event := sse.NewEvent(notification).WithType("alert").WithRetry(3000)
conn.Send(event)

Lightweight server push for notifications without WebSocket overhead.

IoT & Sensors (WebSocket)

// Binary data streaming
conn.Write(websocket.BinaryMessage, sensorData)

Efficient binary data transfer for IoT devices, sensors, telemetry.


πŸ“Š Benchmarks

SSE Performance

BenchmarkSSE_Send-8 50000 23.4 ΞΌs/op 0 allocs/op
BenchmarkSSE_Broadcast-8 30000 47.2 ΞΌs/op 1 allocs/op
BenchmarkSSE_E2E_Latency-8 20000 68.5 ΞΌs/op 2 allocs/op

WebSocket Performance

BenchmarkWS_Echo-8 100000 15.3 ΞΌs/op 0 allocs/op
BenchmarkWS_Broadcast-8 50000 32.1 ΞΌs/op 1 allocs/op
BenchmarkWS_Hub_1000clients-8 10000 156 ΞΌs/op 3 allocs/op

High throughput: >20,000 messages/sec per connection, minimal allocations, sub-100ΞΌs latency.


πŸ”§ Advanced Usage

SSE with Custom Headers

conn, err := sse.Upgrade(w, r)
if err != nil {
 http.Error(w, err.Error(), http.StatusBadRequest)
 return
}
// Set custom retry interval
event := sse.NewEvent("data").WithRetry(5000) // 5 seconds
// Named event types
event = sse.NewEvent("update").WithType("user-joined")
// Event IDs for reconnection
event = sse.NewEvent("data").WithID("msg-123")

WebSocket with Configuration

opts := &websocket.UpgradeOptions{
 ReadBufferSize: 4096,
 WriteBufferSize: 4096,
 CheckOrigin: func(r *http.Request) bool {
 return r.Header.Get("Origin") == "https://example.com"
 },
}
conn, err := websocket.Upgrade(w, r, opts)

Graceful Shutdown

// SSE
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case <-ctx.Done():
 conn.Close()
case <-conn.Done():
 // Client disconnected
}
// WebSocket Hub
hub.Close() // Gracefully closes all connections

🀝 Sister Projects

Part of the coregx ecosystem - production-ready Go libraries:

  • fursy - HTTP Router with generics, OpenAPI, RFC 9457
  • relica - Database Query Builder (coming soon)
  • stream - Real-time Communications (this library)

πŸ“Š Status

Metric Value
Version v0.1.0 (Production Ready)
Test Coverage 84.3% (SSE: 92.3%, WebSocket: 84.3%)
Tests 314 total (215 SSE, 99 WebSocket)
Test Lines 9,245 lines
Benchmarks 23 (E2E latency, throughput, load tests)
Dependencies 0 (production)
Go Version 1.25+

πŸ“„ License

MIT License - see LICENSE file for details.


πŸ™ Acknowledgments

Special Thanks

Professor Ancha Baranova - This project would not have been possible without her invaluable help and support. Her assistance was crucial in making all coregx projects a reality.


Built with ❀️ for the Go community by coregx

About

Production-ready Server-Sent Events (SSE) and WebSocket for Go 1.25+ | RFC-compliant | Zero dependencies | 84% test coverage

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Sponsor this project

Packages

Contributors

AltStyle γ«γ‚ˆγ£γ¦ε€‰ζ›γ•γ‚ŒγŸγƒšγƒΌγ‚Έ (->γ‚ͺγƒͺγ‚ΈγƒŠγƒ«) /