3
\$\begingroup\$

I have previously got a review of something similar (but not the same but its related to proxies) which can be found via: proxy-manager-to-return-proxy-format

What I am now trying to do is that I want to lock the proxies when they are in use. When they are in use we block them and whenever they are finished with they supposed to do then another thread can pick it up. I have ended up doing something like this See AvailableProxies() as ProxyManager has been reviewed:

import random
import threading
import time
from typing import List, Dict, Optional
from loguru import logger
"""
Been reviewed: https://codereview.stackexchange.com/a/266293/223687
"""
class ProxyManager:
 def __init__(self, proxy_file_path: Optional[str] = None):
 self._proxies = self._load_proxies(proxy_file_path) if proxy_file_path else []
 @staticmethod
 def _load_proxies(proxy_file_path: str) -> List[Dict[str, str]]:
 """
 Parse proxies into the format `http://ip:port` or `http://ip:port:user@password`.
 #
 >> load_proxies(['hello:12345', 'hello:12345:apple:orange'])
 # [{'https': 'http://hello:12345'}, {'https': 'http://hello:12345:apple@orange'}]
 >> load_proxies(['test:4125', 'myserver:1337:usertest:passwordtest'])
 [{'https': 'http://test:4125'}, {'https': 'http://myserver:1337:usertest@passwordtest'}]
 """
 def _load_ip_proxy(value: str) -> str:
 """
 Transform a `{ip}:{port}` string into a `http://{ip}:{port}` string.
 """
 ip, port = value.split(":")
 return f"http://{ip}:{port}"
 def _load_ipup_proxy(value: str) -> str:
 """
 Transform a `{ip}:{port}:{user}:{password}` string into a `http://{ip}:{port}:{user}@{password}` string.
 """
 ip, port, user, password = value.split(":")
 return f"http://{ip}:{port}:{user}@{password}"
 parsing_functions = {1: _load_ip_proxy, 3: _load_ipup_proxy}
 with open(proxy_file_path) as proxy_file:
 proxies = []
 for line in proxy_file:
 proxy = line.strip('\n')
 parsing_function = parsing_functions.get(proxy.count(":"))
 if parsing_function is None:
 continue
 proxies.append({'https': parsing_function(proxy)})
 return proxies
 @property
 def random_proxy(self) -> Dict[str, str]:
 return random.choice(self._proxies) if len(self._proxies) > 0 else {}
 @property
 def full_list(self) -> List[Dict[str, str]]:
 return self._proxies
# -------------------------------------------------------------------------
# Proxies path
# -------------------------------------------------------------------------
proxies: Dict[str, ProxyManager] = {
 "rotating": ProxyManager("./proxies/rotating.txt"),
 "test": ProxyManager("./proxies/test.txt")
}
# -------------------------------------------------------------------------
# Proxies available
# -------------------------------------------------------------------------
all_proxies = [proxy['https'] for proxy in proxies["test"].full_list]
proxy_dict = dict(zip(all_proxies, [True] * len(all_proxies)))
proxy_lock: threading = threading.Lock()
# -------------------------------------------------------------------------
# Initialize availability of proxies
# -------------------------------------------------------------------------
class AvailableProxies:
 def __enter__(self):
 proxy_lock.acquire()
 self.proxy = None
 while not self.proxy:
 if available := [att for att, value in proxy_dict.items() if value]:
 self.proxy = random.choice(available)
 proxy_dict[self.proxy] = False
 else:
 logger.info("Waiting ... no proxies available")
 time.sleep(.2)
 proxy_lock.release()
 return self.proxy
 def __exit__(self, exc_type, exc_val, exc_tb):
 proxy_dict[self.proxy] = True

To be able to test it:

import threading
import threading
import time
from lib.proxies import AvailableProxies
def main(thread_name):
 while True:
 with AvailableProxies() as proxies:
 print(f"Thread name: {thread_name} -> {proxies}")
 time.sleep(3)
if __name__ == '__main__':
 for i in range(10):
 threading.Thread(target=main, args=(i,)).start()
 time.sleep(0.3)

However there is some script where I will use that I will not use the AvailableProxies and only ProxyManager (But the AvailableProxies should be available to import at any time to my scripts of course :) (If thats matters)

There is somethings I am concern about is that I am not sure if this method to set a proxy to "available" and "busy" is a valid way to do (but hey, it works!).

I am looking forward for any code review and I do hope this time I will not get shocked as I got on my previous review :)

asked Aug 22, 2021 at 21:12
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

Availability of proxies

Documentation

The threading name refers to the module, the threading.Lock name refers to a class inside that module. Thus, the type of proxy_lock is wrong, since it's an instance of threading.Lock. You can replace it with

proxy_lock: threading.Lock = threading.Lock()

Performance

To avoid allocating a list filled with Trues, you can use dict.fromkeys and replace

proxy_dict = dict(zip(all_proxies, [True] * len(all_proxies)))

with

proxy_dict = dict.fromkeys(all_proxies, True)

Logic

You can use a context manager to use your lock. There are two main advantages of using a context manager:

  1. You'll only use your proxy_lock variable once, in the with-block. Currently, you're using twice and it may be prone to errors (e.g. you may change your code in the future and accidentally delete the line where you release the lock).

  2. If an error occurs between the lock acquire and the lock release, your lock will not be released. However, the context manager of the threading.Lock class will handle it for you in its internals.

So you can replace

def __enter__(self):
 proxy_lock.acquire()
 
 # ...
 proxy_lock.release()
 return self.proxy

with

def __enter__(self):
 with proxy_lock:
 # ...
 return self.proxy

