Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add incremental updating of open streams count and closed_streams state #1185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
kahuang wants to merge 9 commits into python-hyper:master
base: master
Choose a base branch
Loading
from kahuang:master

Conversation

@kahuang
Copy link

@kahuang kahuang commented Feb 17, 2019

This fixes #1184

Copy link
Author

kahuang commented Feb 17, 2019

On the old code, here is how test_concurrent_stream_open_performance runs:

============================================================= FAILURES =============================================================
___________________________ TestConcurrentStreamOpenPerformance.test_concurrent_stream_open_performance ____________________________
self = <test_concurrent_stream_open.TestConcurrentStreamOpenPerformance object at 0x107d86a50>
frame_factory = <helpers.FrameFactory object at 0x107d86cd0>
 def test_concurrent_stream_open_performance(self, frame_factory):
 """
 Opening many concurrent streams is constant time operation
 """
 num_concurrent_streams = 10000
 c = h2.connection.H2Connection()
 c.initiate_connection()
 start = time.time()
 for i in xrange(num_concurrent_streams):
 c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False)
 c.clear_outbound_data_buffer()
 end = time.time()
 
 run_time = end - start
> assert run_time < 3
E assert 36.598387002944946 < 3
test/test_concurrent_stream_open.py:51: AssertionError
==================================================== 1 failed in 36.66 seconds =====================================================

New code:

============================================================================================================ test session starts =============================================================================================================
platform darwin -- Python 2.7.15, pytest-3.4.2, py-1.7.0, pluggy-0.6.0
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/Users/andrew/workspace/hyper-h2/.hypothesis/examples')
rootdir: /Users/andrew/workspace/hyper-h2, inifile:
plugins: xdist-1.22.2, profiling-1.6.0, forked-1.0.2, cov-2.5.1, hypothesis-4.5.11
collected 1 item 
test/test_concurrent_stream_open.py . [100%]
========================================================================================================== 1 passed in 1.06 seconds ==========================================================================================================

Note that for this example, I commented out the second "test" in the test_concurrent_stream_open_performance function since the old code fails the first "test".

As you can see, this is a 36X perf improvement

Copy link
Author

kahuang commented Feb 17, 2019

So the lints I can clean up (I assume I can just run autopep8 or something similar?), but the code coverage tests are failing for some versions of python for these lines:

h2/stream.py 479 0 106 1 99% 809->814

Pasting the code here for reference:

def sync_state_change(func):
 def wrapper(self, *args, **kwargs):
 # Collect state at the beginning.
 start_state = self.state_machine.state
 started_open = self.open
 started_closed = not started_open
 # Do the state change (if any).
 result = func(self, *args, **kwargs)
 # Collect state at the end.
 end_state = self.state_machine.state
 ended_open = self.open
 ended_closed = not ended_open
 # If at any point we've tranwsitioned to the CLOSED state
 # from any other state, close our stream.
 if end_state == StreamState.CLOSED and start_state != end_state:
 if self._close_stream_callback:
 self._close_stream_callback(self.stream_id)
 # Clear callback so we only call this once per stream
 self._close_stream_callback = None
 # If we were open, but are now closed, decrement
 # the open stream count, and call the close callback.
 if started_open and ended_closed:
 if self._decrement_open_stream_count_callback:
 self._decrement_open_stream_count_callback(self.stream_id,
 -1,)
 # Clear callback so we only call this once per stream
 self._decrement_open_stream_count_callback = None
 # If we were closed, but are now open, increment
 # the open stream count.
 elif started_closed and ended_open:
> if self._increment_open_stream_count_callback:
> self._increment_open_stream_count_callback(self.stream_id,
> 1,)
> # Clear callback so we only call this once per stream
> self._increment_open_stream_count_callback = None
 return result
 return wrapper

Which is odd, since I can insert a print statement there and verify that the code is getting called, not to mention the counts of open outbound/inbound streams would be completely wrong if that code wasn't getting called.

Is this a quirk with the code coverage tool?

Copy link
Author

kahuang commented Feb 17, 2019

Oh I see, it's because the if() never evaluates to False. If I remove the conditional the coverage tests pass

The reason that conditional is there is for defensive reasons. A function wrapped by sync_state_change can call another function that is also wrapped by sync_state_change, and we don't want to update state twice.

I can write a specific test to exercise this behavior for that conditional

Copy link
Author

kahuang commented Feb 21, 2019

@Lukasa @pgjones This is ready for review (picking these names based on recent merge commits)

Copy link
Member

pgjones commented Apr 21, 2019

Thanks, I've managed to find time to understand the problem - but I'm not sure about the solution. It would be helpful if @Lukasa could comment on the general solution and how it fits in with the codebase. I'd then be happy to comment on the details.

Copy link
Author

kahuang commented Jun 2, 2019

Any updates on this @pgjones @Lukasa ?

ardzoht reacted with thumbs up emoji

Copy link

dimaqq commented Mar 23, 2020

I'm very glad someone already thought that this open stream count may be a problem 😍

However the @sync_state_change annotation on every method in H2Stream here seems invasive and possibly error-prone. Its flip-side, callbacks, don't seem very Pythonic 🤔

The open stream count is essentially a cache; could it be implemented in some other way?
For example, could it be something akin to a weakset/dict?
Or, perhaps the count could be recalculated on demand (i.e. on new stream or stream end mark cache dirty; next time recalculate)?
Or, perhaps stream state machine could have an output STREAM_CLOSED that the connection receives after pumping stream's events to the state machine, at which point stream can be removed from .streams as opposed to removal during counting?
Or, maybe, .streams could be simply split into .inbound_streams/.outbound_streams/.promises so that only specific set is ever evaluated (against own or peer's MAX_CONCURRENT_STREAMS)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

No reviews

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

Quadratic performance of h2.connection.H2Connection._open_streams

AltStyle によって変換されたページ (->オリジナル) /