From e950ea15cbf69db65c617341b5961ebe6fec904e Mon Sep 17 00:00:00 2001 From: Hevake Date: 2026年6月17日 15:46:06 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat(http-sse):=E5=AE=9E=E7=8E=B0=E4=BA=86?= =?UTF-8?q?=20Http=20=E7=9A=84=20SSE=20=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 3 +- README_CN.md | 2 +- documents/modules/README.md | 2 +- documents/modules/README_CN.md | 2 +- documents/modules/http.md | 95 ++++ documents/modules/http_CN.md | 95 ++++ examples/http/server/sse/Makefile | 37 ++ examples/http/server/sse/push.cpp | 200 +++++++ modules/http/CMakeLists.txt | 16 +- modules/http/Makefile | 8 + modules/http/respond.cpp | 5 +- modules/http/server/server_imp.cpp | 17 +- modules/http/server/sse/sse_connection.cpp | 201 +++++++ modules/http/server/sse/sse_connection.h | 114 ++++ .../http/server/sse/sse_connection_test.cpp | 49 ++ modules/http/server/sse/sse_event.cpp | 78 +++ modules/http/server/sse/sse_event.h | 73 +++ modules/http/server/sse/sse_event_test.cpp | 140 +++++ modules/http/server/sse/sse_server.h | 134 +++++ modules/http/server/sse/sse_server_impl.cpp | 522 ++++++++++++++++++ modules/http/server/sse/sse_server_impl.h | 137 +++++ modules/http/server/sse/sse_server_test.cpp | 92 +++ 22 files changed, 2008 insertions(+), 14 deletions(-) create mode 100644 examples/http/server/sse/Makefile create mode 100644 examples/http/server/sse/push.cpp create mode 100644 modules/http/server/sse/sse_connection.cpp create mode 100644 modules/http/server/sse/sse_connection.h create mode 100644 modules/http/server/sse/sse_connection_test.cpp create mode 100644 modules/http/server/sse/sse_event.cpp create mode 100644 modules/http/server/sse/sse_event.h create mode 100644 modules/http/server/sse/sse_event_test.cpp create mode 100644 modules/http/server/sse/sse_server.h create mode 100644 modules/http/server/sse/sse_server_impl.cpp create mode 100644 modules/http/server/sse/sse_server_impl.h create mode 100644 modules/http/server/sse/sse_server_test.cpp diff --git a/README.md b/README.md index 9903984..73c47f9 100644 --- a/README.md +++ b/README.md @@ -109,8 +109,7 @@ It contains an event-driven behavior tree that can realize sequential, branching | run | It's an ELF. It loads one or more lib\*.so file which specified by parameter `-l xxx`, then run Modules in side | | mqtt | MQTT Client | | coroutine | coroutine function | -| http | Implemented HTTP Server and Client modules on the basis of network | -| websocket | Implemented WebSocket Server and Client modules based on HTTP middleware, RFC 6455 | +| http | Implemented HTTP Server and Client, middleware, and SSE (Server-Sent Events) modules on the basis of network | | alarm | Realized 4 commonly used alarm clocks: CRON alarm clock, single alarm clock, weekly cycle alarm clock, weekday alarm clock | | flow | Contains multi-level state machine and behavior tree to solve the problem of action flow in asynchronous mode | | crypto | Implemented the commonly used AES and MD5 encryption and decryption calculations | diff --git a/README_CN.md b/README_CN.md index dbc8b80..9f06b47 100644 --- a/README_CN.md +++ b/README_CN.md @@ -111,7 +111,7 @@ trace模块能记录被标记的函数每次执行的时间点与时长,可导 | run | 执行器 | 是个可执行程序,可加载多个由参数`-l xxx`指定的动态库,并运行其中的Module | | mqtt | MQTT客户端库 | | | coroutine | 协程库 | 众所周知,异步框架不方便处理顺序性业务,协程弥补之 | -| http | HTTP库 | 在network的基础上实现了HTTP的Server与Client模块 | +| http | HTTP库 | 在network的基础上实现了HTTP的Server与Client、中间件、SSE(服务端推送事件)模块 | | websocket | WebSocket库 | 在http的基础上实现了WebSocket的Server与Client模块,遵循RFC 6455 | | alarm | 闹钟库 | 实现了4种常用的闹钟:CRON闹钟、单次闹钟、星期循环闹钟、工作日闹钟 | | flow | 流程库| 含多层级状态机与行为树,解决异步模式下动行流程问题 | diff --git a/documents/modules/README.md b/documents/modules/README.md index f03c7c4..f9d31a8 100644 --- a/documents/modules/README.md +++ b/documents/modules/README.md @@ -17,7 +17,7 @@ cpp-tbox is an event-driven C++ service application development library that pro | **network** | Network communication | TCP/UDP/UART communication and byte stream abstraction | [network.md](network.md) | | **terminal** | Interactive terminal | Runtime command interaction, similar to Bash shell | [terminal.md](terminal.md) | | **log** | Log channels | File/stdout/syslog and other log outputs | [log.md](log.md) | -| **http** | HTTP service | Express-style HTTP server/client and middleware | [http.md](http.md) | +| **http** | HTTP service | Express-style HTTP server/client, middleware, and SSE (Server-Sent Events) | [http.md](http.md) | | **websocket** | WebSocket service | WebSocket server/client, RFC 6455, HTTP middleware-based | [websocket.md](websocket.md) | | **coroutine** | Coroutine | Coroutine scheduler and Channel/Mutex helper components | [coroutine.md](coroutine.md) | | **alarm** | Timer alarm | Cron/Oneshot/Weekly/Workday timers | [alarm.md](alarm.md) | diff --git a/documents/modules/README_CN.md b/documents/modules/README_CN.md index 7832390..591bd51 100644 --- a/documents/modules/README_CN.md +++ b/documents/modules/README_CN.md @@ -17,7 +17,7 @@ cpp-tbox 是一个基于事件驱动的 C++ 服务应用开发库,提供完整 | **network** | 网络通信 | TCP/UDP/UART 通信与字节流抽象 | [network_CN.md](network_CN.md) | | **terminal** | 交互终端 | 运行时命令交互,类似 Bash shell | [terminal_CN.md](terminal_CN.md) | | **log** | 日志通道 | 文件/stdout/syslog 等日志输出 | [log_CN.md](log_CN.md) | -| **http** | HTTP 服务 | Express 式 HTTP 服务端与客户端、中间件 | [http_CN.md](http_CN.md) | +| **http** | HTTP 服务 | Express 式 HTTP 服务端与客户端、中间件、SSE(服务端推送事件) | [http_CN.md](http_CN.md) | | **websocket** | WebSocket 服务 | WebSocket 服务端与客户端,RFC 6455,基于 HTTP 中间件 | [websocket_CN.md](websocket_CN.md) | | **coroutine** | 协程 | 协程调度器与 Channel/Mutex 等辅助组件 | [coroutine_CN.md](coroutine_CN.md) | | **alarm** | 定时闹钟 | Cron/Oneshot/Weekly/Workday 定时器 | [alarm_CN.md](alarm_CN.md) | diff --git a/documents/modules/http.md b/documents/modules/http.md index 4390105..3dc160f 100644 --- a/documents/modules/http.md +++ b/documents/modules/http.md @@ -351,6 +351,101 @@ http_client.setReconnectDelayCalcFunc( 6. **Calling external APIs**: Use Client to make HTTP requests to other services 7. **Service-to-service communication**: Client with auto-reconnect for reliable inter-service HTTP calls +## SSE — Server-Sent Events + +The http module also includes an SSE (Server-Sent Events) sub-package, implementing server-side event push based on the W3C/WHATWG EventSource specification. SSE uses standard HTTP long-lived responses (200 OK) to stream events to the browser, requiring no protocol upgrade like WebSocket. + +### Header Files + +```cpp +#include //! SSE event data structure +#include //! SSE server +#include //! SSE connection (internal) +``` + +### SseServer — SSE Server + +SseServer runs on top of an HTTP server as a middleware. It detects SSE requests (Accept: text/event-stream), sets 200 OK response headers, and takes over the TcpConnection via the `upgrade_cb` mechanism to provide continuous event streaming. + +| Method | Description | +|------|------| +| `SseServer(loop)` | Constructor | +| `initialize(http_server, url_path)` | Initialize: associate with an HTTP server; `url_path` controls URL matching | +| `start()` | Start (registers as HTTP middleware) | +| `stop()` | Stop (unregisters middleware, closes all SSE connections) | +| `cleanup()` | Cleanup | +| `state()` | Get current state (None/Inited/Running) | +| `send(client, data)` | Send data to a client (simple text, event type "message") | +| `send(client, event)` | Send SseEvent to a client | +| `sendToAll(data)` | Broadcast data to all clients | +| `sendToAll(event)` | Broadcast SseEvent to all clients | +| `close(client)` | Close a client connection | +| `sendHeartbeat(client, comment)` | Send heartbeat comment line | +| `setHeartbeatInterval(ms)` | Set auto-heartbeat interval (default: 0 = disabled) | +| `isClientValid(client)` | Check if a client connection is still valid | +| `peerAddr(client)` | Get client address (IP:port) | +| `getLastEventId(client)` | Get the Last-Event-ID from browser reconnect | +| `getUrl(client)` | Get the URL path the client connected to | +| `setContext(client, ctx, deleter)` | Set context data for a client | +| `getContext(client)` | Get context data for a client | +| `setConnectedCallback(cb)` | Set callback: new client connected | +| `setDisconnectedCallback(cb)` | Set callback: client disconnected | +| `IsSseRequest(req)` | Static: check if an HTTP request is a valid SSE request | + +**URL path matching rules:** Same as WsServer — prefix match if url_path ends with `/`, exact match otherwise, empty string matches all. + +**SSE vs WebSocket:** + +| Feature | WebSocket | SSE | +|---------|-----------|------| +| HTTP status code | 101 Switching Protocols | 200 OK | +| Data direction | Bidirectional | Server→Client only | +| Data format | Binary frames | Plain text (`data:`/`event:`/`id:` fields) | +| Client message callback | Yes | No (unidirectional) | +| Heartbeat | Ping/Pong frames | Comment lines + timer | +| Reconnection | Manual implementation | Browser auto-reconnect + Last-Event-ID | +| Module | Separate `websocket` module | Inside `http` module | + +### SseEvent — SSE Event Data Structure + +```cpp +struct SseEvent { + std::string id; //! Event ID (optional), for Last-Event-ID reconnection + std::string event; //! Event type (optional, default "message") + std::string data; //! Data (required, supports multiline) + int retry = 0; //! Reconnect interval in ms (optional) + + //! Format event as SSE text protocol + //! Multiline data auto-splits into multiple `data:` lines + std::string toString() const; +}; +``` + +### SSE Example: Event Push + +> Full example in `examples/http/server/sse/` + +```cpp +#include +#include + +SseServer sse_srv(sp_loop); +sse_srv.initialize(&http_srv, "/sse/events"); +sse_srv.setHeartbeatInterval(std::chrono::seconds(15)); + +sse_srv.setConnectedCallback([](const SseServer::ConnToken &token) { + LogInfo("sse client connected"); + sse_srv.send(token, "Welcome!"); +}); + +//! Push events every 5 seconds +SseEvent evt; +evt.id = "42"; +evt.event = "tick"; +evt.data = "{\"time\":\"2026-06-16 10:30:00\"}"; +sse_srv.sendToAll(evt); +``` + ## Important Notes 1. **Middleware invocation order**: Middleware added via `use()` executes in the order it was added diff --git a/documents/modules/http_CN.md b/documents/modules/http_CN.md index 3ae8459..0ea996e 100644 --- a/documents/modules/http_CN.md +++ b/documents/modules/http_CN.md @@ -353,6 +353,101 @@ http_client.setReconnectDelayCalcFunc( 6. **调用外部 API**:使用 Client 向其它服务发起 HTTP 请求 7. **服务间通信**:使用 Client 配合自动重连,实现可靠的服务间 HTTP 调用 +## SSE — Server-Sent Events(服务端推送事件) + +http 模块还包含 SSE(Server-Sent Events)子包,实现基于 W3C/WHATWG EventSource 规范的服务端事件推送。SSE 使用标准 HTTP 长响应(200 OK)向浏览器流式推送事件,不像 WebSocket 需要协议升级。 + +### 头文件 + +```cpp +#include //! SSE 事件数据结构 +#include //! SSE 服务端 +#include //! SSE 连接(内部类) +``` + +### SseServer — SSE 服务端 + +SseServer 运行在 HTTP 服务器之上,作为中间件存在。它检测 SSE 请求(Accept 头包含 text/event-stream),设置 200 OK 响应头,通过 `upgrade_cb` 机制接管 TcpConnection,提供持续的事件推送。 + +| 方法 | 说明 | +|------|------| +| `SseServer(loop)` | 构造 | +| `initialize(http_server, url_path)` | 初始化:关联到 HTTP 服务器;`url_path` 控制 URL 匹配规则 | +| `start()` | 启动(注册为 HTTP 中间件) | +| `stop()` | 停止(反注册中间件,关闭所有 SSE 连接) | +| `cleanup()` | 清理 | +| `state()` | 获取当前状态 (None/Inited/Running) | +| `send(client, data)` | 向指定客户端发送数据(简单文本) | +| `send(client, event)` | 向指定客户端发送 SseEvent | +| `sendToAll(data)` | 向所有客户端广播数据 | +| `sendToAll(event)` | 向所有客户端广播 SseEvent | +| `close(client)` | 关闭指定客户端连接 | +| `sendHeartbeat(client, comment)` | 发送心跳注释行 | +| `setHeartbeatInterval(ms)` | 设置自动心跳间隔(默认 0 = 禁用) | +| `isClientValid(client)` | 检查客户端连接是否有效 | +| `peerAddr(client)` | 获取客户端地址 | +| `getLastEventId(client)` | 获取浏览器重连时的 Last-Event-ID | +| `getUrl(client)` | 获取客户端连接的 URL 路径 | +| `setContext(client, ctx, deleter)` | 设置上下文数据 | +| `getContext(client)` | 获取上下文数据 | +| `setConnectedCallback(cb)` | 设置回调:客户端连接 | +| `setDisconnectedCallback(cb)` | 设置回调:客户端断开 | +| `IsSseRequest(req)` | 静态方法:检查是否为 SSE 请求 | + +**URL 路径匹配规则:** 与 WsServer 一致——url_path 以 `/` 结尾为前缀匹配,不以 `/` 结尾为全量匹配,空字符串匹配所有。 + +**SSE 与 WebSocket 对比:** + +| 特性 | WebSocket | SSE | +|------|-----------|------| +| HTTP 状态码 | 101 Switching Protocols | 200 OK | +| 数据方向 | 双向 | 仅服务端→客户端 | +| 数据格式 | 二进制帧 | 纯文本(data:/event:/id: 字段) | +| 客户端消息回调 | 有 | 无(单向) | +| 心跳 | Ping/Pong 帧 | 注释行 + 定时器 | +| 重连机制 | 自行实现 | 浏览器自动重连 + Last-Event-ID | +| 模块位置 | 独立 `websocket` 模块 | `http` 模块内 | + +### SseEvent — SSE 事件数据结构 + +```cpp +struct SseEvent { + std::string id; //! 事件ID(可选),用于 Last-Event-ID 断线续传 + std::string event; //! 事件类型(可选,默认 "message") + std::string data; //! 数据(必须,支持多行) + int retry = 0; //! 重连间隔毫秒数(可选) + + //! 将事件格式化为 SSE 文本协议格式 + //! 多行 data 自动拆分为多个 `data:` 行 + std::string toString() const; +}; +``` + +### SSE 示例:事件推送 + +> 完整示例见 `examples/http/server/sse/` + +```cpp +#include +#include + +SseServer sse_srv(sp_loop); +sse_srv.initialize(&http_srv, "/sse/events"); +sse_srv.setHeartbeatInterval(std::chrono::seconds(15)); + +sse_srv.setConnectedCallback([](const SseServer::ConnToken &token) { + LogInfo("sse 客户端已连接"); + sse_srv.send(token, "欢迎!"); +}); + +//! 每 5 秒推送事件 +SseEvent evt; +evt.id = "42"; +evt.event = "tick"; +evt.data = "{\"time\":\"2026-06-16 10:30:00\"}"; +sse_srv.sendToAll(evt); +``` + ## 注意事项 1. **中间件调用顺序**:`use()` 添加的中间件按添加顺序执行 diff --git a/examples/http/server/sse/Makefile b/examples/http/server/sse/Makefile new file mode 100644 index 0000000..38adaa2 --- /dev/null +++ b/examples/http/server/sse/Makefile @@ -0,0 +1,37 @@ +# +# .============. +# // M A K E / \ +# // C++ DEV / \ +# // E A S Y / \/ \ +# ++ ----------. \/\ . +# \\ \ \ /\ / +# \\ \ \ / +# \\ \ \ / +# -============' +# +# Copyright (c) 2026 Hevake and contributors, all rights reserved. +# +# This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) +# Use of this source code is governed by MIT license that can be found +# in the LICENSE file in the root of the source tree. All contributing +# project authors may be found in the CONTRIBUTORS.md file in the root +# of the source tree. +# + +PROJECT := examples/http/server/sse +EXE_NAME := ${PROJECT} + +CPP_SRC_FILES := push.cpp + +CXXFLAGS := -DMODULE_ID='"$(EXE_NAME)"' $(CXXFLAGS) +LDFLAGS += \ + -ltbox_http \ + -ltbox_network \ + -ltbox_eventx \ + -ltbox_event \ + -ltbox_log \ + -ltbox_util \ + -ltbox_base \ + -lpthread -ldl + +include $(TOP_DIR)/mk/exe_common.mk diff --git a/examples/http/server/sse/push.cpp b/examples/http/server/sse/push.cpp new file mode 100644 index 0000000..6ffe6f6 --- /dev/null +++ b/examples/http/server/sse/push.cpp @@ -0,0 +1,200 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ + +/** + * SSE 定时推送示例 + * + * 功能: + * - 创建 HTTP 服务器,挂载 SSE 服务到 /sse/events + * - 每 5 秒向所有 SSE 客户端推送当前时间事件 + * - SSE 连接自动心跳(每 15 秒发送注释行) + * - HTTP 主页提供浏览器 EventSource 客户端代码 + * - Ctrl+C 优雅退出 + * + * 用法: + * ./sse_push [bind_addr] + * 示例: ./sse_push 0.0.0.0:8080 + * 浏览器访问: http://127.0.0.1:8080/ + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace tbox; +using namespace tbox::event; +using namespace tbox::http; +using namespace tbox::http::server; +using namespace tbox::http::sse; + +//! 获取当前时间字符串 +static std::string getTimeString() +{ + auto now = std::chrono::system_clock::now(); + auto time_t_now = std::chrono::system_clock::to_time_t(now); + char buf[64]; + std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time_t_now)); + return buf; +} + +//! HTML 主页:包含 EventSource 客户端 JavaScript +static const std::string kIndexHtml = + "\n" + " SSE Push Demo\n" + "
\n" + "

