diff --git a/README.md b/README.md index 99039841684494409d98c9ba7130dba6cf26f53d..73c47f9e025f9d961f915e3e8346e4924f54dc10 100644 --- a/README.md +++ b/README.md @@ -109,8 +109,7 @@ It contains an event-driven behavior tree that can realize sequential, branching | run | It's an ELF. It loads one or more lib\*.so file which specified by parameter `-l xxx`, then run Modules in side | | mqtt | MQTT Client | | coroutine | coroutine function | -| http | Implemented HTTP Server and Client modules on the basis of network | -| websocket | Implemented WebSocket Server and Client modules based on HTTP middleware, RFC 6455 | +| http | Implemented HTTP Server and Client, middleware, and SSE (Server-Sent Events) modules on the basis of network | | alarm | Realized 4 commonly used alarm clocks: CRON alarm clock, single alarm clock, weekly cycle alarm clock, weekday alarm clock | | flow | Contains multi-level state machine and behavior tree to solve the problem of action flow in asynchronous mode | | crypto | Implemented the commonly used AES and MD5 encryption and decryption calculations | diff --git a/README_CN.md b/README_CN.md index dbc8b80c529f773c9c989c62405f6c3bbc05250f..9f06b47b42250602cd6841731293b7ad77d12339 100644 --- a/README_CN.md +++ b/README_CN.md @@ -111,7 +111,7 @@ trace模块能记录被标记的函数每次执行的时间点与时长,可导 | run | 执行器 | 是个可执行程序,可加载多个由参数`-l xxx`指定的动态库,并运行其中的Module | | mqtt | MQTT客户端库 | | | coroutine | 协程库 | 众所周知,异步框架不方便处理顺序性业务,协程弥补之 | -| http | HTTP库 | 在network的基础上实现了HTTP的Server与Client模块 | +| http | HTTP库 | 在network的基础上实现了HTTP的Server与Client、中间件、SSE(服务端推送事件)模块 | | websocket | WebSocket库 | 在http的基础上实现了WebSocket的Server与Client模块,遵循RFC 6455 | | alarm | 闹钟库 | 实现了4种常用的闹钟:CRON闹钟、单次闹钟、星期循环闹钟、工作日闹钟 | | flow | 流程库| 含多层级状态机与行为树,解决异步模式下动行流程问题 | diff --git a/documents/modules/README.md b/documents/modules/README.md index f03c7c46d35b3b720e4acda010edf7361add3c70..f9d31a82f106d9c195fda96bd7fbbe2a271786f9 100644 --- a/documents/modules/README.md +++ b/documents/modules/README.md @@ -17,7 +17,7 @@ cpp-tbox is an event-driven C++ service application development library that pro | **network** | Network communication | TCP/UDP/UART communication and byte stream abstraction | [network.md](network.md) | | **terminal** | Interactive terminal | Runtime command interaction, similar to Bash shell | [terminal.md](terminal.md) | | **log** | Log channels | File/stdout/syslog and other log outputs | [log.md](log.md) | -| **http** | HTTP service | Express-style HTTP server/client and middleware | [http.md](http.md) | +| **http** | HTTP service | Express-style HTTP server/client, middleware, and SSE (Server-Sent Events) | [http.md](http.md) | | **websocket** | WebSocket service | WebSocket server/client, RFC 6455, HTTP middleware-based | [websocket.md](websocket.md) | | **coroutine** | Coroutine | Coroutine scheduler and Channel/Mutex helper components | [coroutine.md](coroutine.md) | | **alarm** | Timer alarm | Cron/Oneshot/Weekly/Workday timers | [alarm.md](alarm.md) | diff --git a/documents/modules/README_CN.md b/documents/modules/README_CN.md index 78323902d40b7e8d7b5ff2665b209ce9a8f9c011..591bd511819a4af982b88f41c72be44bd456f6a7 100644 --- a/documents/modules/README_CN.md +++ b/documents/modules/README_CN.md @@ -17,7 +17,7 @@ cpp-tbox 是一个基于事件驱动的 C++ 服务应用开发库,提供完整 | **network** | 网络通信 | TCP/UDP/UART 通信与字节流抽象 | [network_CN.md](network_CN.md) | | **terminal** | 交互终端 | 运行时命令交互,类似 Bash shell | [terminal_CN.md](terminal_CN.md) | | **log** | 日志通道 | 文件/stdout/syslog 等日志输出 | [log_CN.md](log_CN.md) | -| **http** | HTTP 服务 | Express 式 HTTP 服务端与客户端、中间件 | [http_CN.md](http_CN.md) | +| **http** | HTTP 服务 | Express 式 HTTP 服务端与客户端、中间件、SSE(服务端推送事件) | [http_CN.md](http_CN.md) | | **websocket** | WebSocket 服务 | WebSocket 服务端与客户端,RFC 6455,基于 HTTP 中间件 | [websocket_CN.md](websocket_CN.md) | | **coroutine** | 协程 | 协程调度器与 Channel/Mutex 等辅助组件 | [coroutine_CN.md](coroutine_CN.md) | | **alarm** | 定时闹钟 | Cron/Oneshot/Weekly/Workday 定时器 | [alarm_CN.md](alarm_CN.md) | diff --git a/documents/modules/http.md b/documents/modules/http.md index 43901057f2149f6b545896c6f526a443c137cded..3dc160ff58bbe58108a37d8ec80b9671b0c1b755 100644 --- a/documents/modules/http.md +++ b/documents/modules/http.md @@ -351,6 +351,101 @@ http_client.setReconnectDelayCalcFunc( 6. **Calling external APIs**: Use Client to make HTTP requests to other services 7. **Service-to-service communication**: Client with auto-reconnect for reliable inter-service HTTP calls +## SSE — Server-Sent Events + +The http module also includes an SSE (Server-Sent Events) sub-package, implementing server-side event push based on the W3C/WHATWG EventSource specification. SSE uses standard HTTP long-lived responses (200 OK) to stream events to the browser, requiring no protocol upgrade like WebSocket. + +### Header Files + +```cpp +#include //! SSE event data structure +#include //! SSE server +#include //! SSE connection (internal) +``` + +### SseServer — SSE Server + +SseServer runs on top of an HTTP server as a middleware. It detects SSE requests (Accept: text/event-stream), sets 200 OK response headers, and takes over the TcpConnection via the `upgrade_cb` mechanism to provide continuous event streaming. + +| Method | Description | +|------|------| +| `SseServer(loop)` | Constructor | +| `initialize(http_server, url_path)` | Initialize: associate with an HTTP server; `url_path` controls URL matching | +| `start()` | Start (registers as HTTP middleware) | +| `stop()` | Stop (unregisters middleware, closes all SSE connections) | +| `cleanup()` | Cleanup | +| `state()` | Get current state (None/Inited/Running) | +| `send(client, data)` | Send data to a client (simple text, event type "message") | +| `send(client, event)` | Send SseEvent to a client | +| `sendToAll(data)` | Broadcast data to all clients | +| `sendToAll(event)` | Broadcast SseEvent to all clients | +| `close(client)` | Close a client connection | +| `sendHeartbeat(client, comment)` | Send heartbeat comment line | +| `setHeartbeatInterval(ms)` | Set auto-heartbeat interval (default: 0 = disabled) | +| `isClientValid(client)` | Check if a client connection is still valid | +| `peerAddr(client)` | Get client address (IP:port) | +| `getLastEventId(client)` | Get the Last-Event-ID from browser reconnect | +| `getUrl(client)` | Get the URL path the client connected to | +| `setContext(client, ctx, deleter)` | Set context data for a client | +| `getContext(client)` | Get context data for a client | +| `setConnectedCallback(cb)` | Set callback: new client connected | +| `setDisconnectedCallback(cb)` | Set callback: client disconnected | +| `IsSseRequest(req)` | Static: check if an HTTP request is a valid SSE request | + +**URL path matching rules:** Same as WsServer — prefix match if url_path ends with `/`, exact match otherwise, empty string matches all. + +**SSE vs WebSocket:** + +| Feature | WebSocket | SSE | +|---------|-----------|------| +| HTTP status code | 101 Switching Protocols | 200 OK | +| Data direction | Bidirectional | Server→Client only | +| Data format | Binary frames | Plain text (`data:`/`event:`/`id:` fields) | +| Client message callback | Yes | No (unidirectional) | +| Heartbeat | Ping/Pong frames | Comment lines + timer | +| Reconnection | Manual implementation | Browser auto-reconnect + Last-Event-ID | +| Module | Separate `websocket` module | Inside `http` module | + +### SseEvent — SSE Event Data Structure + +```cpp +struct SseEvent { + std::string id; //! Event ID (optional), for Last-Event-ID reconnection + std::string event; //! Event type (optional, default "message") + std::string data; //! Data (required, supports multiline) + int retry = 0; //! Reconnect interval in ms (optional) + + //! Format event as SSE text protocol + //! Multiline data auto-splits into multiple `data:` lines + std::string toString() const; +}; +``` + +### SSE Example: Event Push + +> Full example in `examples/http/server/sse/` + +```cpp +#include +#include + +SseServer sse_srv(sp_loop); +sse_srv.initialize(&http_srv, "/sse/events"); +sse_srv.setHeartbeatInterval(std::chrono::seconds(15)); + +sse_srv.setConnectedCallback([](const SseServer::ConnToken &token) { + LogInfo("sse client connected"); + sse_srv.send(token, "Welcome!"); +}); + +//! Push events every 5 seconds +SseEvent evt; +evt.id = "42"; +evt.event = "tick"; +evt.data = "{\"time\":\"2026-06-16 10:30:00\"}"; +sse_srv.sendToAll(evt); +``` + ## Important Notes 1. **Middleware invocation order**: Middleware added via `use()` executes in the order it was added diff --git a/documents/modules/http_CN.md b/documents/modules/http_CN.md index 3ae84594c3ed2ad9655b08a68eb78eed2a002d59..0ea996e49798a844c2efcfa803d849093530bd70 100644 --- a/documents/modules/http_CN.md +++ b/documents/modules/http_CN.md @@ -353,6 +353,101 @@ http_client.setReconnectDelayCalcFunc( 6. **调用外部 API**:使用 Client 向其它服务发起 HTTP 请求 7. **服务间通信**:使用 Client 配合自动重连,实现可靠的服务间 HTTP 调用 +## SSE — Server-Sent Events(服务端推送事件) + +http 模块还包含 SSE(Server-Sent Events)子包,实现基于 W3C/WHATWG EventSource 规范的服务端事件推送。SSE 使用标准 HTTP 长响应(200 OK)向浏览器流式推送事件,不像 WebSocket 需要协议升级。 + +### 头文件 + +```cpp +#include //! SSE 事件数据结构 +#include //! SSE 服务端 +#include //! SSE 连接(内部类) +``` + +### SseServer — SSE 服务端 + +SseServer 运行在 HTTP 服务器之上,作为中间件存在。它检测 SSE 请求(Accept 头包含 text/event-stream),设置 200 OK 响应头,通过 `upgrade_cb` 机制接管 TcpConnection,提供持续的事件推送。 + +| 方法 | 说明 | +|------|------| +| `SseServer(loop)` | 构造 | +| `initialize(http_server, url_path)` | 初始化:关联到 HTTP 服务器;`url_path` 控制 URL 匹配规则 | +| `start()` | 启动(注册为 HTTP 中间件) | +| `stop()` | 停止(反注册中间件,关闭所有 SSE 连接) | +| `cleanup()` | 清理 | +| `state()` | 获取当前状态 (None/Inited/Running) | +| `send(client, data)` | 向指定客户端发送数据(简单文本) | +| `send(client, event)` | 向指定客户端发送 SseEvent | +| `sendToAll(data)` | 向所有客户端广播数据 | +| `sendToAll(event)` | 向所有客户端广播 SseEvent | +| `close(client)` | 关闭指定客户端连接 | +| `sendHeartbeat(client, comment)` | 发送心跳注释行 | +| `setHeartbeatInterval(ms)` | 设置自动心跳间隔(默认 0 = 禁用) | +| `isClientValid(client)` | 检查客户端连接是否有效 | +| `peerAddr(client)` | 获取客户端地址 | +| `getLastEventId(client)` | 获取浏览器重连时的 Last-Event-ID | +| `getUrl(client)` | 获取客户端连接的 URL 路径 | +| `setContext(client, ctx, deleter)` | 设置上下文数据 | +| `getContext(client)` | 获取上下文数据 | +| `setConnectedCallback(cb)` | 设置回调:客户端连接 | +| `setDisconnectedCallback(cb)` | 设置回调:客户端断开 | +| `IsSseRequest(req)` | 静态方法:检查是否为 SSE 请求 | + +**URL 路径匹配规则:** 与 WsServer 一致——url_path 以 `/` 结尾为前缀匹配,不以 `/` 结尾为全量匹配,空字符串匹配所有。 + +**SSE 与 WebSocket 对比:** + +| 特性 | WebSocket | SSE | +|------|-----------|------| +| HTTP 状态码 | 101 Switching Protocols | 200 OK | +| 数据方向 | 双向 | 仅服务端→客户端 | +| 数据格式 | 二进制帧 | 纯文本(data:/event:/id: 字段) | +| 客户端消息回调 | 有 | 无(单向) | +| 心跳 | Ping/Pong 帧 | 注释行 + 定时器 | +| 重连机制 | 自行实现 | 浏览器自动重连 + Last-Event-ID | +| 模块位置 | 独立 `websocket` 模块 | `http` 模块内 | + +### SseEvent — SSE 事件数据结构 + +```cpp +struct SseEvent { + std::string id; //! 事件ID(可选),用于 Last-Event-ID 断线续传 + std::string event; //! 事件类型(可选,默认 "message") + std::string data; //! 数据(必须,支持多行) + int retry = 0; //! 重连间隔毫秒数(可选) + + //! 将事件格式化为 SSE 文本协议格式 + //! 多行 data 自动拆分为多个 `data:` 行 + std::string toString() const; +}; +``` + +### SSE 示例:事件推送 + +> 完整示例见 `examples/http/server/sse/` + +```cpp +#include +#include + +SseServer sse_srv(sp_loop); +sse_srv.initialize(&http_srv, "/sse/events"); +sse_srv.setHeartbeatInterval(std::chrono::seconds(15)); + +sse_srv.setConnectedCallback([](const SseServer::ConnToken &token) { + LogInfo("sse 客户端已连接"); + sse_srv.send(token, "欢迎!"); +}); + +//! 每 5 秒推送事件 +SseEvent evt; +evt.id = "42"; +evt.event = "tick"; +evt.data = "{\"time\":\"2026-06-16 10:30:00\"}"; +sse_srv.sendToAll(evt); +``` + ## 注意事项 1. **中间件调用顺序**:`use()` 添加的中间件按添加顺序执行 diff --git a/examples/http/server/sse/Makefile b/examples/http/server/sse/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..38adaa2d2fbe40e94919d6e37bdf29b2dbadd181 --- /dev/null +++ b/examples/http/server/sse/Makefile @@ -0,0 +1,37 @@ +# +# .============. +# // M A K E / \ +# // C++ DEV / \ +# // E A S Y / \/ \ +# ++ ----------. \/\ . +# \\ \ \ /\ / +# \\ \ \ / +# \\ \ \ / +# -============' +# +# Copyright (c) 2026 Hevake and contributors, all rights reserved. +# +# This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) +# Use of this source code is governed by MIT license that can be found +# in the LICENSE file in the root of the source tree. All contributing +# project authors may be found in the CONTRIBUTORS.md file in the root +# of the source tree. +# + +PROJECT := examples/http/server/sse +EXE_NAME := ${PROJECT} + +CPP_SRC_FILES := push.cpp + +CXXFLAGS := -DMODULE_ID='"$(EXE_NAME)"' $(CXXFLAGS) +LDFLAGS += \ + -ltbox_http \ + -ltbox_network \ + -ltbox_eventx \ + -ltbox_event \ + -ltbox_log \ + -ltbox_util \ + -ltbox_base \ + -lpthread -ldl + +include $(TOP_DIR)/mk/exe_common.mk diff --git a/examples/http/server/sse/push.cpp b/examples/http/server/sse/push.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6ffe6f689c49cf65c55db902c107db9a37d73ad1 --- /dev/null +++ b/examples/http/server/sse/push.cpp @@ -0,0 +1,200 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ + +/** + * SSE 定时推送示例 + * + * 功能: + * - 创建 HTTP 服务器,挂载 SSE 服务到 /sse/events + * - 每 5 秒向所有 SSE 客户端推送当前时间事件 + * - SSE 连接自动心跳(每 15 秒发送注释行) + * - HTTP 主页提供浏览器 EventSource 客户端代码 + * - Ctrl+C 优雅退出 + * + * 用法: + * ./sse_push [bind_addr] + * 示例: ./sse_push 0.0.0.0:8080 + * 浏览器访问: http://127.0.0.1:8080/ + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace tbox; +using namespace tbox::event; +using namespace tbox::http; +using namespace tbox::http::server; +using namespace tbox::http::sse; + +//! 获取当前时间字符串 +static std::string getTimeString() +{ + auto now = std::chrono::system_clock::now(); + auto time_t_now = std::chrono::system_clock::to_time_t(now); + char buf[64]; + std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time_t_now)); + return buf; +} + +//! HTML 主页:包含 EventSource 客户端 JavaScript +static const std::string kIndexHtml = + "\n" + " SSE Push Demo\n" + "
\n" + "

