bagpipe: work on notification without port info
This change updates the bagpipe driver so that information is always retrieved from the db, which fixes the cases where the port info is absent from the notification. Closes-Bug: 1520629 Change-Id: I41a4e77b33d0b8790302b58ae625b41d28b2df9a
This commit is contained in:
Thomas Morin
committed by
Thomas Morin
parent
75a22fda5f
commit
2a4ca063d2
2 changed files with 73 additions and 48 deletions
@@ -26,6 +26,7 @@ from neutron.extensions import portbindings
from neutron.i18n import _LE
from neutron import manager
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
from networking_bagpipe.agent.bgpvpn import rpc_client
@@ -248,25 +249,26 @@ class BaGPipeBGPVPNDriver(driver_api.BGPVPNDriver):
self.agent_rpc.delete_bgpvpn(context, formated_bgpvpn)
def _get_port(self, context, port_id):
# we need to look in the db for two reasons:
# - the port dict, as provided by the registry callback has no
# port binding information
# - for some notifications, the current port is not included
_core_plugin = manager.NeutronManager.get_plugin()
# TODO(tmorin): should not need an admin context
return _core_plugin.get_port(n_context.get_admin_context(), port_id)
def _get_port_host(self, context, port_id):
# the port dict, as provided by the registry callback
# has no binding:host_id information, so let's retrieve full port info
port = self._get_port(context, port_id)
def _get_port_host(self, context, port):
if portbindings.HOST_ID not in port:
raise Exception("cannot determine host_id for port %s, "
"aborting BGPVPN update", port_id)
"aborting BGPVPN update", port['id'])
return port.get(portbindings.HOST_ID)
def notify_port_updated(self, context, port):
LOG.info("notify_port_updated on port %s status %s",
port['id'],
port['status'])
def notify_port_updated(self, context, port_id):
LOG.info("notify_port_updated on port %s", port_id)
port = self._get_port(context, port_id)
port_bgpvpn_info = {'id': port['id'],
'network_id': port['network_id']}
@@ -275,9 +277,10 @@ class BaGPipeBGPVPNDriver(driver_api.BGPVPNDriver):
LOG.info("Port %s is DHCP, ignoring", port['id'])
return
agent_host = self._get_port_host(context, port['id'])
agent_host = self._get_port_host(context, port)
if port['status'] == const.PORT_STATUS_ACTIVE:
LOG.info("notify_port_updated, active")
bgpvpn_network_info = (
self._retrieve_bgpvpn_network_info_for_port(context, port)
)
@@ -289,14 +292,15 @@ class BaGPipeBGPVPNDriver(driver_api.BGPVPNDriver):
port_bgpvpn_info,
agent_host)
elif port['status'] == const.PORT_STATUS_DOWN:
LOG.info("notify_port_updated, down")
self.agent_rpc.detach_port_from_bgpvpn(context,
port_bgpvpn_info,
agent_host)
else:
LOG.info("no action since new port status is %s", port['status'])
LOG.info("new port status is %s, no action", port['status'])
def remove_port_from_bgpvpn_agent(self, context, port_id):
LOG.info("remove_port_from_bgpvpn_agent on port %s", port_id)
def notify_port_deleted(self, context, port_id):
LOG.info("notify_port_deleted on port %s", port_id)
port = self._get_port(context, port_id)
@@ -307,17 +311,24 @@ class BaGPipeBGPVPNDriver(driver_api.BGPVPNDriver):
LOG.info("Port %s is DHCP, ignoring", port['id'])
return
agent_host = self._get_port_host(context, port_id)
agent_host = self._get_port_host(context, port)
self.agent_rpc.detach_port_from_bgpvpn(context,
port_bgpvpn_info,
agent_host)
@log_helpers.log_method_call
def registry_port_updated(self, resource, event, trigger, **kwargs):
try:
context = kwargs.get('context')
port_dict = kwargs.get('port')
self.notify_port_updated(context, port_dict)
port = kwargs.get('port')
origin_port = kwargs.get('origin_port')
# In notifications coming from ml2/plugin.py update_port
# it is possible that 'port' may be None, in this
# case we will use origin_port
if port is None:
port = origin_port
self.notify_port_updated(context, port['id'])
except Exception as e:
LOG.exception(_LE("Error during notification processing "
"%(resource)s%(event)s, %(trigger)s, "
@@ -328,11 +339,12 @@ class BaGPipeBGPVPNDriver(driver_api.BGPVPNDriver):
'kwargs': kwargs,
'exc': e})
@log_helpers.log_method_call
def registry_port_deleted(self, resource, event, trigger, **kwargs):
try:
context = kwargs.get('context')
port_id = kwargs.get('port_id')
self.remove_port_from_bgpvpn_agent(context, port_id)
self.notify_port_deleted(context, port_id)
except Exception as e:
LOG.exception(_LE("Error during notification processing "
"%(resource)s%(event)s, %(trigger)s, "
@@ -22,6 +22,8 @@ from neutron.common.constants import DEVICE_OWNER_DHCP
from neutron.common.constants import PORT_STATUS_ACTIVE
from neutron.common.constants import PORT_STATUS_DOWN
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.ml2 import config as ml2_config
from neutron.plugins.ml2 import rpc as ml2_rpc
@@ -180,14 +182,19 @@ class TestBagpipeServiceDriverCallbacks(TestBagpipeCommon):
return {'id': port['id'],
'network_id': port['network_id']}
def _update_port_status(self, port, status):
network_id = port['port']['network_id']
some_network = {'id': network_id}
self.plugin.get_network = mock.Mock(return_value=some_network)
self.plugin.update_port_status(self.ctxt, port['port']['id'],
status, TESTHOST)
def test_bagpipe_callback_to_rpc_update_active(self):
# REVISIT(tmorin): could avoid mocking get_host_port
# by setting binding:host_id at port creation
# as in _test_create_port_binding_profile
with self.port() as port, \
mock.patch.object(self.bagpipe_driver, '_get_port_host',
return_value=TESTHOST):
port['port']['status'] = PORT_STATUS_ACTIVE
with self.port(arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: TESTHOST}) as port:
self._update_port_status(port, PORT_STATUS_ACTIVE)
self.bagpipe_driver.registry_port_updated(
None, None, None,
context=self.ctxt,
@@ -197,12 +204,14 @@ class TestBagpipeServiceDriverCallbacks(TestBagpipeCommon):
mock.ANY,
self._build_expected_return_active(port['port']),
TESTHOST)
self.assertFalse(self.mock_detach_rpc.called)
def test_bagpipe_callback_to_rpc_update_down(self):
with self.port() as port, \
mock.patch.object(self.bagpipe_driver, '_get_port_host',
return_value=TESTHOST):
port['port']['status'] = PORT_STATUS_DOWN
with self.port(arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: TESTHOST}) as port:
self._update_port_status(port, PORT_STATUS_DOWN)
self.bagpipe_driver.registry_port_updated(
None, None, None,
context=self.ctxt,
@@ -212,12 +221,12 @@ class TestBagpipeServiceDriverCallbacks(TestBagpipeCommon):
mock.ANY,
self._build_expected_return_down(port['port']),
TESTHOST)
self.assertFalse(self.mock_attach_rpc.called)
def test_bagpipe_callback_to_rpc_deleted(self):
with self.port() as port, \
mock.patch.object(self.bagpipe_driver, '_get_port_host',
return_value=TESTHOST):
port['port']['status'] = PORT_STATUS_DOWN
with self.port(arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: TESTHOST}) as port:
self._update_port_status(port, PORT_STATUS_DOWN)
self.bagpipe_driver.registry_port_deleted(
None, None, None,
context=self.ctxt,
@@ -227,41 +236,45 @@ class TestBagpipeServiceDriverCallbacks(TestBagpipeCommon):
mock.ANY,
self._build_expected_return_down(port['port']),
TESTHOST)
self.assertFalse(self.mock_attach_rpc.called)
def test_bagpipe_callback_to_rpc_update_active_ignore_DHCP(self):
with self.port(device_owner=DEVICE_OWNER_DHCP) as port, \
mock.patch.object(self.bagpipe_driver, '_get_port_host',
return_value=TESTHOST):
port['port']['status'] = PORT_STATUS_ACTIVE
with self.port(device_owner=DEVICE_OWNER_DHCP,
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: TESTHOST}) as port:
self._update_port_status(port, PORT_STATUS_ACTIVE)
self.bagpipe_driver.registry_port_updated(
None, None, None,
context=self.ctxt,
port=port['port']
)
self.assertFalse(self.mock_attach_rpc.called)
self.assertFalse(self.mock_detach_rpc.called)
def test_bagpipe_callback_to_rpc_update_down_ignore_DHCP(self):
with self.port(device_owner=DEVICE_OWNER_DHCP) as port, \
mock.patch.object(self.bagpipe_driver, '_get_port_host',
return_value=TESTHOST):
port['port']['status'] = PORT_STATUS_DOWN
with self.port(device_owner=DEVICE_OWNER_DHCP,
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: TESTHOST}) as port:
self._update_port_status(port, PORT_STATUS_DOWN)
self.bagpipe_driver.registry_port_updated(
None, None, None,
context=self.ctxt,
port=port['port']
)
self.assertFalse(self.mock_attach_rpc.called)
self.assertFalse(self.mock_detach_rpc.called)
def test_bagpipe_callback_to_rpc_deleted_ignore_DHCP(self):
with self.port(device_owner=DEVICE_OWNER_DHCP) as port, \
mock.patch.object(self.bagpipe_driver, '_get_port_host',
return_value=TESTHOST):
port['port']['status'] = PORT_STATUS_DOWN
with self.port(device_owner=DEVICE_OWNER_DHCP,
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: TESTHOST}) as port:
self._update_port_status(port, PORT_STATUS_DOWN)
self.bagpipe_driver.registry_port_deleted(
None, None, None,
context=self.ctxt,
port_id=port['port']['id']
)
self.assertFalse(self.mock_attach_rpc.called)
self.assertFalse(self.mock_detach_rpc.called)
def test_delete_port_to_bgpvpn_rpc(self):
@@ -363,6 +376,6 @@ class TestBagpipeServiceDriverCallbacks(TestBagpipeCommon):
context=self.ctxt,
port=None
)
self.assertEqual(self.mock_attach_rpc.call_count, 0)
self.assertEqual(self.mock_detach_rpc.call_count, 0)
self.assertEqual(log_exc.call_count, 1)
self.assertFalse(self.mock_attach_rpc.called)
self.assertFalse(self.mock_detach_rpc.called)
self.assertTrue(log_exc.called)
Reference in New Issue
Block a user
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.