From 5d1e5fb35fda1d7c1363f6c6898f516604f7965f Mon Sep 17 00:00:00 2001 From: Michael Mintz Date: 2024年10月23日 16:23:10 -0400 Subject: [PATCH 1/6] Add CDP Mode to UC Mode --- help_docs/method_summary.md | 4 + seleniumbase/common/decorators.py | 15 +- seleniumbase/console_scripts/sb_install.py | 8 +- seleniumbase/core/browser_launcher.py | 296 +++- seleniumbase/core/sb_cdp.py | 898 +++++++++++ seleniumbase/core/sb_driver.py | 44 + seleniumbase/fixtures/base_case.py | 214 ++- seleniumbase/fixtures/constants.py | 1 + seleniumbase/fixtures/js_utils.py | 4 + seleniumbase/fixtures/page_actions.py | 106 ++ seleniumbase/fixtures/shared_utils.py | 26 + seleniumbase/plugins/driver_manager.py | 183 ++- seleniumbase/plugins/sb_manager.py | 41 +- seleniumbase/undetected/__init__.py | 2 + .../undetected/cdp_driver/__init__.py | 1 + .../undetected/cdp_driver/_contradict.py | 110 ++ seleniumbase/undetected/cdp_driver/browser.py | 830 +++++++++++ .../undetected/cdp_driver/cdp_util.py | 301 ++++ seleniumbase/undetected/cdp_driver/config.py | 322 ++++ .../undetected/cdp_driver/connection.py | 625 ++++++++ seleniumbase/undetected/cdp_driver/element.py | 1150 ++++++++++++++ seleniumbase/undetected/cdp_driver/tab.py | 1319 +++++++++++++++++ seleniumbase/undetected/patcher.py | 10 +- 23 files changed, 6396 insertions(+), 114 deletions(-) create mode 100644 seleniumbase/core/sb_cdp.py create mode 100644 seleniumbase/undetected/cdp_driver/__init__.py create mode 100644 seleniumbase/undetected/cdp_driver/_contradict.py create mode 100644 seleniumbase/undetected/cdp_driver/browser.py create mode 100644 seleniumbase/undetected/cdp_driver/cdp_util.py create mode 100644 seleniumbase/undetected/cdp_driver/config.py create mode 100644 seleniumbase/undetected/cdp_driver/connection.py create mode 100644 seleniumbase/undetected/cdp_driver/element.py create mode 100644 seleniumbase/undetected/cdp_driver/tab.py diff --git a/help_docs/method_summary.md b/help_docs/method_summary.md index fdce9beeecc..d79ba678b36 100644 --- a/help_docs/method_summary.md +++ b/help_docs/method_summary.md @@ -125,6 +125,8 @@ self.remove_attribute(selector, attribute, by="css selector", timeout=None) self.remove_attributes(selector, attribute, by="css selector") +self.internalize_links() + self.get_property(selector, property, by="css selector", timeout=None) self.get_text_content(selector="html", by="css selector", timeout=None) @@ -134,6 +136,8 @@ self.get_property_value(selector, property, by="css selector", timeout=None) self.get_image_url(selector, by="css selector", timeout=None) self.find_elements(selector, by="css selector", limit=0) +# Duplicates: +# self.select_all(selector, by="css selector", limit=0) self.find_visible_elements(selector, by="css selector", limit=0) diff --git a/seleniumbase/common/decorators.py b/seleniumbase/common/decorators.py index 6c5a3221c69..0a1850716cc 100644 --- a/seleniumbase/common/decorators.py +++ b/seleniumbase/common/decorators.py @@ -43,23 +43,24 @@ def my_method(): finally: end_time = time.time() run_time = end_time - start_time + name = description # Print times with a statistically significant number of decimal places if run_time < 0.0001: - print(" {%s} ran for %.7f seconds." % (description, run_time)) + print(" - {%s} ran for %.7f seconds." % (name, run_time)) elif run_time < 0.001: - print(" {%s} ran for %.6f seconds." % (description, run_time)) + print(" - {%s} ran for %.6f seconds." % (name, run_time)) elif run_time < 0.01: - print(" {%s} ran for %.5f seconds." % (description, run_time)) + print(" - {%s} ran for %.5f seconds." % (name, run_time)) elif run_time < 0.1: - print(" {%s} ran for %.4f seconds." % (description, run_time)) + print(" - {%s} ran for %.4f seconds." % (name, run_time)) elif run_time < 1: - print(" {%s} ran for %.3f seconds." % (description, run_time)) + print(" - {%s} ran for %.3f seconds." % (name, run_time)) else: - print(" {%s} ran for %.2f seconds." % (description, run_time)) + print(" - {%s} ran for %.2f seconds." % (name, run_time)) if limit and limit> 0 and run_time> limit: message = ( "\n {%s} duration of %.2fs exceeded the time limit of %.2fs!" - % (description, run_time, limit) + % (name, run_time, limit) ) if exception: message = exception.msg + "\nAND " + message diff --git a/seleniumbase/console_scripts/sb_install.py b/seleniumbase/console_scripts/sb_install.py index cc529097898..8d55ac87bb1 100644 --- a/seleniumbase/console_scripts/sb_install.py +++ b/seleniumbase/console_scripts/sb_install.py @@ -811,7 +811,7 @@ def main(override=None, intel_for_uc=None, force_uc=None): zip_ref.extractall(downloads_folder) zip_ref.close() os.remove(zip_file_path) - shutil.copyfile(driver_path, os.path.join(downloads_folder, filename)) + shutil.copy3(driver_path, os.path.join(downloads_folder, filename)) log_d("%sUnzip Complete!%s\n" % (c2, cr)) to_remove = [ "%s/%s/ruby_example/Gemfile" % (downloads_folder, h_ie_fn), @@ -953,7 +953,7 @@ def main(override=None, intel_for_uc=None, force_uc=None): ) if copy_to_path and os.path.exists(LOCAL_PATH): path_file = LOCAL_PATH + f_name - shutil.copyfile(new_file, path_file) + shutil.copy2(new_file, path_file) make_executable(path_file) log_d("Also copied to: %s%s%s" % (c3, path_file, cr)) log_d("") @@ -1042,7 +1042,7 @@ def main(override=None, intel_for_uc=None, force_uc=None): ) if copy_to_path and os.path.exists(LOCAL_PATH): path_file = LOCAL_PATH + f_name - shutil.copyfile(new_file, path_file) + shutil.copy2(new_file, path_file) make_executable(path_file) log_d("Also copied to: %s%s%s" % (c3, path_file, cr)) log_d("") @@ -1078,7 +1078,7 @@ def main(override=None, intel_for_uc=None, force_uc=None): ) if copy_to_path and os.path.exists(LOCAL_PATH): path_file = LOCAL_PATH + f_name - shutil.copyfile(new_file, path_file) + shutil.copy2(new_file, path_file) make_executable(path_file) log_d("Also copied to: %s%s%s" % (c3, path_file, cr)) log_d("") diff --git a/seleniumbase/core/browser_launcher.py b/seleniumbase/core/browser_launcher.py index 01cba62c226..f92a0a7acf4 100644 --- a/seleniumbase/core/browser_launcher.py +++ b/seleniumbase/core/browser_launcher.py @@ -29,6 +29,7 @@ from seleniumbase.core import download_helper from seleniumbase.core import proxy_helper from seleniumbase.core import sb_driver +from seleniumbase.core import sb_cdp from seleniumbase.fixtures import constants from seleniumbase.fixtures import js_utils from seleniumbase.fixtures import page_actions @@ -153,8 +154,10 @@ def extend_driver(driver): page.find_element = DM.find_element page.find_elements = DM.find_elements page.locator = DM.locator + page.get_current_url = DM.get_current_url page.get_page_source = DM.get_page_source page.get_title = DM.get_title + page.get_page_title = DM.get_title page.switch_to_default_window = DM.switch_to_default_window page.switch_to_newest_window = DM.switch_to_newest_window page.open_new_window = DM.open_new_window @@ -206,6 +209,9 @@ def extend_driver(driver): driver.is_valid_url = DM.is_valid_url driver.is_alert_present = DM.is_alert_present driver.is_online = DM.is_online + driver.is_connected = DM.is_connected + driver.is_uc_mode_active = DM.is_uc_mode_active + driver.is_cdp_mode_active = DM.is_cdp_mode_active driver.js_click = DM.js_click driver.get_text = DM.get_text driver.get_active_element_css = DM.get_active_element_css @@ -217,8 +223,10 @@ def extend_driver(driver): driver.highlight_if_visible = DM.highlight_if_visible driver.sleep = time.sleep driver.get_attribute = DM.get_attribute + driver.get_current_url = DM.get_current_url driver.get_page_source = DM.get_page_source driver.get_title = DM.get_title + driver.get_page_title = DM.get_title driver.switch_to_default_window = DM.switch_to_default_window driver.switch_to_newest_window = DM.switch_to_newest_window driver.open_new_window = DM.open_new_window @@ -362,6 +370,11 @@ def has_captcha(text): return False +def __is_cdp_swap_needed(driver): + """If the driver is disconnected, use a CDP method when available.""" + return shared_utils.is_cdp_swap_needed(driver) + + def uc_special_open_if_cf( driver, url, @@ -432,10 +445,11 @@ def uc_special_open_if_cf( def uc_open(driver, url): - if url.startswith("//"): - url = "https:" + url - elif ":" not in url: - url = "https://" + url + url = shared_utils.fix_url_as_needed(url) + if __is_cdp_swap_needed(driver): + driver.cdp.get(url) + time.sleep(0.3) + return if (url.startswith("http:") or url.startswith("https:")): with driver: script = 'window.location.href = "%s";' % url @@ -446,10 +460,11 @@ def uc_open(driver, url): def uc_open_with_tab(driver, url): - if url.startswith("//"): - url = "https:" + url - elif ":" not in url: - url = "https://" + url + url = shared_utils.fix_url_as_needed(url) + if __is_cdp_swap_needed(driver): + driver.cdp.get(url) + time.sleep(0.3) + return if (url.startswith("http:") or url.startswith("https:")): with driver: driver.execute_script('window.open("%s","_blank");' % url) @@ -462,12 +477,13 @@ def uc_open_with_tab(driver, url): def uc_open_with_reconnect(driver, url, reconnect_time=None): """Open a url, disconnect chromedriver, wait, and reconnect.""" + url = shared_utils.fix_url_as_needed(url) + if __is_cdp_swap_needed(driver): + driver.cdp.get(url) + time.sleep(0.3) + return if not reconnect_time: reconnect_time = constants.UC.RECONNECT_TIME - if url.startswith("//"): - url = "https:" + url - elif ":" not in url: - url = "https://" + url if (url.startswith("http:") or url.startswith("https:")): script = 'window.open("%s","_blank");' % url driver.execute_script(script) @@ -493,15 +509,157 @@ def uc_open_with_reconnect(driver, url, reconnect_time=None): return None +def uc_open_with_cdp_mode(driver, url=None): + import asyncio + from seleniumbase.undetected.cdp_driver import cdp_util + + current_url = None + try: + current_url = driver.current_url + except Exception: + driver.connect() + current_url = driver.current_url + url_protocol = current_url.split(":")[0] + if url_protocol not in ["about", "data", "chrome"]: + script = 'window.open("data:,","_blank");' + js_utils.call_me_later(driver, script, 3) + time.sleep(0.012) + driver.close() + driver.clear_cdp_listeners() + driver.delete_all_cookies() + driver.delete_network_conditions() + driver.disconnect() + + cdp_details = driver._get_cdp_details() + cdp_host = cdp_details[1].split("://")[1].split(":")[0] + cdp_port = int(cdp_details[1].split("://")[1].split(":")[1].split("/")[0]) + + url = shared_utils.fix_url_as_needed(url) + url_protocol = url.split(":")[0] + safe_url = True + if url_protocol not in ["about", "data", "chrome"]: + safe_url = False + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + driver.cdp_base = loop.run_until_complete( + cdp_util.start(host=cdp_host, port=cdp_port) + ) + page = loop.run_until_complete(driver.cdp_base.get(url)) + if not safe_url: + time.sleep(constants.UC.CDP_MODE_OPEN_WAIT) + cdp = types.SimpleNamespace() + CDPM = sb_cdp.CDPMethods(loop, page, driver) + cdp.get = CDPM.get + cdp.open = CDPM.get + cdp.reload = CDPM.reload + cdp.refresh = CDPM.refresh + cdp.add_handler = CDPM.add_handler + cdp.get_event_loop = CDPM.get_event_loop + cdp.find_element = CDPM.find_element + cdp.find = CDPM.find_element + cdp.locator = CDPM.find_element + cdp.find_all = CDPM.find_all + cdp.find_elements_by_text = CDPM.find_elements_by_text + cdp.select = CDPM.select + cdp.select_all = CDPM.select_all + cdp.click_link = CDPM.click_link + cdp.tile_windows = CDPM.tile_windows + cdp.get_all_cookies = CDPM.get_all_cookies + cdp.set_all_cookies = CDPM.set_all_cookies + cdp.save_cookies = CDPM.save_cookies + cdp.load_cookies = CDPM.load_cookies + cdp.clear_cookies = CDPM.clear_cookies + cdp.bring_active_window_to_front = CDPM.bring_active_window_to_front + cdp.bring_to_front = CDPM.bring_active_window_to_front + cdp.get_active_element = CDPM.get_active_element + cdp.get_active_element_css = CDPM.get_active_element_css + cdp.click = CDPM.click + cdp.click_active_element = CDPM.click_active_element + cdp.click_if_visible = CDPM.click_if_visible + cdp.mouse_click = CDPM.mouse_click + cdp.remove_element = CDPM.remove_element + cdp.remove_from_dom = CDPM.remove_from_dom + cdp.remove_elements = CDPM.remove_elements + cdp.scroll_into_view = CDPM.scroll_into_view + cdp.send_keys = CDPM.send_keys + cdp.press_keys = CDPM.press_keys + cdp.type = CDPM.type + cdp.evaluate = CDPM.evaluate + cdp.js_dumps = CDPM.js_dumps + cdp.maximize = CDPM.maximize + cdp.minimize = CDPM.minimize + cdp.medimize = CDPM.medimize + cdp.set_window_rect = CDPM.set_window_rect + cdp.reset_window_size = CDPM.reset_window_size + cdp.set_attributes = CDPM.set_attributes + cdp.internalize_links = CDPM.internalize_links + cdp.get_window = CDPM.get_window + cdp.get_element_attributes = CDPM.get_element_attributes + cdp.get_element_html = CDPM.get_element_html + cdp.get_element_rect = CDPM.get_element_rect + cdp.get_element_size = CDPM.get_element_size + cdp.get_element_position = CDPM.get_element_position + cdp.get_gui_element_rect = CDPM.get_gui_element_rect + cdp.get_gui_element_center = CDPM.get_gui_element_center + cdp.get_page_source = CDPM.get_page_source + cdp.get_user_agent = CDPM.get_user_agent + cdp.get_cookie_string = CDPM.get_cookie_string + cdp.get_locale_code = CDPM.get_locale_code + cdp.get_text = CDPM.get_text + cdp.get_title = CDPM.get_title + cdp.get_page_title = CDPM.get_title + cdp.get_current_url = CDPM.get_current_url + cdp.get_origin = CDPM.get_origin + cdp.get_nested_element = CDPM.get_nested_element + cdp.get_document = CDPM.get_document + cdp.get_flattened_document = CDPM.get_flattened_document + cdp.get_screen_rect = CDPM.get_screen_rect + cdp.get_window_rect = CDPM.get_window_rect + cdp.get_window_size = CDPM.get_window_size + cdp.nested_click = CDPM.nested_click + cdp.flash = CDPM.flash + cdp.focus = CDPM.focus + cdp.highlight_overlay = CDPM.highlight_overlay + cdp.get_window_position = CDPM.get_window_position + cdp.is_element_present = CDPM.is_element_present + cdp.is_element_visible = CDPM.is_element_visible + cdp.assert_element_present = CDPM.assert_element_present + cdp.assert_element = CDPM.assert_element + cdp.assert_element_visible = CDPM.assert_element + cdp.assert_text = CDPM.assert_text + cdp.assert_exact_text = CDPM.assert_exact_text + cdp.save_screenshot = CDPM.save_screenshot + cdp.page = page # async world + cdp.driver = driver.cdp_base # async world + cdp.tab = cdp.page # shortcut (original) + cdp.browser = driver.cdp_base # shortcut (original) + cdp.util = cdp_util # shortcut (original) + core_items = types.SimpleNamespace() + core_items.browser = cdp.browser + core_items.tab = cdp.tab + core_items.util = cdp.util + cdp.core = core_items + driver.cdp = cdp + driver._is_using_cdp = True + + +def uc_activate_cdp_mode(driver, url=None): + uc_open_with_cdp_mode(driver, url=url) + + def uc_open_with_disconnect(driver, url, timeout=None): """Open a url and disconnect chromedriver. Then waits for the duration of the timeout. Note: You can't perform Selenium actions again until after you've called driver.connect().""" - if url.startswith("//"): - url = "https:" + url - elif ":" not in url: - url = "https://" + url + url = shared_utils.fix_url_as_needed(url) + if __is_cdp_swap_needed(driver): + driver.cdp.get(url) + time.sleep(0.3) + return + if not driver.is_connected(): + driver.connect() if (url.startswith("http:") or url.startswith("https:")): script = 'window.open("%s","_blank");' % url driver.execute_script(script) @@ -685,6 +843,9 @@ def uc_gui_write(driver, text): def get_gui_element_position(driver, selector): + if __is_cdp_swap_needed(driver): + element_rect = driver.cdp.get_gui_element_rect(selector) + return (element_rect["x"], element_rect["y"]) element = driver.wait_for_element_present(selector, timeout=3) element_rect = element.rect window_rect = driver.get_window_rect() @@ -823,6 +984,7 @@ def _uc_gui_click_captcha( blind=False, ctype=None, ): + cdp_mode_on_at_start = __is_cdp_swap_needed(driver) _on_a_captcha_page = None if ctype == "cf_t": if not _on_a_cf_turnstile_page(driver): @@ -864,10 +1026,13 @@ def _uc_gui_click_captcha( is_in_frame = js_utils.is_in_frame(driver) if not is_in_frame: # Make sure the window is on top - page_actions.switch_to_window( - driver, driver.current_window_handle, 2, uc_lock=False - ) - if IS_WINDOWS: + if __is_cdp_swap_needed(driver): + driver.cdp.bring_active_window_to_front() + else: + page_actions.switch_to_window( + driver, driver.current_window_handle, 2, uc_lock=False + ) + if IS_WINDOWS and not __is_cdp_swap_needed(driver): window_rect = driver.get_window_rect() width = window_rect["width"] height = window_rect["height"] @@ -950,7 +1115,10 @@ def _uc_gui_click_captcha( new_class = the_class.replaceAll('center', 'left'); $elements[index].setAttribute('class', new_class);}""" ) - driver.execute_script(script) + if __is_cdp_swap_needed(driver): + driver.cdp.evaluate(script) + else: + driver.execute_script(script) if not is_in_frame or needs_switch: # Currently not in frame (or nested frame outside CF one) try: @@ -961,16 +1129,22 @@ def _uc_gui_click_captcha( if visible_iframe: if driver.is_element_present("iframe"): i_x, i_y = get_gui_element_position(driver, "iframe") - driver.switch_to_frame("iframe") + if driver.is_connected(): + driver.switch_to_frame("iframe") else: return if not i_x or not i_y: return try: - if visible_iframe: + if ctype == "g_rc" and not driver.is_connected(): + x = (i_x + 32) * width_ratio + y = (i_y + 34) * width_ratio + elif visible_iframe: selector = "span" if ctype == "g_rc": selector = "span.recaptcha-checkbox" + if not driver.is_connected(): + selector = "iframe" element = driver.wait_for_element_present( selector, timeout=2.5 ) @@ -981,16 +1155,18 @@ def _uc_gui_click_captcha( else: x = (i_x + 34) * width_ratio y = (i_y + 34) * width_ratio - driver.switch_to.default_content() - except Exception: - try: + if driver.is_connected(): driver.switch_to.default_content() - except Exception: - return + except Exception: + if driver.is_connected(): + try: + driver.switch_to.default_content() + except Exception: + return if x and y: sb_config._saved_cf_x_y = (x, y) if driver.is_element_present(".footer .clearfix .ray-id"): - driver.uc_open_with_disconnect(driver.current_url, 3.8) + driver.uc_open_with_disconnect(driver.get_current_url(), 3.8) else: driver.disconnect() with suppress(Exception): @@ -1013,9 +1189,12 @@ def _uc_gui_click_captcha( if retry and x and y and (caught or _on_a_captcha_page(driver)): with gui_lock: # Prevent issues with multiple processes # Make sure the window is on top - page_actions.switch_to_window( - driver, driver.current_window_handle, 2, uc_lock=False - ) + if __is_cdp_swap_needed(driver): + driver.cdp.bring_active_window_to_front() + else: + page_actions.switch_to_window( + driver, driver.current_window_handle, 2, uc_lock=False + ) if driver.is_element_present("iframe"): try: driver.switch_to_frame(frame) @@ -1035,14 +1214,18 @@ def _uc_gui_click_captcha( driver.switch_to.parent_frame(checkbox_success) return if blind: - driver.uc_open_with_disconnect(driver.current_url, 3.8) - _uc_gui_click_x_y(driver, x, y, timeframe=0.32) + driver.uc_open_with_disconnect(driver.get_current_url(), 3.8) + if __is_cdp_swap_needed(driver) and _on_a_captcha_page(driver): + _uc_gui_click_x_y(driver, x, y, timeframe=0.32) + else: + time.sleep(0.1) else: - driver.uc_open_with_reconnect(driver.current_url, 3.8) + driver.uc_open_with_reconnect(driver.get_current_url(), 3.8) if _on_a_captcha_page(driver): driver.disconnect() _uc_gui_click_x_y(driver, x, y, timeframe=0.32) - driver.reconnect(reconnect_time) + if not cdp_mode_on_at_start: + driver.reconnect(reconnect_time) def uc_gui_click_captcha(driver, frame="iframe", retry=False, blind=False): @@ -1089,6 +1272,9 @@ def _uc_gui_handle_captcha_(driver, frame="iframe", ctype=None): ctype = "cf_t" else: return + if not driver.is_connected(): + driver.connect() + time.sleep(2) install_pyautogui_if_missing(driver) import pyautogui pyautogui = get_configured_pyautogui(pyautogui) @@ -1111,9 +1297,12 @@ def _uc_gui_handle_captcha_(driver, frame="iframe", ctype=None): page_actions.switch_to_window( driver, driver.current_window_handle, 2, uc_lock=False ) - if IS_WINDOWS and hasattr(pyautogui, "getActiveWindowTitle"): + if ( + IS_WINDOWS + and hasattr(pyautogui, "getActiveWindowTitle") + ): py_a_g_title = pyautogui.getActiveWindowTitle() - window_title = driver.title + window_title = driver.get_title() if not py_a_g_title.startswith(window_title): window_rect = driver.get_window_rect() width = window_rect["width"] @@ -1614,6 +1803,7 @@ def _set_chrome_options( prefs["default_content_settings.popups"] = 0 prefs["managed_default_content_settings.popups"] = 0 prefs["profile.password_manager_enabled"] = False + prefs["profile.password_manager_leak_detection"] = False prefs["profile.default_content_setting_values.notifications"] = 2 prefs["profile.default_content_settings.popups"] = 0 prefs["profile.managed_default_content_settings.popups"] = 0 @@ -3271,6 +3461,7 @@ def get_local_driver( "default_content_settings.popups": 0, "managed_default_content_settings.popups": 0, "profile.password_manager_enabled": False, + "profile.password_manager_leak_detection": False, "profile.default_content_setting_values.notifications": 2, "profile.default_content_settings.popups": 0, "profile.managed_default_content_settings.popups": 0, @@ -4244,13 +4435,17 @@ def get_local_driver( with uc_lock: # Avoid multithreaded issues if make_uc_driver_from_chromedriver: if os.path.exists(LOCAL_CHROMEDRIVER): - shutil.copyfile( - LOCAL_CHROMEDRIVER, LOCAL_UC_DRIVER - ) + with suppress(Exception): + make_driver_executable_if_not( + LOCAL_CHROMEDRIVER + ) + shutil.copy2(LOCAL_CHROMEDRIVER, LOCAL_UC_DRIVER) elif os.path.exists(path_chromedriver): - shutil.copyfile( - path_chromedriver, LOCAL_UC_DRIVER - ) + with suppress(Exception): + make_driver_executable_if_not( + path_chromedriver + ) + shutil.copy2(path_chromedriver, LOCAL_UC_DRIVER) try: make_driver_executable_if_not(LOCAL_UC_DRIVER) except Exception as e: @@ -4670,6 +4865,9 @@ def get_local_driver( options=chrome_options, ) driver.default_get = driver.get # Save copy of original + driver.cdp = None # Set a placeholder + driver._is_using_cdp = False + driver._is_connected = True if uc_activated: driver.get = lambda url: uc_special_open_if_cf( driver, @@ -4697,6 +4895,16 @@ def get_local_driver( driver.uc_click = lambda *args, **kwargs: uc_click( driver, *args, **kwargs ) + driver.uc_activate_cdp_mode = ( + lambda *args, **kwargs: uc_activate_cdp_mode( + driver, *args, **kwargs + ) + ) + driver.uc_open_with_cdp_mode = ( + lambda *args, **kwargs: uc_open_with_cdp_mode( + driver, *args, **kwargs + ) + ) driver.uc_gui_press_key = ( lambda *args, **kwargs: uc_gui_press_key( driver, *args, **kwargs diff --git a/seleniumbase/core/sb_cdp.py b/seleniumbase/core/sb_cdp.py new file mode 100644 index 00000000000..44aa07237d6 --- /dev/null +++ b/seleniumbase/core/sb_cdp.py @@ -0,0 +1,898 @@ +"""Add CDP methods to extend the driver""" +import math +import os +import re +import time +from contextlib import suppress +from seleniumbase import config as sb_config +from seleniumbase.config import settings +from seleniumbase.fixtures import constants +from seleniumbase.fixtures import js_utils +from seleniumbase.fixtures import page_utils +from seleniumbase.fixtures import shared_utils + + +class CDPMethods(): + def __init__(self, loop, page, driver): + self.loop = loop + self.page = page + self.driver = driver + + def __slow_mode_pause_if_set(self): + if hasattr(sb_config, "slow_mode") and sb_config.slow_mode: + time.sleep(0.16) + + def __add_light_pause(self): + time.sleep(0.007) + + def __convert_to_css_if_xpath(self, selector): + if page_utils.is_xpath_selector(selector): + with suppress(Exception): + css = js_utils.convert_to_css_selector(selector, "xpath") + if css: + return css + return selector + + def __add_sync_methods(self, element): + if not element: + return element + element.clear_input = lambda: self.__clear_input(element) + element.click = lambda: self.__click(element) + element.flash = lambda: self.__flash(element) + element.focus = lambda: self.__focus(element) + element.highlight_overlay = lambda: self.__highlight_overlay(element) + element.mouse_click = lambda: self.__mouse_click(element) + element.mouse_drag = ( + lambda destination: self.__mouse_drag(element, destination) + ) + element.mouse_move = lambda: self.__mouse_move(element) + element.query_selector = ( + lambda selector: self.__query_selector(element, selector) + ) + element.querySelector = element.query_selector + element.query_selector_all = ( + lambda selector: self.__query_selector_all(element, selector) + ) + element.querySelectorAll = element.query_selector_all + element.remove_from_dom = lambda: self.__remove_from_dom(element) + element.save_screenshot = ( + lambda *args, **kwargs: self.__save_screenshot( + element, *args, **kwargs) + ) + element.save_to_dom = lambda: self.__save_to_dom(element) + element.scroll_into_view = lambda: self.__scroll_into_view(element) + element.select_option = lambda: self.__select_option(element) + element.send_file = ( + lambda *file_paths: self.__send_file(element, *file_paths) + ) + element.send_keys = lambda text: self.__send_keys(element, text) + element.set_text = lambda value: self.__set_text(element, value) + element.set_value = lambda value: self.__set_value(element, value) + element.type = lambda text: self.__type(element, text) + element.get_position = lambda: self.__get_position(element) + element.get_html = lambda: self.__get_html(element) + element.get_js_attributes = lambda: self.__get_js_attributes(element) + return element + + def get(self, url): + url = shared_utils.fix_url_as_needed(url) + self.page = self.loop.run_until_complete(self.driver.cdp_base.get(url)) + url_protocol = url.split(":")[0] + safe_url = True + if url_protocol not in ["about", "data", "chrome"]: + safe_url = False + if not safe_url: + time.sleep(constants.UC.CDP_MODE_OPEN_WAIT) + + def reload(self, ignore_cache=True, script_to_evaluate_on_load=None): + self.loop.run_until_complete( + self.page.reload( + ignore_cache=ignore_cache, + script_to_evaluate_on_load=script_to_evaluate_on_load, + ) + ) + + def refresh(self, *args, **kwargs): + self.reload(*args, **kwargs) + + def get_event_loop(self): + return self.loop + + def add_handler(self, event, handler): + self.page.add_handler(event, handler) + + def find_element( + self, selector, best_match=False, timeout=settings.SMALL_TIMEOUT + ): + """Similar to select(), but also finds elements by text content. + When using text-based searches, if best_match=False, then will + find the first element with the text. If best_match=True, then + if multiple elements have that text, then will use the element + with the closest text-length to the text being searched for.""" + self.__add_light_pause() + selector = self.__convert_to_css_if_xpath(selector) + if (":contains(" in selector): + tag_name = selector.split(":contains(")[0].split(" ")[-1] + text = selector.split(":contains(")[1].split(")")[0][1:-1] + with suppress(Exception): + self.loop.run_until_complete( + self.page.select(tag_name, timeout=3) + ) + self.loop.run_until_complete(self.page.find(text, timeout=3)) + element = self.find_elements_by_text(text, tag_name=tag_name)[0] + return self.__add_sync_methods(element) + failure = False + try: + element = self.loop.run_until_complete( + self.page.find( + selector, best_match=best_match, timeout=timeout + ) + ) + except Exception: + failure = True + plural = "s" + if timeout == 1: + plural = "" + message = "\n Element {%s} was not found after %s second%s!" % ( + selector, + timeout, + plural, + ) + if failure: + raise Exception(message) + element = self.__add_sync_methods(element) + self.__slow_mode_pause_if_set() + return element + + def find_all(self, selector, timeout=settings.SMALL_TIMEOUT): + self.__add_light_pause() + selector = self.__convert_to_css_if_xpath(selector) + elements = self.loop.run_until_complete( + self.page.find_all(selector, timeout=timeout) + ) + updated_elements = [] + for element in elements: + element = self.__add_sync_methods(element) + updated_elements.append(element) + self.__slow_mode_pause_if_set() + return updated_elements + + def find_elements_by_text(self, text, tag_name=None): + """Returns a list of elements by matching text. + Optionally, provide a tag_name to narrow down the search + to only elements with the given tag. (Eg: a, div, script, span)""" + self.__add_light_pause() + elements = self.loop.run_until_complete( + self.page.find_elements_by_text(text=text) + ) + updated_elements = [] + for element in elements: + if not tag_name or tag_name.lower() == element.tag_name.lower(): + element = self.__add_sync_methods(element) + updated_elements.append(element) + self.__slow_mode_pause_if_set() + return updated_elements + + def select(self, selector, timeout=settings.SMALL_TIMEOUT): + """Similar to find_element(), but without text-based search.""" + self.__add_light_pause() + selector = self.__convert_to_css_if_xpath(selector) + if (":contains(" in selector): + tag_name = selector.split(":contains(")[0].split(" ")[-1] + text = selector.split(":contains(")[1].split(")")[0][1:-1] + with suppress(Exception): + self.loop.run_until_complete( + self.page.select(tag_name, timeout=5) + ) + self.loop.run_until_complete(self.page.find(text, timeout=5)) + element = self.find_elements_by_text(text, tag_name=tag_name)[0] + return self.__add_sync_methods(element) + failure = False + try: + element = self.loop.run_until_complete( + self.page.select(selector, timeout=timeout) + ) + except Exception: + failure = True + plural = "s" + if timeout == 1: + plural = "" + message = "\n Element {%s} was not found after %s second%s!" % ( + selector, + timeout, + plural, + ) + if failure: + raise Exception(message) + element = self.__add_sync_methods(element) + self.__slow_mode_pause_if_set() + return element + + def select_all(self, selector, timeout=settings.SMALL_TIMEOUT): + self.__add_light_pause() + selector = self.__convert_to_css_if_xpath(selector) + elements = self.loop.run_until_complete( + self.page.select_all(selector, timeout=timeout) + ) + updated_elements = [] + for element in elements: + element = self.__add_sync_methods(element) + updated_elements.append(element) + self.__slow_mode_pause_if_set() + return updated_elements + + def click_link(self, link_text): + self.find_elements_by_text(link_text, "a")[0].click() + + def __clear_input(self, element): + return ( + self.loop.run_until_complete(element.clear_input_async()) + ) + + def __click(self, element): + return ( + self.loop.run_until_complete(element.click_async()) + ) + + def __flash(self, element): + return ( + self.loop.run_until_complete(element.flash_async()) + ) + + def __focus(self, element): + return ( + self.loop.run_until_complete(element.focus_async()) + ) + + def __highlight_overlay(self, element): + return ( + self.loop.run_until_complete(element.highlight_overlay_async()) + ) + + def __mouse_click(self, element): + return ( + self.loop.run_until_complete(element.mouse_click_async()) + ) + + def __mouse_drag(self, element, destination): + return ( + self.loop.run_until_complete(element.mouse_drag_async(destination)) + ) + + def __mouse_move(self, element): + return ( + self.loop.run_until_complete(element.mouse_move_async()) + ) + + def __query_selector(self, element, selector): + selector = self.__convert_to_css_if_xpath(selector) + element = self.loop.run_until_complete( + element.query_selector_async(selector) + ) + element = self.__add_sync_methods(element) + return element + + def __query_selector_all(self, element, selector): + selector = self.__convert_to_css_if_xpath(selector) + elements = self.loop.run_until_complete( + element.query_selector_all_async(selector) + ) + updated_elements = [] + for element in elements: + element = self.__add_sync_methods(element) + updated_elements.append(element) + self.__slow_mode_pause_if_set() + return updated_elements + + def __remove_from_dom(self, element): + return ( + self.loop.run_until_complete(element.remove_from_dom_async()) + ) + + def __save_screenshot(self, element, *args, **kwargs): + return ( + self.loop.run_until_complete( + element.save_screenshot_async(*args, **kwargs) + ) + ) + + def __save_to_dom(self, element): + return ( + self.loop.run_until_complete(element.save_to_dom_async()) + ) + + def __scroll_into_view(self, element): + return ( + self.loop.run_until_complete(element.scroll_into_view_async()) + ) + + def __select_option(self, element): + return ( + self.loop.run_until_complete(element.select_option_async()) + ) + + def __send_file(self, element, *file_paths): + return ( + self.loop.run_until_complete(element.send_file_async(*file_paths)) + ) + + def __send_keys(self, element, text): + return ( + self.loop.run_until_complete(element.send_keys_async(text)) + ) + + def __set_text(self, element, value): + return ( + self.loop.run_until_complete(element.set_text_async(value)) + ) + + def __set_value(self, element, value): + return ( + self.loop.run_until_complete(element.set_value_async(value)) + ) + + def __type(self, element, text): + with suppress(Exception): + element.clear_input() + element.send_keys(text) + + def __get_position(self, element): + return ( + self.loop.run_until_complete(element.get_position_async()) + ) + + def __get_html(self, element): + return ( + self.loop.run_until_complete(element.get_html_async()) + ) + + def __get_js_attributes(self, element): + return ( + self.loop.run_until_complete(element.get_js_attributes_async()) + ) + + def tile_windows(self, windows=None, max_columns=0): + """Tile windows and return the grid of tiled windows.""" + return self.loop.run_until_complete( + self.driver.cdp_base.tile_windows(windows, max_columns) + ) + + def get_all_cookies(self, *args, **kwargs): + return self.loop.run_until_complete( + self.driver.cdp_base.cookies.get_all(*args, **kwargs) + ) + + def set_all_cookies(self, *args, **kwargs): + return self.loop.run_until_complete( + self.driver.cdp_base.cookies.set_all(*args, **kwargs) + ) + + def save_cookies(self, *args, **kwargs): + return self.loop.run_until_complete( + self.driver.cdp_base.cookies.save(*args, **kwargs) + ) + + def load_cookies(self, *args, **kwargs): + return self.loop.run_until_complete( + self.driver.cdp_base.cookies.load(*args, **kwargs) + ) + + def clear_cookies(self, *args, **kwargs): + return self.loop.run_until_complete( + self.driver.cdp_base.cookies.clear(*args, **kwargs) + ) + + def sleep(self, seconds): + time.sleep(seconds) + + def bring_active_window_to_front(self): + self.loop.run_until_complete(self.page.bring_to_front()) + + def get_active_element(self): + return self.loop.run_until_complete( + self.page.js_dumps("document.activeElement") + ) + + def get_active_element_css(self): + from seleniumbase.js_code import active_css_js + + js_code = active_css_js.get_active_element_css + js_code = js_code.replace("return getBestSelector", "getBestSelector") + return self.loop.run_until_complete( + self.page.evaluate(js_code) + ) + + def click(self, selector, timeout=settings.SMALL_TIMEOUT): + self.__slow_mode_pause_if_set() + element = self.find_element(selector, timeout=timeout) + self.__add_light_pause() + element.click() + self.__slow_mode_pause_if_set() + + def click_active_element(self): + self.loop.run_until_complete( + self.page.evaluate("document.activeElement.click()") + ) + self.__slow_mode_pause_if_set() + + def click_if_visible(self, selector): + if self.is_element_visible(selector): + self.find_element(selector).click() + self.__slow_mode_pause_if_set() + + def mouse_click(self, selector, timeout=settings.SMALL_TIMEOUT): + """(Attempt simulating a mouse click)""" + self.__slow_mode_pause_if_set() + element = self.find_element(selector, timeout=timeout) + self.__add_light_pause() + element.mouse_click() + self.__slow_mode_pause_if_set() + + def nested_click(self, parent_selector, selector): + """ + Find parent element and click on child element inside it. + (This can be used to click on elements inside an iframe.) + """ + element = self.find_element(parent_selector) + element.query_selector(selector).mouse_click() + self.__slow_mode_pause_if_set() + + def get_nested_element(self, parent_selector, selector): + """(Can be used to find an element inside an iframe)""" + element = self.find_element(parent_selector) + return element.query_selector(selector) + + def flash(self, selector): + """Paint a quickly-vanishing dot over an element.""" + self.find_element(selector).flash() + + def focus(self, selector): + self.find_element(selector).focus() + + def highlight_overlay(self, selector): + self.find_element(selector).highlight_overlay() + + def remove_element(self, selector): + self.select(selector).remove_from_dom() + + def remove_from_dom(self, selector): + self.select(selector).remove_from_dom() + + def remove_elements(self, selector): + """Remove all elements on the page that match the selector.""" + css_selector = self.__convert_to_css_if_xpath(selector) + css_selector = re.escape(css_selector) # Add "\\" to special chars + css_selector = js_utils.escape_quotes_if_needed(css_selector) + js_code = ( + """var $elements = document.querySelectorAll('%s'); + var index = 0, length = $elements.length; + for(; index < length; index++){ + $elements[index].remove();}""" + % css_selector + ) + with suppress(Exception): + self.loop.run_until_complete(self.page.evaluate(js_code)) + + def scroll_into_view(self, selector): + self.find_element(selector).scroll_into_view() + + def send_keys(self, selector, text, timeout=settings.SMALL_TIMEOUT): + element = self.select(selector) + self.__slow_mode_pause_if_set() + if text.endswith("\n") or text.endswith("\r"): + text = text[:-1] + "\r\n" + element.send_keys(text) + self.__slow_mode_pause_if_set() + + def press_keys(self, selector, text, timeout=settings.SMALL_TIMEOUT): + """Similar to send_keys(), but presses keys at human speed.""" + element = self.select(selector) + self.__slow_mode_pause_if_set() + submit = False + if text.endswith("\n") or text.endswith("\r"): + submit = True + text = text[:-1] + for key in text: + element.send_keys(key) + time.sleep(0.0375) + if submit: + element.send_keys("\r\n") + time.sleep(0.0375) + self.__slow_mode_pause_if_set() + + def type(self, selector, text, timeout=settings.SMALL_TIMEOUT): + """Similar to send_keys(), but clears the text field first.""" + element = self.select(selector) + self.__slow_mode_pause_if_set() + with suppress(Exception): + element.clear_input() + if text.endswith("\n") or text.endswith("\r"): + text = text[:-1] + "\r\n" + element.send_keys(text) + self.__slow_mode_pause_if_set() + + def evaluate(self, expression): + """Run a JavaScript expression and return the result.""" + return self.loop.run_until_complete( + self.page.evaluate(expression) + ) + + def js_dumps(self, obj_name): + """Similar to evaluate(), but for dictionary results.""" + return self.loop.run_until_complete( + self.page.js_dumps(obj_name) + ) + + def maximize(self): + return self.loop.run_until_complete( + self.page.maximize() + ) + + def minimize(self): + return self.loop.run_until_complete( + self.page.minimize() + ) + + def medimize(self): + return self.loop.run_until_complete( + self.page.medimize() + ) + + def set_window_rect(self, x, y, width, height): + return self.loop.run_until_complete( + self.page.set_window_size( + left=x, top=y, height="height)" + ) + + def reset_window_size(self): + x = settings.WINDOW_START_X + y = settings.WINDOW_START_Y + width = settings.CHROME_START_WIDTH + height = settings.CHROME_START_HEIGHT + self.set_window_rect(x, y, width, height) + + def get_window(self): + return self.loop.run_until_complete( + self.page.get_window() + ) + + def get_text(self, selector): + return self.find_element(selector).text_all + + def get_title(self): + return self.loop.run_until_complete( + self.page.evaluate("document.title") + ) + + def get_current_url(self): + return self.loop.run_until_complete( + self.page.evaluate("window.location.href") + ) + + def get_origin(self): + return self.loop.run_until_complete( + self.page.evaluate("window.location.origin") + ) + + def get_page_source(self): + try: + source = self.loop.run_until_complete( + self.page.evaluate("document.documentElement.outerHTML") + ) + except Exception: + time.sleep(constants.UC.CDP_MODE_OPEN_WAIT) + source = self.loop.run_until_complete( + self.page.evaluate("document.documentElement.outerHTML") + ) + return source + + def get_user_agent(self): + return self.loop.run_until_complete( + self.page.evaluate("navigator.userAgent") + ) + + def get_cookie_string(self): + return self.loop.run_until_complete( + self.page.evaluate("document.cookie") + ) + + def get_locale_code(self): + return self.loop.run_until_complete( + self.page.evaluate("navigator.language || navigator.languages[0]") + ) + + def get_screen_rect(self): + coordinates = self.loop.run_until_complete( + self.page.js_dumps("window.screen") + ) + return coordinates + + def get_window_rect(self): + coordinates = {} + innerWidth = self.loop.run_until_complete( + self.page.evaluate("window.innerWidth") + ) + innerHeight = self.loop.run_until_complete( + self.page.evaluate("window.innerHeight") + ) + outerWidth = self.loop.run_until_complete( + self.page.evaluate("window.outerWidth") + ) + outerHeight = self.loop.run_until_complete( + self.page.evaluate("window.outerHeight") + ) + pageXOffset = self.loop.run_until_complete( + self.page.evaluate("window.pageXOffset") + ) + pageYOffset = self.loop.run_until_complete( + self.page.evaluate("window.pageYOffset") + ) + scrollX = self.loop.run_until_complete( + self.page.evaluate("window.scrollX") + ) + scrollY = self.loop.run_until_complete( + self.page.evaluate("window.scrollY") + ) + screenLeft = self.loop.run_until_complete( + self.page.evaluate("window.screenLeft") + ) + screenTop = self.loop.run_until_complete( + self.page.evaluate("window.screenTop") + ) + x = self.loop.run_until_complete( + self.page.evaluate("window.screenX") + ) + y = self.loop.run_until_complete( + self.page.evaluate("window.screenY") + ) + coordinates["innerWidth"] = innerWidth + coordinates["innerHeight"] = innerHeight + coordinates["outerWidth"] = outerWidth + coordinates["outerHeight"] = outerHeight + coordinates["width"] = outerWidth + coordinates["height"] = outerHeight + coordinates["pageXOffset"] = pageXOffset if pageXOffset else 0 + coordinates["pageYOffset"] = pageYOffset if pageYOffset else 0 + coordinates["scrollX"] = scrollX if scrollX else 0 + coordinates["scrollY"] = scrollY if scrollY else 0 + coordinates["screenLeft"] = screenLeft if screenLeft else 0 + coordinates["screenTop"] = screenTop if screenTop else 0 + coordinates["x"] = x if x else 0 + coordinates["y"] = y if y else 0 + return coordinates + + def get_window_size(self): + coordinates = {} + outerWidth = self.loop.run_until_complete( + self.page.evaluate("window.outerWidth") + ) + outerHeight = self.loop.run_until_complete( + self.page.evaluate("window.outerHeight") + ) + coordinates["width"] = outerWidth + coordinates["height"] = outerHeight + return coordinates + + def get_window_position(self): + coordinates = {} + x = self.loop.run_until_complete( + self.page.evaluate("window.screenX") + ) + y = self.loop.run_until_complete( + self.page.evaluate("window.screenY") + ) + coordinates["x"] = x if x else 0 + coordinates["y"] = y if y else 0 + return coordinates + + def get_element_rect(self, selector): + selector = self.__convert_to_css_if_xpath(selector) + coordinates = self.loop.run_until_complete( + self.page.js_dumps( + """document.querySelector""" + """('%s').getBoundingClientRect()""" + % js_utils.escape_quotes_if_needed(re.escape(selector)) + ) + ) + return coordinates + + def get_element_size(self, selector): + element_rect = self.get_element_rect(selector) + coordinates = {} + coordinates["width"] = element_rect["width"] + coordinates["height"] = element_rect["height"] + return coordinates + + def get_element_position(self, selector): + element_rect = self.get_element_rect(selector) + coordinates = {} + coordinates["x"] = element_rect["x"] + coordinates["y"] = element_rect["y"] + return coordinates + + def get_gui_element_rect(self, selector): + """(Coordinates are relative to the screen. Not the window.)""" + element_rect = self.get_element_rect(selector) + e_width = element_rect["width"] + e_height = element_rect["height"] + window_rect = self.get_window_rect() + w_bottom_y = window_rect["y"] + window_rect["height"] + viewport_height = window_rect["innerHeight"] + x = math.ceil(window_rect["x"] + element_rect["x"]) + y = math.ceil(w_bottom_y - viewport_height + element_rect["y"]) + y_scroll_offset = window_rect["pageYOffset"] + y = int(y - y_scroll_offset) + return ({"height": e_height, "width": e_width, "x": x, "y": y}) + + def get_gui_element_center(self, selector): + """(Coordinates are relative to the screen. Not the window.)""" + element_rect = self.get_gui_element_rect(selector) + e_width = element_rect["width"] + e_height = element_rect["height"] + e_x = element_rect["x"] + e_y = element_rect["y"] + return ((e_x + e_width / 2), (e_y + e_height / 2)) + + def get_document(self): + return self.loop.run_until_complete( + self.page.get_document() + ) + + def get_flattened_document(self): + return self.loop.run_until_complete( + self.page.get_flattened_document() + ) + + def get_element_attributes(self, selector): + return self.loop.run_until_complete( + self.page.js_dumps( + """document.querySelector('%s')""" + % js_utils.escape_quotes_if_needed(re.escape(selector)) + ) + ) + + def get_element_html(self, selector): + selector = self.__convert_to_css_if_xpath(selector) + return self.loop.run_until_complete( + self.page.evaluate( + """document.querySelector('%s').outerHTML""" + % js_utils.escape_quotes_if_needed(re.escape(selector)) + ) + ) + + def set_attributes(self, selector, attribute, value): + """This method uses JavaScript to set/update a common attribute. + All matching selectors from querySelectorAll() are used. + Example => (Make all links on a website redirect to Google): + self.set_attributes("a", "href", "https://google.com")""" + attribute = re.escape(attribute) + attribute = js_utils.escape_quotes_if_needed(attribute) + value = re.escape(value) + value = js_utils.escape_quotes_if_needed(value) + css_selector = self.__convert_to_css_if_xpath(selector) + css_selector = re.escape(css_selector) # Add "\\" to special chars + css_selector = js_utils.escape_quotes_if_needed(css_selector) + js_code = """var $elements = document.querySelectorAll('%s'); + var index = 0, length = $elements.length; + for(; index < length; index++){ + $elements[index].setAttribute('%s','%s');}""" % ( + css_selector, + attribute, + value, + ) + with suppress(Exception): + self.loop.run_until_complete(self.page.evaluate(js_code)) + + def internalize_links(self): + """All `` links become ``. + This prevents those links from opening in a new tab.""" + self.set_attributes('[]', "target", "_self") + + def is_element_present(self, selector): + try: + self.select(selector, timeout=0.01) + return True + except Exception: + return False + selector = self.__convert_to_css_if_xpath(selector) + element = self.loop.run_until_complete( + self.page.js_dumps( + """document.querySelector('%s')""" + % js_utils.escape_quotes_if_needed(re.escape(selector)) + ) + ) + return element is not None + + def is_element_visible(self, selector): + selector = self.__convert_to_css_if_xpath(selector) + element = None + if ":contains(" not in selector: + try: + element = self.loop.run_until_complete( + self.page.js_dumps( + """window.getComputedStyle(document.querySelector""" + """('%s'))""" + % js_utils.escape_quotes_if_needed(re.escape(selector)) + ) + ) + except Exception: + return False + if not element: + return False + return element.get("display") != "none" + else: + with suppress(Exception): + tag_name = selector.split(":contains(")[0].split(" ")[-1] + text = selector.split(":contains(")[1].split(")")[0][1:-1] + self.loop.run_until_complete( + self.page.select(tag_name, timeout=0.1) + ) + self.loop.run_until_complete(self.page.find(text, timeout=0.1)) + return True + return False + + def assert_element(self, selector, timeout=settings.SMALL_TIMEOUT): + try: + self.select(selector, timeout=timeout) + except Exception: + raise Exception("Element {%s} not found!" % selector) + for i in range(30): + if self.is_element_visible(selector): + return True + time.sleep(0.1) + raise Exception("Element {%s} not visible!" % selector) + + def assert_element_present(self, selector, timeout=settings.SMALL_TIMEOUT): + try: + self.select(selector, timeout=timeout) + except Exception: + raise Exception("Element {%s} not found!" % selector) + return True + + def assert_text( + self, text, selector="html", timeout=settings.SMALL_TIMEOUT + ): + element = None + try: + element = self.select(selector, timeout=timeout) + except Exception: + raise Exception("Element {%s} not found!" % selector) + for i in range(30): + if self.is_element_visible(selector) and text in element.text_all: + return True + time.sleep(0.1) + raise Exception( + "Text {%s} not found in {%s}! Actual text: {%s}" + % (text, selector, element.text_all) + ) + + def assert_exact_text( + self, text, selector="html", timeout=settings.SMALL_TIMEOUT + ): + element = None + try: + element = self.select(selector, timeout=timeout) + except Exception: + raise Exception("Element {%s} not found!" % selector) + for i in range(30): + if ( + self.is_element_visible(selector) + and text.strip() == element.text_all.strip() + ): + return True + time.sleep(0.1) + raise Exception( + "Expected Text {%s}, is not equal to {%s} in {%s}!" + % (text, element.text_all, selector) + ) + + def save_screenshot(self, name, folder=None, selector=None): + filename = name + if folder: + filename = os.path.join(folder, name) + if not selector: + self.loop.run_until_complete( + self.page.save_screenshot(filename) + ) + else: + self.select(selector).save_screenshot(filename) diff --git a/seleniumbase/core/sb_driver.py b/seleniumbase/core/sb_driver.py index 6fc9dea1730..d4baab0b4be 100644 --- a/seleniumbase/core/sb_driver.py +++ b/seleniumbase/core/sb_driver.py @@ -4,12 +4,17 @@ from seleniumbase.fixtures import js_utils from seleniumbase.fixtures import page_actions from seleniumbase.fixtures import page_utils +from seleniumbase.fixtures import shared_utils class DriverMethods(): def __init__(self, driver): self.driver = driver + def __is_cdp_swap_needed(self): + """If the driver is disconnected, use a CDP method when available.""" + return shared_utils.is_cdp_swap_needed(self.driver) + def find_element(self, by=None, value=None): if not value: value = by @@ -45,10 +50,21 @@ def get_attribute(self, selector, attribute, by="css selector"): element = self.locator(selector, by=by) return element.get_attribute(attribute) + def get_current_url(self): + if self.__is_cdp_swap_needed(): + current_url = self.driver.cdp.get_current_url() + else: + current_url = self.driver.current_url + return current_url + def get_page_source(self): + if self.__is_cdp_swap_needed(): + return self.driver.cdp.get_page_source() return self.driver.page_source def get_title(self): + if self.__is_cdp_swap_needed(): + return self.driver.cdp.get_title() return self.driver.title def open_url(self, *args, **kwargs): @@ -175,6 +191,34 @@ def is_alert_present(self): def is_online(self): return self.driver.execute_script("return navigator.onLine;") + def is_connected(self): + """ + Return True if WebDriver is connected to the browser. + Note that the stealthy CDP-Driver isn't a WebDriver. + In CDP Mode, the CDP-Driver controls the web browser. + The CDP-Driver can be connected while WebDriver isn't. + """ + try: + self.driver.window_handles + return True + except Exception: + return False + + def is_uc_mode_active(self): + """Return True if the driver is using UC Mode. False otherwise.""" + return ( + hasattr(self.driver, "_is_using_uc") + and self.driver._is_using_uc + ) + + def is_cdp_mode_active(self): + """CDP Mode is a special mode within UC Mode. Activated separately. + Return True if CDP Mode is loaded in the driver. False otherwise.""" + return ( + hasattr(self.driver, "_is_using_cdp") + and self.driver._is_using_cdp + ) + def js_click(self, *args, **kwargs): return page_actions.js_click(self.driver, *args, **kwargs) diff --git a/seleniumbase/fixtures/base_case.py b/seleniumbase/fixtures/base_case.py index f97a1cc5aff..4007c3fe606 100644 --- a/seleniumbase/fixtures/base_case.py +++ b/seleniumbase/fixtures/base_case.py @@ -223,6 +223,9 @@ def test_example(self): def open(self, url): """Navigates the current browser window to the specified page.""" self.__check_scope() + if self.__is_cdp_swap_needed(): + self.cdp.open(url) + return self._check_browser() if self.__needs_minimum_wait(): time.sleep(0.04) @@ -388,6 +391,9 @@ def click( original_selector = selector original_by = by selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + self.cdp.click(selector) + return if delay and (type(delay) in [int, float]) and delay> 0: time.sleep(delay) if page_utils.is_link_text_selector(selector) or by == By.LINK_TEXT: @@ -878,6 +884,9 @@ def update_text( if self.timeout_multiplier and timeout == settings.LARGE_TIMEOUT: timeout = self.__get_new_timeout(timeout) selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + self.cdp.type(selector, text) + return if self.__is_shadow_selector(selector): self.__shadow_type(selector, text, timeout) return @@ -991,6 +1000,9 @@ def add_text(self, selector, text, by="css selector", timeout=None): if self.timeout_multiplier and timeout == settings.LARGE_TIMEOUT: timeout = self.__get_new_timeout(timeout) selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + self.cdp.send_keys(selector, text) + return if self.__is_shadow_selector(selector): self.__shadow_type(selector, text, timeout, clear_first=False) return @@ -1099,6 +1111,9 @@ def send_keys(self, selector, text, by="css selector", timeout=None): def press_keys(self, selector, text, by="css selector", timeout=None): """Use send_keys() to press one key at a time.""" + if self.__is_cdp_swap_needed(): + self.cdp.press_keys(selector, text) + return self.wait_for_ready_state_complete() element = self.wait_for_element_present( selector, by=by, timeout=timeout @@ -1207,6 +1222,9 @@ def focus(self, selector, by="css selector", timeout=None): if self.timeout_multiplier and timeout == settings.LARGE_TIMEOUT: timeout = self.__get_new_timeout(timeout) selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + self.cdp.focus(selector) + return element = self.wait_for_element_present( selector, by=by, timeout=timeout ) @@ -1237,6 +1255,9 @@ def focus(self, selector, by="css selector", timeout=None): def refresh_page(self): self.__check_scope() self.__last_page_load_url = None + if self.__is_cdp_swap_needed(): + self.cdp.reload() + return js_utils.clear_out_console_logs(self.driver) self.driver.refresh() self.wait_for_ready_state_complete() @@ -1247,7 +1268,11 @@ def refresh(self): def get_current_url(self): self.__check_scope() - current_url = self.driver.current_url + current_url = None + if self.__is_cdp_swap_needed(): + current_url = self.cdp.get_current_url() + else: + current_url = self.driver.current_url if "%" in current_url: try: from urllib.parse import unquote @@ -1262,15 +1287,22 @@ def get_origin(self): return self.execute_script("return window.location.origin;") def get_page_source(self): + if self.__is_cdp_swap_needed(): + return self.cdp.get_page_source() self.wait_for_ready_state_complete() if self.__needs_minimum_wait: - time.sleep(0.02) + time.sleep(0.025) return self.driver.page_source def get_page_title(self): + if self.__is_cdp_swap_needed(): + return self.cdp.get_title() self.wait_for_ready_state_complete() - self.wait_for_element_present("title", timeout=settings.SMALL_TIMEOUT) - time.sleep(0.03) + with suppress(Exception): + self.wait_for_element_present( + "title", by="css selector", timeout=settings.MINI_TIMEOUT + ) + time.sleep(0.025) return self.driver.title def get_title(self): @@ -1365,7 +1397,7 @@ def open_if_not_url(self, url): to convert the open() action into open_if_not_url() so that the same page isn't opened again if the user is already on the page.""" self.__check_scope() - current_url = self.driver.current_url + current_url = self.get_current_url() if current_url != url: if ( "?q=" not in current_url @@ -1377,6 +1409,8 @@ def open_if_not_url(self, url): def is_element_present(self, selector, by="css selector"): """Returns whether the element exists in the HTML.""" + if self.__is_cdp_swap_needed(): + return self.cdp.is_element_present(selector) self.wait_for_ready_state_complete() selector, by = self.__recalculate_selector(selector, by) if self.__is_shadow_selector(selector): @@ -1385,6 +1419,8 @@ def is_element_present(self, selector, by="css selector"): def is_element_visible(self, selector, by="css selector"): """Returns whether the element is visible on the page.""" + if self.__is_cdp_swap_needed(): + return self.cdp.is_element_visible(selector) self.wait_for_ready_state_complete() selector, by = self.__recalculate_selector(selector, by) if self.__is_shadow_selector(selector): @@ -1562,6 +1598,9 @@ def click_link_text(self, link_text, timeout=None): if self.timeout_multiplier and timeout == settings.SMALL_TIMEOUT: timeout = self.__get_new_timeout(timeout) link_text = self.__get_type_checked_text(link_text) + if self.__is_cdp_swap_needed(): + self.cdp.click_link(link_text) + return if self.browser == "safari": if self.demo_mode: self.wait_for_link_text_present(link_text, timeout=timeout) @@ -1794,6 +1833,8 @@ def get_text(self, selector="html", by="css selector", timeout=None): if self.timeout_multiplier and timeout == settings.LARGE_TIMEOUT: timeout = self.__get_new_timeout(timeout) selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + return self.cdp.get_text(selector) if self.__is_shadow_selector(selector): return self.__get_shadow_text(selector, timeout) self.wait_for_ready_state_complete() @@ -1921,6 +1962,9 @@ def set_attributes(self, selector, attribute, value, by="css selector"): self.set_attributes("a", "href", "https://google.com")""" self.__check_scope() selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + self.cdp.set_attributes(selector, attribute, value) + return original_attribute = attribute original_value = value attribute = re.escape(attribute) @@ -2001,6 +2045,14 @@ def remove_attributes(self, selector, attribute, by="css selector"): with suppress(Exception): self.execute_script(script) + def internalize_links(self): + """All `target="_blank"` links become `target="_self"`. + This prevents those links from opening in a new tab.""" + if self.__is_cdp_swap_needed(): + self.cdp.internalize_links() + return + self.set_attributes('[target="_blank"]', "target", "_self") + def get_property( self, selector, property, by="css selector", timeout=None ): @@ -2104,6 +2156,12 @@ def find_elements(self, selector, by="css selector", limit=0): Elements could be either hidden or visible on the page. If "limit" is set and> 0, will only return that many elements.""" selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + elements = self.cdp.select_all(selector) + if limit and limit> 0 and len(elements)> limit: + elements = elements[:limit] + return elements + self.wait_for_ready_state_complete() time.sleep(0.05) elements = self.driver.find_elements(by=by, value=selector) @@ -2277,6 +2335,9 @@ def click_if_visible(self, selector, by="css selector", timeout=0): Use click_visible_elements() to click all matching elements. If a "timeout" is provided, waits that long for the element to appear before giving up and returning without a click().""" + if self.__is_cdp_swap_needed(): + self.cdp.click_if_visible(selector) + return self.wait_for_ready_state_complete() if self.is_element_visible(selector, by=by): self.click(selector, by=by) @@ -2289,6 +2350,9 @@ def click_if_visible(self, selector, by="css selector", timeout=0): self.click(selector, by=by) def click_active_element(self): + if self.__is_cdp_swap_needed(): + self.cdp.click_active_element() + return self.wait_for_ready_state_complete() pre_action_url = None with suppress(Exception): @@ -3311,6 +3375,8 @@ def get_gui_element_rect(self, selector, by="css selector"): relative to the entire screen, rather than the browser window. This is specifically for PyAutoGUI actions on the full screen. (Note: There may be complications if iframes are involved.)""" + if self.__is_cdp_swap_needed(): + return self.cdp.get_gui_element_rect(selector) element = self.wait_for_element_present(selector, by=by, timeout=1) element_rect = element.rect e_width = element_rect["width"] @@ -3344,6 +3410,8 @@ def get_gui_element_center(self, selector, by="css selector"): on the entire GUI / screen, rather than on the browser window. This is specifically for PyAutoGUI actions on the full screen. (Note: There may be complications if iframes are involved.)""" + if self.__is_cdp_swap_needed(): + return self.cdp.get_gui_element_center(selector) element_rect = self.get_gui_element_rect(selector, by=by) x = int(element_rect["x"]) + int(element_rect["width"] / 2) + 1 y = int(element_rect["y"]) + int(element_rect["height"] / 2) + 1 @@ -3351,16 +3419,22 @@ def get_gui_element_center(self, selector, by="css selector"): def get_window_rect(self): self.__check_scope() + if self.__is_cdp_swap_needed(): + return self.cdp.get_window_rect() self._check_browser() return self.driver.get_window_rect() def get_window_size(self): self.__check_scope() + if self.__is_cdp_swap_needed(): + return self.cdp.get_window_size() self._check_browser() return self.driver.get_window_size() def get_window_position(self): self.__check_scope() + if self.__is_cdp_swap_needed(): + return self.cdp.get_window_position() self._check_browser() return self.driver.get_window_position() @@ -4138,12 +4212,16 @@ def get_new_driver( self.open(new_start_page) self.__dont_record_open = False if undetectable: + if hasattr(new_driver, "cdp"): + self.cdp = new_driver.cdp if hasattr(new_driver, "uc_open"): self.uc_open = new_driver.uc_open if hasattr(new_driver, "uc_open_with_tab"): self.uc_open_with_tab = new_driver.uc_open_with_tab if hasattr(new_driver, "uc_open_with_reconnect"): self.uc_open_with_reconnect = new_driver.uc_open_with_reconnect + if hasattr(new_driver, "uc_open_with_cdp_mode"): + self.uc_open_with_cdp_mode = new_driver.uc_open_with_cdp_mode if hasattr(new_driver, "uc_open_with_disconnect"): self.uc_open_with_disconnect = ( new_driver.uc_open_with_disconnect @@ -4207,6 +4285,9 @@ def save_screenshot( If a provided selector is not found, then takes a full-page screenshot. If the folder provided doesn't exist, it will get created. The screenshot will be in PNG format: (*.png)""" + if self.__is_cdp_swap_needed(): + self.cdp.save_screenshot(name, folder=folder, selector=selector) + return self.wait_for_ready_state_complete() if selector and by: selector, by = self.__recalculate_selector(selector, by) @@ -4565,12 +4646,44 @@ def activate_design_mode(self): script = """document.designMode = 'on';""" self.execute_script(script) - def deactivate_design_mode(self): + def deactivate_design_mode(self, url=None): # Deactivate Chrome's Design Mode. self.wait_for_ready_state_complete() script = """document.designMode = 'off';""" self.execute_script(script) + def activate_cdp_mode(self, url=None): + if hasattr(self.driver, "_is_using_uc") and self.driver._is_using_uc: + self.driver.uc_open_with_cdp_mode(url) + else: + # Fix Chrome-130 issues by creating a user-data-dir in advance + if ( + ( + not self.user_data_dir + or not os.path.exists(self.user_data_dir) + ) + and self.browser == "chrome" + ): + import tempfile + user_data_dir = os.path.normpath(tempfile.mkdtemp()) + self.user_data_dir = user_data_dir + sb_config.user_data_dir = user_data_dir + try: + driver = self.get_new_driver( + user_data_dir=user_data_dir, + undetectable=True, + headless2=True, + ) + time.sleep(0.555) + except Exception: + pass + finally: + with suppress(Exception): + driver.quit() + self.get_new_driver(undetectable=True) + self.driver.uc_open_with_cdp_mode(url) + self.cdp = self.driver.cdp + def activate_recorder(self): from seleniumbase.js_code.recorder_js import recorder_js @@ -5589,6 +5702,9 @@ def bring_active_window_to_front(self): """Brings the active browser window to the front (on top). Useful when multiple drivers are being used at the same time.""" self.__check_scope() + if self.__is_cdp_swap_needed(): + self.cdp.bring_active_window_to_front() + return with suppress(Exception): if not self.__is_in_frame(): # Only bring the window to the front if not in a frame @@ -5804,6 +5920,7 @@ def highlight( scroll - the option to scroll to the element first (Default: True) timeout - the time to wait for the element to appear """ self.__check_scope() + self._check_browser() self.__skip_if_esc() if isinstance(selector, WebElement): self.__highlight_element(selector, loops=loops, scroll=scroll) @@ -6004,6 +6121,9 @@ def scroll_into_view(self, selector, by="css selector", timeout=None): timeout = settings.SMALL_TIMEOUT if self.timeout_multiplier and timeout == settings.SMALL_TIMEOUT: timeout = self.__get_new_timeout(timeout) + if self.__is_cdp_swap_needed(): + self.cdp.scroll_into_view(selector) + return element = self.wait_for_element_visible(selector, by, timeout=timeout) self.execute_script("arguments[0].scrollIntoView();", element) @@ -6046,6 +6166,9 @@ def js_click( Can be used to click hidden / invisible elements. If "all_matches" is False, only the first match is clicked. If "scroll" is False, won't scroll unless running in Demo Mode.""" + if self.__is_cdp_swap_needed(): + self.cdp.click(selector) + return self.wait_for_ready_state_complete() if not timeout or timeout is True: timeout = settings.SMALL_TIMEOUT @@ -6414,6 +6537,9 @@ def show_elements(self, selector, by="css selector"): def remove_element(self, selector, by="css selector"): """Remove the first element on the page that matches the selector.""" self.__check_scope() + if self.__is_cdp_swap_needed(): + self.cdp.remove_element(selector) + return element = None with suppress(Exception): self.wait_for_element_visible("body", timeout=1.5) @@ -6445,6 +6571,9 @@ def remove_element(self, selector, by="css selector"): def remove_elements(self, selector, by="css selector"): """Remove all elements on the page that match the selector.""" self.__check_scope() + if self.__is_cdp_swap_needed(): + self.cdp.remove_elements(selector) + return with suppress(Exception): self.wait_for_element_visible("body", timeout=1.5) selector, by = self.__recalculate_selector(selector, by) @@ -7829,6 +7958,15 @@ def is_online(self): """Return True if connected to the Internet.""" return self.execute_script("return navigator.onLine;") + def is_connected(self): + """ + Return True if WebDriver is connected to the browser. + Note that the stealthy CDP-Driver isn't a WebDriver. + In CDP Mode, the CDP-Driver controls the web browser. + The CDP-Driver can be connected while WebDriver isn't. + """ + return self.driver.is_connected() + def is_chromium(self): """Return True if the browser is Chrome or Edge.""" self.__check_scope() @@ -7944,6 +8082,10 @@ def enter_mfa_code( self.__check_scope() if not timeout: timeout = settings.SMALL_TIMEOUT + if self.__is_cdp_swap_needed(): + mfa_code = self.get_mfa_code(totp_key) + self.cdp.type(selector, mfa_code + "\n") + return self.wait_for_element_visible(selector, by=by, timeout=timeout) if self.recorder_mode and self.__current_url_is_recordable(): if self.get_session_storage_item("pause_recorder") == "no": @@ -7981,6 +8123,9 @@ def set_value( if self.timeout_multiplier and timeout == settings.LARGE_TIMEOUT: timeout = self.__get_new_timeout(timeout) selector, by = self.__recalculate_selector(selector, by, xp_ok=False) + if self.__is_cdp_swap_needed(): + self.cdp.type(selector, text) + return self.wait_for_ready_state_complete() self.wait_for_element_present(selector, by=by, timeout=timeout) original_selector = selector @@ -8696,6 +8841,8 @@ def wait_for_element_visible( timeout = self.__get_new_timeout(timeout) original_selector = selector selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + return self.cdp.select(selector) if self.__is_shadow_selector(selector): return self.__get_shadow_element(selector, timeout) return page_actions.wait_for_element_visible( @@ -8751,6 +8898,9 @@ def wait_for_element_not_present( original_selector=original_selector, ) + def select_all(self, selector, by="css selector", limit=0): + return self.find_elements(selector, by=by, limit=limit) + def assert_link(self, link_text, timeout=None): """Same as self.assert_link_text()""" self.__check_scope() @@ -8825,6 +8975,7 @@ def block_ads(self): def _check_browser(self): """This method raises an exception if the active window is closed. (This provides a much cleaner exception message in this situation.)""" + page_actions._reconnect_if_disconnected(self.driver) active_window = None with suppress(Exception): active_window = self.driver.current_window_handle # Fails if None @@ -9109,6 +9260,8 @@ def wait_for_element_present( timeout = self.__get_new_timeout(timeout) original_selector = selector selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + return self.cdp.select(selector) if self.__is_shadow_selector(selector): return self.__wait_for_shadow_element_present(selector, timeout) return page_actions.wait_for_element_present( @@ -9129,6 +9282,8 @@ def wait_for_element(self, selector, by="css selector", timeout=None): timeout = self.__get_new_timeout(timeout) original_selector = selector selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + return self.cdp.select(selector) if self.recorder_mode and self.__current_url_is_recordable(): if self.get_session_storage_item("pause_recorder") == "no": if by == By.XPATH: @@ -9189,6 +9344,9 @@ def assert_element_present( if isinstance(selector, list): self.assert_elements_present(selector, by=by, timeout=timeout) return True + if self.__is_cdp_swap_needed(): + self.cdp.assert_element_present(selector) + return True if self.__is_shadow_selector(selector): self.__assert_shadow_element_present(selector) return True @@ -9263,6 +9421,9 @@ def assert_element(self, selector, by="css selector", timeout=None): timeout = settings.SMALL_TIMEOUT if self.timeout_multiplier and timeout == settings.SMALL_TIMEOUT: timeout = self.__get_new_timeout(timeout) + if self.__is_cdp_swap_needed(): + self.cdp.assert_element(selector) + return True if isinstance(selector, list): self.assert_elements(selector, by=by, timeout=timeout) return True @@ -9383,6 +9544,8 @@ def wait_for_text_visible( timeout = self.__get_new_timeout(timeout) text = self.__get_type_checked_text(text) selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + return self.cdp.find_element(selector) if self.__is_shadow_selector(selector): return self.__wait_for_shadow_text_visible(text, selector, timeout) return page_actions.wait_for_text_visible( @@ -9551,6 +9714,11 @@ def assert_text( self.__highlight_with_assert_success( messenger_post, selector, by ) + elif self.__is_cdp_swap_needed(): + self.cdp.assert_text(text, selector) + return True + elif not self.is_connected(): + self.connect() elif self.__is_shadow_selector(selector): self.__assert_shadow_text_visible(text, selector, timeout) return True @@ -9596,6 +9764,9 @@ def assert_exact_text( timeout = self.__get_new_timeout(timeout) original_selector = selector selector, by = self.__recalculate_selector(selector, by) + if self.__is_cdp_swap_needed(): + self.cdp.assert_exact_text(text, selector) + return True if self.__is_shadow_selector(selector): self.__assert_exact_shadow_text_visible(text, selector, timeout) return True @@ -10578,6 +10749,12 @@ def __get_new_timeout(self, timeout): ############ + def __is_cdp_swap_needed(self): + """If the driver is disconnected, use a CDP method when available.""" + return shared_utils.is_cdp_swap_needed(self.driver) + + ############ + def __check_scope(self): if hasattr(self, "browser"): # self.browser stores the type of browser return # All good: setUp() already initialized variables in "self" @@ -14764,6 +14941,31 @@ def setUp(self, masterqa_mode=False): self.__js_start_time = int(time.time() * 1000.0) else: # Launch WebDriver for both pytest and pynose + + # Fix Chrome-130 issues by creating a user-data-dir in advance + if ( + self.undetectable + and ( + not self.user_data_dir + or not os.path.exists(self.user_data_dir) + ) + and self.browser == "chrome" + ): + import tempfile + user_data_dir = os.path.normpath(tempfile.mkdtemp()) + self.user_data_dir = user_data_dir + sb_config.user_data_dir = user_data_dir + try: + driver = self.get_new_driver( + user_data_dir=user_data_dir, + headless2=True, + ) + time.sleep(0.555) + except Exception: + pass + finally: + with suppress(Exception): + driver.quit() self.driver = self.get_new_driver( browser=self.browser, headless=self.headless, diff --git a/seleniumbase/fixtures/constants.py b/seleniumbase/fixtures/constants.py index 94c8c19068a..145bd5b8cf5 100644 --- a/seleniumbase/fixtures/constants.py +++ b/seleniumbase/fixtures/constants.py @@ -375,6 +375,7 @@ class Mobile: class UC: RECONNECT_TIME = 2.4 # Seconds + CDP_MODE_OPEN_WAIT = 0.9 # Seconds class ValidBrowsers: diff --git a/seleniumbase/fixtures/js_utils.py b/seleniumbase/fixtures/js_utils.py index ff85ed57f32..52bf9154eac 100644 --- a/seleniumbase/fixtures/js_utils.py +++ b/seleniumbase/fixtures/js_utils.py @@ -243,6 +243,10 @@ def escape_quotes_if_needed(string): def is_in_frame(driver): # Returns True if the driver has switched to a frame. # Returns False if the driver was on default content. + from seleniumbase.fixtures import shared_utils + + if shared_utils.is_cdp_swap_needed(driver): + return False in_basic_frame = driver.execute_script( """ var frame = window.frameElement; diff --git a/seleniumbase/fixtures/page_actions.py b/seleniumbase/fixtures/page_actions.py index 3e0b1506cd1..b3d9a206f37 100644 --- a/seleniumbase/fixtures/page_actions.py +++ b/seleniumbase/fixtures/page_actions.py @@ -49,6 +49,8 @@ def is_element_present(driver, selector, by="css selector"): @Returns Boolean (is element present) """ + if __is_cdp_swap_needed(driver): + return driver.cdp.is_element_present(selector) selector, by = page_utils.swap_selector_and_by_if_reversed(selector, by) try: driver.find_element(by=by, value=selector) @@ -67,6 +69,8 @@ def is_element_visible(driver, selector, by="css selector"): @Returns Boolean (is element visible) """ + if __is_cdp_swap_needed(driver): + return driver.cdp.is_element_visible(selector) selector, by = page_utils.swap_selector_and_by_if_reversed(selector, by) try: element = driver.find_element(by=by, value=selector) @@ -85,6 +89,7 @@ def is_element_clickable(driver, selector, by="css selector"): @Returns Boolean (is element clickable) """ + _reconnect_if_disconnected(driver) try: element = driver.find_element(by=by, value=selector) if element.is_displayed() and element.is_enabled(): @@ -104,6 +109,7 @@ def is_element_enabled(driver, selector, by="css selector"): @Returns Boolean (is element enabled) """ + _reconnect_if_disconnected(driver) try: element = driver.find_element(by=by, value=selector) return element.is_enabled() @@ -122,6 +128,7 @@ def is_text_visible(driver, text, selector="html", by="css selector"): @Returns Boolean (is text visible) """ + _reconnect_if_disconnected(driver) selector, by = page_utils.swap_selector_and_by_if_reversed(selector, by) text = str(text) try: @@ -151,6 +158,7 @@ def is_exact_text_visible(driver, text, selector, by="css selector"): @Returns Boolean (is text visible) """ + _reconnect_if_disconnected(driver) selector, by = page_utils.swap_selector_and_by_if_reversed(selector, by) text = str(text) try: @@ -185,6 +193,7 @@ def is_attribute_present( @Returns Boolean (is attribute present) """ + _reconnect_if_disconnected(driver) try: element = driver.find_element(by=by, value=selector) found_value = element.get_attribute(attribute) @@ -211,6 +220,7 @@ def is_non_empty_text_visible(driver, selector, by="css selector"): @Returns Boolean (is any text visible in the element with the selector) """ + _reconnect_if_disconnected(driver) try: element = driver.find_element(by=by, value=selector) element_text = element.text @@ -235,6 +245,7 @@ def hover_on_element(driver, selector, by="css selector"): selector - the locator for identifying the page element (required) by - the type of selector being used (Default: "css selector") """ + _reconnect_if_disconnected(driver) element = driver.find_element(by=by, value=selector) hover = ActionChains(driver).move_to_element(element) hover.perform() @@ -245,6 +256,7 @@ def hover_element(driver, element): """ Similar to hover_on_element(), but uses found element, not a selector. """ + _reconnect_if_disconnected(driver) hover = ActionChains(driver).move_to_element(element) hover.perform() return element @@ -276,6 +288,7 @@ def hover_and_click( timeout - number of seconds to wait for click element to appear after hover js_click - the option to use js_click() instead of click() on the last part """ + _reconnect_if_disconnected(driver) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) element = driver.find_element(by=hover_by, value=hover_selector) @@ -315,6 +328,7 @@ def hover_element_and_click( """ Similar to hover_and_click(), but assumes top element is already found. """ + _reconnect_if_disconnected(driver) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) hover = ActionChains(driver).move_to_element(element) @@ -347,6 +361,7 @@ def hover_element_and_double_click( click_by="css selector", timeout=settings.SMALL_TIMEOUT, ): + _reconnect_if_disconnected(driver) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) hover = ActionChains(driver).move_to_element(element) @@ -398,6 +413,7 @@ def wait_for_element_present( @Returns A web element object """ + _reconnect_if_disconnected(driver) element = None start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) @@ -457,6 +473,7 @@ def wait_for_element_visible( @Returns A web element object """ + _reconnect_if_disconnected(driver) element = None is_present = False start_ms = time.time() * 1000.0 @@ -538,6 +555,7 @@ def wait_for_text_visible( @Returns A web element object that contains the text searched for """ + _reconnect_if_disconnected(driver) element = None is_present = False full_text = None @@ -648,6 +666,7 @@ def wait_for_exact_text_visible( @Returns A web element object that contains the text searched for """ + _reconnect_if_disconnected(driver) element = None is_present = False actual_text = None @@ -758,6 +777,7 @@ def wait_for_attribute( @Returns A web element object that contains the expected attribute/value """ + _reconnect_if_disconnected(driver) element = None element_present = False attribute_present = False @@ -844,6 +864,7 @@ def wait_for_element_clickable( @Returns A web element object """ + _reconnect_if_disconnected(driver) element = None is_present = False is_visible = False @@ -938,6 +959,7 @@ def wait_for_element_absent( timeout - the time to wait for elements in seconds original_selector - handle pre-converted ":contains(TEXT)" selector """ + _reconnect_if_disconnected(driver) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) for x in range(int(timeout * 10)): @@ -985,6 +1007,7 @@ def wait_for_element_not_visible( timeout - the time to wait for the element in seconds original_selector - handle pre-converted ":contains(TEXT)" selector """ + _reconnect_if_disconnected(driver) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) for x in range(int(timeout * 10)): @@ -1037,6 +1060,7 @@ def wait_for_text_not_visible( @Returns A web element object that contains the text searched for """ + _reconnect_if_disconnected(driver) text = str(text) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) @@ -1080,6 +1104,7 @@ def wait_for_exact_text_not_visible( @Returns A web element object that contains the text searched for """ + _reconnect_if_disconnected(driver) text = str(text) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) @@ -1122,6 +1147,7 @@ def wait_for_non_empty_text_visible( @Returns The web element object that has text """ + _reconnect_if_disconnected(driver) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) element = None @@ -1239,6 +1265,7 @@ def find_visible_elements(driver, selector, by="css selector", limit=0): by - the type of selector being used (Default: "css selector") limit - the maximum number of elements to return if> 0. """ + _reconnect_if_disconnected(driver) elements = driver.find_elements(by=by, value=selector) if limit and limit> 0 and len(elements)> limit: elements = elements[:limit] @@ -1276,6 +1303,7 @@ def save_screenshot( If the folder provided doesn't exist, it will get created. The screenshot will be in PNG format: (*.png) """ + _reconnect_if_disconnected(driver) if not name.endswith(".png"): name = name + ".png" if folder: @@ -1314,6 +1342,7 @@ def save_page_source(driver, name, folder=None): """ from seleniumbase.core import log_helper + _reconnect_if_disconnected(driver) if not name.endswith(".html"): name = name + ".html" if folder: @@ -1340,6 +1369,7 @@ def wait_for_and_accept_alert(driver, timeout=settings.LARGE_TIMEOUT): driver - the webdriver object (required) timeout - the time to wait for the alert in seconds """ + _reconnect_if_disconnected(driver) alert = wait_for_and_switch_to_alert(driver, timeout) alert_text = alert.text alert.accept() @@ -1353,6 +1383,7 @@ def wait_for_and_dismiss_alert(driver, timeout=settings.LARGE_TIMEOUT): driver - the webdriver object (required) timeout - the time to wait for the alert in seconds """ + _reconnect_if_disconnected(driver) alert = wait_for_and_switch_to_alert(driver, timeout) alert_text = alert.text alert.dismiss() @@ -1368,6 +1399,7 @@ def wait_for_and_switch_to_alert(driver, timeout=settings.LARGE_TIMEOUT): driver - the webdriver object (required) timeout - the time to wait for the alert in seconds """ + _reconnect_if_disconnected(driver) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) for x in range(int(timeout * 10)): @@ -1395,6 +1427,7 @@ def switch_to_frame(driver, frame, timeout=settings.SMALL_TIMEOUT): frame - the frame element, name, id, index, or selector timeout - the time to wait for the alert in seconds """ + _reconnect_if_disconnected(driver) start_ms = time.time() * 1000.0 stop_ms = start_ms + (timeout * 1000.0) for x in range(int(timeout * 10)): @@ -1460,6 +1493,7 @@ def switch_to_window( timeout - the time to wait for the window in seconds uc_lock - if UC Mode and True, switch_to_window() uses thread-locking """ + _reconnect_if_disconnected(driver) if window == -1: window = len(driver.window_handles) - 1 start_ms = time.time() * 1000.0 @@ -1513,11 +1547,36 @@ def switch_to_window( timeout_exception(Exception, message) +############ + +# Special methods for use with UC Mode + +def _reconnect_if_disconnected(driver): + if ( + hasattr(driver, "_is_using_uc") + and driver._is_using_uc + and hasattr(driver, "_is_connected") + and not driver._is_connected + and hasattr(driver, "is_connected") + and not driver.is_connected() + ): + with suppress(Exception): + driver.connect() + + +def __is_cdp_swap_needed(driver): + """If the driver is disconnected, use a CDP method when available.""" + return shared_utils.is_cdp_swap_needed(driver) + + ############ # Support methods for direct use from driver def open_url(driver, url): + if __is_cdp_swap_needed(driver): + driver.cdp.open(url) + return url = str(url).strip() # Remove leading and trailing whitespace if not page_utils.looks_like_a_page_url(url): if page_utils.is_valid_url("https://" + url): @@ -1527,6 +1586,9 @@ def open_url(driver, url): def click(driver, selector, by="css selector", timeout=settings.SMALL_TIMEOUT): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.click(selector) + return element = wait_for_element_clickable( driver, selector, by=by, timeout=timeout ) @@ -1534,6 +1596,9 @@ def click(driver, selector, by="css selector", timeout=settings.SMALL_TIMEOUT): def click_link(driver, link_text, timeout=settings.SMALL_TIMEOUT): + if __is_cdp_swap_needed(driver): + driver.cdp.click_link(link_text) + return element = wait_for_element_clickable( driver, link_text, by="link text", timeout=timeout ) @@ -1544,6 +1609,9 @@ def click_if_visible( driver, selector, by="css selector", timeout=0 ): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.click_if_visible(selector) + return if is_element_visible(driver, selector, by=by): click(driver, selector, by=by, timeout=1) elif timeout> 0: @@ -1556,6 +1624,9 @@ def click_if_visible( def click_active_element(driver): + if __is_cdp_swap_needed(driver): + driver.cdp.click_active_element() + return driver.execute_script("document.activeElement.click();") @@ -1563,6 +1634,9 @@ def js_click( driver, selector, by="css selector", timeout=settings.SMALL_TIMEOUT ): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.click(selector) + return element = wait_for_element_present( driver, selector, by=by, timeout=timeout ) @@ -1588,6 +1662,9 @@ def send_keys( driver, selector, text, by="css selector", timeout=settings.LARGE_TIMEOUT ): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.send_keys(selector, text) + return element = wait_for_element_present( driver, selector, by=by, timeout=timeout ) @@ -1602,6 +1679,9 @@ def press_keys( driver, selector, text, by="css selector", timeout=settings.LARGE_TIMEOUT ): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.press_keys(selector, text) + return element = wait_for_element_present( driver, selector, by=by, timeout=timeout ) @@ -1618,6 +1698,9 @@ def update_text( driver, selector, text, by="css selector", timeout=settings.LARGE_TIMEOUT ): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.type(selector, text) + return element = wait_for_element_clickable( driver, selector, by=by, timeout=timeout ) @@ -1631,6 +1714,9 @@ def update_text( def submit(driver, selector, by="css selector"): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.send_keys(selector, "\r\n") + return element = wait_for_element_clickable( driver, selector, by=by, timeout=settings.SMALL_TIMEOUT ) @@ -1655,6 +1741,9 @@ def assert_element_visible( elif page_utils.is_valid_by(selector): original_selector = by selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.assert_element(selector) + return True wait_for_element_visible( driver, selector, @@ -1673,6 +1762,9 @@ def assert_element_present( elif page_utils.is_valid_by(selector): original_selector = by selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.assert_element_present(selector) + return True wait_for_element_present( driver, selector, @@ -1708,6 +1800,9 @@ def assert_text( timeout=settings.SMALL_TIMEOUT, ): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.assert_text(text, selector) + return True wait_for_text_visible( driver, text.strip(), selector, by=by, timeout=timeout ) @@ -1721,6 +1816,9 @@ def assert_exact_text( timeout=settings.SMALL_TIMEOUT, ): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + driver.cdp.assert_exact_text(text, selector) + return True wait_for_exact_text_visible( driver, text.strip(), selector, by=by, timeout=timeout ) @@ -1763,6 +1861,8 @@ def wait_for_element( elif page_utils.is_valid_by(selector): original_selector = by selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + return driver.cdp.select(selector) return wait_for_element_visible( driver=driver, selector=selector, @@ -1784,6 +1884,8 @@ def wait_for_selector( elif page_utils.is_valid_by(selector): original_selector = by selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + return driver.cdp.select(selector) return wait_for_element_present( driver=driver, selector=selector, @@ -1801,6 +1903,8 @@ def wait_for_text( timeout=settings.LARGE_TIMEOUT, ): selector, by = page_utils.recalculate_selector(selector, by) + if __is_cdp_swap_needed(driver): + return driver.cdp.find_element(selector) return wait_for_text_visible( driver=driver, text=text, @@ -1848,6 +1952,8 @@ def get_text( by="css selector", timeout=settings.LARGE_TIMEOUT ): + if __is_cdp_swap_needed(driver): + return driver.cdp.get_text(selector) element = wait_for_element( driver=driver, selector=selector, diff --git a/seleniumbase/fixtures/shared_utils.py b/seleniumbase/fixtures/shared_utils.py index 98092da4c1a..d7e85f77e89 100644 --- a/seleniumbase/fixtures/shared_utils.py +++ b/seleniumbase/fixtures/shared_utils.py @@ -73,6 +73,32 @@ def fix_colorama_if_windows(): colorama.just_fix_windows_console() +def fix_url_as_needed(url): + if not url: + url = "data:," + elif url.startswith("//"): + url = "https:" + url + elif ":" not in url: + url = "https://" + url + return url + + +def is_cdp_swap_needed(driver): + """ + When someone is using CDP Mode with a disconnected webdriver, + but they forget to reconnect before calling a webdriver method, + this method is used to substitute the webdriver method for a + CDP Mode method instead, which keeps CDP Stealth Mode enabled. + For other webdriver methods, SeleniumBase will reconnect first. + """ + return ( + driver.is_cdp_mode_active() + # and hasattr(driver, "_is_connected") + # and not driver._is_connected + and not driver.is_connected() + ) + + def format_exc(exception, message): """Formats an exception message to make the output cleaner.""" from selenium.common.exceptions import ElementNotVisibleException diff --git a/seleniumbase/plugins/driver_manager.py b/seleniumbase/plugins/driver_manager.py index 0041da03dd4..64f9250ecd0 100644 --- a/seleniumbase/plugins/driver_manager.py +++ b/seleniumbase/plugins/driver_manager.py @@ -124,10 +124,13 @@ def Driver( d_width=None, # Set device width d_height=None, # Set device height d_p_r=None, # Set device pixel ratio + position=None, # Shortcut / Duplicate of "window_position". + size=None, # Shortcut / Duplicate of "window_size". uc=None, # Shortcut / Duplicate of "undetectable". undetected=None, # Shortcut / Duplicate of "undetectable". uc_cdp=None, # Shortcut / Duplicate of "uc_cdp_events". uc_sub=None, # Shortcut / Duplicate of "uc_subprocess". + locale=None, # Shortcut / Duplicate of "locale_code". log_cdp=None, # Shortcut / Duplicate of "log_cdp_events". ad_block=None, # Shortcut / Duplicate of "ad_block_on". server=None, # Shortcut / Duplicate of "servername". @@ -216,10 +219,13 @@ def Driver( d_width (int): Set device width d_height (int): Set device height d_p_r (float): Set device pixel ratio + position (x,y): Shortcut / Duplicate of "window_position". + size (w,h): Shortcut / Duplicate of "window_size". uc (bool): Shortcut / Duplicate of "undetectable". undetected (bool): Shortcut / Duplicate of "undetectable". uc_cdp (bool): Shortcut / Duplicate of "uc_cdp_events". uc_sub (bool): Shortcut / Duplicate of "uc_subprocess". + locale (str): Shortcut / Duplicate of "locale_code". log_cdp (bool): Shortcut / Duplicate of "log_cdp_events". ad_block (bool): Shortcut / Duplicate of "ad_block_on". server (str): Shortcut / Duplicate of "servername". @@ -433,6 +439,8 @@ def Driver( break count += 1 disable_features = d_f + if window_position is None and position is not None: + window_position = position w_p = window_position if w_p is None and "--window-position" in arg_join: count = 0 @@ -446,29 +454,31 @@ def Driver( w_p = None break count += 1 - window_position = w_p - if window_position: - if window_position.count(",") != 1: - message = ( - '\n\n window_position expects an "x,y" string!' - '\n (Your input was: "%s")\n' % window_position - ) - raise Exception(message) - window_position = window_position.replace(" ", "") - win_x = None - win_y = None - try: - win_x = int(window_position.split(",")[0]) - win_y = int(window_position.split(",")[1]) - except Exception: - message = ( - '\n\n Expecting integer values for "x,y"!' - '\n (window_position input was: "%s")\n' - % window_position - ) - raise Exception(message) - settings.WINDOW_START_X = win_x - settings.WINDOW_START_Y = win_y + window_position = w_p + if window_position: + if window_position.count(",") != 1: + message = ( + '\n\n window_position expects an "x,y" string!' + '\n (Your input was: "%s")\n' % window_position + ) + raise Exception(message) + window_position = window_position.replace(" ", "") + win_x = None + win_y = None + try: + win_x = int(window_position.split(",")[0]) + win_y = int(window_position.split(",")[1]) + except Exception: + message = ( + '\n\n Expecting integer values for "x,y"!' + '\n (window_position input was: "%s")\n' + % window_position + ) + raise Exception(message) + settings.WINDOW_START_X = win_x + settings.WINDOW_START_Y = win_y + if window_size is None and size is not None: + window_size = size w_s = window_size if w_s is None and "--window-size" in arg_join: count = 0 @@ -482,30 +492,30 @@ def Driver( w_s = None break count += 1 - window_size = w_s - if window_size: - if window_size.count(",") != 1: - message = ( - '\n\n window_size expects a "width,height" string!' - '\n (Your input was: "%s")\n' % window_size - ) - raise Exception(message) - window_size = window_size.replace(" ", "") - width = None - height = None - try: - width = int(window_size.split(",")[0]) - height = int(window_size.split(",")[1]) - except Exception: - message = ( - '\n\n Expecting integer values for "width,height"!' - '\n (window_size input was: "%s")\n' % window_size - ) - raise Exception(message) - settings.CHROME_START_WIDTH = width - settings.CHROME_START_HEIGHT = height - settings.HEADLESS_START_WIDTH = width - settings.HEADLESS_START_HEIGHT = height + window_size = w_s + if window_size: + if window_size.count(",") != 1: + message = ( + '\n\n window_size expects a "width,height" string!' + '\n (Your input was: "%s")\n' % window_size + ) + raise Exception(message) + window_size = window_size.replace(" ", "") + width = None + height = None + try: + width = int(window_size.split(",")[0]) + height = int(window_size.split(",")[1]) + except Exception: + message = ( + '\n\n Expecting integer values for "width,height"!' + '\n (window_size input was: "%s")\n' % window_size + ) + raise Exception(message) + settings.CHROME_START_WIDTH = width + settings.CHROME_START_HEIGHT = height + settings.HEADLESS_START_WIDTH = width + settings.HEADLESS_START_HEIGHT = height if agent is None and "--agent" in arg_join: count = 0 for arg in sys_argv: @@ -734,6 +744,8 @@ def Driver( swiftshader = True else: swiftshader = False + if locale is not None and locale_code is None: + locale_code = locale if ad_block is not None and ad_block_on is None: ad_block_on = ad_block if ad_block_on is None: @@ -779,6 +791,83 @@ def Driver( # Launch a web browser from seleniumbase.core import browser_launcher + # Fix Chrome-130 issues by creating a user-data-dir in advance + if undetectable and not user_data_dir and browser == "chrome": + import tempfile + import time + user_data_dir = ( + os.path.normpath(tempfile.mkdtemp()) + ) + try: + decoy_driver = browser_launcher.get_driver( + browser_name=browser_name, + headless=False, + locale_code=locale_code, + use_grid=use_grid, + protocol=protocol, + servername=servername, + port=port, + proxy_string=proxy_string, + proxy_bypass_list=proxy_bypass_list, + proxy_pac_url=proxy_pac_url, + multi_proxy=multi_proxy, + user_agent=user_agent, + cap_file=cap_file, + cap_string=cap_string, + recorder_ext=recorder_ext, + disable_cookies=disable_cookies, + disable_js=disable_js, + disable_csp=disable_csp, + enable_ws=enable_ws, + enable_sync=enable_sync, + use_auto_ext=use_auto_ext, + undetectable=undetectable, + uc_cdp_events=uc_cdp_events, + uc_subprocess=uc_subprocess, + log_cdp_events=log_cdp_events, + no_sandbox=no_sandbox, + disable_gpu=disable_gpu, + headless1=False, + headless2=True, + incognito=incognito, + guest_mode=guest_mode, + dark_mode=dark_mode, + devtools=devtools, + remote_debug=remote_debug, + enable_3d_apis=enable_3d_apis, + swiftshader=swiftshader, + ad_block_on=ad_block_on, + host_resolver_rules=host_resolver_rules, + block_images=block_images, + do_not_track=do_not_track, + chromium_arg=chromium_arg, + firefox_arg=firefox_arg, + firefox_pref=firefox_pref, + user_data_dir=user_data_dir, + extension_zip=extension_zip, + extension_dir=extension_dir, + disable_features=disable_features, + binary_location=binary_location, + driver_version=driver_version, + page_load_strategy=page_load_strategy, + use_wire=use_wire, + external_pdf=external_pdf, + test_id=test_id, + mobile_emulator=is_mobile, + device_width=d_width, + device_height=d_height, + device_pixel_ratio=d_p_r, + browser=browser_name, + ) + time.sleep(0.555) + except Exception: + pass + finally: + try: + decoy_driver.quit() + except Exception: + pass + driver = browser_launcher.get_driver( browser_name=browser_name, headless=headless, diff --git a/seleniumbase/plugins/sb_manager.py b/seleniumbase/plugins/sb_manager.py index d35e2806ce1..3f00d3f7f84 100644 --- a/seleniumbase/plugins/sb_manager.py +++ b/seleniumbase/plugins/sb_manager.py @@ -104,10 +104,13 @@ def SB( disable_ws=None, # Reverse of "enable_ws". (None and False are different) disable_beforeunload=None, # Disable the "beforeunload" event on Chromium. settings_file=None, # A file for overriding default SeleniumBase settings. + position=None, # Shortcut / Duplicate of "window_position". + size=None, # Shortcut / Duplicate of "window_size". uc=None, # Shortcut / Duplicate of "undetectable". undetected=None, # Shortcut / Duplicate of "undetectable". uc_cdp=None, # Shortcut / Duplicate of "uc_cdp_events". uc_sub=None, # Shortcut / Duplicate of "uc_subprocess". + locale=None, # Shortcut / Duplicate of "locale_code". log_cdp=None, # Shortcut / Duplicate of "log_cdp_events". ad_block=None, # Shortcut / Duplicate of "ad_block_on". server=None, # Shortcut / Duplicate of "servername". @@ -224,10 +227,13 @@ def SB( disable_ws (bool): Reverse of "enable_ws". (None and False are different) disable_beforeunload (bool): Disable the "beforeunload" event on Chromium. settings_file (str): A file for overriding default SeleniumBase settings. + position (x,y): Shortcut / Duplicate of "window_position". + size (w,h): Shortcut / Duplicate of "window_size". uc (bool): Shortcut / Duplicate of "undetectable". undetected (bool): Shortcut / Duplicate of "undetectable". uc_cdp (bool): Shortcut / Duplicate of "uc_cdp_events". uc_sub (bool): Shortcut / Duplicate of "uc_subprocess". + locale (str): Shortcut / Duplicate of "locale_code". log_cdp (bool): Shortcut / Duplicate of "log_cdp_events". ad_block (bool): Shortcut / Duplicate of "ad_block_on". server (str): Shortcut / Duplicate of "servername". @@ -497,8 +503,13 @@ def SB( break count += 1 disable_features = d_f + if window_position is None and position is not None: + window_position = position w_p = window_position - if w_p is None and "--window-position" in arg_join: + if ( + w_p is None + and ("--window-position" in arg_join or "--position" in arg_join) + ): count = 0 for arg in sys_argv: if arg.startswith("--window-position="): @@ -511,6 +522,8 @@ def SB( break count += 1 window_position = w_p + if window_size is None and size is not None: + window_size = size w_s = window_size if w_s is None and "--window-size" in arg_join: count = 0 @@ -890,6 +903,8 @@ def SB( swiftshader = True else: swiftshader = False + if locale is not None and locale_code is None: + locale_code = locale if ad_block is not None and ad_block_on is None: ad_block_on = ad_block if ad_block_on is None: @@ -1197,6 +1212,30 @@ def SB( if not sb_config.multi_proxy: proxy_helper.remove_proxy_zip_if_present() start_time = time.time() + saved_headless2 = headless2 + + # Fix Chrome-130 issues by creating a user-data-dir in advance + if undetectable and not user_data_dir and browser == "chrome": + import tempfile + user_data_dir = ( + os.path.normpath(tempfile.mkdtemp()) + ) + sb.user_data_dir = user_data_dir + sb_config.user_data_dir = user_data_dir + try: + decoy = sb + decoy.headless2 = True + decoy.setUp() + decoy.sleep(0.555) + except Exception: + pass + finally: + try: + decoy.tearDown() + except Exception: + pass + sb.headless2 = saved_headless2 + sb.setUp() test_passed = True # This can change later teardown_exception = None diff --git a/seleniumbase/undetected/__init__.py b/seleniumbase/undetected/__init__.py index 36d6d3a5977..f9b0b44845b 100644 --- a/seleniumbase/undetected/__init__.py +++ b/seleniumbase/undetected/__init__.py @@ -441,6 +441,7 @@ def disconnect(self): if hasattr(self, "service"): with suppress(Exception): self.service.stop() + self._is_connected = False time.sleep(0.012) def connect(self): @@ -452,6 +453,7 @@ def connect(self): time.sleep(0.012) with suppress(Exception): self.start_session() + self._is_connected = True time.sleep(0.012) def start_session(self, capabilities=None): diff --git a/seleniumbase/undetected/cdp_driver/__init__.py b/seleniumbase/undetected/cdp_driver/__init__.py new file mode 100644 index 00000000000..8cc0fcf4a08 --- /dev/null +++ b/seleniumbase/undetected/cdp_driver/__init__.py @@ -0,0 +1 @@ +from seleniumbase.undetected.cdp_driver import cdp_util # noqa diff --git a/seleniumbase/undetected/cdp_driver/_contradict.py b/seleniumbase/undetected/cdp_driver/_contradict.py new file mode 100644 index 00000000000..e087e4cf07e --- /dev/null +++ b/seleniumbase/undetected/cdp_driver/_contradict.py @@ -0,0 +1,110 @@ +import warnings as _warnings +from collections.abc import Mapping as _Mapping, Sequence as _Sequence +import logging + +__logger__ = logging.getLogger(__name__) +__all__ = ["cdict", "ContraDict"] + + +def cdict(*args, **kwargs): + """Factory function""" + return ContraDict(*args, **kwargs) + + +class ContraDict(dict): + """ + Directly inherited from dict. + Accessible by attribute. o.x == o['x'] + This works also for all corner cases. + Native json.dumps and json.loads work with it. + + Names like "keys", "update", "values" etc won't overwrite the methods, + but will just be available using dict lookup notation obj['items'] + instead of obj.items. + + All key names are converted to snake_case. + Hyphen's (-), dot's (.) or whitespaces are replaced by underscore (_). + Autocomplete works even if the objects comes from a list. + Recursive action. Dict assignments will be converted too. + """ + __module__ = None + + def __init__(self, *args, **kwargs): + super().__init__() + silent = kwargs.pop("silent", False) + _ = dict(*args, **kwargs) + + super().__setattr__("__dict__", self) + for k, v in _.items(): + _check_key(k, self, False, silent) + super().__setitem__(k, _wrap(self.__class__, v)) + + def __setitem__(self, key, value): + super().__setitem__(key, _wrap(self.__class__, value)) + + def __setattr__(self, key, value): + super().__setitem__(key, _wrap(self.__class__, value)) + + def __getattribute__(self, attribute): + if attribute in self: + return self[attribute] + if not _check_key(attribute, self, True, silent=True): + return getattr(super(), attribute) + return object.__getattribute__(self, attribute) + + +def _wrap(cls, v): + if isinstance(v, _Mapping): + v = cls(v) + elif isinstance(v, _Sequence) and not isinstance( + v, (str, bytes, bytearray, set, tuple) + ): + v = list([_wrap(cls, x) for x in v]) + return v + + +_warning_names = ( + "items", + "keys", + "values", + "update", + "clear", + "copy", + "fromkeys", + "get", + "items", + "keys", + "pop", + "popitem", + "setdefault", + "update", + "values", + "class", +) + +_warning_names_message = """\n\ + While creating a ContraDict object, a key offending key name '{0}' + has been found, which might behave unexpected. + You will only be able to look it up using key, + eg. myobject['{0}']. myobject.{0} will not work with that name.""" + + +def _check_key( + key: str, mapping: _Mapping, boolean: bool = False, silent=False +): + """Checks `key` and warns if needed. + :param key: + :param boolean: return True or False instead of passthrough + """ + e = None + if not isinstance(key, (str,)): + if boolean: + return True + return key + if key.lower() in _warning_names or any(_ in key for _ in ("-", ".")): + if not silent: + _warnings.warn(_warning_names_message.format(key)) + e = True + if not boolean: + return key + return not e diff --git a/seleniumbase/undetected/cdp_driver/browser.py b/seleniumbase/undetected/cdp_driver/browser.py new file mode 100644 index 00000000000..7571cb87d2b --- /dev/null +++ b/seleniumbase/undetected/cdp_driver/browser.py @@ -0,0 +1,830 @@ +"""CDP-Driver is based on NoDriver""" +from __future__ import annotations +import asyncio +import atexit +import http.cookiejar +import json +import logging +import os +import pickle +import pathlib +import shutil +import urllib.parse +import urllib.request +import warnings +from collections import defaultdict +from typing import List, Set, Tuple, Union +import mycdp as cdp +from . import cdp_util as util +from . import tab +from ._contradict import ContraDict +from .config import PathLike, Config, is_posix +from .connection import Connection + +logger = logging.getLogger(__name__) + + +def get_registered_instances(): + return __registered__instances__ + + +def deconstruct_browser(): + import time + + for _ in __registered__instances__: + if not _.stopped: + _.stop() + for attempt in range(5): + try: + if _.config and not _.config.uses_custom_data_dir: + shutil.rmtree(_.config.user_data_dir, ignore_errors=False) + except FileNotFoundError: + break + except (PermissionError, OSError) as e: + if attempt == 4: + logger.debug( + "Problem removing data dir %s\n" + "Consider checking whether it's there " + "and remove it by hand\nerror: %s", + _.config.user_data_dir, + e, + ) + break + time.sleep(0.15) + continue + logging.debug("Temp profile %s was removed." % _.config.user_data_dir) + + +class Browser: + """ + The Browser object is the "root" of the hierarchy + and contains a reference to the browser parent process. + There should usually be only 1 instance of this. + All opened tabs, extra browser screens, + and resources will not cause a new Browser process, + but rather create additional :class:`Tab` objects. + So, besides starting your instance and first/additional tabs, + you don't actively use it a lot under normal conditions. + Tab objects will represent and control: + - tabs (as you know them) + - browser windows (new window) + - iframe + - background processes + Note: + The Browser object is not instantiated by __init__ + but using the asynchronous :meth:`Browser.create` method. + Note: + In Chromium based browsers, there is a parent process which keeps + running all the time, even if there are no visible browser windows. + Sometimes it's stubborn to close it, so make sure that after using + this library, the browser is correctly and fully closed/exited/killed. + """ + _process: asyncio.subprocess.Process + _process_pid: int + _http: HTTPApi = None + _cookies: CookieJar = None + config: Config + connection: Connection + + @classmethod + async def create( + cls, + config: Config = None, + *, + user_data_dir: PathLike = None, + headless: bool = False, + incognito: bool = False, + guest: bool = False, + browser_executable_path: PathLike = None, + browser_args: List[str] = None, + sandbox: bool = True, + host: str = None, + port: int = None, + **kwargs, + ) -> Browser: + """Entry point for creating an instance.""" + if not config: + config = Config( + user_data_dir=user_data_dir, + headless=headless, + incognito=incognito, + guest=guest, + browser_executable_path=browser_executable_path, + browser_args=browser_args or [], + sandbox=sandbox, + host=host, + port=port, + **kwargs, + ) + instance = cls(config) + await instance.start() + return instance + + def __init__(self, config: Config, **kwargs): + """ + Constructor. To create a instance, use :py:meth:`Browser.create(...)` + :param config: + """ + try: + asyncio.get_running_loop() + except RuntimeError: + raise RuntimeError( + "{0} objects of this class are created " + "using await {0}.create()".format( + self.__class__.__name__ + ) + ) + self.config = config + self.targets: List = [] + self.info = None + self._target = None + self._process = None + self._process_pid = None + self._keep_user_data_dir = None + self._is_updating = asyncio.Event() + self.connection: Connection = None + logger.debug("Session object initialized: %s" % vars(self)) + + @property + def websocket_url(self): + return self.info.webSocketDebuggerUrl + + @property + def main_tab(self) -> tab.Tab: + """Returns the target which was launched with the browser.""" + return sorted( + self.targets, key=lambda x: x.type_ == "page", reverse=True + )[0] + + @property + def tabs(self) -> List[tab.Tab]: + """Returns the current targets which are of type "page".""" + tabs = filter(lambda item: item.type_ == "page", self.targets) + return list(tabs) + + @property + def cookies(self) -> CookieJar: + if not self._cookies: + self._cookies = CookieJar(self) + return self._cookies + + @property + def stopped(self): + if self._process and self._process.returncode is None: + return False + return True + # return (self._process and self._process.returncode) or False + + async def wait(self, time: Union[float, int] = 1) -> Browser: + """Wait for