-
Notifications
You must be signed in to change notification settings - Fork 371
Fix: Notify app of client disconnection when request is in progress.#1556
Fix: Notify app of client disconnection when request is in progress. #1556gourav-kandoria wants to merge 2 commits into
Conversation
ac000
commented
Feb 5, 2025
Would you mind splitting the python changes into a separate commit? (within this PR...)
Would you mind splitting the python changes into a separate commit? (within this PR...)
Sure, Doing changes as per suggetions
86682b6 to
bd552e5
Compare
@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions
This caused issues especially in cases of websocket connections and SSE Events where the app continued to send data to the router, which could not deliver it to the client due to the disconnection. Changes made: Added functionality to send a port message to notify the app of client disconnection in form of port message(_NXT_PORT_MSG_CLIENT_ERROR). On the App side, handled this message and called the registered close_hanlder callback if registered.
bd552e5 to
7caee0e
Compare
ac000
commented
Feb 6, 2025
Thanks @gourav-kandoria and the whitespace checker no longer complains :-)
ac000
commented
Feb 6, 2025
@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions
Yeah, OSError would make more sense here...
But then we'd have to flow errno through...
@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions
Yeah,
OSErrorwould make more sense here...But then we'd have to flow
errnothrough...
You mean, to flow the error all the way from router to app passing exact errorno?
ac000
commented
Feb 6, 2025
Yeah,
OSErrorwould make more sense here...
But then we'd have to flowerrnothrough...You mean, to flow the error all the way from router to app passing exact errorno?
Right.
As this error will happen in the router process that is where errno(3p) will be set. In this case I guess it's likely to be either ECONNRESET or EPIPE.
I'm not saying you need to do that as that's likely not a trivial change. Or we could just hardcode say ECONNRESET
However, what you have there now looks reasonable to me, but I'd like to get @hongzhidao 's overall input on this.
ac000
commented
Feb 6, 2025
Was there an open issue about this?
hongzhidao
commented
Feb 7, 2025
However, what you have there now looks reasonable to me, but I'd like to get @hongzhidao 's overall input on this.
Will do it, thanks for the contribution.
I'm not saying you need to do that as that's likely not a trivial change. Or we could just hardcode say
ECONNRESET
Got it. So for now, will raise ECONNRESET
...ASGI spec For HTTP connections: - If the app is sending data using the send callable, according to the ASGI spec, it should throw an exception in case of client disconnection. Previously, even if we processed the client_error message and set the http->closed state, it wouldn't throw an error because it wasn't handled. This change ensures that the exception is raised as per the ASGI spec. For WebSocket connections: - If the app is awaiting on receive, it would get a 'websocket.disconnect' event. However, if the app continues to send data using the send callable after receiving this event, it wouldn't raise an error because ws->state = NXT_WS_DISCONNECTED was never set in that case. According to the ASGI spec, if send is called after receiving a 'websocket.disconnect' event or on a closed client, it should raise an exception. This change ensures that the exception is raised as per the ASGI spec.
7caee0e to
8deb39c
Compare
ac000
commented
Feb 7, 2025
Yeah, even though I said that, I'm not sure it's the right thing to do...
Do you have some reproducer for this issue?
I've been trying to myself, but no luck, I am unable to trigger either of nxt_http_request_error() or nxt_http_request_error_handler() which are the only places I see us setting r->error.
Python also seems to be notified (I.e. if m['type'] == 'websocket.disconnect': triggers) about the websocket closure even if you kill -9 the client (the kernel closes the socket) .
The only was I can get thing gummied up is if I firewall the websocket after the client connects.
With this hack
import time async def application(scope, receive, send): if scope['type'] == 'websocket': print("WebSocket!") while True: m = await receive() if m['type'] == 'websocket.connect': print("Waiting for websocket.accept") await send({'type': 'websocket.accept'}) print("Got WebSocket connection") time.sleep(5); send( { 'type': 'websocket.send', 'bytes': m.get('bytes', None), 'text': "Test", } )
If I Ctrl-c the client while the app is sleeping, the router process never attempts to send the message (even though AFAICT the python app has called send() sending the data to the router process), perhaps because Unit knows the socket is closed.
So at the moment I am having a bit of a doubt about this whole thing...
Heh, just noticed the you pointed out an open issue... after a quick skim, I'm even more confused now... that doesn't seem to be about WebSockets and this PR does...
gourav-kandoria
commented
Feb 8, 2025
Heh, just noticed the you pointed out an open issue... after a quick skim, I'm even more confused now... that doesn't seem to be about WebSockets and this PR does...
oh sorry my bad for mixing up things. Just for the issue which I mentioned earlier. I am sharing app side code and client side script which I used to test it. Will share the scenario, why also made changes websocket related files once the changes to this thing are verified.
application :
async def application_sse(scope, receive, send):
if scope['type'] == 'http':
headers = [
(b'content-type', b'text/event-stream'),
(b'cache-control', b'no-cache'),
(b'connection', b'keep-alive'),
]
await send({
'type': 'http.response.start',
'status': 200,
'headers': headers,
})
send_task = asyncio.create_task(send_messages_sse(send))
receive_task = asyncio.create_task(receive_messages_sse(receive))
await asyncio.gather(send_task, receive_task)
async def receive_messages_sse(receive):
message = await receive()
print(f'message received: {message}')
if message['type'] == 'http.disconnect':
return
async def send_messages_sse(send):
i = 0
while True:
try:
message = f"event: count\ndata: {i}\n\n"
print(f'message sent: {message}')
await send({
'type': 'http.response.body',
'body': message.encode('utf-8'),
'more_body': True
})
i+=2
await asyncio.sleep(2)
except Exception as err:
print(f'err : {err}')
break
client side script:
import asyncio
import aiohttp
async def listen_to_events():
async with aiohttp.ClientSession() as session:
async with session.get('http://localhost:8001') as response:
async for line in response.content:
if line:
print(line.decode('utf-8').strip())
if __name__ == '__main__':
asyncio.run(listen_to_events())
used ctrl+c to close the connection from client side
ac000
commented
Feb 8, 2025
Hmm, I'd never heard of Server-Sent events before, even though they pre-date WebSockets...
But, yes, I can reproduce this with the above application + curl(1). Thanks.
An interesting observation is with WebSockets we see (from router to client)
[pid 31184] sendto(27, "201円4円Test", 6, 0, NULL, 0) = 6
[pid 31184] sendto(27, "201円4円Test", 6, 0, NULL, 0) = 6
[pid 31184] sendto(27, "201円4円Test", 6, 0, NULL, 0) = 6
[pid 31184] sendmsg(16, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円315円y0円0円1円0円 1円0円0円0円0円", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
Ctrl-C'ing the client and everything behaves as expected. The connection is closed, the router process sees this and informs the Python application.
With Server-Sent Events we see instead...
[pid 31002] writev(27, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 6\n\n", iov_len=22}], 2) = 28
[pid 31002] writev(27, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 8\n\n", iov_len=22}], 2) = 28
[pid 31002] writev(27, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 10\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 31002] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=30999, si_uid=1000} ---
Once we get -EPIPE we call into nxt_http_request_error_handler() & nxt_http_request_close_handler()
We seem to use writev(3p) for HTTP sockets and sendto(2) for WebSockets... not sure why, probably not that important, just an interesting curiosity.
With your first patch we now see
[pid 31734] writev(11, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 6\n\n", iov_len=22}], 2) = 28
[pid 31734] writev(11, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 8\n\n", iov_len=22}], 2) = 28
[pid 31734] writev(11, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 10\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 31734] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=31731, si_uid=1000} ---
[pid 31734] sendmsg(31, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r0円0円0円363円{0円0円1円0円!1円0円0円0円0円", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
So the router process is now informing the Pythion application, though in this case it's still writing to stdout...AFAICT the application doesn't get the http.disconnect message...
Applying this bit of the second patch
--- a/src/python/nxt_python_asgi_http.c +++ b/src/python/nxt_python_asgi_http.c @@ -368,6 +368,11 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) "sent, after response already completed"); } + if (nxt_slow_path(http->closed)) { + return PyErr_Format(PyExc_ConnectionResetError, + "Connection Closed "); + } + if (nxt_slow_path(http->send_future != NULL)) { return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); }
We don't seem to hit that if () statement as we call this function before we call nxt_http_request_error_handler()
r->error is set in nxt_router_http_request_done(), but http->closed doesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I remember, this line will need wrapping as it just goes over the 80 character limit...
r->erroris set innxt_router_http_request_done(), buthttp->closeddoesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...
I just noticed a strange behaviour on my system. I was assuming port message received from router will call
nxt_unit_process_client_error, then from this below mentioned flow should happen.
nxt_unit_process_client_error -> nxt_py_asgi_close_handle -> nxt_py_asgi_http_close_handler.
If this happens everything seems to work correctly. But what I noticed is that, after starting unit. If I am attaching application process with debugger, in the nxt_unit_process_client_error function. I am able to receive req object by this statement
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); and now if I even stop the debugger. for subsequent tests request I am always getting this request object upon disconnection.
But if I start unit and don't attache dubgger with application process. This statement(
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0) ) is returning NULL.
Is it like in debugging mode, this object is kept in memory otherwise not?
gourav-kandoria
commented
Feb 8, 2025
r->erroris set innxt_router_http_request_done(), buthttp->closeddoesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...
http->closed is supposed to set in this function "nxt_py_asgi_http_close_handler" . which is getting set when debugger is attached with application and not when debugger is not set as I mentioned in above comment
ac000
commented
Feb 8, 2025
Certainly don't be surprised that debugging can change the behaviour of the program...
Certainly don't be surprised that debugging can change the behaviour of the program...
oh okay, So, In PR, wherever I made changes related to nxt_unit_request_hash_find. The purpose of those were to make sure that req object is not removed from hash while request is still in progress and it would get removed once nxt_unit_request_info_release is called after request completion. But It is getting removed through some other way.
So, I have two questions now:
- Is it fine, if we take this approach. I mean keeping performance or memory implications in mind ?
- If yes, How do I make sure, I am always able to get this request object from hash, untill it is released by nxt_unit_request_info_release
ac000
commented
Feb 10, 2025
I think we need to step back for a minute as there may be a more fundamental issue here.
With a WebSocket
(strace(1)ing the Python app)
epoll_wait(12,
<-- Ctrl-C the client
[{events=EPOLLIN, data={u32=8, u64=わ139869904961544}}], 3, -1) = 1
epoll_wait(12, [{events=EPOLLIN, data={u32=8, u64=わ139869904961544}}], 3, 0) = 1
recvmsg(8, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r0円0円0円243円205円0円0円1円0円 1円0円0円0円0円", iov_len=16384}], msg_iovlen=1, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=34211, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16
recvmsg(8, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(12, [], 3, 0) = 0
sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r0円0円0円252円205円0円0円0円0円 0円0円0円0円0円", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r0円0円0円252円205円0円0円0円0円 1円0円0円0円0円", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
epoll_wait(12,
So you can see that python app is sitting in epoll_wait(2), when the socket is closed, the router process notifies the python app
However, with a HTTP connection
epoll_wait(14,
<-- Ctrl-C the client
No notification is sent to the python app...
When a WebSocket is closed nxt_py_asgi_close_handler() is called but not when a HTTP connection is closed.
gourav-kandoria
commented
Feb 10, 2025
I think we need to step back for a minute as there may be a more fundamental issue here.
sure.
gourav-kandoria
commented
Feb 10, 2025
I think we need to step back for a minute as there may be a more fundamental issue here.
With a WebSocket
(strace(1)ing the Python app)
epoll_wait(12, <-- Ctrl-C the client [{events=EPOLLIN, data={u32=8, u64=わ139869904961544}}], 3, -1) = 1 epoll_wait(12, [{events=EPOLLIN, data={u32=8, u64=わ139869904961544}}], 3, 0) = 1 recvmsg(8, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r0円0円0円243円205円0円0円1円0円 1円0円0円0円0円", iov_len=16384}], msg_iovlen=1, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=34211, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16 recvmsg(8, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable) epoll_wait(12, [], 3, 0) = 0 sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r0円0円0円252円205円0円0円0円0円 0円0円0円0円0円", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16 sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r0円0円0円252円205円0円0円0円0円 1円0円0円0円0円", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16 epoll_wait(12,So you can see that python app is sitting in epoll_wait(2), when the socket is closed, the router process notifies the python app
However, with a HTTP connection
epoll_wait(14, <-- Ctrl-C the clientNo notification is sent to the python app...
When a WebSocket is closed
nxt_py_asgi_close_handler()is called but not when a HTTP connection is closed.
Just to explain what's in the PR
In Http SSE case, The code in pr is behaving this way. If app sends data to router and router fails to write to socket due to error like closed connection. It is setting r->error flag as true. and doing all resouce cleanup etc. At last nxt_router_http_request_done is being called. it is where I have plugged the code to notify app using client error msg.
So, router is only notifying when it fails to writes bytes to client not when client connection is actually closed.
ac000
commented
Feb 10, 2025
... not when client connection is actually closed.
This is the bit we need to fix first, then the other bit may not even be an issue...
Just some notes...
The message (NXT_PORT_MSG_WEBSOCKET_LAST) that notifies about the websocket disconnect is sent from nxt_http_websocket_error_handler()
In the router process when closing a WebSocket
[pid 36046] epoll_wait(17,
<-- Ctrl-C the client
[{events=EPOLLIN|EPOLLRDHUP, data={u32=671094160, u64=わ139939295532432}}], 32, 20167) = 1
[pid 36046] recvfrom(27, "", 135, 0, NULL, NULL) = 0
[pid 36046] fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(0x88, 0x1), ...}) = 0
[pid 36046] write(1, "nxt_h1p_conn_ws_error: \n", 24) = 24
[pid 36046] sendmsg(16, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円313円214円0円0円2円0円 1円0円0円0円0円", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
[pid 36046] write(1, "nxt_h1p_request_close: \n", 24) = 24
[pid 36046] write(1, "nxt_h1p_shutdown: \n", 19) = 19
[pid 36046] epoll_wait(17, [{events=EPOLLIN, data={u32=671091920, u64=わ139939295530192}}], 32, 0) = 1
[pid 36046] recvmsg(23, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円315円214円0円0円0円0円 0円0円0円0円0円", iov_len=16}, {iov_base="", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=36045, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16
[pid 36046] recvmsg(23, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 36046] write(1, "nxt_h1p_closing: \n", 18) = 18
[pid 36046] epoll_ctl(17, EPOLL_CTL_MOD, 23, {events=EPOLLIN|EPOLLRDHUP|EPOLLET, data={u32=671091920, u64=わ139939295530192}}) = 0
[pid 36046] epoll_ctl(17, EPOLL_CTL_DEL, 27, 0xa35c24) = 0
[pid 36046] epoll_wait(17, [], 32, 0) = 0
[pid 36046] close(27) = 0
[pid 36046] epoll_wait(17,
epoll_wait(2) returns EPOLLRDHUP, good...
With a HTTP connection
[pid 36043] epoll_wait(3,
<-- Ctrl-C the client
<unfinished ...>
[pid 36046] <... epoll_wait resumed>[{events=EPOLLIN|EPOLLRDHUP, data={u32=671099104, u64=わ139939295537376}}], 32, -1) = 1
[pid 36046] epoll_wait(17,
Again we get EPOLLRDHUP from epoll_wait(2), good, but we immediately go back to epoll_wait(2) again...
With a WebSocket when the connection is closed we call
955 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 956 ev->task, ev, ev->data);
in nxt_epoll_poll().
When a HTTP connection is closed, we don't.
This seems to be due to ev->read == NXT_EVENT_BLOCKED in the HTTP case and ev->read == NXT_EVENT_DEFAULT in the WebSocket case...
(削除) As a POC, this hack fixes the python SSE app (削除ここまで)
Forget that, this causes the router process to crash...
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index d53df1bc..05efc01a 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -937,17 +939,23 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) ev->epoll_eof = ((events & EPOLLRDHUP) != 0); + if (events & EPOLLRDHUP) + ev->read = NXT_EVENT_DEFAULT; +
ac000
commented
Feb 10, 2025
When using a WebSocket we get (after Ctrl-C'ing the client)
(gdb) bt
#0 nxt_epoll_block_read (engine=0x1728050, ev=0x7f9d740028e0) at src/nxt_epoll_engine.c:512
#1 0x000000000043b5dd in nxt_conn_io_read (task=task@entry=0x7f9d740039e0, obj=0x7f9d740028e0,
data=0x7f9d74003d00) at src/nxt_conn_read.c:97
#2 0x0000000000413f97 in nxt_event_engine_start (engine=engine@entry=0x1728050) at src/nxt_event_engine.c:542
#3 0x000000000041c5b4 in nxt_router_thread_start (data=0x170a910) at src/nxt_router.c:3717
#4 0x0000000000412865 in nxt_thread_trampoline (data=0x170a910) at src/nxt_thread.c:126
#5 0x00007f9d81689d22 in start_thread (arg=<optimized out>) at pthread_create.c:443
#6 0x00007f9d8170ed40 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
Calling nxt_epoll_block_read() is good...
With a HTTP connection we get
(gdb) bt
#0 nxt_epoll_enable_read (engine=0xc9d9a0, ev=0x7fcf80000cd0) at src/nxt_epoll_engine.c:418
#1 0x000000000040bbfd in nxt_port_queue_read_handler (task=task@entry=0xc9d9a0, obj=0x7fcf80000cd0,
data=<optimized out>) at src/nxt_port_socket.c:1013
#2 0x0000000000413f97 in nxt_event_engine_start (engine=engine@entry=0xc9d9a0) at src/nxt_event_engine.c:542
#3 0x000000000041c5b4 in nxt_router_thread_start (data=0xc8a390) at src/nxt_router.c:3717
#4 0x0000000000412865 in nxt_thread_trampoline (data=0xc8a390) at src/nxt_thread.c:126
#5 0x00007fcf86e89d22 in start_thread (arg=<optimized out>) at pthread_create.c:443
#6 0x00007fcf86f0ed40 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
A completely different code path...
strace(1)ing the router process
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 20\n\n", iov_len=23}], 2) = 29
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円336円220円0円0円0円0円33円0円0円0円0円0円", iov_len=16}, {iov_base="event: count\ndata: 22\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 22\n\n", iov_len=23}], 2) = 29
<-- Ctrl-C the client
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円336円220円0円0円0円0円33円0円0円0円0円0円", iov_len=16}, {iov_base="event: count\ndata: 24\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 24\n\n", iov_len=23}], 2) = 29
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円336円220円0円0円0円0円33円0円0円0円0円0円", iov_len=16}, {iov_base="event: count\ndata: 26\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 26\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 37087] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=37084, si_uid=1000} ---
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円336円220円0円0円0円0円33円0円0円0円0円0円", iov_len=16}, {iov_base="event: count\ndata: 28\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円336円220円0円0円0円0円33円0円0円0円0円0円", iov_len=16}, {iov_base="event: count\ndata: 30\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円336円220円0円0円0円0円33円0円0円0円0円0円", iov_len=16}, {iov_base="event: count\ndata: 32\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
...
The UNIX domain socket I guess is the Python app talking to the router process...
pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円336円220円0円0円0円0円33円0円0円0円0円0円", iov_len=16}, {iov_base="event: count\ndata: 22\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
Python app sends event: count\ndata: 22\n\n to the router process, then
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 22\n\n", iov_len=23}], 2) = 29
The router process then sends event: count\ndata: 22\n\n to the client.
After Crtl-C'ing the client we don't attempt to read from the client (like we do in the WebSocket case), I guess because we're not expecting to be reading any data from the client after the initial request.
That's fine, but we do need to properly handle EPOLLRDHUP, hmm, but even then that'll only fix it for epoll(7) we still need to worry about all the other poll mechanisms...
After Crtl-C'ing the client we don't attempt to read from the client (like we do in the WebSocket case), I guess because we're not expecting to be reading any data from the client after the initial request.
makes sense. I just want to understand. Why it won't be okay. If at the time of writing back to socket. we notify app of disconnection. I get that, from the app perspective. it would be like some bytes have been sent but not actually, because, router would have discarded them and then notify the app of disconnection.
ac000
commented
Feb 12, 2025
makes sense. I just want to understand. Why it won't be okay. If at the time of writing back to socket. we notify app of disconnection. I get that, from the app perspective. it would be like some bytes have been sent but not actually, because, router would have discarded them and then notify the app of disconnection.
I dunno, maybe that is the right approach... however at the moment if you have an application that starts a chunked transfer, (E.g. a SSE application) but then doesn't send any data you can effectively DOS the server by opening and closing a bunch of connections to the application, as you then end up with a bunch of connections in CLOSE_WAIT. E.g.
$ ss -tnp state CLOSE-WAIT | sed 's/\[[^[]*\]/[::1]/g' Recv-Q Send-Q Local Address:Port Peer Address:Port Process 1 0 [::1]:8000 [::1]:59800 users:(("unitd",pid=41460,fd=31)) 1 0 [::1]:8000 [::1]:54674 users:(("unitd",pid=41460,fd=27)) 1 0 [::1]:8000 [::1]:35932 users:(("unitd",pid=41460,fd=30)) 1 0 [::1]:8000 [::1]:35918 users:(("unitd",pid=41460,fd=29))
See this cloudflare article for the gory details.
Hmm, do you want to re-work your patches to not do any WebSocket stuff as that seems to be handled correctly as is and use EPIPE for the OSError? (although I think you were only doing that for the websocket case).
ac000
commented
Feb 13, 2025
So in trying a variation of your patches to try and handle the client closing the connection, I have (hopefully it's all there as I tried to leave most of my debug code behind...)
diff --git a/src/nxt_port.h b/src/nxt_port.h index 772fb41a..e801a2ee 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -59,6 +59,8 @@ struct nxt_port_handlers_s { /* Status report. */ nxt_port_handler_t status; + nxt_port_handler_t client_close; + nxt_port_handler_t oosm; nxt_port_handler_t shm_ack; nxt_port_handler_t read_queue; @@ -115,6 +117,8 @@ typedef enum { _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart), _NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status), + _NXT_PORT_MSG_CLIENT_CLOSE = nxt_port_handler_idx(client_close), + _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue), @@ -160,6 +164,8 @@ typedef enum { NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART), NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS), + NXT_PORT_MSG_CLIENT_CLOSE = nxt_msg_last(_NXT_PORT_MSG_CLIENT_CLOSE), + NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM), NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK), NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE, diff --git a/src/nxt_router.c b/src/nxt_router.c index 44ea823b..bf8f5ff1 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5276,6 +5276,8 @@ nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; + printf("%s: \n", __func__); + r = obj; nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); @@ -5295,11 +5297,22 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; + printf("%s: \n", __func__); + r = data; nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); if (r->req_rpc_data != NULL) { + nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data; + + printf("%s: Sending [NXT_PORT_MSG_CLIENT_CLOSE] message / [%d]...\n", + __func__, req_rpc_data->stream); + nxt_port_socket_write(task, req_rpc_data->app_port, + NXT_PORT_MSG_CLIENT_CLOSE, -1, + req_rpc_data->stream, + task->thread->engine->port->id, NULL); + nxt_request_rpc_data_unlink(task, r->req_rpc_data); } diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 966a6c0f..fe62861c 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_client_close(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); @@ -1121,6 +1123,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, rc = nxt_unit_process_websocket(ctx, &recv_msg); break; + case _NXT_PORT_MSG_CLIENT_CLOSE: + printf("%s: Got message [NXT_PORT_MSG_CLIENT_CLOSE]\n", __func__); + rc = nxt_unit_process_client_close(ctx, &recv_msg); + break; + case _NXT_PORT_MSG_REMOVE_PID: if (nxt_slow_path(recv_msg.size != sizeof(pid))) { nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " @@ -1377,18 +1384,18 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; + } + if (req->content_length > (uint64_t) (req->content_buf->end - req->content_buf->free)) { - res = nxt_unit_request_hash_add(ctx, req); - if (nxt_slow_path(res != NXT_UNIT_OK)) { - nxt_unit_req_warn(req, "failed to add request to hash"); - - nxt_unit_request_done(req, NXT_UNIT_ERROR); - - return NXT_UNIT_ERROR; - } - /* * If application have separate data handler, we may start * request processing and process data when it is arrived. @@ -1418,7 +1425,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_mmap_buf_t *b; nxt_unit_request_info_t *req; - req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); if (req == NULL) { return NXT_UNIT_OK; } @@ -1723,6 +1730,34 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } +static int +nxt_unit_process_client_close(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg) +{ + nxt_unit_impl_t *lib; + nxt_unit_callbacks_t *cb; + nxt_unit_request_info_t *req; + + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); + if (req == NULL) { + printf("%s: ERROR [req] not found for stream [%d]\n", __func__, + recv_msg->stream); + return NXT_UNIT_OK; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + cb = &lib->callbacks; + + if (cb->close_handler) { + cb->close_handler(req); + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + } + + return NXT_UNIT_OK; +} + + static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) { @@ -6530,6 +6565,7 @@ nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, case NXT_OK: req_impl->in_hash = 1; + printf("%s: Added req for stream [%d]\n", __func__, *stream); return NXT_UNIT_OK; default: @@ -6557,6 +6593,7 @@ nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) pthread_mutex_lock(&ctx_impl->mutex); if (remove) { + printf("%s: Removing req for stream [%d]\n", __func__, stream); res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); } else {
But I think I see the issue you were having with looking up req in the hash table, this seems to be simply due to the fact that it isn't being added.
I.e. we don't hit this code
+ res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; + } +
Trying it earlier I.e. above the if () statement, does result in it being added but also in general breakage...
Perhaps @hongzhidao has some idea?
See this cloudflare article for the gory details.
nice read.
Hmm, do you want to re-work your patches to not do any WebSocket stuff as that seems to be handled correctly as is and use
EPIPEfor the OSError? (although I think you were only doing that for the websocket case).
well, I guess keeping this would still make sense,
ws->state = NXT_WS_DISCONNECTED;
if (ws->receive_future == NULL) {
return;
}
Because, if there is no receive awaiting. then at the time of when send is called this set state would raise exception as should be done as per asgi spec. Basically the point here is to keep the state correctly regardless of whether some receive future is awaiting or not. as is done in nxt_py_asgi_http_close_handler.
Also, even if exception is occuring on router side, But it is not being propagated to application as if we see in
nxt_py_asgi_websocket_close_handler function. There isn't anything that tells us what was the OSError occured.
So, in the absence of any particular error code, we should raise any exception that is atlease subClass of osError as per asgi spec
ac000
commented
Feb 13, 2025
Summary of issues.
With the following python ASGI application
async def application(scope, receive, send): while True: m = await receive() if m['type'] == 'http.disconnect': print("Client Disconnect") break await send( { "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"]], } ) await send( { 'type': 'http.response.body', 'body': b"Testing...\n", 'more_body': True } )
Opening a connection with curl and then Ctrl-C'ing it
$ curl rhel-9:8000/ Testing... ^C
Results in a connection stuck in CLOSE_WAIT and a leaked file descriptor
tcp CLOSE-WAIT 1 0 [::1]:8000 [::!]:47140
This patch at least gets rid of the connection being stuck in CLOSE_WAIT, but it doesn't look like the application is notified.
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index d53df1bc..2d5b6fd0 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -936,18 +950,26 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) #if (NXT_HAVE_EPOLL_EDGE) ev->epoll_eof = ((events & EPOLLRDHUP) != 0); + if (ev->epoll_eof) + ev->read = NXT_EVENT_INACTIVE; #endif
With this python ASGI application
import asyncio async def application(scope, receive, send): if scope['type'] == 'http': headers = [ (b'content-type', b'text/event-stream'), (b'cache-control', b'no-cache'), (b'connection', b'keep-alive'), ] await send({ 'type': 'http.response.start', 'status': 200, 'headers': headers, }) send_task = asyncio.create_task(send_messages_sse(send)) receive_task = asyncio.create_task(receive_messages_sse(receive)) await asyncio.gather(send_task, receive_task) async def receive_messages_sse(receive): message = await receive() print(f'message received: {message}') if message['type'] == 'http.disconnect': print(f'http.disconnect') return async def send_messages_sse(send): i = 0 while True: try: message = f"event: count\ndata: {i}\n\n" print(f'message sent: {message}') await send({ 'type': 'http.response.body', 'body': message.encode('utf-8'), 'more_body': True }) i+=2 await asyncio.sleep(2) except Exception as err: print(f'err : {err}') break
The above patch causes the router process to segfault.
Without the above patch, if you open a connection to the application then Ctrl-C it, you don't get the CLOSE_WAIT issue, but the application also isn't notified that the connection is closed and so keeps sending messages to the router process.
With this slightly modified version of @gourav-kandoria 's patch
diff --git a/src/nxt_port.h b/src/nxt_port.h index 772fb41a..e801a2ee 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -59,6 +59,8 @@ struct nxt_port_handlers_s { /* Status report. */ nxt_port_handler_t status; + nxt_port_handler_t client_close; + nxt_port_handler_t oosm; nxt_port_handler_t shm_ack; nxt_port_handler_t read_queue; @@ -115,6 +117,8 @@ typedef enum { _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart), _NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status), + _NXT_PORT_MSG_CLIENT_CLOSE = nxt_port_handler_idx(client_close), + _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue), @@ -160,6 +164,8 @@ typedef enum { NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART), NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS), + NXT_PORT_MSG_CLIENT_CLOSE = nxt_msg_last(_NXT_PORT_MSG_CLIENT_CLOSE), + NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM), NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK), NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE, diff --git a/src/nxt_router.c b/src/nxt_router.c index 44ea823b..bf8f5ff1 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5276,6 +5276,8 @@ nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; + printf("%s: \n", __func__); + r = obj; nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); @@ -5295,11 +5297,22 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; + printf("%s: \n", __func__); + r = data; nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); if (r->req_rpc_data != NULL) { + nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data; + + printf("%s: Sending [NXT_PORT_MSG_CLIENT_CLOSE] message / [%d]...\n", + __func__, req_rpc_data->stream); + nxt_port_socket_write(task, req_rpc_data->app_port, + NXT_PORT_MSG_CLIENT_CLOSE, -1, + req_rpc_data->stream, + task->thread->engine->port->id, NULL); + nxt_request_rpc_data_unlink(task, r->req_rpc_data); } diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 966a6c0f..866d1e1d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_client_close(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); @@ -1121,6 +1123,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, rc = nxt_unit_process_websocket(ctx, &recv_msg); break; + case _NXT_PORT_MSG_CLIENT_CLOSE: + printf("%s: Got message [NXT_PORT_MSG_CLIENT_CLOSE]\n", __func__); + rc = nxt_unit_process_client_close(ctx, &recv_msg); + break; + case _NXT_PORT_MSG_REMOVE_PID: if (nxt_slow_path(recv_msg.size != sizeof(pid))) { nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " @@ -1418,7 +1425,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_mmap_buf_t *b; nxt_unit_request_info_t *req; - req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); if (req == NULL) { return NXT_UNIT_OK; } @@ -1723,6 +1730,35 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } +static int +nxt_unit_process_client_close(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg) +{ + nxt_unit_impl_t *lib; + nxt_unit_callbacks_t *cb; + nxt_unit_request_info_t *req; + + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); + if (req == NULL) { + printf("%s: ERROR [req] not found for stream [%d]\n", __func__, + recv_msg->stream); + return NXT_UNIT_OK; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + cb = &lib->callbacks; + + if (cb->close_handler) { + printf("%s: Calling [cb->close_handler(req)]\n", __func__); + cb->close_handler(req); + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + } + + return NXT_UNIT_OK; +} + + static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) { @@ -4826,10 +4862,22 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) continue; } +#if 1 + printf("%s: Adding req to hash table\n", __func__); + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } +#endif + if (req->content_length > (uint64_t) (req->content_buf->end - req->content_buf->free)) { - res = nxt_unit_request_hash_add(ctx, req); +#if 0 if (nxt_slow_path(res != NXT_UNIT_OK)) { nxt_unit_req_warn(req, "failed to add request to hash"); @@ -4837,7 +4885,7 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) continue; } - +#endif /* * If application have separate data handler, we may start * request processing and process data when it is arrived. diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c index 702f4d8d..2ed6f964 100644 --- a/src/python/nxt_python_asgi.c +++ b/src/python/nxt_python_asgi.c @@ -626,10 +626,13 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req) { + printf("%s: \n", __func__); + if (req->request->websocket_handshake) { nxt_py_asgi_websocket_close_handler(req); } else { + printf("%s: Calling [nxt_py_asgi_http_close_handler(req)]\n", __func__); nxt_py_asgi_http_close_handler(req); } } diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c index cdd6357e..81c97b7e 100644 --- a/src/python/nxt_python_asgi_http.c +++ b/src/python/nxt_python_asgi_http.c @@ -362,6 +362,12 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) Py_ssize_t body_len, body_off; nxt_py_asgi_ctx_data_t *ctx_data; + printf("%s: \n", __func__); + + if (http->closed) { + return PyErr_Format(PyExc_RuntimeError, "peer closed conenction"); + } + if (nxt_slow_path(http->complete)) { return PyErr_Format(PyExc_RuntimeError, "Unexpected ASGI message 'http.response.body' " @@ -646,6 +652,7 @@ nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req) if (nxt_fast_path(http != NULL)) { http->closed = 1; + printf("%s: Calling [nxt_py_asgi_http_emit_disconnect(http)]\n", __func__); nxt_py_asgi_http_emit_disconnect(http); } }
On the first connection attempt and Ctrl-C it seems to do the right thing and the application stops sending messages.
However on subsequent connections it's back to the original problem. as it doesn't seem to be adding req to the hash table, I notice its streamid has changed from 8 to 9, so perhaps things didn't get cleaned up properly previously.
Hi @gourav-kandoria @ac000,
This looks like a topic of whether to support client abort, just my two cents.
- It can be an option like
ignore_client_abort,notify_client_abort, etc, and the default behavior is ignoring it so that we don't change any behavior on the client by default. - The option seems to be in the
applicationsobject. - I feel we need to call
nxt_conn_read()with a closed handler in the correct place to track the client connection. It seems not a good idea to rely onr->errororr->closed. - Welcome to add unit tests on it.
ac000
commented
Feb 15, 2025
@hongzhidao Thanks for your input.
It can be an option like ignore_client_abort, notify_client_abort, etc, and the default behaviour is ignoring it so that we don't change any behaviour on the client by default.
Hmm, I'm curious why you think this should be optional behaviour and not just the standard behaviour of a properly functioning server?
I feel we need to call nxt_conn_read() with a closed handler in the correct place to track the client connection. It seems not a good idea to rely on r->error or r->closed.
Let's deal with the case where the client end of the connection has been closed (for whatever reason).
On Linux with epoll(7) and EPOLLRDHUP we are notified when this happens in nxt_epoll_poll() and with WebSockets this seems to be handled properly and the socket is properly closed on the server and the application is notified with a websocket.disconnect.
However with a HTTP connection things seem to go awry due to NXT_EVENT_BLOCKED (not really sure what is blocked and why) being set, we hit this condition and then we don't go through things like nxt_conn_io_read() which would detect this via reading 0 bytes, although that should be unnecessary due to EPOLLRDHUP.
But then we also do need someway of notifying the application that the client has closed the connection.
I guess this all does just confirm my suspicion that Unit was never designed with Server-sent events applications in mind...
@ac000 You are welcome.
Hmm, I'm curious why you think this should be optional behaviour and not just the standard behaviour of a properly functioning server?
Since I feel like this is a change, by default I tend not to change the original behavior, such as sometimes the connection is broken, but the application wants to continue processing requests, but welcome to continue the discussion.
Let's deal with the case where the client end of the connection has been closed (for whatever reason).
NXT_EVENT_BLOCKED (not really sure what is blocked and why) being set
In Unit, it's about the way of how to trigger the event engine. Compared to the traditional way, if any event happens, its callback handler is usually called immediately. But in Unit, we need to actively call nxt_conn_read actively (), then its handler will be called. That's why NXT_EVENT_BLOCKED is introduced.
reading 0 bytes, although that should be unnecessary due to EPOLLRDHUP.
Yep, epoll has such an event type, but Unit also supports other event engines.
Since I feel like this is a change, by default I tend not to change the original behavior, such as sometimes the connection is broken, but the application wants to continue processing requests, but welcome to continue the discussion.
Yes, makes sense. but if we see in traditional servers . Let's say even they don't read after initial request. Then at the time of sending data back, the application immediately know that it is writing to broken socket. But here, this information is not being propagated back to application. I also think adding options like notify_client_abort would only be an extra bit of hassle for the users. Because, any framework or application is usually written assuming the ways servers traditionaly works and some languages like python, java have specification like asgi, wsgi, servelet which also assume same thing. So, my suggestion would be if request is in progress and while sending data back if disconnected socket is detected. notify application process and then do something as per language specific specification.
However with a HTTP connection things seem to go awry due to NXT_EVENT_BLOCKED (not really sure what is blocked and
why) being set, we hit this condition and then we don't go through things like nxt_conn_io_read() which would detect this via > reading 0 bytes,
I am not sure. But It might be blocked from reading. Because, when first request is completely read, second request might also be pipelined after it as in http1.1 multiple requests can be server on same TCP connection. So, only after first request response is sent back, then router should start reading and processing second request bytes. Yeah but sure, EPOLLRDHUP and EPOLLRHUP can be listened to.
ac000
commented
Feb 17, 2025
and with WebSockets this seems to be handled properly and the socket is properly closed on the server and the application is notified with a websocket.disconnect.
Hmm, not with
import time from fastapi import FastAPI, WebSocket application = FastAPI() @application.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): print("Listening on WebSocket") await websocket.accept() while True: await websocket.send_text(f"Testing...") time.sleep(2)
When ^Cing the client, the socket is closed on the server but it seems the application continues to want to send data...
gourav-kandoria
commented
Feb 17, 2025
and with WebSockets this seems to be handled properly and the socket is properly closed on the server and the application is notified with a websocket.disconnect.
Hmm, not with
import time from fastapi import FastAPI, WebSocket application = FastAPI() @application.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): print("Listening on WebSocket") await websocket.accept() while True: await websocket.send_text(f"Testing...") time.sleep(2)When
^Cing the client, the socket is closed on the server but it seems the application continues to want to send data...
If you try the websocket file patch it should work
ac000
commented
Feb 17, 2025
If you try the websocket file patch it should work
Hmm, actually it doesn't, because unit is half doing the right thing, it's detecting the client has closed the connection and closes the socket, no error is detected or set (r->error).
[pid 91251] recvfrom(27<TCPv6:[[::1]:8000->[::!]:40818]>, "", 135, 0, NULL, NULL) = 0
[pid 91251] sendmsg(16<UNIX:[210062->210061,@"1d818"]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="10円0円0円0円pd1円0円1円0円!1円0円0円0円0円", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
[pid 91251] sendto(27<TCPv6:[[::!]:8000->[::1]:40818]>, "211円0円", 2, 0, NULL, 0) = 2
[pid 91251] close(27<TCPv6:[[::1]:8000->[::1]:40818]>) = 0
Unit read 0 bytes from the client signalling eof/connection close/shutdown. It just doesn't tell the application. I'd be happy if I could get to this state with HTTP connections, then the rest should fall into place...
ac000
commented
Feb 17, 2025
The other problem this causes (that the application is still trying to send data to the client) is that running unit in the foreground and ^C'ing it leaves the application processes still running...
Hmm, actually it doesn't, because unit is half doing the right thing, it's detecting the client has closed the connection and closes the socket, no error is detected or set (r->error).
It might have something to do with a variable "notify"
These are the two different logs, one where exception occured in application one where it didn't
2025年02月18日 22:19:23.137 [debug] 8249#8252 *114 http websocket error handler
2025年02月18日 22:19:23.137 [debug] 8249#8252 *114 port{8405,0} 107: enqueue 16 notify 1, 0
2025年02月18日 22:29:53.867 [debug] 15231#15237 *27 http websocket error handler
2025年02月18日 22:29:53.867 [debug] 15231#15237 *27 port{15233,0} 23: enqueue 16 notify 0, 0
Now, this is even more confusing, why with simple asgi apps, it is 1 and with framework like fastapi it is 0.
The other problem this causes (that the application is still trying to send data to the client) is that running unit in the foreground and ^C'ing it leaves the application processes still running...
Maybe in case of fastapi, after websocket connection, port messages are not getting delivered to app. Because, when we ctrl+c the foreground running unit. Application process receives this message _NXT_PORT_MSG_QUIT and quits itself. But here it not getting this message. Same is happening for client disconnection, This message(NXT_PORT_MSG_WEBSOCKET_LAST) is not getting delivered to application process
The other problem this causes (that the application is still trying to send data to the client) is that running unit in the foreground and ^C'ing it leaves the application processes still running...
Maybe in case of fastapi, after websocket connection, port messages are not getting delivered to app. Because, when we ctrl+c the foreground running unit. Application process receives this message _NXT_PORT_MSG_QUIT and quits itself. But here it not getting this message. Same is happening for client disconnection, This message(NXT_PORT_MSG_WEBSOCKET_LAST) is not getting delivered to application process
Got the issue, this isn't any issue with unit but with this block of code. This loop never seems to leave CPU. giving time to event loop to process incoming messages
while True:
await websocket.send_text(f"Testing...")
print('sent message')
time.sleep(2)
If instead of time.sleep(2) , asyncio.sleep(2) is used, app is getting disconnection notification.
gourav-kandoria
commented
Feb 24, 2025
@ac000 @hongzhidao . How do we proceed on this?
ac000
commented
Feb 24, 2025
Not forgotten about this, but was looking at some other issues.
In simple terms we just need to make Unit recognise when a client closes the connection. Still trying to figure out the how...
gourav-kandoria
commented
Oct 6, 2025
@ac000 anything on this. seems like world has forgotten about this pr 😂😂
Previously, the app was not notified when the client disconnected. This caused issues especially in cases of websocket connections and SSE Events where the app continued to send data to the router, which could not deliver it to the client due to the disconnection.
Changes made:
This ensures that the app is properly informed of client disconnections and can handle them according to the ASGI spec.