Don't apply timeout to Pool.get operation (leaks)
The connection timeout to a memcache server is performed by using the "with Timeout()" construct over the sock.connect() call in the .create() method. In addition, the same construct was being applied to the Pool.get() call in ._get_conns(). If the maximum number of connections was already created, and the Pool.get() called took longer than the connect timeout, then the error handling path would add a place holder to the connection pool. Eventlet's Pool class allows for additional items to be added to the pool, over and above the max_size setting. This additional place holder will eventually be pulled and a new connection created to take its place. The fix is to remove the timeout construct in the _get_conns() method. In addition, we also apply the unit test patch mentioned in the review comments for Patch Set 6 of https://review.openstack.org/45134, located at http://paste.openstack.org/show/47288/. Fixes bug 1235027 Change-Id: I786cabefe3e8ddf7d92feb7ebc7cfb613d60a1da Signed-off-by: Peter Portante <peter.portante@redhat.com>
This commit is contained in:
2 changed files with 39 additions and 11 deletions
@@ -197,10 +197,21 @@ class MemcacheRing(object):
continue
sock = None
try:
with Timeout(self._connect_timeout):
fp, sock = self._client_cache[server].get()
# NOTE: We do NOT place a Timeout over the MemcacheConnPool's
# get() method. The MemcacheConnPool's create() method already
# places a timeout around the connect() system call, which we
# catch below. It is possible for the underlying Queue of the
# MemcacheConnPool to be contended such that this greenlet is
# waiting for one or more connections to be freed up. If there
# is a particularly slow memcache server causing that problme,
# then the IO_TIMEOUT will catch that behavior, so we do not
# have to place a Timeout here.
fp, sock = self._client_cache[server].get()
yield server, fp, sock
except (Exception, Timeout) as e:
# Typically a Timeout exception caught here is the one raised
# by the create() method of this server's MemcacheConnPool
# object.
self._exception_occurred(
server, e, action='connecting', sock=sock)
@@ -358,10 +358,14 @@ class TestMemcached(unittest.TestCase):
# track clients waiting for connections
connected = []
connections = Queue()
errors = []
def wait_connect(addr):
connected.append(addr)
connections.get()
val = connections.get()
if val is not None:
errors.append(val)
mock_sock.connect = wait_connect
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
@@ -369,7 +373,7 @@ class TestMemcached(unittest.TestCase):
# sanity
self.assertEquals(1, len(memcache_client._client_cache))
for server, pool in memcache_client._client_cache.items():
self.assertEquals(2, pool.max_size)
self.assertEqual(2, pool.max_size)
# make 10 requests "at the same time"
p = GreenPool()
@@ -377,18 +381,31 @@ class TestMemcached(unittest.TestCase):
p.spawn(memcache_client.set, 'key', 'value')
for i in range(3):
sleep(0.1)
self.assertEquals(2, len(connected))
self.assertEqual(2, len(connected))
# give out a connection
connections.put(None)
# at this point, only one connection should have actually been
# created, the other is in the creation step, and the rest of the
# clients are not attempting to connect. we let this play out a
# bit to verify.
for i in range(3):
sleep(0.1)
self.assertEquals(2, len(connected))
# finish up
for i in range(8):
connections.put(None)
self.assertEquals(2, len(connected))
self.assertEqual(2, len(connected))
# finish up, this allows the final connection to be created, so
# that all the other clients can use the two existing connections
# and no others will be created.
connections.put(None)
connections.put('nono')
self.assertEqual(2, len(connected))
p.waitall()
self.assertEquals(2, len(connected))
self.assertEqual(2, len(connected))
self.assertEqual(0, len(errors),
"A client was allowed a third connection")
connections.get_nowait()
self.assertTrue(connections.empty())
if __name__ == '__main__':
Reference in New Issue
Block a user
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.