-
Notifications
You must be signed in to change notification settings - Fork 169
Add incremental updating of open streams count and closed_streams state #1185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a633092 to
8d6df03
Compare
quadratic performance with large number of open streams
kahuang
commented
Feb 17, 2019
On the old code, here is how test_concurrent_stream_open_performance runs:
============================================================= FAILURES =============================================================
___________________________ TestConcurrentStreamOpenPerformance.test_concurrent_stream_open_performance ____________________________
self = <test_concurrent_stream_open.TestConcurrentStreamOpenPerformance object at 0x107d86a50>
frame_factory = <helpers.FrameFactory object at 0x107d86cd0>
def test_concurrent_stream_open_performance(self, frame_factory):
"""
Opening many concurrent streams is constant time operation
"""
num_concurrent_streams = 10000
c = h2.connection.H2Connection()
c.initiate_connection()
start = time.time()
for i in xrange(num_concurrent_streams):
c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False)
c.clear_outbound_data_buffer()
end = time.time()
run_time = end - start
> assert run_time < 3
E assert 36.598387002944946 < 3
test/test_concurrent_stream_open.py:51: AssertionError
==================================================== 1 failed in 36.66 seconds =====================================================
New code:
============================================================================================================ test session starts =============================================================================================================
platform darwin -- Python 2.7.15, pytest-3.4.2, py-1.7.0, pluggy-0.6.0
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/Users/andrew/workspace/hyper-h2/.hypothesis/examples')
rootdir: /Users/andrew/workspace/hyper-h2, inifile:
plugins: xdist-1.22.2, profiling-1.6.0, forked-1.0.2, cov-2.5.1, hypothesis-4.5.11
collected 1 item
test/test_concurrent_stream_open.py . [100%]
========================================================================================================== 1 passed in 1.06 seconds ==========================================================================================================
Note that for this example, I commented out the second "test" in the test_concurrent_stream_open_performance function since the old code fails the first "test".
As you can see, this is a 36X perf improvement
kahuang
commented
Feb 17, 2019
So the lints I can clean up (I assume I can just run autopep8 or something similar?), but the code coverage tests are failing for some versions of python for these lines:
h2/stream.py 479 0 106 1 99% 809->814
Pasting the code here for reference:
def sync_state_change(func):
def wrapper(self, *args, **kwargs):
# Collect state at the beginning.
start_state = self.state_machine.state
started_open = self.open
started_closed = not started_open
# Do the state change (if any).
result = func(self, *args, **kwargs)
# Collect state at the end.
end_state = self.state_machine.state
ended_open = self.open
ended_closed = not ended_open
# If at any point we've tranwsitioned to the CLOSED state
# from any other state, close our stream.
if end_state == StreamState.CLOSED and start_state != end_state:
if self._close_stream_callback:
self._close_stream_callback(self.stream_id)
# Clear callback so we only call this once per stream
self._close_stream_callback = None
# If we were open, but are now closed, decrement
# the open stream count, and call the close callback.
if started_open and ended_closed:
if self._decrement_open_stream_count_callback:
self._decrement_open_stream_count_callback(self.stream_id,
-1,)
# Clear callback so we only call this once per stream
self._decrement_open_stream_count_callback = None
# If we were closed, but are now open, increment
# the open stream count.
elif started_closed and ended_open:
> if self._increment_open_stream_count_callback:
> self._increment_open_stream_count_callback(self.stream_id,
> 1,)
> # Clear callback so we only call this once per stream
> self._increment_open_stream_count_callback = None
return result
return wrapper
Which is odd, since I can insert a print statement there and verify that the code is getting called, not to mention the counts of open outbound/inbound streams would be completely wrong if that code wasn't getting called.
Is this a quirk with the code coverage tool?
kahuang
commented
Feb 17, 2019
Oh I see, it's because the if() never evaluates to False. If I remove the conditional the coverage tests pass
The reason that conditional is there is for defensive reasons. A function wrapped by sync_state_change can call another function that is also wrapped by sync_state_change, and we don't want to update state twice.
I can write a specific test to exercise this behavior for that conditional
kahuang
commented
Feb 21, 2019
pgjones
commented
Apr 21, 2019
Thanks, I've managed to find time to understand the problem - but I'm not sure about the solution. It would be helpful if @Lukasa could comment on the general solution and how it fits in with the codebase. I'd then be happy to comment on the details.
kahuang
commented
Jun 2, 2019
dimaqq
commented
Mar 23, 2020
I'm very glad someone already thought that this open stream count may be a problem 😍
However the @sync_state_change annotation on every method in H2Stream here seems invasive and possibly error-prone. Its flip-side, callbacks, don't seem very Pythonic 🤔
The open stream count is essentially a cache; could it be implemented in some other way?
For example, could it be something akin to a weakset/dict?
Or, perhaps the count could be recalculated on demand (i.e. on new stream or stream end mark cache dirty; next time recalculate)?
Or, perhaps stream state machine could have an output STREAM_CLOSED that the connection receives after pumping stream's events to the state machine, at which point stream can be removed from .streams as opposed to removal during counting?
Or, maybe, .streams could be simply split into .inbound_streams/.outbound_streams/.promises so that only specific set is ever evaluated (against own or peer's MAX_CONCURRENT_STREAMS)?
This fixes #1184