diff --git a/examples/vulnerable_code/command_injection_with_aliases.py b/examples/vulnerable_code/command_injection_with_aliases.py new file mode 100644 index 00000000..4e409c52 --- /dev/null +++ b/examples/vulnerable_code/command_injection_with_aliases.py @@ -0,0 +1,31 @@ +import os +import os as myos +from os import system +from os import system as mysystem +from subprocess import call as mycall, Popen as mypopen + +from flask import Flask, render_template, request + +app = Flask(__name__) + + +@app.route('/menu', methods=['POST']) +def menu(): + param = request.form['suggestion'] + command = 'echo ' + param + '>> ' + 'menu.txt' + + os.system(command) + myos.system(command) + system(command) + mysystem(command) + mycall(command) + mypopen(command) + + with open('menu.txt', 'r') as f: + menu_ctx = f.read() + + return render_template('command_injection.html', menu=menu_ctx) + + +if __name__ == '__main__': + app.run(debug=True) diff --git a/pyt/cfg/alias_helper.py b/pyt/cfg/alias_helper.py index a1c83ab0..920af3f5 100644 --- a/pyt/cfg/alias_helper.py +++ b/pyt/cfg/alias_helper.py @@ -74,3 +74,21 @@ def retrieve_import_alias_mapping(names_list): if alias.asname: import_alias_names[alias.asname] = alias.name return import_alias_names + + +def fully_qualify_alias_labels(label, aliases): + """Replace any aliases in label with the fully qualified name. + + Args: + label -- A label : str representing a name (e.g. myos.system) + aliases -- A dict of {alias: real_name} (e.g. {'myos': 'os'}) + +>>> fully_qualify_alias_labels('myos.mycall', {'myos':'os'}) + 'os.mycall' + """ + for alias, full_name in aliases.items(): + if label == alias: + return full_name + elif label.startswith(alias+'.'): + return full_name + label[len(alias):] + return label diff --git a/pyt/cfg/stmt_visitor.py b/pyt/cfg/stmt_visitor.py index 3b9d5f48..16da1bb0 100644 --- a/pyt/cfg/stmt_visitor.py +++ b/pyt/cfg/stmt_visitor.py @@ -6,6 +6,7 @@ from .alias_helper import ( as_alias_handler, + fully_qualify_alias_labels, handle_aliases_in_init_files, handle_fdid_aliases, not_as_alias_handler, @@ -624,6 +625,11 @@ def add_blackbox_or_builtin_call(self, node, blackbox): # noqa: C901 call_function_label = call_label_visitor.result[:call_label_visitor.result.find('(')] + # Check if function call matches a blackbox/built-in alias and if so, resolve it + # This resolves aliases like "from os import system as mysys" as: mysys -> os.system + local_definitions = self.module_definitions_stack[-1] + call_function_label = fully_qualify_alias_labels(call_function_label, local_definitions.import_alias_mapping) + # Create e.g. ~call_1 = ret_func_foo LHS = CALL_IDENTIFIER + 'call_' + str(saved_function_call_index) RHS = 'ret_' + call_function_label + '(' @@ -810,7 +816,7 @@ def add_module( # noqa: C901 module_path = module[1] parent_definitions = self.module_definitions_stack[-1] - # The only place the import_alias_mapping is updated + # Here, in `visit_Import` and in `visit_ImportFrom` are the only places the `import_alias_mapping` is updated parent_definitions.import_alias_mapping.update(import_alias_mapping) parent_definitions.import_names = local_names @@ -919,10 +925,10 @@ def from_directory_import( if init_exists and not skip_init: package_name = os.path.split(module_path)[1] return self.add_module( - (module[0], init_file_location), - package_name, - local_names, - import_alias_mapping, + module=(module[0], init_file_location), + module_or_package_name=package_name, + local_names=local_names, + import_alias_mapping=import_alias_mapping, is_init=True, from_from=True ) @@ -932,10 +938,10 @@ def from_directory_import( new_init_file_location = os.path.join(full_name, '__init__.py') if os.path.isfile(new_init_file_location): self.add_module( - (real_name, new_init_file_location), - real_name, - local_names, - import_alias_mapping, + module=(real_name, new_init_file_location), + module_or_package_name=real_name, + local_names=local_names, + import_alias_mapping=import_alias_mapping, is_init=True, from_from=True, from_fdid=True @@ -945,10 +951,10 @@ def from_directory_import( else: file_module = (real_name, full_name + '.py') self.add_module( - file_module, - real_name, - local_names, - import_alias_mapping, + module=file_module, + module_or_package_name=real_name, + local_names=local_names, + import_alias_mapping=import_alias_mapping, from_from=True ) return IgnoredNode() @@ -959,10 +965,10 @@ def import_package(self, module, module_name, local_name, import_alias_mapping): init_exists = os.path.isfile(init_file_location) if init_exists: return self.add_module( - (module[0], init_file_location), - module_name, - local_name, - import_alias_mapping, + module=(module[0], init_file_location), + module_or_package_name=module_name, + local_names=local_name, + import_alias_mapping=import_alias_mapping, is_init=True ) else: @@ -1005,10 +1011,10 @@ def handle_relative_import(self, node): # Is it a file? if name_with_dir.endswith('.py'): return self.add_module( - (node.module, name_with_dir), - None, - as_alias_handler(node.names), - retrieve_import_alias_mapping(node.names), + module=(node.module, name_with_dir), + module_or_package_name=None, + local_names=as_alias_handler(node.names), + import_alias_mapping=retrieve_import_alias_mapping(node.names), from_from=True ) return self.from_directory_import( @@ -1031,10 +1037,10 @@ def visit_Import(self, node): retrieve_import_alias_mapping(node.names) ) return self.add_module( - module, - name.name, - name.asname, - retrieve_import_alias_mapping(node.names) + module=module, + module_or_package_name=name.name, + local_names=name.asname, + import_alias_mapping=retrieve_import_alias_mapping(node.names) ) for module in self.project_modules: if name.name == module[0]: @@ -1046,14 +1052,20 @@ def visit_Import(self, node): retrieve_import_alias_mapping(node.names) ) return self.add_module( - module, - name.name, - name.asname, - retrieve_import_alias_mapping(node.names) + module=module, + module_or_package_name=name.name, + local_names=name.asname, + import_alias_mapping=retrieve_import_alias_mapping(node.names) ) for alias in node.names: + # The module is uninspectable (so blackbox or built-in). If it has an alias, we remember + # the alias so we can do fully qualified name resolution for blackbox- and built-in trigger words + # e.g. we want a call to "os.system" be recognised, even if we do "import os as myos" + if alias.asname is not None and alias.asname != alias.name: + local_definitions = self.module_definitions_stack[-1] + local_definitions.import_alias_mapping[name.asname] = alias.name if alias.name not in uninspectable_modules: - log.warn("Cannot inspect module %s", alias.name) + log.warning("Cannot inspect module %s", alias.name) uninspectable_modules.add(alias.name) # Don't repeatedly warn about this return IgnoredNode() @@ -1061,40 +1073,48 @@ def visit_ImportFrom(self, node): # Is it relative? if node.level> 0: return self.handle_relative_import(node) - else: - for module in self.local_modules: - if node.module == module[0]: - if os.path.isdir(module[1]): - return self.from_directory_import( - module, - not_as_alias_handler(node.names), - as_alias_handler(node.names) - ) - return self.add_module( + # not relative + for module in self.local_modules: + if node.module == module[0]: + if os.path.isdir(module[1]): + return self.from_directory_import( module, - None, - as_alias_handler(node.names), - retrieve_import_alias_mapping(node.names), - from_from=True + not_as_alias_handler(node.names), + as_alias_handler(node.names) ) - for module in self.project_modules: - name = module[0] - if node.module == name: - if os.path.isdir(module[1]): - return self.from_directory_import( - module, - not_as_alias_handler(node.names), - as_alias_handler(node.names), - retrieve_import_alias_mapping(node.names) - ) - return self.add_module( + return self.add_module( + module=module, + module_or_package_name=None, + local_names=as_alias_handler(node.names), + import_alias_mapping=retrieve_import_alias_mapping(node.names), + from_from=True + ) + for module in self.project_modules: + name = module[0] + if node.module == name: + if os.path.isdir(module[1]): + return self.from_directory_import( module, - None, + not_as_alias_handler(node.names), as_alias_handler(node.names), - retrieve_import_alias_mapping(node.names), - from_from=True + retrieve_import_alias_mapping(node.names) ) + return self.add_module( + module=module, + module_or_package_name=None, + local_names=as_alias_handler(node.names), + import_alias_mapping=retrieve_import_alias_mapping(node.names), + from_from=True + ) + + # Remember aliases for uninspectable modules such that we can label them fully qualified + # e.g. we want a call to "os.system" be recognised, even if we do "from os import system" + # from os import system as mysystem -> module=os, name=system, asname=mysystem + for name in node.names: + local_definitions = self.module_definitions_stack[-1] + local_definitions.import_alias_mapping[name.asname or name.name] = "{}.{}".format(node.module, name.name) + if node.module not in uninspectable_modules: - log.warn("Cannot inspect module %s", node.module) + log.warning("Cannot inspect module %s", node.module) uninspectable_modules.add(node.module) return IgnoredNode() diff --git a/pyt/vulnerability_definitions/all_trigger_words.pyt b/pyt/vulnerability_definitions/all_trigger_words.pyt index 5642db5c..615e86b6 100644 --- a/pyt/vulnerability_definitions/all_trigger_words.pyt +++ b/pyt/vulnerability_definitions/all_trigger_words.pyt @@ -31,9 +31,10 @@ ] }, "execute(": {}, - "system(": {}, + "os.system(": {}, "filter(": {}, "subprocess.call(": {}, + "subprocess.Popen(": {}, "render_template(": {}, "set_cookie(": {}, "redirect(": {}, @@ -41,7 +42,6 @@ "flash(": {}, "jsonify(": {}, "render(": {}, - "render_to_response(": {}, - "Popen(": {} + "render_to_response(": {} } } diff --git a/tests/main_test.py b/tests/main_test.py index 561d8bd1..fef1e124 100644 --- a/tests/main_test.py +++ b/tests/main_test.py @@ -108,11 +108,11 @@ def test_targets_with_recursive(self): excluded_files = "" included_files = discover_files(targets, excluded_files, True) - self.assertEqual(len(included_files), 33) + self.assertEqual(len(included_files), 34) def test_targets_with_recursive_and_excluded(self): targets = ["examples/vulnerable_code/"] excluded_files = "inter_command_injection.py" included_files = discover_files(targets, excluded_files, True) - self.assertEqual(len(included_files), 32) + self.assertEqual(len(included_files), 33) diff --git a/tests/vulnerabilities/vulnerabilities_across_files_test.py b/tests/vulnerabilities/vulnerabilities_across_files_test.py index bd63b190..d7b8f0d1 100644 --- a/tests/vulnerabilities/vulnerabilities_across_files_test.py +++ b/tests/vulnerabilities/vulnerabilities_across_files_test.py @@ -62,7 +62,7 @@ def test_blackbox_library_call(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code_across_files/blackbox_library_call.py > User input at line 12, source "request.args.get(": - ~call_1 = ret_request.args.get('suggestion') + ~call_1 = ret_flask.request.args.get('suggestion') Reassigned in: File: examples/vulnerable_code_across_files/blackbox_library_call.py > Line 12: param = ~call_1 diff --git a/tests/vulnerabilities/vulnerabilities_base_test_case.py b/tests/vulnerabilities/vulnerabilities_base_test_case.py index c21f81ed..a7e86121 100644 --- a/tests/vulnerabilities/vulnerabilities_base_test_case.py +++ b/tests/vulnerabilities/vulnerabilities_base_test_case.py @@ -11,7 +11,7 @@ def string_compare_alpha(self, output, expected_string): def assertAlphaEqual(self, output, expected_string): self.assertEqual( - [char for char in output if char.isalpha()], - [char for char in expected_string if char.isalpha()] + ''.join(char for char in output if char.isalpha()), + ''.join(char for char in expected_string if char.isalpha()) ) return True diff --git a/tests/vulnerabilities/vulnerabilities_test.py b/tests/vulnerabilities/vulnerabilities_test.py index 5a28aa03..893ec70a 100644 --- a/tests/vulnerabilities/vulnerabilities_test.py +++ b/tests/vulnerabilities/vulnerabilities_test.py @@ -150,7 +150,7 @@ def test_XSS_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/XSS.py > User input at line 6, source "request.args.get(": - ~call_1 = ret_request.args.get('param', 'not set') + ~call_1 = ret_flask.request.args.get('param', 'not set') Reassigned in: File: examples/vulnerable_code/XSS.py > Line 6: param = ~call_1 @@ -186,7 +186,7 @@ def test_path_traversal_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/path_traversal.py > User input at line 15, source "request.args.get(": - ~call_1 = ret_request.args.get('image_name') + ~call_1 = ret_flask.request.args.get('image_name') Reassigned in: File: examples/vulnerable_code/path_traversal.py > Line 15: image_name = ~call_1 @@ -210,7 +210,7 @@ def test_path_traversal_result(self): > Line 19: foo = ~call_2 File: examples/vulnerable_code/path_traversal.py > reaches line 20, sink "send_file(": - ~call_4 = ret_send_file(foo) + ~call_4 = ret_flask.send_file(foo) """ self.assertAlphaEqual(vulnerability_description, EXPECTED_VULNERABILITY_DESCRIPTION) @@ -222,7 +222,7 @@ def test_ensure_saved_scope(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/ensure_saved_scope.py > User input at line 15, source "request.args.get(": - ~call_1 = ret_request.args.get('image_name') + ~call_1 = ret_flask.request.args.get('image_name') Reassigned in: File: examples/vulnerable_code/ensure_saved_scope.py > Line 15: image_name = ~call_1 @@ -232,7 +232,7 @@ def test_ensure_saved_scope(self): > Line 10: save_3_image_name = image_name File: examples/vulnerable_code/ensure_saved_scope.py > reaches line 20, sink "send_file(": - ~call_4 = ret_send_file(image_name) + ~call_4 = ret_flask.send_file(image_name) """ self.assertAlphaEqual( vulnerability_description, @@ -246,7 +246,7 @@ def test_path_traversal_sanitised_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/path_traversal_sanitised.py > User input at line 8, source "request.args.get(": - ~call_1 = ret_request.args.get('image_name') + ~call_1 = ret_flask.request.args.get('image_name') Reassigned in: File: examples/vulnerable_code/path_traversal_sanitised.py > Line 8: image_name = ~call_1 @@ -258,7 +258,7 @@ def test_path_traversal_sanitised_result(self): > Line 12: ~call_4 = ret_os.path.join(~call_5, image_name) File: examples/vulnerable_code/path_traversal_sanitised.py > reaches line 12, sink "send_file(": - ~call_3 = ret_send_file(~call_4) + ~call_3 = ret_flask.send_file(~call_4) This vulnerability is sanitised by: Label: ~call_2 = ret_image_name.replace('..', '') """ @@ -271,7 +271,7 @@ def test_path_traversal_sanitised_2_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/path_traversal_sanitised_2.py > User input at line 8, source "request.args.get(": - ~call_1 = ret_request.args.get('image_name') + ~call_1 = ret_flask.request.args.get('image_name') Reassigned in: File: examples/vulnerable_code/path_traversal_sanitised_2.py > Line 8: image_name = ~call_1 @@ -279,7 +279,7 @@ def test_path_traversal_sanitised_2_result(self): > Line 12: ~call_3 = ret_os.path.join(~call_4, image_name) File: examples/vulnerable_code/path_traversal_sanitised_2.py > reaches line 12, sink "send_file(": - ~call_2 = ret_send_file(~call_3) + ~call_2 = ret_flask.send_file(~call_3) This vulnerability is potentially sanitised by: Label: if '..' in image_name: """ @@ -292,7 +292,7 @@ def test_sql_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/sql/sqli.py > User input at line 26, source "request.args.get(": - ~call_1 = ret_request.args.get('param', 'not set') + ~call_1 = ret_flask.request.args.get('param', 'not set') Reassigned in: File: examples/vulnerable_code/sql/sqli.py > Line 26: param = ~call_1 @@ -347,7 +347,7 @@ def test_XSS_reassign_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/XSS_reassign.py > User input at line 6, source "request.args.get(": - ~call_1 = ret_request.args.get('param', 'not set') + ~call_1 = ret_flask.request.args.get('param', 'not set') Reassigned in: File: examples/vulnerable_code/XSS_reassign.py > Line 6: param = ~call_1 @@ -367,18 +367,18 @@ def test_XSS_sanitised_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/XSS_sanitised.py > User input at line 7, source "request.args.get(": - ~call_1 = ret_request.args.get('param', 'not set') + ~call_1 = ret_flask.request.args.get('param', 'not set') Reassigned in: File: examples/vulnerable_code/XSS_sanitised.py > Line 7: param = ~call_1 File: examples/vulnerable_code/XSS_sanitised.py -> Line 9: ~call_2 = ret_Markup.escape(param) +> Line 9: ~call_2 = ret_flask.Markup.escape(param) File: examples/vulnerable_code/XSS_sanitised.py > Line 9: param = ~call_2 File: examples/vulnerable_code/XSS_sanitised.py > reaches line 12, sink "replace(": ~call_5 = ret_html.replace('{{ param }}', param) - This vulnerability is sanitised by: Label: ~call_2 = ret_Markup.escape(param) + This vulnerability is sanitised by: Label: ~call_2 = ret_flask.Markup.escape(param) """ self.assertAlphaEqual(vulnerability_description, EXPECTED_VULNERABILITY_DESCRIPTION) @@ -394,7 +394,7 @@ def test_XSS_variable_assign_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/XSS_variable_assign.py > User input at line 6, source "request.args.get(": - ~call_1 = ret_request.args.get('param', 'not set') + ~call_1 = ret_flask.request.args.get('param', 'not set') Reassigned in: File: examples/vulnerable_code/XSS_variable_assign.py > Line 6: param = ~call_1 @@ -414,7 +414,7 @@ def test_XSS_variable_multiple_assign_result(self): EXPECTED_VULNERABILITY_DESCRIPTION = """ File: examples/vulnerable_code/XSS_variable_multiple_assign.py > User input at line 6, source "request.args.get(": - ~call_1 = ret_request.args.get('param', 'not set') + ~call_1 = ret_flask.request.args.get('param', 'not set') Reassigned in: File: examples/vulnerable_code/XSS_variable_multiple_assign.py > Line 6: param = ~call_1 @@ -482,6 +482,21 @@ def test_list_append_taints_list(self): ) self.assert_length(vulnerabilities, expected_length=1) + def test_import_bb_or_bi_with_alias(self): + vulnerabilities = self.run_analysis('examples/vulnerable_code/command_injection_with_aliases.py') + + EXPECTED_SINK_TRIGGER_WORDS = [ + 'os.system(', + 'os.system(', + 'os.system(', + 'os.system(', + 'subprocess.call(', + 'subprocess.Popen(' + ] + + for vuln, expected_sink_trigger_word in zip(vulnerabilities, EXPECTED_SINK_TRIGGER_WORDS): + self.assertEqual(vuln.sink_trigger_word, expected_sink_trigger_word) + class EngineDjangoTest(VulnerabilitiesBaseTestCase): def run_analysis(self, path): @@ -516,7 +531,7 @@ def test_django_view_param(self): param File: examples/vulnerable_code/django_XSS.py > reaches line 5, sink "render(": - ~call_1 = ret_render(request, 'templates/xss.html', 'param'param) + ~call_1 = ret_django.shortcuts.render(request, 'templates/xss.html', 'param'param) """ self.assertAlphaEqual(vulnerability_description, EXPECTED_VULNERABILITY_DESCRIPTION)