SSE Push Demo

\n" + "\n" + "\n" + "

AltStyle によって変換されたページ (->オリジナル) /

\n"; + +int main(int argc, char **argv) +{ + std::string bind_addr = "0.0.0.0:8080"; + + if (argc == 2) + bind_addr = argv[1]; + + LogOutput_Enable(); + + LogInfo("enter"); + + auto sp_loop = Loop::New(); + auto sp_sig_event = sp_loop->newSignalEvent(); + + //! 心跳定时器 + auto sp_push_timer = sp_loop->newTimerEvent(); + + SetScopeExitAction( + [=] { + delete sp_push_timer; + delete sp_sig_event; + delete sp_loop; + } + ); + + //! 创建 HTTP 服务器 + Server http_srv(sp_loop); + if (!http_srv.initialize(network::SockAddr::FromString(bind_addr), 1)) { + LogErr("init http server fail"); + return 0; + } + //http_srv.setContextLogEnable(true); + + //! 创建 SSE 服务,挂载到 /sse/events + SseServer sse_srv(sp_loop); + if (!sse_srv.initialize(&http_srv, "/sse/events")) { + LogErr("init sse server fail"); + return 0; + } + //sse_srv.setContextLogEnable(true); + + //! 设置自动心跳(每 15 秒发送注释行,保持连接活跃) + sse_srv.setHeartbeatInterval(std::chrono::seconds(15)); + + //! 设置 SSE 连接回调 + sse_srv.setConnectedCallback([&](const SseServer::ConnToken &token) { + auto addr = sse_srv.peerAddr(token); + LogInfo("sse client connected from %s", addr.toString().c_str()); + + //! 向新连接发送欢迎消息 + sse_srv.send(token, "Welcome! SSE connection established."); + }); + + sse_srv.setDisconnectedCallback([](const SseServer::ConnToken &token) { + LogInfo("sse client disconnected"); + }); + + //! 定时推送:每 5 秒向所有客户端推送当前时间 + int event_id = 0; + sp_push_timer->initialize(std::chrono::seconds(5), Event::Mode::kPersist); + sp_push_timer->setCallback([&] { + //! 构造 SSE 事件 + SseEvent evt; + evt.id = std::to_string(++event_id); + evt.event = "tick"; + evt.data = "{\"time\":\"" + getTimeString() + "\",\"id\":" + std::to_string(event_id) + "}"; + + //! 向所有客户端推送 + sse_srv.sendToAll(evt); + LogDbg("push event id:%d to clients", event_id); + }); + + //! 添加 HTTP 主页处理 + http_srv.use( + [&](ContextSptr ctx, const NextFunc &next) { + if (ctx->req().url.path == "/") { + ctx->res().status_code = StatusCode::k200_OK; + ctx->res().headers["Content-Type"] = "text/html; charset=utf-8"; + ctx->res().body = kIndexHtml; + return; + } + next(); + } + ); + + //! 启动服务 + http_srv.start(); + sse_srv.start(); + sp_push_timer->enable(); + + //! Ctrl+C 退出 + sp_sig_event->initialize(SIGINT, Event::Mode::kPersist); + sp_sig_event->enable(); + sp_sig_event->setCallback( + [&] (int) { + LogInfo("stopping..."); + sp_push_timer->disable(); + sse_srv.stop(); + http_srv.stop(); + sp_loop->exitLoop(); + } + ); + + LogInfo("start, listen at %s", bind_addr.c_str()); + sp_loop->runLoop(); + LogInfo("stop"); + + sse_srv.cleanup(); + http_srv.cleanup(); + + LogInfo("exit"); + return 0; +} diff --git a/modules/http/CMakeLists.txt b/modules/http/CMakeLists.txt index ff4e474..33c1831 100644 --- a/modules/http/CMakeLists.txt +++ b/modules/http/CMakeLists.txt @@ -42,6 +42,9 @@ set(TBOX_HTTP_SOURCES server/middlewares/form_data.cpp server/middlewares/form_data_middleware.cpp server/middlewares/file_downloader_middleware.cpp + server/sse/sse_event.cpp + server/sse/sse_connection.cpp + server/sse/sse_server_impl.cpp client/client.cpp client/client_impl.cpp client/respond_parser.cpp) @@ -51,7 +54,10 @@ set(TBOX_HTTP_TEST_SOURCES respond_test.cpp request_test.cpp url_test.cpp - server/request_parser_test.cpp) + server/request_parser_test.cpp + server/sse/sse_event_test.cpp + server/sse/sse_server_test.cpp + server/sse/sse_connection_test.cpp) add_library(${TBOX_LIBRARY_NAME} ${TBOX_BUILD_LIB_TYPE} ${TBOX_HTTP_SOURCES}) add_library(tbox::${TBOX_LIBRARY_NAME} ALIAS ${TBOX_LIBRARY_NAME}) @@ -112,6 +118,14 @@ install( DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/tbox/http/client ) +install( + FILES + server/sse/sse_event.h + server/sse/sse_server.h + server/sse/sse_connection.h + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/tbox/http/server/sse +) + # generate and install export file install( EXPORT ${TBOX_LIBRARY_NAME}_targets diff --git a/modules/http/Makefile b/modules/http/Makefile index 775eadb..7b77a50 100644 --- a/modules/http/Makefile +++ b/modules/http/Makefile @@ -37,6 +37,8 @@ HEAD_FILES = \ server/middlewares/form_data.h \ server/middlewares/form_data_middleware.h \ server/middlewares/file_downloader_middleware.h \ + server/sse/sse_event.h \ + server/sse/sse_server.h \ client/client.h \ CPP_SRC_FILES = \ @@ -52,6 +54,9 @@ CPP_SRC_FILES = \ server/middlewares/form_data.cpp \ server/middlewares/form_data_middleware.cpp \ server/middlewares/file_downloader_middleware.cpp \ + server/sse/sse_event.cpp \ + server/sse/sse_connection.cpp \ + server/sse/sse_server_impl.cpp \ client/client.cpp \ client/client_impl.cpp \ client/respond_parser.cpp \ @@ -65,6 +70,9 @@ TEST_CPP_SRC_FILES = \ request_test.cpp \ url_test.cpp \ server/request_parser_test.cpp \ + server/sse/sse_event_test.cpp \ + server/sse/sse_server_test.cpp \ + server/sse/sse_connection_test.cpp \ TEST_LDFLAGS := $(LDFLAGS) -ltbox_network -ltbox_log -ltbox_eventx -ltbox_event -ltbox_util -ltbox_base -ldl diff --git a/modules/http/respond.cpp b/modules/http/respond.cpp index f527d08..4619bfe 100644 --- a/modules/http/respond.cpp +++ b/modules/http/respond.cpp @@ -40,7 +40,10 @@ std::string Respond::toString() const has_content_length = true; } - if (!has_content_length) + //! 当 upgrade_cb 已设置时(WebSocket 101、SSE 200 等),不自动添加 Content-Length + //! 原因:升级/流式响应后面是持续的数据流(WebSocket 帧、SSE 事件),不是定长 body + //! Content-Length 会误导浏览器认为响应已完成,阻止流式数据接收 + if (!has_content_length && !upgrade_cb) oss << "Content-Length: " << body.length() << CRLF; oss << CRLF; diff --git a/modules/http/server/server_imp.cpp b/modules/http/server/server_imp.cpp index 56ac949..d70adc3 100644 --- a/modules/http/server/server_imp.cpp +++ b/modules/http/server/server_imp.cpp @@ -265,15 +265,18 @@ void Server::Impl::commitRespond(const TcpServer::ConnToken &ct, int index, Resp //! 处理协议升级请求(WebSocket 101、SSE 200 等) //! 触发条件:res->upgrade_cb 已设置(中间件负责设置) if (res->upgrade_cb) { + //! 注意:必须在 std::move(upgrade_cb) 之前调用 toString() + //! 否则 move 后 upgrade_cb 为空,toString() 会误判为普通响应而添加 Content-Length + //! SSE 响应添加 Content-Length:0 会导致浏览器认为响应已完成并断开重连 + const string content = res->toString(); + //! 发送响应后,将 TcpConnection 从 HTTP 服务器分离,交给升级协议 auto upgrade_cb = std::move(res->upgrade_cb); - { - const string &content = res->toString(); - tcp_server_.send(ct, content.data(), content.size()); - delete res; - if (context_log_enable_) - LogDbg("RES: [%s]", content.c_str()); - } + delete res; + + tcp_server_.send(ct, content.data(), content.size()); + if (context_log_enable_) + LogDbg("RES: [%s]", content.c_str()); //! 当前回调结束后立即执行 detach(使用 runNext,更高效) //! 因为 commitRespond() 是在 Loop 线程中执行的 diff --git a/modules/http/server/sse/sse_connection.cpp b/modules/http/server/sse/sse_connection.cpp new file mode 100644 index 0000000..9841ff4 --- /dev/null +++ b/modules/http/server/sse/sse_connection.cpp @@ -0,0 +1,201 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include "sse_connection.h" + +#include +#include +#include + +namespace tbox { +namespace http { +namespace sse { + +using namespace std::placeholders; + +SseConnection::SseConnection(event::Loop *wp_loop, + network::TcpConnection *tcp_conn, + const std::string &url, + const std::string &last_event_id) + : wp_loop_(wp_loop) + , sp_tcp_conn_(tcp_conn) + , url_(url) + , last_event_id_(last_event_id) +{ + TBOX_ASSERT(wp_loop != nullptr); + TBOX_ASSERT(tcp_conn != nullptr); + + //! 设置 TcpConnection 的回调 + //! SSE 是单向推送协议(服务端→客户端),不需要处理客户端发送的数据 + //! 但必须保持 receiveCallback 注册(阈值=0),否则底层 BufferedFd 会停止监听 + //! socket 读事件,导致:(1) 无法检测浏览器关闭连接;(2) TCP 写事件也可能受影响 + //! 与 WsConnection 一样,设置空函数体回调而非 nullptr,确保读事件持续监听 + sp_tcp_conn_->setReceiveCallback([](util::Buffer &buff) { buff.hasReadAll(); }, 0); + sp_tcp_conn_->setDisconnectedCallback(std::bind(&SseConnection::onTcpDisconnected, this)); + sp_tcp_conn_->setSendCompleteCallback(std::bind(&SseConnection::onTcpSendCompleted, this)); +} + +SseConnection::~SseConnection() +{ + TBOX_ASSERT(cb_level_ == 0); + + if (sp_tcp_conn_ == nullptr) + return; + + //! 先取消 TcpConnection 的回调,防止断开时回调到已销毁的 SseConnection + //! 注意:receiveCallback 不能设为 nullptr,否则会停止 socket 读事件监听 + //! 设置空函数体回调即可 + sp_tcp_conn_->setDisconnectedCallback(nullptr); + sp_tcp_conn_->setSendCompleteCallback(nullptr); + + sp_tcp_conn_->disconnect(); + auto tcp_conn = sp_tcp_conn_; + sp_tcp_conn_ = nullptr; + wp_loop_->runNext([tcp_conn] { CHECK_DELETE_OBJ(tcp_conn); }, + "SseConnection::~SseConnection, delete tcp_conn"); +} + +//! === 发送事件 === + +bool SseConnection::send(const std::string &data) +{ + if (sp_tcp_conn_ == nullptr) + return false; + + //! 简单数据:只输出 data 字段 + //! 格式:"data: xxx\n\n" + std::string sse_text = "data: " + data + "\n\n"; + + if (context_log_enable_) + LogDbg("SEND: %s", sse_text.c_str()); + + return sp_tcp_conn_->send(sse_text.data(), sse_text.size()); +} + +bool SseConnection::send(const SseEvent &event) +{ + if (sp_tcp_conn_ == nullptr) + return false; + + //! 完整事件:调用 SseEvent::toString() 格式化后发送 + std::string sse_text = event.toString(); + + if (context_log_enable_) + LogDbg("SEND: %s", sse_text.c_str()); + + return sp_tcp_conn_->send(sse_text.data(), sse_text.size()); +} + +bool SseConnection::sendHeartbeat(const std::string &comment) +{ + if (sp_tcp_conn_ == nullptr) + return false; + + //! 心跳注释行:": \n\n" + //! 浏览器 EventSource 会忽略以 ":" 开头的行,用于保持连接活跃 + std::string sse_text = ": " + comment + "\n\n"; + + if (context_log_enable_) + LogDbg("SEND: %s", sse_text.c_str()); + + return sp_tcp_conn_->send(sse_text.data(), sse_text.size()); +} + +bool SseConnection::close() +{ + if (sp_tcp_conn_ == nullptr) + return false; + + //! SSE 没有特殊的关闭协议,直接断开 TCP 连接 + sp_tcp_conn_->disconnect(); + return true; +} + +//! === 客户端信息 === + +network::SockAddr SseConnection::peerAddr() const +{ + if (sp_tcp_conn_ != nullptr) + return sp_tcp_conn_->peerAddr(); + return network::SockAddr(); +} + +std::string SseConnection::getLastEventId() const +{ + return last_event_id_; +} + +std::string SseConnection::getUrl() const +{ + return url_; +} + +bool SseConnection::isExpired() const +{ + return sp_tcp_conn_ == nullptr || sp_tcp_conn_->isExpired(); +} + +//! === 上下文数据 === + +void SseConnection::setContext(void *context, ContextDeleter &&deleter) +{ + if (sp_tcp_conn_ != nullptr) + sp_tcp_conn_->setContext(context, std::move(deleter)); +} + +void* SseConnection::getContext() const +{ + if (sp_tcp_conn_ != nullptr) + return sp_tcp_conn_->getContext(); + return nullptr; +} + +//! === TCP 回调 === + +void SseConnection::onTcpDisconnected() +{ + LogInfo("sse disconnected"); + + //! 通知 SseServer(通过 close_cb_ 绑定了 ConnToken) + if (close_cb_) { + ++cb_level_; + close_cb_(); + --cb_level_; + } + + //! 清理 TcpConnection:先断空指针,延后删除 + //! 必须在 close_cb_() 之后清理,否则回调中 getContext() 拿到空值 + auto tcp_conn = sp_tcp_conn_; + sp_tcp_conn_ = nullptr; + wp_loop_->runNext([tcp_conn] { CHECK_DELETE_OBJ(tcp_conn); }, + "SseConnection::onTcpDisconnected, delete tcp_conn"); +} + +void SseConnection::onTcpSendCompleted() +{ + if (send_complete_cb_) { + ++cb_level_; + send_complete_cb_(); + --cb_level_; + } +} + +} +} +} diff --git a/modules/http/server/sse/sse_connection.h b/modules/http/server/sse/sse_connection.h new file mode 100644 index 0000000..d2ee2af --- /dev/null +++ b/modules/http/server/sse/sse_connection.h @@ -0,0 +1,114 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#ifndef TBOX_HTTP_SSE_CONNECTION_H_20260616 +#define TBOX_HTTP_SSE_CONNECTION_H_20260616 + +#include +#include +#include + +#include "sse_event.h" + +namespace tbox { +namespace http { +namespace sse { + +//! SSE 连接 +//! 包装从 HTTP 升级后分离出来的 TcpConnection,提供 SSE 事件推送功能 +//! 生命期由 SseServer 通过 Cabinet 管理,用户通过 ConnToken 访问 +//! SSE 是单向推送协议(服务端→客户端),不需要解析客户端数据 +class SseConnection { + public: + //! 内部回调:SseServer::Impl 绑定 ConnToken,不传递 SseConnection* + using CloseCallback = std::function; + using SendCompleteCallback = std::function; + + ~SseConnection(); + + NONCOPYABLE(SseConnection); + IMMOVABLE(SseConnection); + + public: + //! 设置回调(由 SseServer::Impl 调用,绑定 ConnToken) + void setCloseCallback(const CloseCallback &cb) { close_cb_ = cb; } + void setSendCompleteCallback(const SendCompleteCallback &cb) { send_complete_cb_ = cb; } + void setContextLogEnable(bool enable) { context_log_enable_ = enable; } + + public: + //! 发送简单数据(event 类型默认 "message") + bool send(const std::string &data); + + //! 发送完整 SSE 事件 + bool send(const SseEvent &event); + + //! 发送心跳注释行(保持连接活跃) + //! 格式:": \n\n" + bool sendHeartbeat(const std::string &comment = "keep-alive"); + + //! 关闭连接(断开 TcpConnection) + bool close(); + + //! 获取客户端地址 + network::SockAddr peerAddr() const; + + //! 获取浏览器重连时携带的 Last-Event-ID + std::string getLastEventId() const; + + //! 获取客户端连接的 URL 路径 + std::string getUrl() const; + + //! 连接是否已失效 + bool isExpired() const; + + //! 设置/获取上下文数据(委托给底层 TcpConnection) + using ContextDeleter = network::TcpConnection::ContextDeleter; + void setContext(void *context, ContextDeleter &&deleter = nullptr); + void* getContext() const; + + private: + //! 仅由 SseServer 创建(生命期由 Cabinet 管理) + SseConnection(event::Loop *wp_loop, + network::TcpConnection *tcp_conn, + const std::string &url, + const std::string &last_event_id); + + void onTcpDisconnected(); + void onTcpSendCompleted(); + + private: + event::Loop *wp_loop_; + network::TcpConnection *sp_tcp_conn_; + std::string url_; + std::string last_event_id_; //! 浏览器重连时的 Last-Event-ID + + CloseCallback close_cb_; + SendCompleteCallback send_complete_cb_; + + int cb_level_ = 0; + bool context_log_enable_ = false; + + friend class SseServer; +}; + +} +} +} + +#endif //TBOX_HTTP_SSE_CONNECTION_H_20260616 diff --git a/modules/http/server/sse/sse_connection_test.cpp b/modules/http/server/sse/sse_connection_test.cpp new file mode 100644 index 0000000..d578833 --- /dev/null +++ b/modules/http/server/sse/sse_connection_test.cpp @@ -0,0 +1,49 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include +#include "sse_connection.h" + +namespace tbox { +namespace http { +namespace sse { + +//! SseConnection 的完整测试需要集成环境(HTTP Server + SseServer) +//! 因为 TcpConnection 构造函数为 private(仅 TcpAcceptor/TcpConnector 可创建) +//! 这里仅验证 SseConnection 的创建约束与设计合理性 + +TEST(SseConnection, CannotCreateWithoutTcpConnection) +{ + //! SseConnection 构造函数需要 TcpConnection* + //! TcpConnection 的构造函数为 private,无法在单元测试中独立创建 + //! SseConnection 的 send/close/peerAddr 等操作均依赖 TcpConnection + //! 完整的集成测试在 sse_server_test.cpp 中通过 SseServer 来间接验证 +} + +TEST(SseConnection, DesignConstraints) +{ + //! SseConnection 是单向推送协议(服务端→客户端) + //! 不需要 MessageCallback(与 WsConnection 不同) + //! 需要 receiveCallback 注册空函数体(阈值=0)以保持读事件监听 + //! 用于检测浏览器关闭连接和保持 TCP 写事件正常 +} + +} +} +} diff --git a/modules/http/server/sse/sse_event.cpp b/modules/http/server/sse/sse_event.cpp new file mode 100644 index 0000000..0bc17b2 --- /dev/null +++ b/modules/http/server/sse/sse_event.cpp @@ -0,0 +1,78 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include "sse_event.h" + +#include + +namespace tbox { +namespace http { +namespace sse { + +std::string SseEvent::toString() const +{ + //! SSE 协议格式(W3C/WHATWG EventSource 规范): + //! 每个字段以 "field: value\n" 格式输出 + //! 事件以空行 "\n" 结束(标志事件完成) + //! + //! 字段输出顺序:retry → id → event → data → 空行 + //! 顺序不影响浏览器解析,但统一顺序便于调试 + + std::string result; + + //! retry 字段(可选,仅在 retry> 0 时输出) + //! 告知浏览器断线后多久自动重连 + if (retry> 0) + result += "retry: " + std::to_string(retry) + "\n"; + + //! id 字段(可选) + //! 浏览器重连时通过 Last-Event-ID 头部携带此值 + if (!id.empty()) + result += "id: " + id + "\n"; + + //! event 字段(可选) + //! 默认为 "message",浏览器通过 .onmessage 监听 + //! 自定义 event 类型通过 .addEventListener(event, ...) 监听 + //! 不输出默认值 "message",减少传输量 + if (!event.empty() && event != "message") + result += "event: " + event + "\n"; + + //! data 字段(必须) + //! 多行 data 自动拆分为多个 `data:` 行 + //! 例如 data="line1\nline2" 输出为 "data: line1\ndata: line2\n" + if (!data.empty()) { + std::istringstream iss(data); + std::string line; + while (std::getline(iss, line)) + result += "data: " + line + "\n"; + } else { + //! data 为空时仍需输出空 data 行(保持协议完整性) + result += "data:\n"; + } + + //! 事件结束标志(空行) + //! 浏览器 EventSource 在收到空行时认为事件完成并触发回调 + result += "\n"; + + return result; +} + +} +} +} diff --git a/modules/http/server/sse/sse_event.h b/modules/http/server/sse/sse_event.h new file mode 100644 index 0000000..20e5fa4 --- /dev/null +++ b/modules/http/server/sse/sse_event.h @@ -0,0 +1,73 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#ifndef TBOX_HTTP_SSE_EVENT_H_20260616 +#define TBOX_HTTP_SSE_EVENT_H_20260616 + +#include +#include + +namespace tbox { +namespace http { +namespace sse { + +//! SSE 事件(Server-Sent Events, W3C/WHATWG 规范) +//! 对应 SSE 协议中的字段:id、event、data、retry +//! toString() 将事件格式化为标准 SSE 文本格式 +struct SseEvent { + //! 事件 ID(可选) + //! 对应 `id:` 字段,浏览器重连时通过 Last-Event-ID 头部携带此值 + //! 用于实现断线续传:服务端可根据 Last-Event-ID 从断点继续推送 + std::string id; + + //! 事件类型(可选) + //! 对应 `event:` 字段,默认为 "message" + //! 浏览器 EventSource 对象通过 .onmessage 或 .addEventListener(event, ...) 监听 + std::string event; + + //! 数据(必须) + //! 对应 `data:` 字段,支持多行文本 + //! toString() 会自动将多行 data 拆分为多个 `data:` 行 + std::string data; + + //! 重连间隔毫秒数(可选) + //! 对应 `retry:` 字段,告知浏览器断线后多久自动重连 + //! 仅在 retry> 0 时输出 + int retry = 0; + + //! 将事件格式化为 SSE 文本协议格式 + //! 输出规则(W3C/WHATWG EventSource 规范): + //! - retry> 0 时输出 "retry: \n" + //! - id 非空时输出 "id: \n" + //! - event 非空且不等于 "message" 时输出 "event: \n" + //! - data 按行拆分,每行输出 "data: \n" + //! - 最后以空行 "\n" 结束(标志事件完成) + //! + //! 示例输出: + //! "id: 42\nevent: update\ndata: hello\n\n" + //! 多行 data: + //! "data: line1\ndata: line2\n\n" + std::string toString() const; +}; + +} +} +} + +#endif //TBOX_HTTP_SSE_EVENT_H_20260616 diff --git a/modules/http/server/sse/sse_event_test.cpp b/modules/http/server/sse/sse_event_test.cpp new file mode 100644 index 0000000..6a5460c --- /dev/null +++ b/modules/http/server/sse/sse_event_test.cpp @@ -0,0 +1,140 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include +#include "sse_event.h" + +namespace tbox { +namespace http { +namespace sse { + +//! === SseEvent 构造测试 === + +TEST(SseEvent, DefaultValues) +{ + SseEvent evt; + EXPECT_EQ(evt.id, ""); + EXPECT_EQ(evt.event, ""); + EXPECT_EQ(evt.data, ""); + EXPECT_EQ(evt.retry, 0); +} + +//! === SseEvent::toString() 格式化测试 === + +TEST(SseEvent, ToStringSimpleData) +{ + //! 简单数据:只输出 data 字段 + SseEvent evt; + evt.data = "hello world"; + + //! 预期输出:"data: hello world\n\n" + EXPECT_EQ(evt.toString(), "data: hello world\n\n"); +} + +TEST(SseEvent, ToStringWithId) +{ + //! 带 id 的数据 + SseEvent evt; + evt.id = "42"; + evt.data = "hello"; + + //! 预期输出:"id: 42\ndata: hello\n\n" + EXPECT_EQ(evt.toString(), "id: 42\ndata: hello\n\n"); +} + +TEST(SseEvent, ToStringWithEvent) +{ + //! 带 event 类型(非默认 "message") + SseEvent evt; + evt.event = "update"; + evt.data = "status ok"; + + //! 预期输出:"event: update\ndata: status ok\n\n" + EXPECT_EQ(evt.toString(), "event: update\ndata: status ok\n\n"); +} + +TEST(SseEvent, ToStringWithDefaultEvent) +{ + //! event 为 "message"(默认值)时不输出 event 字段 + SseEvent evt; + evt.event = "message"; + evt.data = "hello"; + + //! 预期输出:"data: hello\n\n"(不输出 "event: message") + EXPECT_EQ(evt.toString(), "data: hello\n\n"); +} + +TEST(SseEvent, ToStringWithRetry) +{ + //! 带 retry 字段 + SseEvent evt; + evt.retry = 3000; + evt.data = "hello"; + + //! 预期输出:"retry: 3000\ndata: hello\n\n" + EXPECT_EQ(evt.toString(), "retry: 3000\ndata: hello\n\n"); +} + +TEST(SseEvent, ToStringRetryZeroNotOutput) +{ + //! retry = 0时不输出 retry 字段 + SseEvent evt; + evt.retry = 0; + evt.data = "hello"; + + //! 预期输出:"data: hello\n\n"(不输出 "retry: 0") + EXPECT_EQ(evt.toString(), "data: hello\n\n"); +} + +TEST(SseEvent, ToStringMultilineData) +{ + //! 多行 data 自动拆分为多个 data: 行 + SseEvent evt; + evt.data = "line1\nline2\nline3"; + + //! 预期输出:"data: line1\ndata: line2\ndata: line3\n\n" + EXPECT_EQ(evt.toString(), "data: line1\ndata: line2\ndata: line3\n\n"); +} + +TEST(SseEvent, ToStringCompleteEvent) +{ + //! 完整事件:所有字段 + SseEvent evt; + evt.id = "123"; + evt.event = "update"; + evt.data = "status ok"; + evt.retry = 5000; + + //! 预期输出:"retry: 5000\nid: 123\nevent: update\ndata: status ok\n\n" + EXPECT_EQ(evt.toString(), "retry: 5000\nid: 123\nevent: update\ndata: status ok\n\n"); +} + +TEST(SseEvent, ToStringEmptyData) +{ + //! data 为空时输出空 data 行 + SseEvent evt; + evt.id = "1"; + + //! 预期输出:"id: 1\ndata:\n\n" + EXPECT_EQ(evt.toString(), "id: 1\ndata:\n\n"); +} + +} +} +} diff --git a/modules/http/server/sse/sse_server.h b/modules/http/server/sse/sse_server.h new file mode 100644 index 0000000..5e8d255 --- /dev/null +++ b/modules/http/server/sse/sse_server.h @@ -0,0 +1,134 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#ifndef TBOX_HTTP_SSE_SERVER_H_20260616 +#define TBOX_HTTP_SSE_SERVER_H_20260616 + +#include +#include +#include +#include + +#include +#include + +#include "sse_event.h" + +namespace tbox { +namespace http { +namespace sse { + +//! SSE 服务器(Server-Sent Events, W3C/WHATWG 规范) +//! 基于 HTTP 服务器运行,本身即为 HTTP 中间件 +//! 检测 SSE 请求(Accept: text/event-stream),设置 200 响应头 +//! 通过 upgrade_cb 机制接管 TcpConnection,提供 SSE 事件推送功能 +//! SSE 是单向推送协议(服务端→客户端),无 MessageCallback +//! 通过 Cabinet 管理 SseConnection 生命期,用户通过 ConnToken 操作连接 +class SseServer { + public: + using ConnToken = cabinet::Token; + + explicit SseServer(event::Loop *wp_loop); + ~SseServer(); + + NONCOPYABLE(SseServer); + IMMOVABLE(SseServer); + + public: + //! 初始化:关联到 HTTP 服务器 + //! URL 路径匹配规则: + //! - url_path 以 '/' 结尾:前缀匹配,如 "/sse/" 匹配 "/sse/aa"、" /sse/bb/cc" + //! - url_path 不以 '/' 结尾:全量匹配,如 "/sse" 仅匹配 "/sse" + //! - url_path 为空字符串:匹配所有 SSE 请求 + bool initialize(http::server::Server *http_server, const std::string &url_path = ""); + bool start(); + void stop(); + void cleanup(); + + enum class State { kNone, kInited, kRunning }; + State state() const; + + public: + //! 设置回调(SSE 是单向推送,无 MessageCallback) + using ConnectedCallback = std::function; + using DisconnectedCallback = std::function; + + void setConnectedCallback(const ConnectedCallback &cb); + void setDisconnectedCallback(const DisconnectedCallback &cb); + + public: + //! 向指定客户端发送数据(简单文本,event 类型默认 "message") + //! 格式:"data: \n\n" + bool send(const ConnToken &client, const std::string &data); + + //! 向指定客户端发送完整 SSE 事件 + bool send(const ConnToken &client, const SseEvent &event); + + //! 向所有客户端广播数据 + bool sendToAll(const std::string &data); + + //! 向所有客户端广播事件 + bool sendToAll(const SseEvent &event); + + //! 关闭指定客户端连接 + bool close(const ConnToken &client); + + //! 发送心跳注释行(保持连接活跃) + //! 格式:": \n\n",浏览器 EventSource 忽略以 ":" 开头的行 + bool sendHeartbeat(const ConnToken &client, const std::string &comment = "keep-alive"); + + //! 设置自动心跳间隔(默认 0 = 禁用) + //! 启用后,定时器每 interval 毫秒向所有连接发送 ": keep-alive\n\n" + void setHeartbeatInterval(std::chrono::milliseconds interval); + + //! 检查客户端连接是否有效 + bool isClientValid(const ConnToken &client) const; + + //! 获取客户端地址(含 IP 与端口) + network::SockAddr peerAddr(const ConnToken &client) const; + + //! 获取客户端请求的 Last-Event-ID(浏览器重连时携带) + //! 用于实现断线续传:服务端可根据此值从断点继续推送 + std::string getLastEventId(const ConnToken &client) const; + + //! 获取客户端连接的 URL 路径 + std::string getUrl(const ConnToken &client) const; + + //! 设置/获取客户端连接的上下文数据 + using ContextDeleter = std::function; + void setContext(const ConnToken &client, void *context, ContextDeleter &&deleter = nullptr); + void* getContext(const ConnToken &client) const; + + void setContextLogEnable(bool enable); + + public: + //! 检查请求是否为有效的 SSE 请求 + //! 条件:GET 方法 + Accept 头包含 "text/event-stream" + static bool IsSseRequest(const http::Request &req); + + private: + class Impl; + Impl *impl_; +}; + +} +} +} + +#endif //TBOX_HTTP_SSE_SERVER_H_20260616 diff --git a/modules/http/server/sse/sse_server_impl.cpp b/modules/http/server/sse/sse_server_impl.cpp new file mode 100644 index 0000000..2c594fd --- /dev/null +++ b/modules/http/server/sse/sse_server_impl.cpp @@ -0,0 +1,522 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include "sse_server.h" +#include "sse_server_impl.h" + +#include +#include +#include +#include + +#include +#include + +#undef MODULE_ID +#define MODULE_ID "tbox.http.sse" + +namespace tbox { +namespace http { +namespace sse { + +using namespace std::placeholders; + +//! === 生命周期 === + +SseServer::Impl::Impl(SseServer *wp_parent, event::Loop *wp_loop) + : wp_parent_(wp_parent) + , wp_loop_(wp_loop) +{ + //! 创建心跳定时器(但不初始化,等 setHeartbeatInterval 时初始化) + sp_heartbeat_timer_ = wp_loop_->newTimerEvent(); +} + +SseServer::Impl::~Impl() +{ + TBOX_ASSERT(cb_level_ == 0); + cleanup(); + CHECK_DELETE_RESET_OBJ(sp_heartbeat_timer_); +} + +bool SseServer::Impl::initialize(http::server::Server *http_server, const std::string &url_path) +{ + if (state_ != SseServer::State::kNone) + return false; + + //! 记录 URL 路径 + url_path_ = url_path; + + //! 记录 HTTP 服务器指针(不立即注册中间件,等 start() 时注册) + wp_http_server_ = http_server; + + state_ = SseServer::State::kInited; + return true; +} + +bool SseServer::Impl::start() +{ + if (state_ != SseServer::State::kInited) + return false; + + //! 注册自身到 HTTP 服务器(SseServer::Impl 即为 Middleware) + mw_token_ = wp_http_server_->use(this); + + //! 如果心跳间隔已设置,启用心跳定时器 + if (heartbeat_interval_.count()> 0) { + sp_heartbeat_timer_->initialize(heartbeat_interval_, event::Event::Mode::kPersist); + sp_heartbeat_timer_->setCallback(std::bind(&SseServer::Impl::onHeartbeatTimer, this)); + sp_heartbeat_timer_->enable(); + } + + state_ = SseServer::State::kRunning; + return true; +} + +void SseServer::Impl::stop() +{ + if (state_ != SseServer::State::kRunning) + return; + + //! 从 HTTP 服务器反注册中间件 + wp_http_server_->unuse(mw_token_); + mw_token_.reset(); + + //! 停止心跳定时器 + sp_heartbeat_timer_->disable(); + + //! 清除 SseConnection 内部回调,防止断开时回调到 Impl + sse_conns_.foreach([](SseConnection *conn) { + conn->setCloseCallback(nullptr); + conn->setSendCompleteCallback(nullptr); + }); + + //! 删除所有 SseConnection(析构时会断开并延后删除 TcpConnection) + sse_conns_.foreach([](SseConnection *conn) { + delete conn; + }); + sse_conns_.clear(); + + state_ = SseServer::State::kInited; +} + +void SseServer::Impl::cleanup() +{ + if (state_ == SseServer::State::kNone) + return; + + if (state_ == SseServer::State::kRunning) + stop(); + + wp_http_server_ = nullptr; + + connected_cb_ = nullptr; + disconnected_cb_ = nullptr; + heartbeat_interval_ = std::chrono::milliseconds(0); + + state_ = SseServer::State::kNone; +} + +//! === Middleware 接口实现 === + +void SseServer::Impl::handle(http::server::ContextSptr sp_ctx, const http::server::NextFunc &next) +{ + auto &req = sp_ctx->req(); + + if (IsSseRequest(req)) { + //! URL 路径匹配规则: + //! - url_path_ 以 '/' 结尾:前缀匹配 + //! - url_path_ 不以 '/' 结尾:全量匹配 + //! - url_path_ 为空字符串:匹配所有 SSE 请求 + if (!url_path_.empty()) { + bool matched = false; + if (url_path_.back() == '/') { + //! 前缀匹配 + matched = util::string::IsStartWith(req.url.path, url_path_); + } else { + //! 全量匹配 + matched = (req.url.path == url_path_); + } + if (!matched) { + //! 不是本服务关心的 URL,传递给下一个中间件 + next(); + return; + } + } + + LogDbg("sse request: %s", req.url.path.c_str()); + + auto &res = sp_ctx->res(); + + //! 设置 200 OK 响应(SSE 不是协议升级,使用 200) + res.status_code = http::StatusCode::k200_OK; + res.http_ver = http::HttpVer::k1_1; + + //! SSE 必需的响应头 + res.headers["Content-Type"] = "text/event-stream"; + res.headers["Cache-Control"] = "no-cache"; + res.headers["Connection"] = "keep-alive"; + + //! 从请求中提取 Last-Event-ID(浏览器重连时携带) + std::string last_event_id; + auto id_iter = req.headers.find("Last-Event-ID"); + if (id_iter != req.headers.end()) + last_event_id = id_iter->second; + + //! 注册升级回调:HTTP 服务器发送 200 响应后,将 TcpConnection 交给 SseServer + //! 与 WebSocket 使用同一套 upgrade_cb 机制 + res.upgrade_cb = std::bind(&SseServer::Impl::onSseUpgrade, this, _1, req.url.path, last_event_id); + + //! SSE 请求已处理,不再调用 next() + return; + } + + //! 非 SSE 请求,传递给下一个中间件 + next(); +} + +//! === 升级与连接管理 === + +void SseServer::Impl::onSseUpgrade(network::TcpConnection *tcp_conn, + const std::string &url_path, + const std::string &last_event_id) +{ + RECORD_SCOPE(); + LogDbg("sse upgrade: new connection from %s", tcp_conn->peerAddr().toString().c_str()); + + //! 创建 SseConnection,并存入 Cabinet + //! 传入 URL 路径和 Last-Event-ID,供用户后续查询 + SseConnection *sse_conn = new SseConnection(wp_loop_, tcp_conn, url_path, last_event_id); + ConnToken sse_token = sse_conns_.alloc(sse_conn); + + //! 设置 SseConnection 的回调(bind 捕获 ConnToken,不传递 SseConnection*) + sse_conn->setCloseCallback(std::bind(&SseServer::Impl::onSseDisconnected, this, sse_token)); + sse_conn->setContextLogEnable(context_log_enable_); + + //! 通知用户(传递 ConnToken) + if (connected_cb_) { + ++cb_level_; + connected_cb_(sse_token); + --cb_level_; + } +} + +void SseServer::Impl::onSseDisconnected(const ConnToken &client) +{ + RECORD_SCOPE(); + LogDbg("sse disconnected"); + + //! 先通知用户(此时 ConnToken 在 Cabinet 中仍有效) + //! 用户可通过 ConnToken 调用 SseServer 方法获取连接信息 + if (disconnected_cb_) { + ++cb_level_; + disconnected_cb_(client); + --cb_level_; + } + + //! 从 Cabinet 中移除并获取指针 + SseConnection *sse_conn = sse_conns_.free(client); + + //! 延后删除 SseConnection(确保回调中还能访问对象) + wp_loop_->runNext([sse_conn] { CHECK_DELETE_OBJ(sse_conn); }, + "SseServer::onSseDisconnected, delete sse_conn"); +} + +//! === 心跳定时器 === + +void SseServer::Impl::onHeartbeatTimer() +{ + //! 向所有连接发送心跳注释行 + sse_conns_.foreach([](SseConnection *conn) { + conn->sendHeartbeat("keep-alive"); + }); +} + +void SseServer::Impl::setHeartbeatInterval(std::chrono::milliseconds interval) +{ + heartbeat_interval_ = interval; + + //! 如果已在运行中,动态调整心跳定时器 + if (state_ == SseServer::State::kRunning) { + sp_heartbeat_timer_->disable(); + + if (interval.count()> 0) { + sp_heartbeat_timer_->initialize(interval, event::Event::Mode::kPersist); + sp_heartbeat_timer_->setCallback(std::bind(&SseServer::Impl::onHeartbeatTimer, this)); + sp_heartbeat_timer_->enable(); + } + } +} + +//! === 通过 ConnToken 操作连接 === + +bool SseServer::Impl::send(const ConnToken &client, const std::string &data) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->send(data); + return false; +} + +bool SseServer::Impl::send(const ConnToken &client, const SseEvent &event) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->send(event); + return false; +} + +bool SseServer::Impl::sendToAll(const std::string &data) +{ + bool all_ok = true; + sse_conns_.foreach([&](SseConnection *conn) { + if (!conn->send(data)) + all_ok = false; + }); + return all_ok; +} + +bool SseServer::Impl::sendToAll(const SseEvent &event) +{ + bool all_ok = true; + sse_conns_.foreach([&](SseConnection *conn) { + if (!conn->send(event)) + all_ok = false; + }); + return all_ok; +} + +bool SseServer::Impl::close(const ConnToken &client) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->close(); + return false; +} + +bool SseServer::Impl::sendHeartbeat(const ConnToken &client, const std::string &comment) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->sendHeartbeat(comment); + return false; +} + +bool SseServer::Impl::isClientValid(const ConnToken &client) const +{ + return sse_conns_.at(client) != nullptr; +} + +network::SockAddr SseServer::Impl::peerAddr(const ConnToken &client) const +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->peerAddr(); + return network::SockAddr(); +} + +std::string SseServer::Impl::getLastEventId(const ConnToken &client) const +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->getLastEventId(); + return ""; +} + +std::string SseServer::Impl::getUrl(const ConnToken &client) const +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->getUrl(); + return ""; +} + +void SseServer::Impl::setContext(const ConnToken &client, void *context, ContextDeleter &&deleter) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + sse_conn->setContext(context, std::move(deleter)); +} + +void* SseServer::Impl::getContext(const ConnToken &client) const +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->getContext(); + return nullptr; +} + +void SseServer::Impl::setContextLogEnable(bool enable) +{ + context_log_enable_ = enable; + sse_conns_.foreach([&](SseConnection *conn) { + conn->setContextLogEnable(enable); + }); +} + +//! === 静态辅助方法 === + +bool SseServer::Impl::IsSseRequest(const http::Request &req) +{ + //! SSE 请求检测条件: + //! 1) 必须是 GET 方法 + //! 2) Accept 头必须包含 "text/event-stream" + //! 3) 不检查 Upgrade 头(SSE 不是协议升级) + + if (req.method != http::Method::kGet) + return false; + + auto accept_iter = req.headers.find("Accept"); + if (accept_iter == req.headers.end()) + return false; + + //! 检查 Accept 头是否包含 text/event-stream + //! 注意:Accept 头可能包含多个值,如 "text/event-stream, text/html" + if (accept_iter->second.find("text/event-stream") == std::string::npos) + return false; + + return true; +} + +//! === SseServer 外部接口 === + +SseServer::SseServer(event::Loop *wp_loop) + : impl_(new Impl(this, wp_loop)) +{ + TBOX_ASSERT(wp_loop != nullptr); +} + +SseServer::~SseServer() +{ + CHECK_DELETE_RESET_OBJ(impl_); +} + +bool SseServer::initialize(http::server::Server *http_server, const std::string &url_path) +{ + TBOX_ASSERT(http_server != nullptr); + return impl_->initialize(http_server, url_path); +} + +bool SseServer::start() +{ + return impl_->start(); +} + +void SseServer::stop() +{ + impl_->stop(); +} + +void SseServer::cleanup() +{ + impl_->cleanup(); +} + +SseServer::State SseServer::state() const +{ + return impl_->state(); +} + +void SseServer::setConnectedCallback(const ConnectedCallback &cb) +{ + impl_->setConnectedCallback(cb); +} + +void SseServer::setDisconnectedCallback(const DisconnectedCallback &cb) +{ + impl_->setDisconnectedCallback(cb); +} + +bool SseServer::send(const ConnToken &client, const std::string &data) +{ + return impl_->send(client, data); +} + +bool SseServer::send(const ConnToken &client, const SseEvent &event) +{ + return impl_->send(client, event); +} + +bool SseServer::sendToAll(const std::string &data) +{ + return impl_->sendToAll(data); +} + +bool SseServer::sendToAll(const SseEvent &event) +{ + return impl_->sendToAll(event); +} + +bool SseServer::close(const ConnToken &client) +{ + return impl_->close(client); +} + +bool SseServer::sendHeartbeat(const ConnToken &client, const std::string &comment) +{ + return impl_->sendHeartbeat(client, comment); +} + +void SseServer::setHeartbeatInterval(std::chrono::milliseconds interval) +{ + impl_->setHeartbeatInterval(interval); +} + +bool SseServer::isClientValid(const ConnToken &client) const +{ + return impl_->isClientValid(client); +} + +network::SockAddr SseServer::peerAddr(const ConnToken &client) const +{ + return impl_->peerAddr(client); +} + +std::string SseServer::getLastEventId(const ConnToken &client) const +{ + return impl_->getLastEventId(client); +} + +std::string SseServer::getUrl(const ConnToken &client) const +{ + return impl_->getUrl(client); +} + +void SseServer::setContext(const ConnToken &client, void *context, ContextDeleter &&deleter) +{ + impl_->setContext(client, context, std::move(deleter)); +} + +void* SseServer::getContext(const ConnToken &client) const +{ + return impl_->getContext(client); +} + +void SseServer::setContextLogEnable(bool enable) +{ + impl_->setContextLogEnable(enable); +} + +bool SseServer::IsSseRequest(const http::Request &req) +{ + return Impl::IsSseRequest(req); +} + +} +} +} diff --git a/modules/http/server/sse/sse_server_impl.h b/modules/http/server/sse/sse_server_impl.h new file mode 100644 index 0000000..95367f4 --- /dev/null +++ b/modules/http/server/sse/sse_server_impl.h @@ -0,0 +1,137 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#ifndef TBOX_HTTP_SSE_SERVER_IMPL_H_20260616 +#define TBOX_HTTP_SSE_SERVER_IMPL_H_20260616 + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "sse_server.h" +#include "sse_connection.h" + +namespace tbox { +namespace http { +namespace sse { + +//! SseServer::Impl 同时充当 HTTP 中间件 +//! 检测 SSE 请求(Accept: text/event-stream),设置 200 响应头,注册 upgrade_cb +//! 通过 Cabinet 管理 SseConnection 生命期,所有操作基于 ConnToken +//! SSE 是单向推送协议,无 MessageCallback +class SseServer::Impl : public http::server::Middleware { + public: + Impl(SseServer *wp_parent, event::Loop *wp_loop); + virtual ~Impl(); + + public: + bool initialize(http::server::Server *http_server, const std::string &url_path = ""); + bool start(); + void stop(); + void cleanup(); + + SseServer::State state() const { return state_; } + + public: + void setConnectedCallback(const SseServer::ConnectedCallback &cb) { connected_cb_ = cb; } + void setDisconnectedCallback(const SseServer::DisconnectedCallback &cb) { disconnected_cb_ = cb; } + + public: + //! 通过 ConnToken 操作连接 + bool send(const ConnToken &client, const std::string &data); + bool send(const ConnToken &client, const SseEvent &event); + bool sendToAll(const std::string &data); + bool sendToAll(const SseEvent &event); + bool close(const ConnToken &client); + bool sendHeartbeat(const ConnToken &client, const std::string &comment); + bool isClientValid(const ConnToken &client) const; + network::SockAddr peerAddr(const ConnToken &client) const; + std::string getLastEventId(const ConnToken &client) const; + std::string getUrl(const ConnToken &client) const; + void setHeartbeatInterval(std::chrono::milliseconds interval); + + //! 上下文数据操作(委托到 SseConnection → TcpConnection) + using ContextDeleter = network::TcpConnection::ContextDeleter; + void setContext(const ConnToken &client, void *context, ContextDeleter &&deleter = nullptr); + void* getContext(const ConnToken &client) const; + + void setContextLogEnable(bool enable); + + public: + //! Middleware 接口:处理 HTTP 请求,检测 SSE 请求 + virtual void handle(http::server::ContextSptr sp_ctx, const http::server::NextFunc &next) override; + + //! 静态辅助方法 + static bool IsSseRequest(const http::Request &req); + + private: + //! 当 HTTP 服务器发送 200 响应后回调此函数 + //! 将 TcpConnection 从 HTTP 服务器分离,交给 SseServer 管理 + void onSseUpgrade(network::TcpConnection *tcp_conn, + const std::string &url_path, + const std::string &last_event_id); + + //! 当 SseConnection 断开时回调(参数为 ConnToken) + void onSseDisconnected(const ConnToken &client); + + //! 心跳定时器回调:向所有连接发送注释行 + void onHeartbeatTimer(); + + private: + SseServer *wp_parent_; + event::Loop *wp_loop_; + + http::server::Server *wp_http_server_ = nullptr; + //! URL 路径匹配规则: + //! - url_path_ 以 '/' 结尾:前缀匹配,如 "/sse/" 匹配 "/sse/aa" + //! - url_path_ 不以 '/' 结尾:全量匹配,如 "/sse" 仅匹配 "/sse" + //! - url_path_ 为空字符串:匹配所有 SSE 请求 + std::string url_path_; + + //! 中间件 token(由 HTTP Server 的 use() 返回,用于 unuse() 反注册) + http::server::MiddlewareToken mw_token_; + + //! SseConnection 容器(生命期管理) + cabinet::Cabinet sse_conns_; + + //! 心跳定时器(可选,默认禁用) + event::TimerEvent *sp_heartbeat_timer_ = nullptr; + std::chrono::milliseconds heartbeat_interval_{0}; + + SseServer::State state_ = SseServer::State::kNone; + + SseServer::ConnectedCallback connected_cb_; + SseServer::DisconnectedCallback disconnected_cb_; + + int cb_level_ = 0; + bool context_log_enable_ = false; +}; + +} +} +} + +#endif //TBOX_HTTP_SSE_SERVER_IMPL_H_20260616 diff --git a/modules/http/server/sse/sse_server_test.cpp b/modules/http/server/sse/sse_server_test.cpp new file mode 100644 index 0000000..d3ad31e --- /dev/null +++ b/modules/http/server/sse/sse_server_test.cpp @@ -0,0 +1,92 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include +#include +#include "sse_server.h" + +namespace tbox { +namespace http { +namespace sse { + +//! === SseServer::IsSseRequest() 请求检测测试 === + +TEST(SseServer, DetectSseRequest) +{ + //! 合法的 SSE 请求:GET + Accept: text/event-stream + http::Request req; + req.method = http::Method::kGet; + req.http_ver = http::HttpVer::k1_1; + req.headers["Accept"] = "text/event-stream"; + + EXPECT_TRUE(SseServer::IsSseRequest(req)); +} + +TEST(SseServer, DetectSseRequestWithMultipleAccept) +{ + //! Accept 头包含多个值时,仍能检测到 text/event-stream + http::Request req; + req.method = http::Method::kGet; + req.headers["Accept"] = "text/event-stream, text/html;q=0.9"; + + EXPECT_TRUE(SseServer::IsSseRequest(req)); +} + +TEST(SseServer, DetectNonSseRequestNoAccept) +{ + //! 缺少 Accept 头:不是 SSE 请求 + http::Request req; + req.method = http::Method::kGet; + + EXPECT_FALSE(SseServer::IsSseRequest(req)); +} + +TEST(SseServer, DetectNonSseRequestHtmlAccept) +{ + //! Accept 头不包含 text/event-stream:不是 SSE 请求 + http::Request req; + req.method = http::Method::kGet; + req.headers["Accept"] = "text/html"; + + EXPECT_FALSE(SseServer::IsSseRequest(req)); +} + +TEST(SseServer, DetectNonSsePostRequest) +{ + //! POST 方法:不是 SSE 请求(SSE 必须是 GET) + http::Request req; + req.method = http::Method::kPost; + req.headers["Accept"] = "text/event-stream"; + + EXPECT_FALSE(SseServer::IsSseRequest(req)); +} + +TEST(SseServer, DetectNonSsePutRequest) +{ + //! PUT 方法:不是 SSE 请求 + http::Request req; + req.method = http::Method::kPut; + req.headers["Accept"] = "text/event-stream"; + + EXPECT_FALSE(SseServer::IsSseRequest(req)); +} + +} +} +} -- Gitee From 8f7951cc6149a07dbf0764cf20d4ef1f2b29ed57 Mon Sep 17 00:00:00 2001 From: Hevake Lee Date: 2026年6月18日 10:07:28 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat(http.sse),=20=E4=BC=98=E5=8C=96=20sse?= =?UTF-8?q?=20=E6=9C=8D=E5=8A=A1=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/http/CMakeLists.txt | 2 +- modules/http/Makefile | 2 +- modules/http/server/sse/sse_server.h | 19 +++++++------ modules/http/server/sse/sse_server_impl.cpp | 5 ---- modules/http/server/sse/sse_server_impl.h | 12 ++++---- ...rver_test.cpp => sse_server_impl_test.cpp} | 28 +++++++++---------- 6 files changed, 31 insertions(+), 37 deletions(-) rename modules/http/server/sse/{sse_server_test.cpp => sse_server_impl_test.cpp} (73%) diff --git a/modules/http/CMakeLists.txt b/modules/http/CMakeLists.txt index 33c1831..9b36124 100644 --- a/modules/http/CMakeLists.txt +++ b/modules/http/CMakeLists.txt @@ -56,7 +56,7 @@ set(TBOX_HTTP_TEST_SOURCES url_test.cpp server/request_parser_test.cpp server/sse/sse_event_test.cpp - server/sse/sse_server_test.cpp + server/sse/sse_server_impl_test.cpp server/sse/sse_connection_test.cpp) add_library(${TBOX_LIBRARY_NAME} ${TBOX_BUILD_LIB_TYPE} ${TBOX_HTTP_SOURCES}) diff --git a/modules/http/Makefile b/modules/http/Makefile index 7b77a50..9c07400 100644 --- a/modules/http/Makefile +++ b/modules/http/Makefile @@ -71,7 +71,7 @@ TEST_CPP_SRC_FILES = \ url_test.cpp \ server/request_parser_test.cpp \ server/sse/sse_event_test.cpp \ - server/sse/sse_server_test.cpp \ + server/sse/sse_server_impl_test.cpp \ server/sse/sse_connection_test.cpp \ TEST_LDFLAGS := $(LDFLAGS) -ltbox_network -ltbox_log -ltbox_eventx -ltbox_event -ltbox_util -ltbox_base -ldl diff --git a/modules/http/server/sse/sse_server.h b/modules/http/server/sse/sse_server.h index 5e8d255..2f0ac1f 100644 --- a/modules/http/server/sse/sse_server.h +++ b/modules/http/server/sse/sse_server.h @@ -20,18 +20,23 @@ #ifndef TBOX_HTTP_SSE_SERVER_H_20260616 #define TBOX_HTTP_SSE_SERVER_H_20260616 -#include +#include +#include + +#include #include #include #include -#include -#include - #include "sse_event.h" namespace tbox { namespace http { + +namespace server { +class Server; +} + namespace sse { //! SSE 服务器(Server-Sent Events, W3C/WHATWG 规范) @@ -117,13 +122,9 @@ class SseServer { void setContextLogEnable(bool enable); - public: - //! 检查请求是否为有效的 SSE 请求 - //! 条件:GET 方法 + Accept 头包含 "text/event-stream" - static bool IsSseRequest(const http::Request &req); + class Impl; private: - class Impl; Impl *impl_; }; diff --git a/modules/http/server/sse/sse_server_impl.cpp b/modules/http/server/sse/sse_server_impl.cpp index 2c594fd..965e012 100644 --- a/modules/http/server/sse/sse_server_impl.cpp +++ b/modules/http/server/sse/sse_server_impl.cpp @@ -512,11 +512,6 @@ void SseServer::setContextLogEnable(bool enable) impl_->setContextLogEnable(enable); } -bool SseServer::IsSseRequest(const http::Request &req) -{ - return Impl::IsSseRequest(req); -} - } } } diff --git a/modules/http/server/sse/sse_server_impl.h b/modules/http/server/sse/sse_server_impl.h index 95367f4..5c035b0 100644 --- a/modules/http/server/sse/sse_server_impl.h +++ b/modules/http/server/sse/sse_server_impl.h @@ -27,9 +27,9 @@ #include #include -#include -#include -#include +#include "../server.h" +#include "../middleware.h" +#include "../context.h" #include "sse_server.h" #include "sse_connection.h" @@ -80,13 +80,13 @@ class SseServer::Impl : public http::server::Middleware { void setContextLogEnable(bool enable); + //! 静态辅助方法 + static bool IsSseRequest(const http::Request &req); + public: //! Middleware 接口:处理 HTTP 请求,检测 SSE 请求 virtual void handle(http::server::ContextSptr sp_ctx, const http::server::NextFunc &next) override; - //! 静态辅助方法 - static bool IsSseRequest(const http::Request &req); - private: //! 当 HTTP 服务器发送 200 响应后回调此函数 //! 将 TcpConnection 从 HTTP 服务器分离,交给 SseServer 管理 diff --git a/modules/http/server/sse/sse_server_test.cpp b/modules/http/server/sse/sse_server_impl_test.cpp similarity index 73% rename from modules/http/server/sse/sse_server_test.cpp rename to modules/http/server/sse/sse_server_impl_test.cpp index d3ad31e..a534cbb 100644 --- a/modules/http/server/sse/sse_server_test.cpp +++ b/modules/http/server/sse/sse_server_impl_test.cpp @@ -19,15 +19,13 @@ */ #include #include -#include "sse_server.h" +#include "sse_server_impl.h" namespace tbox { namespace http { namespace sse { -//! === SseServer::IsSseRequest() 请求检测测试 === - -TEST(SseServer, DetectSseRequest) +TEST(SseServerImpl, DetectSseRequest) { //! 合法的 SSE 请求:GET + Accept: text/event-stream http::Request req; @@ -35,56 +33,56 @@ TEST(SseServer, DetectSseRequest) req.http_ver = http::HttpVer::k1_1; req.headers["Accept"] = "text/event-stream"; - EXPECT_TRUE(SseServer::IsSseRequest(req)); + EXPECT_TRUE(SseServer::Impl::IsSseRequest(req)); } -TEST(SseServer, DetectSseRequestWithMultipleAccept) +TEST(SseServerImpl, DetectSseRequestWithMultipleAccept) { //! Accept 头包含多个值时,仍能检测到 text/event-stream http::Request req; req.method = http::Method::kGet; req.headers["Accept"] = "text/event-stream, text/html;q=0.9"; - EXPECT_TRUE(SseServer::IsSseRequest(req)); + EXPECT_TRUE(SseServer::Impl::IsSseRequest(req)); } -TEST(SseServer, DetectNonSseRequestNoAccept) +TEST(SseServerImpl, DetectNonSseRequestNoAccept) { //! 缺少 Accept 头:不是 SSE 请求 http::Request req; req.method = http::Method::kGet; - EXPECT_FALSE(SseServer::IsSseRequest(req)); + EXPECT_FALSE(SseServer::Impl::IsSseRequest(req)); } -TEST(SseServer, DetectNonSseRequestHtmlAccept) +TEST(SseServerImpl, DetectNonSseRequestHtmlAccept) { //! Accept 头不包含 text/event-stream:不是 SSE 请求 http::Request req; req.method = http::Method::kGet; req.headers["Accept"] = "text/html"; - EXPECT_FALSE(SseServer::IsSseRequest(req)); + EXPECT_FALSE(SseServer::Impl::IsSseRequest(req)); } -TEST(SseServer, DetectNonSsePostRequest) +TEST(SseServerImpl, DetectNonSsePostRequest) { //! POST 方法:不是 SSE 请求(SSE 必须是 GET) http::Request req; req.method = http::Method::kPost; req.headers["Accept"] = "text/event-stream"; - EXPECT_FALSE(SseServer::IsSseRequest(req)); + EXPECT_FALSE(SseServer::Impl::IsSseRequest(req)); } -TEST(SseServer, DetectNonSsePutRequest) +TEST(SseServerImpl, DetectNonSsePutRequest) { //! PUT 方法:不是 SSE 请求 http::Request req; req.method = http::Method::kPut; req.headers["Accept"] = "text/event-stream"; - EXPECT_FALSE(SseServer::IsSseRequest(req)); + EXPECT_FALSE(SseServer::Impl::IsSseRequest(req)); } } -- Gitee From 0ddb2c3a3f6b1f7baf2ea957556b880b2db2cae2 Mon Sep 17 00:00:00 2001 From: Hevake Lee Date: 2026年6月18日 10:26:50 +0800 Subject: [PATCH 3/4] tidy(sse) --- modules/http/server/sse/sse_connection.cpp | 16 +++------------- modules/http/server/sse/sse_connection.h | 14 +++++++------- modules/http/server/sse/sse_server_impl.cpp | 17 +++-------------- modules/http/server/sse/sse_server_impl.h | 5 ++--- 4 files changed, 15 insertions(+), 37 deletions(-) diff --git a/modules/http/server/sse/sse_connection.cpp b/modules/http/server/sse/sse_connection.cpp index 9841ff4..4468e61 100644 --- a/modules/http/server/sse/sse_connection.cpp +++ b/modules/http/server/sse/sse_connection.cpp @@ -30,9 +30,9 @@ namespace sse { using namespace std::placeholders; SseConnection::SseConnection(event::Loop *wp_loop, - network::TcpConnection *tcp_conn, - const std::string &url, - const std::string &last_event_id) + network::TcpConnection *tcp_conn, + const std::string &url, + const std::string &last_event_id) : wp_loop_(wp_loop) , sp_tcp_conn_(tcp_conn) , url_(url) @@ -136,16 +136,6 @@ network::SockAddr SseConnection::peerAddr() const return network::SockAddr(); } -std::string SseConnection::getLastEventId() const -{ - return last_event_id_; -} - -std::string SseConnection::getUrl() const -{ - return url_; -} - bool SseConnection::isExpired() const { return sp_tcp_conn_ == nullptr || sp_tcp_conn_->isExpired(); diff --git a/modules/http/server/sse/sse_connection.h b/modules/http/server/sse/sse_connection.h index d2ee2af..e9e936e 100644 --- a/modules/http/server/sse/sse_connection.h +++ b/modules/http/server/sse/sse_connection.h @@ -47,8 +47,8 @@ class SseConnection { public: //! 设置回调(由 SseServer::Impl 调用,绑定 ConnToken) - void setCloseCallback(const CloseCallback &cb) { close_cb_ = cb; } - void setSendCompleteCallback(const SendCompleteCallback &cb) { send_complete_cb_ = cb; } + void setCloseCallback(const CloseCallback &cb) { close_cb_ = cb; } + void setSendCompleteCallback(const SendCompleteCallback &cb) { send_complete_cb_ = cb; } void setContextLogEnable(bool enable) { context_log_enable_ = enable; } public: @@ -69,10 +69,10 @@ class SseConnection { network::SockAddr peerAddr() const; //! 获取浏览器重连时携带的 Last-Event-ID - std::string getLastEventId() const; + std::string getLastEventId() const { return last_event_id_; } //! 获取客户端连接的 URL 路径 - std::string getUrl() const; + std::string getUrl() const { return url_; } //! 连接是否已失效 bool isExpired() const; @@ -85,9 +85,9 @@ class SseConnection { private: //! 仅由 SseServer 创建(生命期由 Cabinet 管理) SseConnection(event::Loop *wp_loop, - network::TcpConnection *tcp_conn, - const std::string &url, - const std::string &last_event_id); + network::TcpConnection *tcp_conn, + const std::string &url, + const std::string &last_event_id); void onTcpDisconnected(); void onTcpSendCompleted(); diff --git a/modules/http/server/sse/sse_server_impl.cpp b/modules/http/server/sse/sse_server_impl.cpp index 965e012..144d747 100644 --- a/modules/http/server/sse/sse_server_impl.cpp +++ b/modules/http/server/sse/sse_server_impl.cpp @@ -42,10 +42,8 @@ using namespace std::placeholders; SseServer::Impl::Impl(SseServer *wp_parent, event::Loop *wp_loop) : wp_parent_(wp_parent) , wp_loop_(wp_loop) -{ - //! 创建心跳定时器(但不初始化,等 setHeartbeatInterval 时初始化) - sp_heartbeat_timer_ = wp_loop_->newTimerEvent(); -} + , sp_heartbeat_timer_(wp_loop->newTimerEvent()) +{ } SseServer::Impl::~Impl() { @@ -107,9 +105,7 @@ void SseServer::Impl::stop() }); //! 删除所有 SseConnection(析构时会断开并延后删除 TcpConnection) - sse_conns_.foreach([](SseConnection *conn) { - delete conn; - }); + sse_conns_.foreach([](SseConnection *conn) { delete conn; }); sse_conns_.clear(); state_ = SseServer::State::kInited; @@ -133,7 +129,6 @@ void SseServer::Impl::cleanup() } //! === Middleware 接口实现 === - void SseServer::Impl::handle(http::server::ContextSptr sp_ctx, const http::server::NextFunc &next) { auto &req = sp_ctx->req(); @@ -191,7 +186,6 @@ void SseServer::Impl::handle(http::server::ContextSptr sp_ctx, const http::serve } //! === 升级与连接管理 === - void SseServer::Impl::onSseUpgrade(network::TcpConnection *tcp_conn, const std::string &url_path, const std::string &last_event_id) @@ -238,7 +232,6 @@ void SseServer::Impl::onSseDisconnected(const ConnToken &client) } //! === 心跳定时器 === - void SseServer::Impl::onHeartbeatTimer() { //! 向所有连接发送心跳注释行 @@ -264,7 +257,6 @@ void SseServer::Impl::setHeartbeatInterval(std::chrono::milliseconds interval) } //! === 通过 ConnToken 操作连接 === - bool SseServer::Impl::send(const ConnToken &client, const std::string &data) { auto sse_conn = sse_conns_.at(client); @@ -369,8 +361,6 @@ void SseServer::Impl::setContextLogEnable(bool enable) }); } -//! === 静态辅助方法 === - bool SseServer::Impl::IsSseRequest(const http::Request &req) { //! SSE 请求检测条件: @@ -394,7 +384,6 @@ bool SseServer::Impl::IsSseRequest(const http::Request &req) } //! === SseServer 外部接口 === - SseServer::SseServer(event::Loop *wp_loop) : impl_(new Impl(this, wp_loop)) { diff --git a/modules/http/server/sse/sse_server_impl.h b/modules/http/server/sse/sse_server_impl.h index 5c035b0..c003e0c 100644 --- a/modules/http/server/sse/sse_server_impl.h +++ b/modules/http/server/sse/sse_server_impl.h @@ -103,8 +103,9 @@ class SseServer::Impl : public http::server::Middleware { private: SseServer *wp_parent_; event::Loop *wp_loop_; - + event::TimerEvent *sp_heartbeat_timer_ = nullptr; //! 心跳定时器(可选,默认禁用) http::server::Server *wp_http_server_ = nullptr; + //! URL 路径匹配规则: //! - url_path_ 以 '/' 结尾:前缀匹配,如 "/sse/" 匹配 "/sse/aa" //! - url_path_ 不以 '/' 结尾:全量匹配,如 "/sse" 仅匹配 "/sse" @@ -117,8 +118,6 @@ class SseServer::Impl : public http::server::Middleware { //! SseConnection 容器(生命期管理) cabinet::Cabinet sse_conns_; - //! 心跳定时器(可选,默认禁用) - event::TimerEvent *sp_heartbeat_timer_ = nullptr; std::chrono::milliseconds heartbeat_interval_{0}; SseServer::State state_ = SseServer::State::kNone; -- Gitee From 6c510cfc11522c4b72cdf6cba772250d94e71bfa Mon Sep 17 00:00:00 2001 From: Hevake Lee Date: 2026年6月20日 12:54:18 +0800 Subject: [PATCH 4/4] tidy --- modules/http/CMakeLists.txt | 1 - modules/http/Makefile | 1 - .../http/server/sse/sse_connection_test.cpp | 49 ----------- modules/http/server/sse/sse_server_impl.cpp | 83 +++++++++---------- modules/http/server/sse/sse_server_impl.h | 6 +- 5 files changed, 44 insertions(+), 96 deletions(-) delete mode 100644 modules/http/server/sse/sse_connection_test.cpp diff --git a/modules/http/CMakeLists.txt b/modules/http/CMakeLists.txt index 9b36124..2eed1c8 100644 --- a/modules/http/CMakeLists.txt +++ b/modules/http/CMakeLists.txt @@ -57,7 +57,6 @@ set(TBOX_HTTP_TEST_SOURCES server/request_parser_test.cpp server/sse/sse_event_test.cpp server/sse/sse_server_impl_test.cpp - server/sse/sse_connection_test.cpp) add_library(${TBOX_LIBRARY_NAME} ${TBOX_BUILD_LIB_TYPE} ${TBOX_HTTP_SOURCES}) add_library(tbox::${TBOX_LIBRARY_NAME} ALIAS ${TBOX_LIBRARY_NAME}) diff --git a/modules/http/Makefile b/modules/http/Makefile index 9c07400..de0442c 100644 --- a/modules/http/Makefile +++ b/modules/http/Makefile @@ -72,7 +72,6 @@ TEST_CPP_SRC_FILES = \ server/request_parser_test.cpp \ server/sse/sse_event_test.cpp \ server/sse/sse_server_impl_test.cpp \ - server/sse/sse_connection_test.cpp \ TEST_LDFLAGS := $(LDFLAGS) -ltbox_network -ltbox_log -ltbox_eventx -ltbox_event -ltbox_util -ltbox_base -ldl diff --git a/modules/http/server/sse/sse_connection_test.cpp b/modules/http/server/sse/sse_connection_test.cpp deleted file mode 100644 index d578833..0000000 --- a/modules/http/server/sse/sse_connection_test.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - * .============. - * // M A K E / \ - * // C++ DEV / \ - * // E A S Y / \/ \ - * ++ ----------. \/\ . - * \\ \ \ /\ / - * \\ \ \ / - * \\ \ \ / - * -============' - * - * Copyright (c) 2026 Hevake and contributors, all rights reserved. - * - * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) - * Use of this source code is governed by MIT license that can be found - * in the LICENSE file in the root of the source tree. All contributing - * project authors may be found in the CONTRIBUTORS.md file in the root - * of the source tree. - */ -#include -#include "sse_connection.h" - -namespace tbox { -namespace http { -namespace sse { - -//! SseConnection 的完整测试需要集成环境(HTTP Server + SseServer) -//! 因为 TcpConnection 构造函数为 private(仅 TcpAcceptor/TcpConnector 可创建) -//! 这里仅验证 SseConnection 的创建约束与设计合理性 - -TEST(SseConnection, CannotCreateWithoutTcpConnection) -{ - //! SseConnection 构造函数需要 TcpConnection* - //! TcpConnection 的构造函数为 private,无法在单元测试中独立创建 - //! SseConnection 的 send/close/peerAddr 等操作均依赖 TcpConnection - //! 完整的集成测试在 sse_server_test.cpp 中通过 SseServer 来间接验证 -} - -TEST(SseConnection, DesignConstraints) -{ - //! SseConnection 是单向推送协议(服务端→客户端) - //! 不需要 MessageCallback(与 WsConnection 不同) - //! 需要 receiveCallback 注册空函数体(阈值=0)以保持读事件监听 - //! 用于检测浏览器关闭连接和保持 TCP 写事件正常 -} - -} -} -} diff --git a/modules/http/server/sse/sse_server_impl.cpp b/modules/http/server/sse/sse_server_impl.cpp index 144d747..33c8cf9 100644 --- a/modules/http/server/sse/sse_server_impl.cpp +++ b/modules/http/server/sse/sse_server_impl.cpp @@ -133,56 +133,55 @@ void SseServer::Impl::handle(http::server::ContextSptr sp_ctx, const http::serve { auto &req = sp_ctx->req(); - if (IsSseRequest(req)) { - //! URL 路径匹配规则: - //! - url_path_ 以 '/' 结尾:前缀匹配 - //! - url_path_ 不以 '/' 结尾:全量匹配 - //! - url_path_ 为空字符串:匹配所有 SSE 请求 - if (!url_path_.empty()) { - bool matched = false; - if (url_path_.back() == '/') { - //! 前缀匹配 - matched = util::string::IsStartWith(req.url.path, url_path_); - } else { - //! 全量匹配 - matched = (req.url.path == url_path_); - } - if (!matched) { - //! 不是本服务关心的 URL,传递给下一个中间件 - next(); - return; - } - } + if (!IsSseRequest(req)) { + //! 非 SSE 请求,传递给下一个中间件 + next(); + } - LogDbg("sse request: %s", req.url.path.c_str()); + //! URL 路径匹配规则: + //! - url_path_ 以 '/' 结尾:前缀匹配 + //! - url_path_ 不以 '/' 结尾:全量匹配 + //! - url_path_ 为空字符串:匹配所有 SSE 请求 + if (!url_path_.empty()) { + bool matched = false; + if (url_path_.back() == '/') { + //! 前缀匹配 + matched = util::string::IsStartWith(req.url.path, url_path_); + } else { + //! 全量匹配 + matched = (req.url.path == url_path_); + } + if (!matched) { + //! 不是本服务关心的 URL,传递给下一个中间件 + next(); + return; + } + } - auto &res = sp_ctx->res(); + LogDbg("sse request: %s", req.url.path.c_str()); - //! 设置 200 OK 响应(SSE 不是协议升级,使用 200) - res.status_code = http::StatusCode::k200_OK; - res.http_ver = http::HttpVer::k1_1; + auto &res = sp_ctx->res(); - //! SSE 必需的响应头 - res.headers["Content-Type"] = "text/event-stream"; - res.headers["Cache-Control"] = "no-cache"; - res.headers["Connection"] = "keep-alive"; + //! 设置 200 OK 响应(SSE 不是协议升级,使用 200) + res.status_code = http::StatusCode::k200_OK; + res.http_ver = http::HttpVer::k1_1; - //! 从请求中提取 Last-Event-ID(浏览器重连时携带) - std::string last_event_id; - auto id_iter = req.headers.find("Last-Event-ID"); - if (id_iter != req.headers.end()) - last_event_id = id_iter->second; + //! SSE 必需的响应头 + res.headers["Content-Type"] = "text/event-stream"; + res.headers["Cache-Control"] = "no-cache"; + res.headers["Connection"] = "keep-alive"; - //! 注册升级回调:HTTP 服务器发送 200 响应后,将 TcpConnection 交给 SseServer - //! 与 WebSocket 使用同一套 upgrade_cb 机制 - res.upgrade_cb = std::bind(&SseServer::Impl::onSseUpgrade, this, _1, req.url.path, last_event_id); + //! 从请求中提取 Last-Event-ID(浏览器重连时携带) + std::string last_event_id; + auto id_iter = req.headers.find("Last-Event-ID"); + if (id_iter != req.headers.end()) + last_event_id = id_iter->second; - //! SSE 请求已处理,不再调用 next() - return; - } + //! 注册升级回调:HTTP 服务器发送 200 响应后,将 TcpConnection 交给 SseServer + //! 与 WebSocket 使用同一套 upgrade_cb 机制 + res.upgrade_cb = std::bind(&SseServer::Impl::onSseUpgrade, this, _1, req.url.path, last_event_id); - //! 非 SSE 请求,传递给下一个中间件 - next(); + //! SSE 请求已处理,不再调用 next() } //! === 升级与连接管理 === diff --git a/modules/http/server/sse/sse_server_impl.h b/modules/http/server/sse/sse_server_impl.h index c003e0c..de0121d 100644 --- a/modules/http/server/sse/sse_server_impl.h +++ b/modules/http/server/sse/sse_server_impl.h @@ -56,7 +56,7 @@ class SseServer::Impl : public http::server::Middleware { SseServer::State state() const { return state_; } public: - void setConnectedCallback(const SseServer::ConnectedCallback &cb) { connected_cb_ = cb; } + void setConnectedCallback(const SseServer::ConnectedCallback &cb) { connected_cb_ = cb; } void setDisconnectedCallback(const SseServer::DisconnectedCallback &cb) { disconnected_cb_ = cb; } public: @@ -91,8 +91,8 @@ class SseServer::Impl : public http::server::Middleware { //! 当 HTTP 服务器发送 200 响应后回调此函数 //! 将 TcpConnection 从 HTTP 服务器分离,交给 SseServer 管理 void onSseUpgrade(network::TcpConnection *tcp_conn, - const std::string &url_path, - const std::string &last_event_id); + const std::string &url_path, + const std::string &last_event_id); //! 当 SseConnection 断开时回调(参数为 ConnToken) void onSseDisconnected(const ConnToken &client); -- Gitee