A lightweight, spec-compliant Server-Sent Events (SSE) client for Swift.
- Full implementation of the Server-Sent Events specification
- Automatic connection management and reconnection with configurable retry intervals
- Event parsing that handles all standard fields (
id,event,data,retry) - Support for different line break formats (
LF,CR,CRLF) - Multi-line data aggregation
-
AsyncSequencesupport for streaming events - Works on all Apple platforms and Linux
- Swift 6.0+ / Xcode 16+
Add the following to your Package.swift file:
dependencies: [ .package(url: "https://github.com/mattt/EventSource.git", from: "1.4.0") ]
EventSource can optionally integrate with AsyncHTTPClient through a package trait (Swift 6.1+):
dependencies: [ .package( url: "https://github.com/mattt/EventSource.git", from: "1.4.0", traits: ["AsyncHTTPClient"] ) ]
Build and test with the trait enabled:
swift build --traits AsyncHTTPClient
swift test --traits AsyncHTTPClientNote
AsyncHTTPClient uses SwiftNIO instead of Foundation URL Loading System.
If traffic is routed through AsyncHTTPClient, URLProtocol-based interception
does not apply.
On Linux, EventSource starts with URLSession transport and switches to
AsyncHTTPClient only after a retryable URLSession failure. Once switched,
that EventSource instance continues using AsyncHTTPClient for reconnect attempts.
Create an EventSource with a URL to establish
a persistent connection to an SSE endpoint.
The API mirrors the JavaScript EventSource interface
with event handlers for connection lifecycle management.
import EventSource import Foundation // Initialize with SSE endpoint URL let sse = EventSource(url: URL(string: "https://example.com/events")!) // Create an EventSource with the URL let sse = EventSource(url: url) // Set up event handlers sse.onOpen = { print("Connection established") } sse.onMessage = { e in print("Received event: \(e.event): \(e.data)") } sse.onError = { error in if let error = error { print("Error: \(error)") } else { print("Connection closed") } } // Later, when done Task { await sse.close() }
Alternatively, you can process server-sent event data with
Swift's modern AsyncSequence API for greater flexibility and control.
Use this approach when you need custom request configuration
or direct integration with existing async / URL Loading System code.
import EventSource import Foundation Task { // Create a request to the SSE endpoint let url = URL(string: "https://example.com/events")! let request = URLRequest(url: url) do { let (stream, _) = try await URLSession.shared.bytes(for: request) // Iterate through events as they arrive for try await event in stream.events { switch event.event { case "update": handleUpdate(event.data) case "error": handleError(event.data) default: print("Received: \(event.data)") } } } catch { print("Stream error: \(error.localizedDescription)") } }
Use the low-level parser directly to process raw SSE data. This approach is ideal for custom networking stacks, testing, or scenarios where you need precise control over state management.
import EventSource import Foundation // Create a parser instance let parser = EventSource.Parser() // Process raw SSE data byte-by-byte let rawData = """ id: 123 event: update data: {"key": "value"} data: Another message """.utf8 // Parse the data Task { // Feed bytes to the parser for byte in rawData { await parser.consume(byte) } await parser.finish() // Extract all parsed events while let event = await parser.getNextEvent() { handleEvent(event) } // Access state for reconnection logic print("Last Event ID: \(await parser.getLastEventId())") print("Reconnection time: \(await parser.getReconnectionTime())ms") }
Inference providers like Anthropic and OpenAI offer APIs with endpoints that stream tokens as they're generated. Here's a simplified example of how you might consume server-sent events for such an API:
import EventSource import Foundation // Simple model for LLM token streaming struct TokenChunk: Codable { let text: String let isComplete: Bool } // Create a request to the LLM streaming endpoint let url = URL(string: "https://api.example.com/completions")! var request = URLRequest(url: url) request.httpMethod = "POST" request.setValue("application/json", forHTTPHeaderField: "Content-Type") request.setValue("Bearer YOUR_API_KEY", forHTTPHeaderField: "Authorization") // Track the full response var completedText = "" // Process the stream asynchronously Task { do { // Get a byte stream from URLSession let (byteStream, response) = try await URLSession.shared.bytes(for: request) // Ensure response is valid guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 200, let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type"), contentType.contains("text/event-stream") else { throw NSError(domain: NSURLErrorDomain, code: NSURLErrorBadServerResponse, userInfo: nil) } let decoder = JSONDecoder() // Stream events asynchronously for try await event in byteStream.events { // Decode each chunk as it arrives let chunk = try decoder.decode(TokenChunk.self, from: Data(event.data.utf8)) // Add the new token to our result completedText += chunk.text print("Text so far: \(completedText)") // Check if the response is complete if chunk.isComplete { print("Final response: \(completedText)") break } } } catch { print("Stream error: \(error.localizedDescription)") } }
This project is available under the MIT license. See the LICENSE file for more info.