A few months ago I put up a port scanner for review. Now, I've updated it with some new Python knowledge and integraing the feedback I got.
Some things I specifically think might be wrong with it or feel clunky to me:
- Default argument values for the
scan_ports
function. I set the default values, but then I have to check forNone
again when I enter the block. I was actually kind of surprised and disappointed that, if the arg wasNone
, it didn't just automatically take the default. Any better way to handle this? - The
is_address_valid
function. I'm not entirely sure I did this the proper way, with catching multiple exception types. Not sure if I'm catching things I shouldn't be, or if there's a more elegant way to approach it. - Detecting the operating system. Not sure if the standard is to use
os.name
orplatform.system()
. - When is it acceptable to actually add line-breaks into your function bodies? Most of the scripts I've seen are really, really loathe to do so, and I kind of get it since everything in Python is space-based instead of bracket-based. But sometimes the function bodies seem to be a bit too long and it starts to look bad.
- (More of a StackOverflow question, but still, if someone wants to throw it in as a tidbit...) Why do the arguments in
socket.socket().connect((host, 80))
have to be wrapped in another set of parentheses? Found that this was the way to do it, but couldn't find in the API why it has to be that way.
Any tips or feedback are greatly appreciated!
Also, I think @GarethRees was skeptical in the other question that I might be using this for malicious purposes. I'm actually just learning from a book called Violent Python, and a port scanner is the first thing it goes over in the book. The book is a bit more interesting than a normal textbook that goes over different variations of "Hello World!", haha.
#!/usr/bin/env python3
import argparse
import errno
import functools
import multiprocessing
import os
import platform
import socket
from concurrent.futures import ThreadPoolExecutor
DEFAULT_HOST = '127.0.0.1'
DEFAULT_TIMEOUT = 2
DEFAULT_CORES = 4
DEFAULT_THREADS = 512
PORT_RANGE = range(1, 65536)
def ping(host, port):
try:
socket.socket().connect((host, port))
print(str(port) + " Open")
return port
except socket.timeout:
return False
except socket.error as socket_error:
if socket_error.errno == errno.ECONNREFUSED:
return False
raise
def thread_scan(host, threads):
print('Scanning ' + host + ' with threads ...')
with ThreadPoolExecutor(max_workers = threads) as executor:
ping_partial = functools.partial(ping, host)
return list(filter(bool, executor.map(ping_partial, PORT_RANGE)))
def fork_scan(host, cores):
print('Scanning ' + host + ' with forks ...')
pool = multiprocessing.Pool(cores)
ping_partial = functools.partial(ping, host)
return list(filter(bool, pool.map(ping_partial, PORT_RANGE)))
def is_address_valid(host):
try:
socket.socket().connect((host, 80))
return True
except socket.gaierror:
return False
except (socket.timeout, socket.error):
return True
def get_os_info():
return os.name + ' [' + platform.system() + ' ' + platform.version() + ']'
def scan_ports(host = DEFAULT_HOST,
timeout = DEFAULT_TIMEOUT,
threads = DEFAULT_THREADS,
cores = DEFAULT_CORES):
if host is None:
host = DEFAULT_HOST
if timeout is None:
timeout = DEFAULT_TIMEOUT
if threads is None:
threads = DEFAULT_THREADS
if cores is None:
cores = DEFAULT_CORES
available_ports = []
socket.setdefaulttimeout(timeout)
if not is_address_valid(host):
print('DNS lookup for \'' + host + '\' failed.')
return
if os.name == 'nt':
print('Windows OS detected.')
available_ports = thread_scan(host, threads)
elif os.name == 'posix':
print('*Nix OS detected.')
available_ports = fork_scan(host, cores)
else:
print('Unidentified operating system: ' + get_os_info())
available_ports = fork_scan(host, cores)
print('Done.')
available_ports.sort()
print()
print(str(len(available_ports)) + ' ports available.')
print(available_ports)
def main():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-ip', '--host', help = 'IP address/host to scan')
arg_parser.add_argument('-to', '--timeout', help = 'Connection timeout in seconds', type = int)
arg_parser.add_argument('-t', '--threads', help = 'Maximum number of threads to spawn (Windows only)', type = int)
arg_parser.add_argument('-c', '--cores', help = 'Number of cores to utilize (*Nix only)', type = int)
args = arg_parser.parse_args()
scan_ports(args.host, args.timeout, args.threads, args.cores)
if __name__ == "__main__":
main()
-
1\$\begingroup\$ Follow-up question here \$\endgroup\$200_success– 200_success2014年04月11日 06:28:07 +00:00Commented Apr 11, 2014 at 6:28
1 Answer 1
You're opening a lot of sockets but not closing them. Those are likely file descriptor leaks. (Whether or not it actually leaks depends on the garbage collector's behaviour.)
If you're using concurrent.futures.ThreadPoolExecutor
for the multithreaded version, then you should also use concurrent.futures.ProcessPoolExecutor
for the multiprocess version.
ping
usually implies an ICMP echo. If you're actually doing a TCP scan, call it tcp_ping
or something.
The status messages are misleading. "Windows OS detected" sounds like you performed OS fingerprinting on the remote peer, but you actually mean that the port scanner is running on a Windows host.
-
\$\begingroup\$ Thanks for your comments. How does one close the socket appropriately? \$\endgroup\$asteri– asteri2014年04月10日 20:14:56 +00:00Commented Apr 10, 2014 at 20:14
-
\$\begingroup\$ I guess just storing the reference to
socket.socket()
and then calling.close()
on it later. Thanks. I'll try it. \$\endgroup\$asteri– asteri2014年04月10日 20:23:19 +00:00Commented Apr 10, 2014 at 20:23 -
2\$\begingroup\$ Actually, you should prefer Python's
with
construct if you can. \$\endgroup\$Toby Speight– Toby Speight2016年11月09日 17:23:23 +00:00Commented Nov 9, 2016 at 17:23
Explore related questions
See similar questions with these tags.