SSE Push Demo

\n" + "\n" + "\n" + "

AltStyle によって変換されたページ (->オリジナル) /

\n"; + +int main(int argc, char **argv) +{ + std::string bind_addr = "0.0.0.0:8080"; + + if (argc == 2) + bind_addr = argv[1]; + + LogOutput_Enable(); + + LogInfo("enter"); + + auto sp_loop = Loop::New(); + auto sp_sig_event = sp_loop->newSignalEvent(); + + //! 心跳定时器 + auto sp_push_timer = sp_loop->newTimerEvent(); + + SetScopeExitAction( + [=] { + delete sp_push_timer; + delete sp_sig_event; + delete sp_loop; + } + ); + + //! 创建 HTTP 服务器 + Server http_srv(sp_loop); + if (!http_srv.initialize(network::SockAddr::FromString(bind_addr), 1)) { + LogErr("init http server fail"); + return 0; + } + //http_srv.setContextLogEnable(true); + + //! 创建 SSE 服务,挂载到 /sse/events + SseServer sse_srv(sp_loop); + if (!sse_srv.initialize(&http_srv, "/sse/events")) { + LogErr("init sse server fail"); + return 0; + } + //sse_srv.setContextLogEnable(true); + + //! 设置自动心跳(每 15 秒发送注释行,保持连接活跃) + sse_srv.setHeartbeatInterval(std::chrono::seconds(15)); + + //! 设置 SSE 连接回调 + sse_srv.setConnectedCallback([&](const SseServer::ConnToken &token) { + auto addr = sse_srv.peerAddr(token); + LogInfo("sse client connected from %s", addr.toString().c_str()); + + //! 向新连接发送欢迎消息 + sse_srv.send(token, "Welcome! SSE connection established."); + }); + + sse_srv.setDisconnectedCallback([](const SseServer::ConnToken &token) { + LogInfo("sse client disconnected"); + }); + + //! 定时推送:每 5 秒向所有客户端推送当前时间 + int event_id = 0; + sp_push_timer->initialize(std::chrono::seconds(5), Event::Mode::kPersist); + sp_push_timer->setCallback([&] { + //! 构造 SSE 事件 + SseEvent evt; + evt.id = std::to_string(++event_id); + evt.event = "tick"; + evt.data = "{\"time\":\"" + getTimeString() + "\",\"id\":" + std::to_string(event_id) + "}"; + + //! 向所有客户端推送 + sse_srv.sendToAll(evt); + LogDbg("push event id:%d to clients", event_id); + }); + + //! 添加 HTTP 主页处理 + http_srv.use( + [&](ContextSptr ctx, const NextFunc &next) { + if (ctx->req().url.path == "/") { + ctx->res().status_code = StatusCode::k200_OK; + ctx->res().headers["Content-Type"] = "text/html; charset=utf-8"; + ctx->res().body = kIndexHtml; + return; + } + next(); + } + ); + + //! 启动服务 + http_srv.start(); + sse_srv.start(); + sp_push_timer->enable(); + + //! Ctrl+C 退出 + sp_sig_event->initialize(SIGINT, Event::Mode::kPersist); + sp_sig_event->enable(); + sp_sig_event->setCallback( + [&] (int) { + LogInfo("stopping..."); + sp_push_timer->disable(); + sse_srv.stop(); + http_srv.stop(); + sp_loop->exitLoop(); + } + ); + + LogInfo("start, listen at %s", bind_addr.c_str()); + sp_loop->runLoop(); + LogInfo("stop"); + + sse_srv.cleanup(); + http_srv.cleanup(); + + LogInfo("exit"); + return 0; +} diff --git a/modules/http/CMakeLists.txt b/modules/http/CMakeLists.txt index ff4e474913e644aa361feb665d114bae470ce77d..2eed1c885290b01cb9011bd7853d6d4b8007a235 100644 --- a/modules/http/CMakeLists.txt +++ b/modules/http/CMakeLists.txt @@ -42,6 +42,9 @@ set(TBOX_HTTP_SOURCES server/middlewares/form_data.cpp server/middlewares/form_data_middleware.cpp server/middlewares/file_downloader_middleware.cpp + server/sse/sse_event.cpp + server/sse/sse_connection.cpp + server/sse/sse_server_impl.cpp client/client.cpp client/client_impl.cpp client/respond_parser.cpp) @@ -51,7 +54,9 @@ set(TBOX_HTTP_TEST_SOURCES respond_test.cpp request_test.cpp url_test.cpp - server/request_parser_test.cpp) + server/request_parser_test.cpp + server/sse/sse_event_test.cpp + server/sse/sse_server_impl_test.cpp add_library(${TBOX_LIBRARY_NAME} ${TBOX_BUILD_LIB_TYPE} ${TBOX_HTTP_SOURCES}) add_library(tbox::${TBOX_LIBRARY_NAME} ALIAS ${TBOX_LIBRARY_NAME}) @@ -112,6 +117,14 @@ install( DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/tbox/http/client ) +install( + FILES + server/sse/sse_event.h + server/sse/sse_server.h + server/sse/sse_connection.h + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/tbox/http/server/sse +) + # generate and install export file install( EXPORT ${TBOX_LIBRARY_NAME}_targets diff --git a/modules/http/Makefile b/modules/http/Makefile index 775eadbaff5c16e855e9f461e13954c83ced8bbb..de0442c884c3cc5931817e2962ab0b213d99270d 100644 --- a/modules/http/Makefile +++ b/modules/http/Makefile @@ -37,6 +37,8 @@ HEAD_FILES = \ server/middlewares/form_data.h \ server/middlewares/form_data_middleware.h \ server/middlewares/file_downloader_middleware.h \ + server/sse/sse_event.h \ + server/sse/sse_server.h \ client/client.h \ CPP_SRC_FILES = \ @@ -52,6 +54,9 @@ CPP_SRC_FILES = \ server/middlewares/form_data.cpp \ server/middlewares/form_data_middleware.cpp \ server/middlewares/file_downloader_middleware.cpp \ + server/sse/sse_event.cpp \ + server/sse/sse_connection.cpp \ + server/sse/sse_server_impl.cpp \ client/client.cpp \ client/client_impl.cpp \ client/respond_parser.cpp \ @@ -65,6 +70,8 @@ TEST_CPP_SRC_FILES = \ request_test.cpp \ url_test.cpp \ server/request_parser_test.cpp \ + server/sse/sse_event_test.cpp \ + server/sse/sse_server_impl_test.cpp \ TEST_LDFLAGS := $(LDFLAGS) -ltbox_network -ltbox_log -ltbox_eventx -ltbox_event -ltbox_util -ltbox_base -ldl diff --git a/modules/http/respond.cpp b/modules/http/respond.cpp index f527d082051f6949036adf711898dd950bebf038..4619bfebf84c502dfa63bc9cb6adb3fa4c717143 100644 --- a/modules/http/respond.cpp +++ b/modules/http/respond.cpp @@ -40,7 +40,10 @@ std::string Respond::toString() const has_content_length = true; } - if (!has_content_length) + //! 当 upgrade_cb 已设置时(WebSocket 101、SSE 200 等),不自动添加 Content-Length + //! 原因:升级/流式响应后面是持续的数据流(WebSocket 帧、SSE 事件),不是定长 body + //! Content-Length 会误导浏览器认为响应已完成,阻止流式数据接收 + if (!has_content_length && !upgrade_cb) oss << "Content-Length: " << body.length() << CRLF; oss << CRLF; diff --git a/modules/http/server/server_imp.cpp b/modules/http/server/server_imp.cpp index 56ac94975cae8ff2e78efa9e16083e623e5c8ab4..d70adc30f329d04f7beb2365a8bf5f40dde37414 100644 --- a/modules/http/server/server_imp.cpp +++ b/modules/http/server/server_imp.cpp @@ -265,15 +265,18 @@ void Server::Impl::commitRespond(const TcpServer::ConnToken &ct, int index, Resp //! 处理协议升级请求(WebSocket 101、SSE 200 等) //! 触发条件:res->upgrade_cb 已设置(中间件负责设置) if (res->upgrade_cb) { + //! 注意:必须在 std::move(upgrade_cb) 之前调用 toString() + //! 否则 move 后 upgrade_cb 为空,toString() 会误判为普通响应而添加 Content-Length + //! SSE 响应添加 Content-Length:0 会导致浏览器认为响应已完成并断开重连 + const string content = res->toString(); + //! 发送响应后,将 TcpConnection 从 HTTP 服务器分离,交给升级协议 auto upgrade_cb = std::move(res->upgrade_cb); - { - const string &content = res->toString(); - tcp_server_.send(ct, content.data(), content.size()); - delete res; - if (context_log_enable_) - LogDbg("RES: [%s]", content.c_str()); - } + delete res; + + tcp_server_.send(ct, content.data(), content.size()); + if (context_log_enable_) + LogDbg("RES: [%s]", content.c_str()); //! 当前回调结束后立即执行 detach(使用 runNext,更高效) //! 因为 commitRespond() 是在 Loop 线程中执行的 diff --git a/modules/http/server/sse/sse_connection.cpp b/modules/http/server/sse/sse_connection.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4468e615a33b9f3ff50550c34e509d4e26c96926 --- /dev/null +++ b/modules/http/server/sse/sse_connection.cpp @@ -0,0 +1,191 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include "sse_connection.h" + +#include +#include +#include + +namespace tbox { +namespace http { +namespace sse { + +using namespace std::placeholders; + +SseConnection::SseConnection(event::Loop *wp_loop, + network::TcpConnection *tcp_conn, + const std::string &url, + const std::string &last_event_id) + : wp_loop_(wp_loop) + , sp_tcp_conn_(tcp_conn) + , url_(url) + , last_event_id_(last_event_id) +{ + TBOX_ASSERT(wp_loop != nullptr); + TBOX_ASSERT(tcp_conn != nullptr); + + //! 设置 TcpConnection 的回调 + //! SSE 是单向推送协议(服务端→客户端),不需要处理客户端发送的数据 + //! 但必须保持 receiveCallback 注册(阈值=0),否则底层 BufferedFd 会停止监听 + //! socket 读事件,导致:(1) 无法检测浏览器关闭连接;(2) TCP 写事件也可能受影响 + //! 与 WsConnection 一样,设置空函数体回调而非 nullptr,确保读事件持续监听 + sp_tcp_conn_->setReceiveCallback([](util::Buffer &buff) { buff.hasReadAll(); }, 0); + sp_tcp_conn_->setDisconnectedCallback(std::bind(&SseConnection::onTcpDisconnected, this)); + sp_tcp_conn_->setSendCompleteCallback(std::bind(&SseConnection::onTcpSendCompleted, this)); +} + +SseConnection::~SseConnection() +{ + TBOX_ASSERT(cb_level_ == 0); + + if (sp_tcp_conn_ == nullptr) + return; + + //! 先取消 TcpConnection 的回调,防止断开时回调到已销毁的 SseConnection + //! 注意:receiveCallback 不能设为 nullptr,否则会停止 socket 读事件监听 + //! 设置空函数体回调即可 + sp_tcp_conn_->setDisconnectedCallback(nullptr); + sp_tcp_conn_->setSendCompleteCallback(nullptr); + + sp_tcp_conn_->disconnect(); + auto tcp_conn = sp_tcp_conn_; + sp_tcp_conn_ = nullptr; + wp_loop_->runNext([tcp_conn] { CHECK_DELETE_OBJ(tcp_conn); }, + "SseConnection::~SseConnection, delete tcp_conn"); +} + +//! === 发送事件 === + +bool SseConnection::send(const std::string &data) +{ + if (sp_tcp_conn_ == nullptr) + return false; + + //! 简单数据:只输出 data 字段 + //! 格式:"data: xxx\n\n" + std::string sse_text = "data: " + data + "\n\n"; + + if (context_log_enable_) + LogDbg("SEND: %s", sse_text.c_str()); + + return sp_tcp_conn_->send(sse_text.data(), sse_text.size()); +} + +bool SseConnection::send(const SseEvent &event) +{ + if (sp_tcp_conn_ == nullptr) + return false; + + //! 完整事件:调用 SseEvent::toString() 格式化后发送 + std::string sse_text = event.toString(); + + if (context_log_enable_) + LogDbg("SEND: %s", sse_text.c_str()); + + return sp_tcp_conn_->send(sse_text.data(), sse_text.size()); +} + +bool SseConnection::sendHeartbeat(const std::string &comment) +{ + if (sp_tcp_conn_ == nullptr) + return false; + + //! 心跳注释行:": \n\n" + //! 浏览器 EventSource 会忽略以 ":" 开头的行,用于保持连接活跃 + std::string sse_text = ": " + comment + "\n\n"; + + if (context_log_enable_) + LogDbg("SEND: %s", sse_text.c_str()); + + return sp_tcp_conn_->send(sse_text.data(), sse_text.size()); +} + +bool SseConnection::close() +{ + if (sp_tcp_conn_ == nullptr) + return false; + + //! SSE 没有特殊的关闭协议,直接断开 TCP 连接 + sp_tcp_conn_->disconnect(); + return true; +} + +//! === 客户端信息 === + +network::SockAddr SseConnection::peerAddr() const +{ + if (sp_tcp_conn_ != nullptr) + return sp_tcp_conn_->peerAddr(); + return network::SockAddr(); +} + +bool SseConnection::isExpired() const +{ + return sp_tcp_conn_ == nullptr || sp_tcp_conn_->isExpired(); +} + +//! === 上下文数据 === + +void SseConnection::setContext(void *context, ContextDeleter &&deleter) +{ + if (sp_tcp_conn_ != nullptr) + sp_tcp_conn_->setContext(context, std::move(deleter)); +} + +void* SseConnection::getContext() const +{ + if (sp_tcp_conn_ != nullptr) + return sp_tcp_conn_->getContext(); + return nullptr; +} + +//! === TCP 回调 === + +void SseConnection::onTcpDisconnected() +{ + LogInfo("sse disconnected"); + + //! 通知 SseServer(通过 close_cb_ 绑定了 ConnToken) + if (close_cb_) { + ++cb_level_; + close_cb_(); + --cb_level_; + } + + //! 清理 TcpConnection:先断空指针,延后删除 + //! 必须在 close_cb_() 之后清理,否则回调中 getContext() 拿到空值 + auto tcp_conn = sp_tcp_conn_; + sp_tcp_conn_ = nullptr; + wp_loop_->runNext([tcp_conn] { CHECK_DELETE_OBJ(tcp_conn); }, + "SseConnection::onTcpDisconnected, delete tcp_conn"); +} + +void SseConnection::onTcpSendCompleted() +{ + if (send_complete_cb_) { + ++cb_level_; + send_complete_cb_(); + --cb_level_; + } +} + +} +} +} diff --git a/modules/http/server/sse/sse_connection.h b/modules/http/server/sse/sse_connection.h new file mode 100644 index 0000000000000000000000000000000000000000..e9e936ef659b3e428b3e41e74950eb93e093e6cc --- /dev/null +++ b/modules/http/server/sse/sse_connection.h @@ -0,0 +1,114 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#ifndef TBOX_HTTP_SSE_CONNECTION_H_20260616 +#define TBOX_HTTP_SSE_CONNECTION_H_20260616 + +#include +#include +#include + +#include "sse_event.h" + +namespace tbox { +namespace http { +namespace sse { + +//! SSE 连接 +//! 包装从 HTTP 升级后分离出来的 TcpConnection,提供 SSE 事件推送功能 +//! 生命期由 SseServer 通过 Cabinet 管理,用户通过 ConnToken 访问 +//! SSE 是单向推送协议(服务端→客户端),不需要解析客户端数据 +class SseConnection { + public: + //! 内部回调:SseServer::Impl 绑定 ConnToken,不传递 SseConnection* + using CloseCallback = std::function; + using SendCompleteCallback = std::function; + + ~SseConnection(); + + NONCOPYABLE(SseConnection); + IMMOVABLE(SseConnection); + + public: + //! 设置回调(由 SseServer::Impl 调用,绑定 ConnToken) + void setCloseCallback(const CloseCallback &cb) { close_cb_ = cb; } + void setSendCompleteCallback(const SendCompleteCallback &cb) { send_complete_cb_ = cb; } + void setContextLogEnable(bool enable) { context_log_enable_ = enable; } + + public: + //! 发送简单数据(event 类型默认 "message") + bool send(const std::string &data); + + //! 发送完整 SSE 事件 + bool send(const SseEvent &event); + + //! 发送心跳注释行(保持连接活跃) + //! 格式:": \n\n" + bool sendHeartbeat(const std::string &comment = "keep-alive"); + + //! 关闭连接(断开 TcpConnection) + bool close(); + + //! 获取客户端地址 + network::SockAddr peerAddr() const; + + //! 获取浏览器重连时携带的 Last-Event-ID + std::string getLastEventId() const { return last_event_id_; } + + //! 获取客户端连接的 URL 路径 + std::string getUrl() const { return url_; } + + //! 连接是否已失效 + bool isExpired() const; + + //! 设置/获取上下文数据(委托给底层 TcpConnection) + using ContextDeleter = network::TcpConnection::ContextDeleter; + void setContext(void *context, ContextDeleter &&deleter = nullptr); + void* getContext() const; + + private: + //! 仅由 SseServer 创建(生命期由 Cabinet 管理) + SseConnection(event::Loop *wp_loop, + network::TcpConnection *tcp_conn, + const std::string &url, + const std::string &last_event_id); + + void onTcpDisconnected(); + void onTcpSendCompleted(); + + private: + event::Loop *wp_loop_; + network::TcpConnection *sp_tcp_conn_; + std::string url_; + std::string last_event_id_; //! 浏览器重连时的 Last-Event-ID + + CloseCallback close_cb_; + SendCompleteCallback send_complete_cb_; + + int cb_level_ = 0; + bool context_log_enable_ = false; + + friend class SseServer; +}; + +} +} +} + +#endif //TBOX_HTTP_SSE_CONNECTION_H_20260616 diff --git a/modules/http/server/sse/sse_event.cpp b/modules/http/server/sse/sse_event.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0bc17b275113b57740113b841e76de8a81049d58 --- /dev/null +++ b/modules/http/server/sse/sse_event.cpp @@ -0,0 +1,78 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include "sse_event.h" + +#include + +namespace tbox { +namespace http { +namespace sse { + +std::string SseEvent::toString() const +{ + //! SSE 协议格式(W3C/WHATWG EventSource 规范): + //! 每个字段以 "field: value\n" 格式输出 + //! 事件以空行 "\n" 结束(标志事件完成) + //! + //! 字段输出顺序:retry → id → event → data → 空行 + //! 顺序不影响浏览器解析,但统一顺序便于调试 + + std::string result; + + //! retry 字段(可选,仅在 retry> 0 时输出) + //! 告知浏览器断线后多久自动重连 + if (retry> 0) + result += "retry: " + std::to_string(retry) + "\n"; + + //! id 字段(可选) + //! 浏览器重连时通过 Last-Event-ID 头部携带此值 + if (!id.empty()) + result += "id: " + id + "\n"; + + //! event 字段(可选) + //! 默认为 "message",浏览器通过 .onmessage 监听 + //! 自定义 event 类型通过 .addEventListener(event, ...) 监听 + //! 不输出默认值 "message",减少传输量 + if (!event.empty() && event != "message") + result += "event: " + event + "\n"; + + //! data 字段(必须) + //! 多行 data 自动拆分为多个 `data:` 行 + //! 例如 data="line1\nline2" 输出为 "data: line1\ndata: line2\n" + if (!data.empty()) { + std::istringstream iss(data); + std::string line; + while (std::getline(iss, line)) + result += "data: " + line + "\n"; + } else { + //! data 为空时仍需输出空 data 行(保持协议完整性) + result += "data:\n"; + } + + //! 事件结束标志(空行) + //! 浏览器 EventSource 在收到空行时认为事件完成并触发回调 + result += "\n"; + + return result; +} + +} +} +} diff --git a/modules/http/server/sse/sse_event.h b/modules/http/server/sse/sse_event.h new file mode 100644 index 0000000000000000000000000000000000000000..20e5fa427fa3cfb24c25ec038478b8c87bfa7947 --- /dev/null +++ b/modules/http/server/sse/sse_event.h @@ -0,0 +1,73 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#ifndef TBOX_HTTP_SSE_EVENT_H_20260616 +#define TBOX_HTTP_SSE_EVENT_H_20260616 + +#include +#include + +namespace tbox { +namespace http { +namespace sse { + +//! SSE 事件(Server-Sent Events, W3C/WHATWG 规范) +//! 对应 SSE 协议中的字段:id、event、data、retry +//! toString() 将事件格式化为标准 SSE 文本格式 +struct SseEvent { + //! 事件 ID(可选) + //! 对应 `id:` 字段,浏览器重连时通过 Last-Event-ID 头部携带此值 + //! 用于实现断线续传:服务端可根据 Last-Event-ID 从断点继续推送 + std::string id; + + //! 事件类型(可选) + //! 对应 `event:` 字段,默认为 "message" + //! 浏览器 EventSource 对象通过 .onmessage 或 .addEventListener(event, ...) 监听 + std::string event; + + //! 数据(必须) + //! 对应 `data:` 字段,支持多行文本 + //! toString() 会自动将多行 data 拆分为多个 `data:` 行 + std::string data; + + //! 重连间隔毫秒数(可选) + //! 对应 `retry:` 字段,告知浏览器断线后多久自动重连 + //! 仅在 retry> 0 时输出 + int retry = 0; + + //! 将事件格式化为 SSE 文本协议格式 + //! 输出规则(W3C/WHATWG EventSource 规范): + //! - retry> 0 时输出 "retry: \n" + //! - id 非空时输出 "id: \n" + //! - event 非空且不等于 "message" 时输出 "event: \n" + //! - data 按行拆分,每行输出 "data: \n" + //! - 最后以空行 "\n" 结束(标志事件完成) + //! + //! 示例输出: + //! "id: 42\nevent: update\ndata: hello\n\n" + //! 多行 data: + //! "data: line1\ndata: line2\n\n" + std::string toString() const; +}; + +} +} +} + +#endif //TBOX_HTTP_SSE_EVENT_H_20260616 diff --git a/modules/http/server/sse/sse_event_test.cpp b/modules/http/server/sse/sse_event_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6a5460c7601f063505ed72a1ceaca431319152aa --- /dev/null +++ b/modules/http/server/sse/sse_event_test.cpp @@ -0,0 +1,140 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include +#include "sse_event.h" + +namespace tbox { +namespace http { +namespace sse { + +//! === SseEvent 构造测试 === + +TEST(SseEvent, DefaultValues) +{ + SseEvent evt; + EXPECT_EQ(evt.id, ""); + EXPECT_EQ(evt.event, ""); + EXPECT_EQ(evt.data, ""); + EXPECT_EQ(evt.retry, 0); +} + +//! === SseEvent::toString() 格式化测试 === + +TEST(SseEvent, ToStringSimpleData) +{ + //! 简单数据:只输出 data 字段 + SseEvent evt; + evt.data = "hello world"; + + //! 预期输出:"data: hello world\n\n" + EXPECT_EQ(evt.toString(), "data: hello world\n\n"); +} + +TEST(SseEvent, ToStringWithId) +{ + //! 带 id 的数据 + SseEvent evt; + evt.id = "42"; + evt.data = "hello"; + + //! 预期输出:"id: 42\ndata: hello\n\n" + EXPECT_EQ(evt.toString(), "id: 42\ndata: hello\n\n"); +} + +TEST(SseEvent, ToStringWithEvent) +{ + //! 带 event 类型(非默认 "message") + SseEvent evt; + evt.event = "update"; + evt.data = "status ok"; + + //! 预期输出:"event: update\ndata: status ok\n\n" + EXPECT_EQ(evt.toString(), "event: update\ndata: status ok\n\n"); +} + +TEST(SseEvent, ToStringWithDefaultEvent) +{ + //! event 为 "message"(默认值)时不输出 event 字段 + SseEvent evt; + evt.event = "message"; + evt.data = "hello"; + + //! 预期输出:"data: hello\n\n"(不输出 "event: message") + EXPECT_EQ(evt.toString(), "data: hello\n\n"); +} + +TEST(SseEvent, ToStringWithRetry) +{ + //! 带 retry 字段 + SseEvent evt; + evt.retry = 3000; + evt.data = "hello"; + + //! 预期输出:"retry: 3000\ndata: hello\n\n" + EXPECT_EQ(evt.toString(), "retry: 3000\ndata: hello\n\n"); +} + +TEST(SseEvent, ToStringRetryZeroNotOutput) +{ + //! retry = 0时不输出 retry 字段 + SseEvent evt; + evt.retry = 0; + evt.data = "hello"; + + //! 预期输出:"data: hello\n\n"(不输出 "retry: 0") + EXPECT_EQ(evt.toString(), "data: hello\n\n"); +} + +TEST(SseEvent, ToStringMultilineData) +{ + //! 多行 data 自动拆分为多个 data: 行 + SseEvent evt; + evt.data = "line1\nline2\nline3"; + + //! 预期输出:"data: line1\ndata: line2\ndata: line3\n\n" + EXPECT_EQ(evt.toString(), "data: line1\ndata: line2\ndata: line3\n\n"); +} + +TEST(SseEvent, ToStringCompleteEvent) +{ + //! 完整事件:所有字段 + SseEvent evt; + evt.id = "123"; + evt.event = "update"; + evt.data = "status ok"; + evt.retry = 5000; + + //! 预期输出:"retry: 5000\nid: 123\nevent: update\ndata: status ok\n\n" + EXPECT_EQ(evt.toString(), "retry: 5000\nid: 123\nevent: update\ndata: status ok\n\n"); +} + +TEST(SseEvent, ToStringEmptyData) +{ + //! data 为空时输出空 data 行 + SseEvent evt; + evt.id = "1"; + + //! 预期输出:"id: 1\ndata:\n\n" + EXPECT_EQ(evt.toString(), "id: 1\ndata:\n\n"); +} + +} +} +} diff --git a/modules/http/server/sse/sse_server.h b/modules/http/server/sse/sse_server.h new file mode 100644 index 0000000000000000000000000000000000000000..2f0ac1f68827d9813984f4b846b06341f42d173f --- /dev/null +++ b/modules/http/server/sse/sse_server.h @@ -0,0 +1,135 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#ifndef TBOX_HTTP_SSE_SERVER_H_20260616 +#define TBOX_HTTP_SSE_SERVER_H_20260616 + +#include +#include + +#include +#include +#include +#include + +#include "sse_event.h" + +namespace tbox { +namespace http { + +namespace server { +class Server; +} + +namespace sse { + +//! SSE 服务器(Server-Sent Events, W3C/WHATWG 规范) +//! 基于 HTTP 服务器运行,本身即为 HTTP 中间件 +//! 检测 SSE 请求(Accept: text/event-stream),设置 200 响应头 +//! 通过 upgrade_cb 机制接管 TcpConnection,提供 SSE 事件推送功能 +//! SSE 是单向推送协议(服务端→客户端),无 MessageCallback +//! 通过 Cabinet 管理 SseConnection 生命期,用户通过 ConnToken 操作连接 +class SseServer { + public: + using ConnToken = cabinet::Token; + + explicit SseServer(event::Loop *wp_loop); + ~SseServer(); + + NONCOPYABLE(SseServer); + IMMOVABLE(SseServer); + + public: + //! 初始化:关联到 HTTP 服务器 + //! URL 路径匹配规则: + //! - url_path 以 '/' 结尾:前缀匹配,如 "/sse/" 匹配 "/sse/aa"、" /sse/bb/cc" + //! - url_path 不以 '/' 结尾:全量匹配,如 "/sse" 仅匹配 "/sse" + //! - url_path 为空字符串:匹配所有 SSE 请求 + bool initialize(http::server::Server *http_server, const std::string &url_path = ""); + bool start(); + void stop(); + void cleanup(); + + enum class State { kNone, kInited, kRunning }; + State state() const; + + public: + //! 设置回调(SSE 是单向推送,无 MessageCallback) + using ConnectedCallback = std::function; + using DisconnectedCallback = std::function; + + void setConnectedCallback(const ConnectedCallback &cb); + void setDisconnectedCallback(const DisconnectedCallback &cb); + + public: + //! 向指定客户端发送数据(简单文本,event 类型默认 "message") + //! 格式:"data: \n\n" + bool send(const ConnToken &client, const std::string &data); + + //! 向指定客户端发送完整 SSE 事件 + bool send(const ConnToken &client, const SseEvent &event); + + //! 向所有客户端广播数据 + bool sendToAll(const std::string &data); + + //! 向所有客户端广播事件 + bool sendToAll(const SseEvent &event); + + //! 关闭指定客户端连接 + bool close(const ConnToken &client); + + //! 发送心跳注释行(保持连接活跃) + //! 格式:": \n\n",浏览器 EventSource 忽略以 ":" 开头的行 + bool sendHeartbeat(const ConnToken &client, const std::string &comment = "keep-alive"); + + //! 设置自动心跳间隔(默认 0 = 禁用) + //! 启用后,定时器每 interval 毫秒向所有连接发送 ": keep-alive\n\n" + void setHeartbeatInterval(std::chrono::milliseconds interval); + + //! 检查客户端连接是否有效 + bool isClientValid(const ConnToken &client) const; + + //! 获取客户端地址(含 IP 与端口) + network::SockAddr peerAddr(const ConnToken &client) const; + + //! 获取客户端请求的 Last-Event-ID(浏览器重连时携带) + //! 用于实现断线续传:服务端可根据此值从断点继续推送 + std::string getLastEventId(const ConnToken &client) const; + + //! 获取客户端连接的 URL 路径 + std::string getUrl(const ConnToken &client) const; + + //! 设置/获取客户端连接的上下文数据 + using ContextDeleter = std::function; + void setContext(const ConnToken &client, void *context, ContextDeleter &&deleter = nullptr); + void* getContext(const ConnToken &client) const; + + void setContextLogEnable(bool enable); + + class Impl; + + private: + Impl *impl_; +}; + +} +} +} + +#endif //TBOX_HTTP_SSE_SERVER_H_20260616 diff --git a/modules/http/server/sse/sse_server_impl.cpp b/modules/http/server/sse/sse_server_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..33c8cf9547b6c9801986e2ba39ddd9d65070107b --- /dev/null +++ b/modules/http/server/sse/sse_server_impl.cpp @@ -0,0 +1,505 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include "sse_server.h" +#include "sse_server_impl.h" + +#include +#include +#include +#include + +#include +#include + +#undef MODULE_ID +#define MODULE_ID "tbox.http.sse" + +namespace tbox { +namespace http { +namespace sse { + +using namespace std::placeholders; + +//! === 生命周期 === + +SseServer::Impl::Impl(SseServer *wp_parent, event::Loop *wp_loop) + : wp_parent_(wp_parent) + , wp_loop_(wp_loop) + , sp_heartbeat_timer_(wp_loop->newTimerEvent()) +{ } + +SseServer::Impl::~Impl() +{ + TBOX_ASSERT(cb_level_ == 0); + cleanup(); + CHECK_DELETE_RESET_OBJ(sp_heartbeat_timer_); +} + +bool SseServer::Impl::initialize(http::server::Server *http_server, const std::string &url_path) +{ + if (state_ != SseServer::State::kNone) + return false; + + //! 记录 URL 路径 + url_path_ = url_path; + + //! 记录 HTTP 服务器指针(不立即注册中间件,等 start() 时注册) + wp_http_server_ = http_server; + + state_ = SseServer::State::kInited; + return true; +} + +bool SseServer::Impl::start() +{ + if (state_ != SseServer::State::kInited) + return false; + + //! 注册自身到 HTTP 服务器(SseServer::Impl 即为 Middleware) + mw_token_ = wp_http_server_->use(this); + + //! 如果心跳间隔已设置,启用心跳定时器 + if (heartbeat_interval_.count()> 0) { + sp_heartbeat_timer_->initialize(heartbeat_interval_, event::Event::Mode::kPersist); + sp_heartbeat_timer_->setCallback(std::bind(&SseServer::Impl::onHeartbeatTimer, this)); + sp_heartbeat_timer_->enable(); + } + + state_ = SseServer::State::kRunning; + return true; +} + +void SseServer::Impl::stop() +{ + if (state_ != SseServer::State::kRunning) + return; + + //! 从 HTTP 服务器反注册中间件 + wp_http_server_->unuse(mw_token_); + mw_token_.reset(); + + //! 停止心跳定时器 + sp_heartbeat_timer_->disable(); + + //! 清除 SseConnection 内部回调,防止断开时回调到 Impl + sse_conns_.foreach([](SseConnection *conn) { + conn->setCloseCallback(nullptr); + conn->setSendCompleteCallback(nullptr); + }); + + //! 删除所有 SseConnection(析构时会断开并延后删除 TcpConnection) + sse_conns_.foreach([](SseConnection *conn) { delete conn; }); + sse_conns_.clear(); + + state_ = SseServer::State::kInited; +} + +void SseServer::Impl::cleanup() +{ + if (state_ == SseServer::State::kNone) + return; + + if (state_ == SseServer::State::kRunning) + stop(); + + wp_http_server_ = nullptr; + + connected_cb_ = nullptr; + disconnected_cb_ = nullptr; + heartbeat_interval_ = std::chrono::milliseconds(0); + + state_ = SseServer::State::kNone; +} + +//! === Middleware 接口实现 === +void SseServer::Impl::handle(http::server::ContextSptr sp_ctx, const http::server::NextFunc &next) +{ + auto &req = sp_ctx->req(); + + if (!IsSseRequest(req)) { + //! 非 SSE 请求,传递给下一个中间件 + next(); + } + + //! URL 路径匹配规则: + //! - url_path_ 以 '/' 结尾:前缀匹配 + //! - url_path_ 不以 '/' 结尾:全量匹配 + //! - url_path_ 为空字符串:匹配所有 SSE 请求 + if (!url_path_.empty()) { + bool matched = false; + if (url_path_.back() == '/') { + //! 前缀匹配 + matched = util::string::IsStartWith(req.url.path, url_path_); + } else { + //! 全量匹配 + matched = (req.url.path == url_path_); + } + if (!matched) { + //! 不是本服务关心的 URL,传递给下一个中间件 + next(); + return; + } + } + + LogDbg("sse request: %s", req.url.path.c_str()); + + auto &res = sp_ctx->res(); + + //! 设置 200 OK 响应(SSE 不是协议升级,使用 200) + res.status_code = http::StatusCode::k200_OK; + res.http_ver = http::HttpVer::k1_1; + + //! SSE 必需的响应头 + res.headers["Content-Type"] = "text/event-stream"; + res.headers["Cache-Control"] = "no-cache"; + res.headers["Connection"] = "keep-alive"; + + //! 从请求中提取 Last-Event-ID(浏览器重连时携带) + std::string last_event_id; + auto id_iter = req.headers.find("Last-Event-ID"); + if (id_iter != req.headers.end()) + last_event_id = id_iter->second; + + //! 注册升级回调:HTTP 服务器发送 200 响应后,将 TcpConnection 交给 SseServer + //! 与 WebSocket 使用同一套 upgrade_cb 机制 + res.upgrade_cb = std::bind(&SseServer::Impl::onSseUpgrade, this, _1, req.url.path, last_event_id); + + //! SSE 请求已处理,不再调用 next() +} + +//! === 升级与连接管理 === +void SseServer::Impl::onSseUpgrade(network::TcpConnection *tcp_conn, + const std::string &url_path, + const std::string &last_event_id) +{ + RECORD_SCOPE(); + LogDbg("sse upgrade: new connection from %s", tcp_conn->peerAddr().toString().c_str()); + + //! 创建 SseConnection,并存入 Cabinet + //! 传入 URL 路径和 Last-Event-ID,供用户后续查询 + SseConnection *sse_conn = new SseConnection(wp_loop_, tcp_conn, url_path, last_event_id); + ConnToken sse_token = sse_conns_.alloc(sse_conn); + + //! 设置 SseConnection 的回调(bind 捕获 ConnToken,不传递 SseConnection*) + sse_conn->setCloseCallback(std::bind(&SseServer::Impl::onSseDisconnected, this, sse_token)); + sse_conn->setContextLogEnable(context_log_enable_); + + //! 通知用户(传递 ConnToken) + if (connected_cb_) { + ++cb_level_; + connected_cb_(sse_token); + --cb_level_; + } +} + +void SseServer::Impl::onSseDisconnected(const ConnToken &client) +{ + RECORD_SCOPE(); + LogDbg("sse disconnected"); + + //! 先通知用户(此时 ConnToken 在 Cabinet 中仍有效) + //! 用户可通过 ConnToken 调用 SseServer 方法获取连接信息 + if (disconnected_cb_) { + ++cb_level_; + disconnected_cb_(client); + --cb_level_; + } + + //! 从 Cabinet 中移除并获取指针 + SseConnection *sse_conn = sse_conns_.free(client); + + //! 延后删除 SseConnection(确保回调中还能访问对象) + wp_loop_->runNext([sse_conn] { CHECK_DELETE_OBJ(sse_conn); }, + "SseServer::onSseDisconnected, delete sse_conn"); +} + +//! === 心跳定时器 === +void SseServer::Impl::onHeartbeatTimer() +{ + //! 向所有连接发送心跳注释行 + sse_conns_.foreach([](SseConnection *conn) { + conn->sendHeartbeat("keep-alive"); + }); +} + +void SseServer::Impl::setHeartbeatInterval(std::chrono::milliseconds interval) +{ + heartbeat_interval_ = interval; + + //! 如果已在运行中,动态调整心跳定时器 + if (state_ == SseServer::State::kRunning) { + sp_heartbeat_timer_->disable(); + + if (interval.count()> 0) { + sp_heartbeat_timer_->initialize(interval, event::Event::Mode::kPersist); + sp_heartbeat_timer_->setCallback(std::bind(&SseServer::Impl::onHeartbeatTimer, this)); + sp_heartbeat_timer_->enable(); + } + } +} + +//! === 通过 ConnToken 操作连接 === +bool SseServer::Impl::send(const ConnToken &client, const std::string &data) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->send(data); + return false; +} + +bool SseServer::Impl::send(const ConnToken &client, const SseEvent &event) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->send(event); + return false; +} + +bool SseServer::Impl::sendToAll(const std::string &data) +{ + bool all_ok = true; + sse_conns_.foreach([&](SseConnection *conn) { + if (!conn->send(data)) + all_ok = false; + }); + return all_ok; +} + +bool SseServer::Impl::sendToAll(const SseEvent &event) +{ + bool all_ok = true; + sse_conns_.foreach([&](SseConnection *conn) { + if (!conn->send(event)) + all_ok = false; + }); + return all_ok; +} + +bool SseServer::Impl::close(const ConnToken &client) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->close(); + return false; +} + +bool SseServer::Impl::sendHeartbeat(const ConnToken &client, const std::string &comment) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->sendHeartbeat(comment); + return false; +} + +bool SseServer::Impl::isClientValid(const ConnToken &client) const +{ + return sse_conns_.at(client) != nullptr; +} + +network::SockAddr SseServer::Impl::peerAddr(const ConnToken &client) const +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->peerAddr(); + return network::SockAddr(); +} + +std::string SseServer::Impl::getLastEventId(const ConnToken &client) const +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->getLastEventId(); + return ""; +} + +std::string SseServer::Impl::getUrl(const ConnToken &client) const +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->getUrl(); + return ""; +} + +void SseServer::Impl::setContext(const ConnToken &client, void *context, ContextDeleter &&deleter) +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + sse_conn->setContext(context, std::move(deleter)); +} + +void* SseServer::Impl::getContext(const ConnToken &client) const +{ + auto sse_conn = sse_conns_.at(client); + if (sse_conn != nullptr) + return sse_conn->getContext(); + return nullptr; +} + +void SseServer::Impl::setContextLogEnable(bool enable) +{ + context_log_enable_ = enable; + sse_conns_.foreach([&](SseConnection *conn) { + conn->setContextLogEnable(enable); + }); +} + +bool SseServer::Impl::IsSseRequest(const http::Request &req) +{ + //! SSE 请求检测条件: + //! 1) 必须是 GET 方法 + //! 2) Accept 头必须包含 "text/event-stream" + //! 3) 不检查 Upgrade 头(SSE 不是协议升级) + + if (req.method != http::Method::kGet) + return false; + + auto accept_iter = req.headers.find("Accept"); + if (accept_iter == req.headers.end()) + return false; + + //! 检查 Accept 头是否包含 text/event-stream + //! 注意:Accept 头可能包含多个值,如 "text/event-stream, text/html" + if (accept_iter->second.find("text/event-stream") == std::string::npos) + return false; + + return true; +} + +//! === SseServer 外部接口 === +SseServer::SseServer(event::Loop *wp_loop) + : impl_(new Impl(this, wp_loop)) +{ + TBOX_ASSERT(wp_loop != nullptr); +} + +SseServer::~SseServer() +{ + CHECK_DELETE_RESET_OBJ(impl_); +} + +bool SseServer::initialize(http::server::Server *http_server, const std::string &url_path) +{ + TBOX_ASSERT(http_server != nullptr); + return impl_->initialize(http_server, url_path); +} + +bool SseServer::start() +{ + return impl_->start(); +} + +void SseServer::stop() +{ + impl_->stop(); +} + +void SseServer::cleanup() +{ + impl_->cleanup(); +} + +SseServer::State SseServer::state() const +{ + return impl_->state(); +} + +void SseServer::setConnectedCallback(const ConnectedCallback &cb) +{ + impl_->setConnectedCallback(cb); +} + +void SseServer::setDisconnectedCallback(const DisconnectedCallback &cb) +{ + impl_->setDisconnectedCallback(cb); +} + +bool SseServer::send(const ConnToken &client, const std::string &data) +{ + return impl_->send(client, data); +} + +bool SseServer::send(const ConnToken &client, const SseEvent &event) +{ + return impl_->send(client, event); +} + +bool SseServer::sendToAll(const std::string &data) +{ + return impl_->sendToAll(data); +} + +bool SseServer::sendToAll(const SseEvent &event) +{ + return impl_->sendToAll(event); +} + +bool SseServer::close(const ConnToken &client) +{ + return impl_->close(client); +} + +bool SseServer::sendHeartbeat(const ConnToken &client, const std::string &comment) +{ + return impl_->sendHeartbeat(client, comment); +} + +void SseServer::setHeartbeatInterval(std::chrono::milliseconds interval) +{ + impl_->setHeartbeatInterval(interval); +} + +bool SseServer::isClientValid(const ConnToken &client) const +{ + return impl_->isClientValid(client); +} + +network::SockAddr SseServer::peerAddr(const ConnToken &client) const +{ + return impl_->peerAddr(client); +} + +std::string SseServer::getLastEventId(const ConnToken &client) const +{ + return impl_->getLastEventId(client); +} + +std::string SseServer::getUrl(const ConnToken &client) const +{ + return impl_->getUrl(client); +} + +void SseServer::setContext(const ConnToken &client, void *context, ContextDeleter &&deleter) +{ + impl_->setContext(client, context, std::move(deleter)); +} + +void* SseServer::getContext(const ConnToken &client) const +{ + return impl_->getContext(client); +} + +void SseServer::setContextLogEnable(bool enable) +{ + impl_->setContextLogEnable(enable); +} + +} +} +} diff --git a/modules/http/server/sse/sse_server_impl.h b/modules/http/server/sse/sse_server_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..de0121de147171ecdbd0c219907c1dc67adb147a --- /dev/null +++ b/modules/http/server/sse/sse_server_impl.h @@ -0,0 +1,136 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#ifndef TBOX_HTTP_SSE_SERVER_IMPL_H_20260616 +#define TBOX_HTTP_SSE_SERVER_IMPL_H_20260616 + +#include +#include +#include +#include +#include +#include + +#include "../server.h" +#include "../middleware.h" +#include "../context.h" + +#include "sse_server.h" +#include "sse_connection.h" + +namespace tbox { +namespace http { +namespace sse { + +//! SseServer::Impl 同时充当 HTTP 中间件 +//! 检测 SSE 请求(Accept: text/event-stream),设置 200 响应头,注册 upgrade_cb +//! 通过 Cabinet 管理 SseConnection 生命期,所有操作基于 ConnToken +//! SSE 是单向推送协议,无 MessageCallback +class SseServer::Impl : public http::server::Middleware { + public: + Impl(SseServer *wp_parent, event::Loop *wp_loop); + virtual ~Impl(); + + public: + bool initialize(http::server::Server *http_server, const std::string &url_path = ""); + bool start(); + void stop(); + void cleanup(); + + SseServer::State state() const { return state_; } + + public: + void setConnectedCallback(const SseServer::ConnectedCallback &cb) { connected_cb_ = cb; } + void setDisconnectedCallback(const SseServer::DisconnectedCallback &cb) { disconnected_cb_ = cb; } + + public: + //! 通过 ConnToken 操作连接 + bool send(const ConnToken &client, const std::string &data); + bool send(const ConnToken &client, const SseEvent &event); + bool sendToAll(const std::string &data); + bool sendToAll(const SseEvent &event); + bool close(const ConnToken &client); + bool sendHeartbeat(const ConnToken &client, const std::string &comment); + bool isClientValid(const ConnToken &client) const; + network::SockAddr peerAddr(const ConnToken &client) const; + std::string getLastEventId(const ConnToken &client) const; + std::string getUrl(const ConnToken &client) const; + void setHeartbeatInterval(std::chrono::milliseconds interval); + + //! 上下文数据操作(委托到 SseConnection → TcpConnection) + using ContextDeleter = network::TcpConnection::ContextDeleter; + void setContext(const ConnToken &client, void *context, ContextDeleter &&deleter = nullptr); + void* getContext(const ConnToken &client) const; + + void setContextLogEnable(bool enable); + + //! 静态辅助方法 + static bool IsSseRequest(const http::Request &req); + + public: + //! Middleware 接口:处理 HTTP 请求,检测 SSE 请求 + virtual void handle(http::server::ContextSptr sp_ctx, const http::server::NextFunc &next) override; + + private: + //! 当 HTTP 服务器发送 200 响应后回调此函数 + //! 将 TcpConnection 从 HTTP 服务器分离,交给 SseServer 管理 + void onSseUpgrade(network::TcpConnection *tcp_conn, + const std::string &url_path, + const std::string &last_event_id); + + //! 当 SseConnection 断开时回调(参数为 ConnToken) + void onSseDisconnected(const ConnToken &client); + + //! 心跳定时器回调:向所有连接发送注释行 + void onHeartbeatTimer(); + + private: + SseServer *wp_parent_; + event::Loop *wp_loop_; + event::TimerEvent *sp_heartbeat_timer_ = nullptr; //! 心跳定时器(可选,默认禁用) + http::server::Server *wp_http_server_ = nullptr; + + //! URL 路径匹配规则: + //! - url_path_ 以 '/' 结尾:前缀匹配,如 "/sse/" 匹配 "/sse/aa" + //! - url_path_ 不以 '/' 结尾:全量匹配,如 "/sse" 仅匹配 "/sse" + //! - url_path_ 为空字符串:匹配所有 SSE 请求 + std::string url_path_; + + //! 中间件 token(由 HTTP Server 的 use() 返回,用于 unuse() 反注册) + http::server::MiddlewareToken mw_token_; + + //! SseConnection 容器(生命期管理) + cabinet::Cabinet sse_conns_; + + std::chrono::milliseconds heartbeat_interval_{0}; + + SseServer::State state_ = SseServer::State::kNone; + + SseServer::ConnectedCallback connected_cb_; + SseServer::DisconnectedCallback disconnected_cb_; + + int cb_level_ = 0; + bool context_log_enable_ = false; +}; + +} +} +} + +#endif //TBOX_HTTP_SSE_SERVER_IMPL_H_20260616 diff --git a/modules/http/server/sse/sse_server_impl_test.cpp b/modules/http/server/sse/sse_server_impl_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a534cbb4805d0396174f4944e836459e7f73568e --- /dev/null +++ b/modules/http/server/sse/sse_server_impl_test.cpp @@ -0,0 +1,90 @@ +/* + * .============. + * // M A K E / \ + * // C++ DEV / \ + * // E A S Y / \/ \ + * ++ ----------. \/\ . + * \\ \ \ /\ / + * \\ \ \ / + * \\ \ \ / + * -============' + * + * Copyright (c) 2026 Hevake and contributors, all rights reserved. + * + * This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox) + * Use of this source code is governed by MIT license that can be found + * in the LICENSE file in the root of the source tree. All contributing + * project authors may be found in the CONTRIBUTORS.md file in the root + * of the source tree. + */ +#include +#include +#include "sse_server_impl.h" + +namespace tbox { +namespace http { +namespace sse { + +TEST(SseServerImpl, DetectSseRequest) +{ + //! 合法的 SSE 请求:GET + Accept: text/event-stream + http::Request req; + req.method = http::Method::kGet; + req.http_ver = http::HttpVer::k1_1; + req.headers["Accept"] = "text/event-stream"; + + EXPECT_TRUE(SseServer::Impl::IsSseRequest(req)); +} + +TEST(SseServerImpl, DetectSseRequestWithMultipleAccept) +{ + //! Accept 头包含多个值时,仍能检测到 text/event-stream + http::Request req; + req.method = http::Method::kGet; + req.headers["Accept"] = "text/event-stream, text/html;q=0.9"; + + EXPECT_TRUE(SseServer::Impl::IsSseRequest(req)); +} + +TEST(SseServerImpl, DetectNonSseRequestNoAccept) +{ + //! 缺少 Accept 头:不是 SSE 请求 + http::Request req; + req.method = http::Method::kGet; + + EXPECT_FALSE(SseServer::Impl::IsSseRequest(req)); +} + +TEST(SseServerImpl, DetectNonSseRequestHtmlAccept) +{ + //! Accept 头不包含 text/event-stream:不是 SSE 请求 + http::Request req; + req.method = http::Method::kGet; + req.headers["Accept"] = "text/html"; + + EXPECT_FALSE(SseServer::Impl::IsSseRequest(req)); +} + +TEST(SseServerImpl, DetectNonSsePostRequest) +{ + //! POST 方法:不是 SSE 请求(SSE 必须是 GET) + http::Request req; + req.method = http::Method::kPost; + req.headers["Accept"] = "text/event-stream"; + + EXPECT_FALSE(SseServer::Impl::IsSseRequest(req)); +} + +TEST(SseServerImpl, DetectNonSsePutRequest) +{ + //! PUT 方法:不是 SSE 请求 + http::Request req; + req.method = http::Method::kPut; + req.headers["Accept"] = "text/event-stream"; + + EXPECT_FALSE(SseServer::Impl::IsSseRequest(req)); +} + +} +} +}