The proxy_lock variable should not be available to the outside world, only AvailableProxies will use it. However, when you import lib.proxies, proxy_lock will come with it. You can make it a private class attribute:

class AvailableProxies:
 _proxy_lock: threading.Lock = threading.Lock()
 def __enter__(self):
 with self._proxy_lock:
 # ...
 return self.proxy

Dependency injection

You should decide whether or not to use dependency injection. This answer explains it well:

Dependency Injection is a practice where objects are designed in a manner where they receive instances of the objects from other pieces of code, instead of constructing them internally. This means that any object implementing the interface which is required by the object can be substituted in without changing the code, which simplifies testing, and improves decoupling.

Its main goal is to make unit testing easier and one of its rules is to "avoid using mutable global variables". Your proxy_dict variable is mutable and it's declared in the outermost (global) scope, thus it's against the principles of DI. Think if it's worth it. If it is, here's how to fix it step-by-step:

  1. Note that, in your driver file, main is creating an instance of AvailableProxies. Let's use DI and pass the dependency to main instead of creating a new one:
def main(manager, thread_name):
 while True:
 with manager as proxies:
 print(f"Thread name: {thread_name} -> {proxies}")
 time.sleep(3)
if __name__ == '__main__':
 # Only one instance
 manager = AvailableProxies()
 for i in range(10):
 threading.Thread(target=main, args=(manager, i)).start()
 time.sleep(0.3)

You can try this change. You'll notice that it will not work: only threads 0-4 will get a proxy, threads 5-9 will deadlock on self.proxy since there's only one instance of AvailableProxies. We can fix this by using contextlib.contextmanager instead of using __enter__ and __exit__.

I also like using the latter, but in this case __exit__ needs to access the proxy found by __enter__, and the only way for __enter__ pass it to __exit__ is by creating a instance variable (thus modifying the instance). However, contextlib.contextmanager allows this transfer to occur via a local variable, which does not modify the instance and removes the deadlock threat:

import contextlib
class AvailableProxies:
 # ...
 @property
 @contextlib.contextmanager
 def proxies(self):
 # Note that we can use `proxy` instead of `self.proxy`
 # Since our goal is to use only one instance of `AvailableProxies`
 # and multiple threads, using a local variable here don't modify 
 # the instance in any way, avoiding a possible deadlock
 proxy = None
 with self._proxy_lock:
 while not proxy:
 if available := [att for att, value in proxy_dict.items() if value]:
 proxy = random.choice(available)
 proxy_dict[proxy] = False
 else:
 logger.info("Waiting ... no proxies available")
 time.sleep(.2)
 yield proxy
 proxy_dict[proxy] = True

In your driver file:

def main(manager, thread_name):
 while True:
 # Access the `proxies` property here
 with manager.proxies as proxies:
 print(f"Thread name: {thread_name} -> {proxies}")
 time.sleep(3)

This now works as expected.

  1. The proxy_dict dictionary is still mutable and it's still global. Let's create it in the constructor of AvailableProxies as a instance variable:
class AvailableProxies:
 # ...
 # Pass the proxies as a dependency to the constructor (DI)
 def __init__(self, proxies: Dict[str, ProxyManager]):
 all_proxies = [proxy['https'] for proxy in proxies["test"].full_list]
 self._proxy_dict = dict.fromkeys(all_proxies, True)
 
 @property
 @contextlib.contextmanager
 def proxies(self):
 # Replace `proxy_dict` here with `self._proxy_dict`
 ...

You may ask here: when I do self._proxy_dict[proxy] = True, I'm modifying the AvailableProxies instance, why doesn't a deadlock occur here too? The answer is that the loop inside the lock does not depend on self._proxy_dict, but depended on self.proxy. If you modified self.proxy, it would affect the loop condition, creating the deadlock. Since the loop doesn't depend on self._proxy_dict, you can modify it fine.

Since now AvailableProxies expects proxies as an argument to its constructor, create proxies as a local variable instead of a global variable:

if __name__ == '__main__':
 proxies: Dict[str, ProxyManager] = {
 "rotating": ProxyManager("./proxies/rotating.txt"),
 "test": ProxyManager("./proxies/test.txt")
 }
 manager = AvailableProxies(proxies)
 for i in range(10):
 threading.Thread(target=main, args=(manager, i)).start()
 time.sleep(0.3)

Your code is fine now: no more global variables.


For a final note, I would suggest to add a suffix to AvailableProxies, such as AvailableProxiesProvider or AvailableProxiesManager, but that's up to you.

answered Aug 24, 2021 at 0:45
\$\endgroup\$
3
  • 1
    \$\begingroup\$ Once again a hero! Oh wow, I honestly cant believe how you managed to change my code totally but also describing every step why and how which I really love. I have read the answer already 5 times and have adapted it into my code. It is indeed very well written. thank you so much! I do indeed have a question. Is it okey to have the manager = AvailableProxies(proxies) in a utils.py lib and then just import it to a scripts where you basically want to use it? (e.g. test1.py and test2.py imports from lib.utils import manager instead of doing it in a if __name__ == '__main__': ? \$\endgroup\$ Commented Aug 24, 2021 at 8:32
  • 1
    \$\begingroup\$ Yes, it's okay! In fact, the approach you mentioned is the easiest way of creating a singleton in Python. \$\endgroup\$ Commented Aug 24, 2021 at 15:16
  • 1
    \$\begingroup\$ Thank you so much Enzo! I wish you a very good day, week, month, years and all coming years ahead! You are awesome! \$\endgroup\$ Commented Aug 24, 2021 at 18:34

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.