diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index afa46cd916..025a4c6363 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -69,6 +69,61 @@ def decode_wanted(parts): return wanted +class SsyncBufferedHTTPResponse(bufferedhttp.BufferedHTTPResponse, object): + def __init__(self, *args, **kwargs): + super(SsyncBufferedHTTPResponse, self).__init__(*args, **kwargs) + self.ssync_response_buffer = '' + self.ssync_response_chunk_left = 0 + + def readline(self, size=1024): + """ + Reads a line from the SSYNC response body. + + httplib has no readline and will block on read(x) until x is + read, so we have to do the work ourselves. A bit of this is + taken from Python's httplib itself. + """ + data = self.ssync_response_buffer + self.ssync_response_buffer = '' + while '\n' not in data and len(data) < size: + if self.ssync_response_chunk_left == -1: # EOF-already indicator + break + if self.ssync_response_chunk_left == 0: + line = self.fp.readline() + i = line.find(';') + if i>= 0: + line = line[:i] # strip chunk-extensions + try: + self.ssync_response_chunk_left = int(line.strip(), 16) + except ValueError: + # close the connection as protocol synchronisation is + # probably lost + self.close() + raise exceptions.ReplicationException('Early disconnect') + if self.ssync_response_chunk_left == 0: + self.ssync_response_chunk_left = -1 + break + chunk = self.fp.read(min(self.ssync_response_chunk_left, + size - len(data))) + if not chunk: + # close the connection as protocol synchronisation is + # probably lost + self.close() + raise exceptions.ReplicationException('Early disconnect') + self.ssync_response_chunk_left -= len(chunk) + if self.ssync_response_chunk_left == 0: + self.fp.read(2) # discard the trailing \r\n + data += chunk + if '\n' in data: + data, self.ssync_response_buffer = data.split('\n', 1) + data += '\n' + return data + + +class SsyncBufferedHTTPConnection(bufferedhttp.BufferedHTTPConnection): + response_class = SsyncBufferedHTTPResponse + + class Sender(object): """ Sends SSYNC requests to the object server. @@ -84,9 +139,6 @@ class Sender(object): self.node = node self.job = job self.suffixes = suffixes - self.response = None - self.response_buffer = '' - self.response_chunk_left = 0 # available_map has an entry for each object in given suffixes that # is available to be sync'd; each entry is a hash => dict of timestamps # of data file or tombstone file and/or meta file @@ -110,7 +162,7 @@ class Sender(object): """ if not self.suffixes: return True, {} - connection = None + connection = response = None try: # Double try blocks in case our main error handler fails. try: @@ -119,10 +171,10 @@ class Sender(object): # exceptions.ReplicationException for common issues that will # abort the replication attempt and log a simple error. All # other exceptions will be logged with a full stack trace. - connection = self.connect() - self.missing_check(connection) + connection, response = self.connect() + self.missing_check(connection, response) if self.remote_check_objs is None: - self.updates(connection) + self.updates(connection, response) can_delete_obj = self.available_map else: # when we are initialized with remote_check_objs we don't @@ -167,10 +219,10 @@ class Sender(object): Establishes a connection and starts an SSYNC request with the object server. """ - connection = None + connection = response = None with exceptions.MessageTimeout( self.daemon.conn_timeout, 'connect send'): - connection = bufferedhttp.BufferedHTTPConnection( + connection = SsyncBufferedHTTPConnection( '%s:%s' % (self.node['replication_ip'], self.node['replication_port'])) connection.putrequest('SSYNC', '/%s/%s' % ( @@ -196,60 +248,15 @@ class Sender(object): connection.endheaders() with exceptions.MessageTimeout( self.daemon.node_timeout, 'connect receive'): - self.response = connection.getresponse() - if self.response.status != http.HTTP_OK: - err_msg = self.response.read()[:1024] + response = connection.getresponse() + if response.status != http.HTTP_OK: + err_msg = response.read()[:1024] raise exceptions.ReplicationException( 'Expected status %s; got %s (%s)' % - (http.HTTP_OK, self.response.status, err_msg)) - return connection + (http.HTTP_OK, response.status, err_msg)) + return connection, response - def readline(self): - """ - Reads a line from the SSYNC response body. - - httplib has no readline and will block on read(x) until x is - read, so we have to do the work ourselves. A bit of this is - taken from Python's httplib itself. - """ - data = self.response_buffer - self.response_buffer = '' - while '\n' not in data and len(data) < self.daemon.network_chunk_size: - if self.response_chunk_left == -1: # EOF-already indicator - break - if self.response_chunk_left == 0: - line = self.response.fp.readline() - i = line.find(';') - if i>= 0: - line = line[:i] # strip chunk-extensions - try: - self.response_chunk_left = int(line.strip(), 16) - except ValueError: - # close the connection as protocol synchronisation is - # probably lost - self.response.close() - raise exceptions.ReplicationException('Early disconnect') - if self.response_chunk_left == 0: - self.response_chunk_left = -1 - break - chunk = self.response.fp.read(min( - self.response_chunk_left, - self.daemon.network_chunk_size - len(data))) - if not chunk: - # close the connection as protocol synchronisation is - # probably lost - self.response.close() - raise exceptions.ReplicationException('Early disconnect') - self.response_chunk_left -= len(chunk) - if self.response_chunk_left == 0: - self.response.fp.read(2) # discard the trailing \r\n - data += chunk - if '\n' in data: - data, self.response_buffer = data.split('\n', 1) - data += '\n' - return data - - def missing_check(self, connection): + def missing_check(self, connection, response): """ Handles the sender-side of the MISSING_CHECK step of a SSYNC request. @@ -286,7 +293,7 @@ class Sender(object): while True: with exceptions.MessageTimeout( self.daemon.http_timeout, 'missing_check start wait'): - line = self.readline() + line = response.readline(size=self.daemon.network_chunk_size) if not line: raise exceptions.ReplicationException('Early disconnect') line = line.strip() @@ -298,7 +305,7 @@ class Sender(object): while True: with exceptions.MessageTimeout( self.daemon.http_timeout, 'missing_check line wait'): - line = self.readline() + line = response.readline(size=self.daemon.network_chunk_size) if not line: raise exceptions.ReplicationException('Early disconnect') line = line.strip() @@ -308,7 +315,7 @@ class Sender(object): if parts: self.send_map[parts[0]] = decode_wanted(parts[1:]) - def updates(self, connection): + def updates(self, connection, response): """ Handles the sender-side of the UPDATES step of an SSYNC request. @@ -362,7 +369,7 @@ class Sender(object): while True: with exceptions.MessageTimeout( self.daemon.http_timeout, 'updates start wait'): - line = self.readline() + line = response.readline(size=self.daemon.network_chunk_size) if not line: raise exceptions.ReplicationException('Early disconnect') line = line.strip() @@ -374,7 +381,7 @@ class Sender(object): while True: with exceptions.MessageTimeout( self.daemon.http_timeout, 'updates line wait'): - line = self.readline() + line = response.readline(size=self.daemon.network_chunk_size) if not line: raise exceptions.ReplicationException('Early disconnect') line = line.strip() diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index 226c71402c..1c8a0e3922 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -99,8 +99,8 @@ class TestBaseSsync(BaseTest): return wrapped_send def make_readline_wrapper(readline): - def wrapped_readline(): - data = readline() + def wrapped_readline(size=1024): + data = readline(size=size) add_trace('rx', data) bytes_read = trace.setdefault('readline_bytes', 0) trace['readline_bytes'] = bytes_read + len(data) @@ -108,11 +108,11 @@ class TestBaseSsync(BaseTest): return wrapped_readline def wrapped_connect(): - connection = orig_connect() + connection, response = orig_connect() connection.send = make_send_wrapper( connection.send) - sender.readline = make_readline_wrapper(sender.readline) - return connection + response.readline = make_readline_wrapper(response.readline) + return connection, response return wrapped_connect, trace def _get_object_data(self, path, **kwargs): diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index fefe50a8dc..d21b63b5fc 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -52,7 +52,7 @@ class NullBufferedHTTPConnection(object): pass -class FakeResponse(object): +class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse): def __init__(self, chunk_body=''): self.status = 200 @@ -60,6 +60,8 @@ class FakeResponse(object): if chunk_body: self.fp = six.StringIO( '%x\r\n%s\r\n0\r\n\r\n' % (len(chunk_body), chunk_body)) + self.ssync_response_buffer = '' + self.ssync_response_chunk_left = 0 def read(self, *args, **kwargs): return '' @@ -163,8 +165,10 @@ class TestSender(BaseTest): def test_call_calls_others(self): connection = FakeConnection() + response = FakeResponse() self.sender.suffixes = ['abc'] - self.sender.connect = mock.MagicMock(return_value=connection) + self.sender.connect = mock.MagicMock(return_value=(connection, + response)) self.sender.missing_check = mock.MagicMock() self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() @@ -172,8 +176,8 @@ class TestSender(BaseTest): self.assertTrue(success) self.assertEqual(candidates, {}) self.sender.connect.assert_called_once_with() - self.sender.missing_check.assert_called_once_with(connection) - self.sender.updates.assert_called_once_with(connection) + self.sender.missing_check.assert_called_once_with(connection, response) + self.sender.updates.assert_called_once_with(connection, response) self.sender.disconnect.assert_called_once_with(connection) def test_connect(self): @@ -183,7 +187,7 @@ class TestSender(BaseTest): self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] with mock.patch( - 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' + 'swift.obj.ssync_sender.SsyncBufferedHTTPConnection' ) as mock_conn_class: mock_conn = mock_conn_class.return_value mock_resp = mock.MagicMock() @@ -217,7 +221,7 @@ class TestSender(BaseTest): self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] with mock.patch( - 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' + 'swift.obj.ssync_sender.SsyncBufferedHTTPConnection' ) as mock_conn_class: mock_conn = mock_conn_class.return_value mock_resp = mock.MagicMock() @@ -251,7 +255,7 @@ class TestSender(BaseTest): self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] with mock.patch( - 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' + 'swift.obj.ssync_sender.SsyncBufferedHTTPConnection' ) as mock_conn_class: mock_conn = mock_conn_class.return_value mock_resp = mock.MagicMock() @@ -285,7 +289,7 @@ class TestSender(BaseTest): self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] with mock.patch( - 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' + 'swift.obj.ssync_sender.SsyncBufferedHTTPConnection' ) as mock_conn_class: mock_conn = mock_conn_class.return_value mock_resp = mock.MagicMock() @@ -320,7 +324,7 @@ class TestSender(BaseTest): self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] with mock.patch( - 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' + 'swift.obj.ssync_sender.SsyncBufferedHTTPConnection' ) as mock_conn_class: mock_conn = mock_conn_class.return_value mock_resp = mock.MagicMock() @@ -350,7 +354,9 @@ class TestSender(BaseTest): def test_call(self): def patch_sender(sender): connection = FakeConnection() - sender.connect = mock.MagicMock(return_value=connection) + response = FakeResponse() + sender.connect = mock.MagicMock(return_value=(connection, + response)) sender.missing_check = mock.MagicMock() sender.updates = mock.MagicMock() sender.disconnect = mock.MagicMock() @@ -439,7 +445,7 @@ class TestSender(BaseTest): 'frag_index': 0, } self.sender.suffixes = ['abc'] - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':MISSING_CHECK: START\r\n' '9d41d8cd98f00b204e9800998ecf0abc\r\n' @@ -448,7 +454,8 @@ class TestSender(BaseTest): ':UPDATES: END\r\n' )) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.connect = mock.MagicMock(return_value=connection) + self.sender.connect = mock.MagicMock(return_value=(connection, + response)) df = mock.MagicMock() df.content_length = 0 self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock( @@ -485,13 +492,14 @@ class TestSender(BaseTest): 'frag_index': 0, } self.sender.suffixes = ['abc'] - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':MISSING_CHECK: START\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.connect = mock.MagicMock(return_value=connection) + self.sender.connect = mock.MagicMock(return_value=(connection, + response)) self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() @@ -519,12 +527,13 @@ class TestSender(BaseTest): self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'], ['9d41d8cd98f00b204e9800998ecf0abc']) connection = FakeConnection() - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.connect = mock.MagicMock(return_value=connection) + self.sender.connect = mock.MagicMock(return_value=(connection, + response)) self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() @@ -552,13 +561,14 @@ class TestSender(BaseTest): self.sender = ssync_sender.Sender(self.daemon, {}, job, ['abc'], ['9d41d8cd98f00b204e9800998ecf0abc']) connection = FakeConnection() - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':MISSING_CHECK: START\r\n' '9d41d8cd98f00b204e9800998ecf0abc d\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.connect = mock.MagicMock(return_value=connection) + self.sender.connect = mock.MagicMock(return_value=(connection, + response)) self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() @@ -602,7 +612,7 @@ class TestSender(BaseTest): eventlet.sleep(0.1) with mock.patch.object( - ssync_sender.bufferedhttp, 'BufferedHTTPConnection', + ssync_sender, 'SsyncBufferedHTTPConnection', FakeBufferedHTTPConnection): success, candidates = self.sender() self.assertFalse(success) @@ -628,7 +638,7 @@ class TestSender(BaseTest): missing_check_fn = 'swift.obj.ssync_sender.Sender.missing_check' with mock.patch(missing_check_fn) as mock_missing_check: with mock.patch.object( - ssync_sender.bufferedhttp, 'BufferedHTTPConnection', + ssync_sender, 'SsyncBufferedHTTPConnection', FakeBufferedHTTPConnection): self.sender = ssync_sender.Sender( self.daemon, node, job, ['abc']) @@ -644,63 +654,65 @@ class TestSender(BaseTest): self.assertFalse(mock_missing_check.called) def test_readline_newline_in_buffer(self): - self.sender.response_buffer = 'Has a newline already.\r\nOkay.' - self.assertEqual(self.sender.readline(), 'Has a newline already.\r\n') - self.assertEqual(self.sender.response_buffer, 'Okay.') + response = FakeResponse() + response.ssync_response_buffer = 'Has a newline already.\r\nOkay.' + self.assertEqual(response.readline(), 'Has a newline already.\r\n') + self.assertEqual(response.ssync_response_buffer, 'Okay.') def test_readline_buffer_exceeds_network_chunk_size_somehow(self): - self.daemon.network_chunk_size = 2 - self.sender.response_buffer = '1234567890' - self.assertEqual(self.sender.readline(), '1234567890') - self.assertEqual(self.sender.response_buffer, '') + response = FakeResponse() + response.ssync_response_buffer = '1234567890' + self.assertEqual(response.readline(size=2), '1234567890') + self.assertEqual(response.ssync_response_buffer, '') def test_readline_at_start_of_chunk(self): - self.sender.response = FakeResponse() - self.sender.response.fp = six.StringIO('2\r\nx\n\r\n') - self.assertEqual(self.sender.readline(), 'x\n') + response = FakeResponse() + response.fp = six.StringIO('2\r\nx\n\r\n') + self.assertEqual(response.readline(), 'x\n') def test_readline_chunk_with_extension(self): - self.sender.response = FakeResponse() - self.sender.response.fp = six.StringIO( + response = FakeResponse() + response.fp = six.StringIO( '2 ; chunk=extension\r\nx\n\r\n') - self.assertEqual(self.sender.readline(), 'x\n') + self.assertEqual(response.readline(), 'x\n') def test_readline_broken_chunk(self): - self.sender.response = FakeResponse() - self.sender.response.fp = six.StringIO('q\r\nx\n\r\n') + response = FakeResponse() + response.fp = six.StringIO('q\r\nx\n\r\n') self.assertRaises( - exceptions.ReplicationException, self.sender.readline) - self.assertTrue(self.sender.response.close_called) + exceptions.ReplicationException, response.readline) + self.assertTrue(response.close_called) def test_readline_terminated_chunk(self): - self.sender.response = FakeResponse() - self.sender.response.fp = six.StringIO('b\r\nnot enough') + response = FakeResponse() + response.fp = six.StringIO('b\r\nnot enough') self.assertRaises( - exceptions.ReplicationException, self.sender.readline) - self.assertTrue(self.sender.response.close_called) + exceptions.ReplicationException, response.readline) + self.assertTrue(response.close_called) def test_readline_all(self): - self.sender.response = FakeResponse() - self.sender.response.fp = six.StringIO('2\r\nx\n\r\n0\r\n\r\n') - self.assertEqual(self.sender.readline(), 'x\n') - self.assertEqual(self.sender.readline(), '') - self.assertEqual(self.sender.readline(), '') + response = FakeResponse() + response.fp = six.StringIO('2\r\nx\n\r\n0\r\n\r\n') + self.assertEqual(response.readline(), 'x\n') + self.assertEqual(response.readline(), '') + self.assertEqual(response.readline(), '') def test_readline_all_trailing_not_newline_termed(self): - self.sender.response = FakeResponse() - self.sender.response.fp = six.StringIO( + response = FakeResponse() + response.fp = six.StringIO( '2\r\nx\n\r\n3\r\n123\r\n0\r\n\r\n') - self.assertEqual(self.sender.readline(), 'x\n') - self.assertEqual(self.sender.readline(), '123') - self.assertEqual(self.sender.readline(), '') - self.assertEqual(self.sender.readline(), '') + self.assertEqual(response.readline(), 'x\n') + self.assertEqual(response.readline(), '123') + self.assertEqual(response.readline(), '') + self.assertEqual(response.readline(), '') def test_missing_check_timeout(self): connection = FakeConnection() connection.send = lambda d: eventlet.sleep(1) + response = FakeResponse() self.sender.daemon.node_timeout = 0.01 self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check, - connection) + connection, response) def test_missing_check_has_empty_suffixes(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -719,12 +731,12 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc', 'def'] - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.missing_check(connection) + self.sender.missing_check(connection, response) self.assertEqual( ''.join(connection.sent), '17\r\n:MISSING_CHECK: START\r\n\r\n' @@ -761,12 +773,12 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc', 'def'] - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.missing_check(connection) + self.sender.missing_check(connection, response) self.assertEqual( ''.join(connection.sent), '17\r\n:MISSING_CHECK: START\r\n\r\n' @@ -809,10 +821,10 @@ class TestSender(BaseTest): } self.sender.suffixes = ['abc'] self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.response = FakeResponse(chunk_body='\r\n') + response = FakeResponse(chunk_body='\r\n') exc = None try: - self.sender.missing_check(connection) + self.sender.missing_check(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') @@ -846,11 +858,11 @@ class TestSender(BaseTest): } self.sender.suffixes = ['abc'] self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=':MISSING_CHECK: START\r\n') exc = None try: - self.sender.missing_check(connection) + self.sender.missing_check(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') @@ -884,10 +896,10 @@ class TestSender(BaseTest): } self.sender.suffixes = ['abc'] self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.response = FakeResponse(chunk_body='OH HAI\r\n') + response = FakeResponse(chunk_body='OH HAI\r\n') exc = None try: - self.sender.missing_check(connection) + self.sender.missing_check(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'OH HAI'") @@ -920,13 +932,13 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc'] - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':MISSING_CHECK: START\r\n' '0123abc dm\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.missing_check(connection) + self.sender.missing_check(connection, response) self.assertEqual( ''.join(connection.sent), '17\r\n:MISSING_CHECK: START\r\n\r\n' @@ -960,13 +972,13 @@ class TestSender(BaseTest): 'policy': POLICIES.legacy, } self.sender.suffixes = ['abc'] - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':MISSING_CHECK: START\r\n' '0123abc d extra response parts\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes - self.sender.missing_check(connection) + self.sender.missing_check(connection, response) self.assertEqual(self.sender.send_map, {'0123abc': {'data': True}}) self.assertEqual(self.sender.available_map, @@ -976,18 +988,19 @@ class TestSender(BaseTest): def test_updates_timeout(self): connection = FakeConnection() connection.send = lambda d: eventlet.sleep(1) + response = FakeResponse() self.sender.daemon.node_timeout = 0.01 self.assertRaises(exceptions.MessageTimeout, self.sender.updates, - connection) + connection, response) def test_updates_empty_send_map(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection) + self.sender.updates(connection, response) self.assertEqual( ''.join(connection.sent), '11\r\n:UPDATES: START\r\n\r\n' @@ -996,14 +1009,14 @@ class TestSender(BaseTest): def test_updates_unexpected_response_lines1(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( 'abc\r\n' ':UPDATES: START\r\n' ':UPDATES: END\r\n')) exc = None try: - self.sender.updates(connection) + self.sender.updates(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'abc'") @@ -1015,14 +1028,14 @@ class TestSender(BaseTest): def test_updates_unexpected_response_lines2(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' 'abc\r\n' ':UPDATES: END\r\n')) exc = None try: - self.sender.updates(connection) + self.sender.updates(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'abc'") @@ -1050,11 +1063,11 @@ class TestSender(BaseTest): self.sender.send_map = {object_hash: {'data': True}} self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection) + self.sender.updates(connection, response) self.sender.send_delete.assert_called_once_with( connection, '/a/c/o', delete_timestamp) self.assertEqual(self.sender.send_put.mock_calls, []) @@ -1082,11 +1095,11 @@ class TestSender(BaseTest): } self.sender.node = {} self.sender.send_map = {object_hash: {'data': True}} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection) + self.sender.updates(connection, response) self.assertEqual( ''.join(connection.sent), '11\r\n:UPDATES: START\r\n\r\n' @@ -1125,11 +1138,11 @@ class TestSender(BaseTest): self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() self.sender.send_post = mock.MagicMock() - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection) + self.sender.updates(connection, response) self.assertEqual(self.sender.send_delete.mock_calls, []) self.assertEqual(self.sender.send_post.mock_calls, []) self.assertEqual(1, len(self.sender.send_put.mock_calls)) @@ -1172,11 +1185,11 @@ class TestSender(BaseTest): self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() self.sender.send_post = mock.MagicMock() - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection) + self.sender.updates(connection, response) self.assertEqual(self.sender.send_delete.mock_calls, []) self.assertEqual(self.sender.send_put.mock_calls, []) self.assertEqual(1, len(self.sender.send_post.mock_calls)) @@ -1219,11 +1232,11 @@ class TestSender(BaseTest): self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() self.sender.send_post = mock.MagicMock() - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection) + self.sender.updates(connection, response) self.assertEqual(self.sender.send_delete.mock_calls, []) self.assertEqual(1, len(self.sender.send_put.mock_calls)) self.assertEqual(1, len(self.sender.send_post.mock_calls)) @@ -1262,11 +1275,11 @@ class TestSender(BaseTest): self.sender.send_map = {object_hash: {'data': True}} self.sender.send_delete = mock.MagicMock() self.sender.send_put = mock.MagicMock() - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - self.sender.updates(connection) + self.sender.updates(connection, response) args, _kwargs = self.sender.send_put.call_args connection, path, df = args self.assertEqual(path, '/a/c/o') @@ -1279,28 +1292,28 @@ class TestSender(BaseTest): def test_updates_read_response_timeout_start(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - orig_readline = self.sender.readline + orig_readline = response.readline - def delayed_readline(): + def delayed_readline(*args, **kwargs): eventlet.sleep(1) - return orig_readline() + return orig_readline(*args, **kwargs) - self.sender.readline = delayed_readline + response.readline = delayed_readline self.sender.daemon.http_timeout = 0.01 self.assertRaises(exceptions.MessageTimeout, self.sender.updates, - connection) + connection, response) def test_updates_read_response_disconnect_start(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse(chunk_body='\r\n') + response = FakeResponse(chunk_body='\r\n') exc = None try: - self.sender.updates(connection) + self.sender.updates(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') @@ -1312,14 +1325,14 @@ class TestSender(BaseTest): def test_updates_read_response_unexp_start(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( 'anything else\r\n' ':UPDATES: START\r\n' ':UPDATES: END\r\n')) exc = None try: - self.sender.updates(connection) + self.sender.updates(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'anything else'") @@ -1331,33 +1344,33 @@ class TestSender(BaseTest): def test_updates_read_response_timeout_end(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' ':UPDATES: END\r\n')) - orig_readline = self.sender.readline + orig_readline = response.readline - def delayed_readline(): - rv = orig_readline() + def delayed_readline(*args, **kwargs): + rv = orig_readline(*args, **kwargs) if rv == ':UPDATES: END\r\n': eventlet.sleep(1) return rv - self.sender.readline = delayed_readline + response.readline = delayed_readline self.sender.daemon.http_timeout = 0.01 self.assertRaises(exceptions.MessageTimeout, self.sender.updates, - connection) + connection, response) def test_updates_read_response_disconnect_end(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' '\r\n')) exc = None try: - self.sender.updates(connection) + self.sender.updates(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), 'Early disconnect') @@ -1369,14 +1382,14 @@ class TestSender(BaseTest): def test_updates_read_response_unexp_end(self): connection = FakeConnection() self.sender.send_map = {} - self.sender.response = FakeResponse( + response = FakeResponse( chunk_body=( ':UPDATES: START\r\n' 'anything else\r\n' ':UPDATES: END\r\n')) exc = None try: - self.sender.updates(connection) + self.sender.updates(connection, response) except exceptions.ReplicationException as err: exc = err self.assertEqual(str(exc), "Unexpected response: 'anything else'")

AltStyle によって変換されたページ (->オリジナル) /