diff --git a/ironic_python_agent/tests/unit/test_utils.py b/ironic_python_agent/tests/unit/test_utils.py index b72fe4dc8..02f2f95d4 100644 --- a/ironic_python_agent/tests/unit/test_utils.py +++ b/ironic_python_agent/tests/unit/test_utils.py @@ -148,45 +148,59 @@ class GetAgentParamsTestCase(ironic_agent_base.IronicAgentTest): self.assertEqual('sdc', vmedia_device_returned) @mock.patch.object(utils, 'execute', autospec=True) - def test__find_device_by_labels(self, execute_mock): - execute_mock.side_effect = [ - processutils.ProcessExecutionError, - ('/dev/fake', ''), - ] - self.assertEqual('/dev/fake', - utils._find_device_by_labels(['l1', 'l2'])) - execute_mock.assert_has_calls([ - mock.call('blkid', '-L', item) - for item in ('l1', 'l2') - ]) - - @mock.patch.object(utils, 'execute', autospec=True) - def test__find_device_by_labels_upper(self, execute_mock): - execute_mock.side_effect = [ - processutils.ProcessExecutionError, - processutils.ProcessExecutionError, - ('/dev/fake', ''), - ] - self.assertEqual('/dev/fake', - utils._find_device_by_labels(['l1', 'l2'])) - execute_mock.assert_has_calls([ - mock.call('blkid', '-L', item) - for item in ('l1', 'l2', 'L1') - ]) - - @mock.patch.object(utils, 'execute', autospec=True) - def test__find_device_by_labels_not_found(self, execute_mock): + def test__find_vmedia_device_by_labels_handles_exec_error(self, + execute_mock): execute_mock.side_effect = processutils.ProcessExecutionError - self.assertIsNone(utils._find_device_by_labels(['l1', 'l2'])) + self.assertIsNone(utils._find_vmedia_device_by_labels(['l1', 'l2'])) + execute_mock.assert_called_once_with('lsblk', '-P', '-oPATH,LABEL') + + @mock.patch.object(utils, 'execute', autospec=True) + def test__find_vmedia_device_by_labels(self, execute_mock): + # NOTE(TheJulia): Case is intentionally mixed here to ensure + # proper matching occurs + disk_list = ('PATH="/dev/sda" LABEL=""\n' + 'PATH="/dev/sda2" LABEL="Meow"\n' + 'PATH="/dev/sda3" LABEL="Recovery HD"\n' + 'PATH="/dev/sda1" LABEL="EFI"\n' + 'PATH="/dev/sdb" LABEL=""\n' + 'PATH="/dev/sdb1" LABEL=""\n' + 'PATH="/dev/sdb2" LABEL=""\n' + 'PATH="/dev/sdc" LABEL="meow"\n') + invalid_disk = ('KNAME="sda1" SIZE="1610612736" TYPE="part" TRAN=""\n' + 'KNAME="sda" SIZE="1610612736" TYPE="disk" ' + 'TRAN="sata"\n') + valid_disk = ('KNAME="sdc" SIZE="1610612736" TYPE="disk" TRAN="usb"\n') + execute_mock.side_effect = [ + (disk_list, ''), + (invalid_disk, ''), + (valid_disk, ''), + ] + self.assertEqual('/dev/sdc', + utils._find_vmedia_device_by_labels(['cat', 'meOw'])) execute_mock.assert_has_calls([ - mock.call('blkid', '-L', item) - for item in ('l1', 'l2', 'L1', 'L2') + mock.call('lsblk', '-P', '-oPATH,LABEL'), + mock.call('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', '/dev/sda2'), + mock.call('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', '/dev/sdc'), ]) - @mock.patch.object(utils, '_find_device_by_labels', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__find_vmedia_device_by_labels_not_found(self, execute_mock): + disk_list = ('PATH="/dev/sdb" LABEL="evil"\n' + 'PATH="/dev/sdb1" LABEL="banana"\n' + 'PATH="/dev/sdb2" LABEL=""\n') + execute_mock.return_value = (disk_list, '') + self.assertIsNone(utils._find_vmedia_device_by_labels(['l1', 'l2'])) + execute_mock.assert_called_once_with('lsblk', '-P', '-oPATH,LABEL') + + @mock.patch.object(utils, '_check_vmedia_device', autospec=True) + @mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True) @mock.patch.object(utils, '_read_params_from_file', autospec=True) @mock.patch.object(ironic_utils, 'mounted', autospec=True) - def test__get_vmedia_params(self, mount_mock, read_params_mock, find_mock): + def test__get_vmedia_params(self, mount_mock, read_params_mock, find_mock, + check_vmedia_mock): + check_vmedia_mock.return_value = True find_mock.return_value = '/dev/fake' mount_mock.return_value.__enter__.return_value = '/tempdir' expected_params = {'a': 'b'} @@ -198,12 +212,15 @@ class GetAgentParamsTestCase(ironic_agent_base.IronicAgentTest): read_params_mock.assert_called_once_with("/tempdir/parameters.txt") self.assertEqual(expected_params, returned_params) - @mock.patch.object(utils, '_find_device_by_labels', autospec=True) + @mock.patch.object(utils, '_check_vmedia_device', autospec=True) + @mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True) @mock.patch.object(utils, '_get_vmedia_device', autospec=True) @mock.patch.object(utils, '_read_params_from_file', autospec=True) @mock.patch.object(ironic_utils, 'mounted', autospec=True) def test__get_vmedia_params_by_device(self, mount_mock, read_params_mock, - get_device_mock, find_mock): + get_device_mock, find_mock, + check_vmedia_mock): + check_vmedia_mock.return_value = True find_mock.return_value = None mount_mock.return_value.__enter__.return_value = '/tempdir' expected_params = {'a': 'b'} @@ -215,15 +232,37 @@ class GetAgentParamsTestCase(ironic_agent_base.IronicAgentTest): mount_mock.assert_called_once_with('/dev/sda') read_params_mock.assert_called_once_with("/tempdir/parameters.txt") self.assertEqual(expected_params, returned_params) + check_vmedia_mock.assert_called_with('/dev/sda') - @mock.patch.object(utils, '_find_device_by_labels', autospec=True) + @mock.patch.object(utils, '_check_vmedia_device', autospec=True) + @mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True) + @mock.patch.object(utils, '_get_vmedia_device', autospec=True) + @mock.patch.object(utils, '_read_params_from_file', autospec=True) + @mock.patch.object(ironic_utils, 'mounted', autospec=True) + def test__get_vmedia_params_by_device_device_invalid( + self, mount_mock, read_params_mock, + get_device_mock, find_mock, + check_vmedia_mock): + check_vmedia_mock.return_value = False + find_mock.return_value = None + expected_params = {} + read_params_mock.return_value = expected_params + get_device_mock.return_value = "sda" + + returned_params = utils._get_vmedia_params() + + mount_mock.assert_not_called() + read_params_mock.assert_not_called + self.assertEqual(expected_params, returned_params) + check_vmedia_mock.assert_called_with('/dev/sda') + + @mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True) @mock.patch.object(utils, '_get_vmedia_device', autospec=True) def test__get_vmedia_params_cannot_find_dev(self, get_device_mock, find_mock): find_mock.return_value = None get_device_mock.return_value = None - self.assertRaises(errors.VirtualMediaBootError, - utils._get_vmedia_params) + self.assertEqual({}, utils._get_vmedia_params()) class TestFailures(testtools.TestCase): @@ -913,22 +952,42 @@ class TestGetEfiPart(testtools.TestCase): self.assertIsNone(utils.get_efi_part_on_device('/dev/sda')) -@mock.patch.object(utils, '_find_device_by_labels', autospec=True) +@mock.patch.object(utils, '_booted_from_vmedia', autospec=True) +@mock.patch.object(utils, '_check_vmedia_device', autospec=True) +@mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True) @mock.patch.object(shutil, 'copy', autospec=True) @mock.patch.object(ironic_utils, 'mounted', autospec=True) @mock.patch.object(utils, 'execute', autospec=True) class TestCopyConfigFromVmedia(testtools.TestCase): - def test_no_vmedia(self, mock_execute, mock_mount, mock_copy, - mock_find_device): + def test_vmedia_found_not_booted_from_vmedia( + self, mock_execute, mock_mount, mock_copy, + mock_find_device, mock_check_vmedia, mock_booted_from_vmedia): + mock_booted_from_vmedia.return_value = False + mock_find_device.return_value = '/dev/fake' + utils.copy_config_from_vmedia() + mock_mount.assert_not_called() + mock_execute.assert_not_called() + mock_copy.assert_not_called() + mock_check_vmedia.assert_not_called() + self.assertTrue(mock_booted_from_vmedia.called) + + def test_no_vmedia( + self, mock_execute, mock_mount, mock_copy, + mock_find_device, mock_check_vmedia, mock_booted_from_vmedia): + mock_booted_from_vmedia.return_value = True mock_find_device.return_value = None utils.copy_config_from_vmedia() mock_mount.assert_not_called() mock_execute.assert_not_called() mock_copy.assert_not_called() + mock_check_vmedia.assert_not_called() + self.assertFalse(mock_booted_from_vmedia.called) - def test_no_files(self, mock_execute, mock_mount, mock_copy, - mock_find_device): + def test_no_files( + self, mock_execute, mock_mount, mock_copy, + mock_find_device, mock_check_vmedia, mock_booted_from_vmedia): + mock_booted_from_vmedia.return_value = True temp_path = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(temp_path)) @@ -940,9 +999,13 @@ class TestCopyConfigFromVmedia(testtools.TestCase): mock_execute.assert_called_once_with('findmnt', '-n', '-oTARGET', '/dev/something') mock_copy.assert_not_called() + self.assertTrue(mock_booted_from_vmedia.called) - def test_mounted_no_files(self, mock_execute, mock_mount, mock_copy, - mock_find_device): + def test_mounted_no_files( + self, mock_execute, mock_mount, mock_copy, + mock_find_device, mock_check_vmedia, mock_booted_from_vmedia): + + mock_booted_from_vmedia.return_value = True mock_execute.return_value = '/some/path', '' mock_find_device.return_value = '/dev/something' utils.copy_config_from_vmedia() @@ -950,10 +1013,14 @@ class TestCopyConfigFromVmedia(testtools.TestCase): 'findmnt', '-n', '-oTARGET', '/dev/something') mock_copy.assert_not_called() mock_mount.assert_not_called() + self.assertTrue(mock_booted_from_vmedia.called) @mock.patch.object(os, 'makedirs', autospec=True) - def test_copy(self, mock_makedirs, mock_execute, mock_mount, mock_copy, - mock_find_device): + def test_copy( + self, mock_makedirs, mock_execute, mock_mount, mock_copy, + mock_find_device, mock_check_vmedia, mock_booted_from_vmedia): + + mock_booted_from_vmedia.return_value = True mock_find_device.return_value = '/dev/something' mock_execute.side_effect = processutils.ProcessExecutionError("") path = tempfile.mkdtemp() @@ -989,10 +1056,14 @@ class TestCopyConfigFromVmedia(testtools.TestCase): mock.call(mock.ANY, '/etc/ironic-python-agent/ironic.crt'), mock.call(mock.ANY, '/etc/ironic-python-agent.d/ironic.conf'), ], any_order=True) + self.assertTrue(mock_booted_from_vmedia.called) @mock.patch.object(os, 'makedirs', autospec=True) - def test_copy_mounted(self, mock_makedirs, mock_execute, mock_mount, - mock_copy, mock_find_device): + def test_copy_mounted( + self, mock_makedirs, mock_execute, mock_mount, + mock_copy, mock_find_device, mock_check_vmedia, + mock_booted_from_vmedia): + mock_booted_from_vmedia.return_value = True mock_find_device.return_value = '/dev/something' path = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(path)) @@ -1026,6 +1097,7 @@ class TestCopyConfigFromVmedia(testtools.TestCase): mock.call(mock.ANY, '/etc/ironic-python-agent.d/ironic.conf'), ], any_order=True) mock_mount.assert_not_called() + self.assertTrue(mock_booted_from_vmedia.called) @mock.patch.object(requests, 'get', autospec=True) @@ -1056,3 +1128,70 @@ class TestStreamingClient(ironic_agent_base.IronicAgentTest): mock_get.assert_called_with("http://url", verify=True, cert=None, stream=True, timeout=60) self.assertEqual(2, mock_get.call_count) + + +class TestCheckVirtualMedia(ironic_agent_base.IronicAgentTest): + + @mock.patch.object(utils, 'execute', autospec=True) + def test_check_vmedia_device(self, mock_execute): + lsblk = 'KNAME="sdh" SIZE="1610612736" TYPE="disk" TRAN="usb"\n' + mock_execute.return_value = (lsblk, '') + self.assertTrue(utils._check_vmedia_device('/dev/sdh')) + mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', + '/dev/sdh') + + @mock.patch.object(utils, 'execute', autospec=True) + def test_check_vmedia_device_rom(self, mock_execute): + lsblk = 'KNAME="sr0" SIZE="1610612736" TYPE="rom" TRAN="usb"\n' + mock_execute.return_value = (lsblk, '') + self.assertTrue(utils._check_vmedia_device('/dev/sr0')) + mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', + '/dev/sr0') + + @mock.patch.object(utils, 'execute', autospec=True) + def test_check_vmedia_device_too_large(self, mock_execute): + lsblk = 'KNAME="sdh" SIZE="1610612736000" TYPE="disk" TRAN="usb"\n' + mock_execute.return_value = (lsblk, '') + self.assertFalse(utils._check_vmedia_device('/dev/sdh')) + mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', + '/dev/sdh') + + @mock.patch.object(utils, 'execute', autospec=True) + def test_check_vmedia_device_part(self, mock_execute): + lsblk = ('KNAME="sdh1" SIZE="1610612736" TYPE="part" TRAN=""\n' + 'KNAME="sdh" SIZE="1610612736" TYPE="disk" TRAN="sata"\n') + mock_execute.return_value = (lsblk, '') + self.assertFalse(utils._check_vmedia_device('/dev/sdh1')) + mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', + '/dev/sdh1') + + @mock.patch.object(utils, 'execute', autospec=True) + def test_check_vmedia_device_other(self, mock_execute): + lsblk = 'KNAME="sdh" SIZE="1610612736" TYPE="other" TRAN="usb"\n' + mock_execute.return_value = (lsblk, '') + self.assertFalse(utils._check_vmedia_device('/dev/sdh')) + mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', + '/dev/sdh') + + @mock.patch.object(utils, 'execute', autospec=True) + def test_check_vmedia_device_sata(self, mock_execute): + lsblk = 'KNAME="sdh" SIZE="1610612736" TYPE="disk" TRAN="sata"\n' + mock_execute.return_value = (lsblk, '') + self.assertFalse(utils._check_vmedia_device('/dev/sdh')) + mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', + '/dev/sdh') + + @mock.patch.object(utils, 'execute', autospec=True) + def test_check_vmedia_device_scsi(self, mock_execute): + lsblk = 'KNAME="sdh" SIZE="1610612736" TYPE="other" TRAN="scsi"\n' + mock_execute.return_value = (lsblk, '') + self.assertFalse(utils._check_vmedia_device('/dev/sdh')) + mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', + '/dev/sdh') diff --git a/ironic_python_agent/utils.py b/ironic_python_agent/utils.py index 036e72d9d..f045a4fe1 100644 --- a/ironic_python_agent/utils.py +++ b/ironic_python_agent/utils.py @@ -125,15 +125,33 @@ def _get_vmedia_device(): pass -def _find_device_by_labels(labels): - """Find device matching any of the provided labels.""" - for label in labels + [lbl.upper() for lbl in labels]: - try: - path, _e = execute('blkid', '-L', label) - except processutils.ProcessExecutionError: - pass +def _find_vmedia_device_by_labels(labels): + """Find device matching any of the provided labels for virtual media""" + candidates = [] + try: + lsblk_output, _e = execute('lsblk', '-P', '-oPATH,LABEL') + except processutils.ProcessExecutionError as e: + _early_log('Was unable to execute the lsblk command. %s', e) + return + + for device in ironic_utils.parse_device_tags(lsblk_output): + for label in labels: + if label.upper() == device['LABEL'].upper(): + candidates.append(device['PATH']) + + for candidate in candidates: + # We explicitly take the device and run it past _check_vmedia_device + # as there *can* be candidate entries, and we only want to return + # one that seems most likely to be the actual device, and the vmedia + # check code also evaluates the device overall, instead of just the + # block device with a label of some sort. + if _check_vmedia_device(candidate): + return candidate else: - return path.strip() + _early_log('Found possible vmedia candidate %s, however ' + 'the device failed vmedia validity checking.', + candidate) + _early_log('Did not identify any virtual media candidates devices.') def _get_vmedia_params(): @@ -143,18 +161,22 @@ def _get_vmedia_params(): :raises: VirtualMediaBootError when it cannot find the virtual media device """ parameters_file = "parameters.txt" - vmedia_device_file = _find_device_by_labels(['ir-vfd-dev']) + vmedia_device_file = _find_vmedia_device_by_labels(['ir-vfd-dev']) if not vmedia_device_file: - # TODO(rameshg87): This block of code is there only for compatibility - # reasons (so that newer agent can work with older Ironic). Remove - # this after Liberty release. + # This falls back to trying to find a matching device by name/type. + # if not found, it is likely okay to just fail out and treat it as + # No device found as there are multiple ways to launch IPA, and all + # vmedia styles should be treated consistently. vmedia_device = _get_vmedia_device() if not vmedia_device: - msg = "Unable to find virtual media device" - raise errors.VirtualMediaBootError(msg) + return {} vmedia_device_file = os.path.join("/dev", vmedia_device) + if not _check_vmedia_device(vmedia_device_file): + # If the device is not valid, return an empty dictionary. + return {} + with ironic_utils.mounted(vmedia_device_file) as vmedia_mount_point: parameters_file_path = os.path.join(vmedia_mount_point, parameters_file) @@ -201,17 +223,102 @@ def _find_mount_point(device): return path.strip() +def _check_vmedia_device(vmedia_device_file): + """Check if a virtual media device appears valid. + + Explicitly ignores partitions, actual disks, and other itmes that + seem unlikely to be virtual media based items being provided + into the running operating system via a BMC. + + :param vmedia_device_file: Path to the device to examine. + :returns: False by default, True if the device appears to be + valid. + """ + try: + output, _e = execute('lsblk', '-n', '-s', '-P', '-b', + '-oKNAME,TRAN,TYPE,SIZE', + vmedia_device_file) + except processutils.ProcessExecutionError as e: + _early_log('Failed to execute lsblk. lsblk is required for ' + 'virtual media identification. %s', e) + return False + try: + for device in ironic_utils.parse_device_tags(output): + if device['TYPE'] == 'part': + _early_log('Excluding device %s from virtual media' + 'consideration as it is a partition.', + device['KNAME']) + return False + if device['TYPE'] == 'rom': + # Media is a something like /dev/sr0, a Read only media type. + # The kernel decides this by consulting the underlying type + # registered for the scsi transport and thus type used. + # This will most likely be a qemu driven testing VM, + # or an older machine where SCSI transport is directly + # used to convey in a virtual + return True + if device['TYPE'] == 'disk' and device['TRAN'] == 'usb': + # We know from experience on HPE machines, with ilo4/5, we see + # and redfish with edgeline gear, return attachment from + # pci device 0c-03. + # https://linux-hardware.org/?probe=4d2526e9f4 + # https://linux-hardware.org/?id=pci:103c-22f6-1590-00e4 + # + # Dell hardware takes a similar approach, using an Aten usb hub + # which provides the standing connection for the BMC attached + # virtual kvm. + # https://linux-hardware.org/?id=usb:0557-8021 + # + # Supermicro also uses Aten on X11, X10, X8 + # https://linux-hardware.org/?probe=4d0ed95e02 + # + # Lenovo appears in some hardware to use an Emulux Pilot4 + # integrated hub to proivide device access on some hardware. + # https://linux-hardware.org/index.php?id=usb:2a4b-0400 + # + # ??? but the virtual devices appear to be American Megatrends + # https://linux-hardware.org/?probe=076bcef32e + # + # Fujitsu hardware is more uncertian, but appears to be similar + # in use of a USB pass-through + # http://linux-hardware.org/index.php?probe=cca9eab7fe&log=dmesg + if device['SIZE'] != "" and int(device['SIZE']) < 4294967296: + # Device is a usb backed block device which is smaller + # than 4 GiB + return True + else: + _early_log('Device %s appears to not qualify as virtual ' + 'due to the device size. Size: %s', + device['KNAME'], device['SIZE']) + _early_log('Device %s was disqualified as virtual media. ' + 'Type: %s, Transport: %s', + device['KNAME'], device['TYPE'], device['TRAN']) + return False + except KeyError: + return False + + +def _booted_from_vmedia(): + """Indicates if the machine was booted via vmedia.""" + params = _read_params_from_file('/proc/cmdline') + return params.get('boot_method') == 'vmedia' + + def copy_config_from_vmedia(): """Copies any configuration from a virtual media device. Copies files under /etc/ironic-python-agent and /etc/ironic-python-agent.d. """ - vmedia_device_file = _find_device_by_labels( + vmedia_device_file = _find_vmedia_device_by_labels( ['config-2', 'vmedia_boot_iso']) if not vmedia_device_file: _early_log('No virtual media device detected') return - + if not _booted_from_vmedia(): + _early_log('Cannot use configuration from virtual media as the ' + 'agent was not booted from virtual media.') + return + # Determine the device mounted = _find_mount_point(vmedia_device_file) if mounted: _copy_config_from(mounted) diff --git a/releasenotes/notes/check-virtual-media-devices-a9b1f54c3fe7884d.yaml b/releasenotes/notes/check-virtual-media-devices-a9b1f54c3fe7884d.yaml new file mode 100644 index 000000000..264aad04b --- /dev/null +++ b/releasenotes/notes/check-virtual-media-devices-a9b1f54c3fe7884d.yaml @@ -0,0 +1,30 @@ +--- +security: + - | + Addresses a potential vector in which an system authenticated malicious + actor could leveraged data left on disk in some limited cases to make the + API of the ``ironic-python-agent`` attackable, or possibly break cleaning + processes to prevent the machine from being able to be returned to the + available pool. Please see `story 2008749 `_ + for more information. +fixes: + - | + Adds validation of Virtual Media devices in order to prevent existing + partitions on the system from being considered as potential sources of IPA + configuration data. + - | + Adds check into the configuration load from virtual media, to ensure it + only occurs when the machine booted from virtual media. +issues: + - | + Logic around virtual media device validation is now much more strict, + and may not work in all cases. Should you discover a case, please provide + the output from ``lsblk -P -O`` with a virtual media device attached to the + Ironic development community via + `Storyboard `_. + - | + Internal logic to copy configuration data from virtual media now requires + the ``boot_method=vmedia`` flag to be set on the kernel command line of + the bootloader for the virtual media. Operators crafting custom boot + ISOs, should ensure that the appropriate command line is being added in + any custom build processes.

AltStyle によって変換されたページ (->オリジナル) /