Question
I love using NMAP and other related tools to scan networks really fast.
But using a tool and understanding how it works are 2 different things, thus I tried creating a program where I can scan a subnet for open TCP-Ports
. I am using scapy
on a *NIX
machine in python2.7
It works, but is rather slow. Will be working on Threading
later on, so that is not under review.
Before I continue with coding, I'll want to know a few things:
- I'm interested in how scalable my approach is.
- Any stylistic errors.
- Or is there a better approach in general, maybe
scapy
is outdated.
Code
from scapy.all import *
import argparse
TIME_OUT = 2
def arp_ping(subnet):
"""ARP Pings entire subnet returns found in subnet."""
conf.verb = 0
answered, unanswered = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=subnet), timeout=TIME_OUT, verbose=False, inter=0.1)
return [rcv.sprintf(r"%Ether.src% - %ARP.psrc%") for snd, rcv in answered]
def tcp_scan(dst_ip, stealth=False):
"""Scans TCP ports for availability."""
system_ports = {20: 'FTP',
21: 'FTP Control',
22: 'SSH',
23: 'Telnet',
25: 'SMPT',
53: 'DNS',
67: 'DHCP Server',
68: 'DHCP Client',
69: 'TFTP',
80: 'HTTP',
110: 'POP3',
119: 'NNTP',
139: 'NetBIOS',
143: 'IMAP',
389: 'LDAP',
443: 'HTTPS',
445: 'SMB',
465: 'SMTP',
569: 'MSN',
587: 'SMTP',
990: 'FTPS',
993: 'IMAP',
995: 'POP3'}
user_ports = {1080: 'SOCKS',
1194: 'OpenVPN',
3306: 'MySQL',
3389: 'RDP',
3689: 'DAAP',
5432: 'PostGreSQL',
5800: 'VNC',
5900: 'VNC',
6346: 'Grutella',
8080: 'HTTP'}
def tcp_default(dst_port, src_port):
"""Default TCP Scan."""
default_scan = sr1(IP(dst=dst_ip)/TCP(sport=src_port, dport=dst_port, flags="S"), timeout=TIME_OUT)
if default_scan is not None:
if(default_scan.getlayer(TCP).flags == 0x12):
send_rst = sr(IP(dst=dst_ip)/TCP(sport=src_port, dport=dst_port, flags="AR"), timeout=TIME_OUT)
return "Open"
return "Closed"
def tcp_stealth(dst_port, src_port):
"""Stealthy TCP Scan"""
stealth_scan = sr1(IP(dst=dst_ip)/TCP(sport=src_port, dport=dst_port, flags="S"), timeout=TIME_OUT)
if stealth_scan is not None:
if stealth_scan.getlayer(TCP).flags == 0x12:
send_rst = sr(IP(dst=dst_ip)/TCP(sport=src_port, dport=dst_port, flags="R"), timeout=TIME_OUT)
return "Open"
elif stealth_scan.getlayer(TCP).flags == 0x14:
return "Closed"
return "Filtered"
def log_ports(ports, stealth=False):
"""Logs status and info of ports."""
log_ports = []
for dst_port, nme_port in ports.iteritems():
src_port = RandShort()
if default:
stat = tcp_default(dst_port, src_port)
log_ports.append('[*] TCP default scan: dest_ip=%s port=%d, service=%s, status=%s' % (dst_ip, dst_port, nme_port, stat))
else:
stat = tcp_stealth(dst_port, src_port)
log_ports.append('[*] TCP stealth scan: dest_ip=%s port=%d, service=%s, status=%s' % (dst_ip, dst_port, nme_port, stat))
return log_ports
ports = []
ports += ['[!] User Ports']
ports += log_ports(user_ports, stealth)
ports += ['[!] System Ports']
ports += log_ports(system_ports, stealth)
return ports
def parse_arguments():
"""Arguments parser."""
parser = argparse.ArgumentParser(usage='%(prog)s [options] <subnet>',
description='port scanning tool @Ludisposed',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=
'''
Examples:
python port_scan.py "192.168.1.0/24" -s
python port_scan.py "192.168.1.0/24" --timeout 10
''')
parser.add_argument('-s', '--stealth', default=False, action="store_true", help='Stealthy TCP scan')
parser.add_argument('--timeout', type=int, help='Timeout parameter of scans')
if args.timeout is not None:
global TIME_OUT
TIME_OUT = args.timeout
return parser.parse_args()
if __name__ == '__main__':
args = argparse.ArgumentsParser()
conf.verb = 0
network = arp_ping(args.subnet)
for connection in network:
mac, ip = connection.split(' - ')
print '\n\n[!] Trying port scan of current connection with mac=%s and ip=%s' % (mac, ip)
for result in tcp_scan(ip, args.stealth):
print result
1 Answer 1
- How do system and user ports have anything to do with scanning? I'd remove that code from how you scan.
- I think
TIMEOUT
should be part of an instance of a class. This is as you may want to have two scanners with different timeouts. - Why force this to be Python 2 only?
print('Hello')
works in both Python 2 and 3. You're just making more effort in the long-run. If you Scapy updates to Python 3, or you migrate to something else that uses Python 3. printf
style formatting is somewhat recommended against.- You don't use
parse_arguments
. It doesn't make sense for
system_ports
to be constantly redefined, nor does it make sense fortcp_default
to be either. Closures are good, but you should note that each call to the function re-creates everything inside it.Raw information is better than a list of strings. Allow the user to format the information from the scanner however they wish. They may not even want formatting, and just want to know statuses to then perform certain commands.
- You don't follow the order outlined in PEP 8 for imports.
And so without changing your scapy
code, I'd use something like:
import argparse
from textwrap import dedent
from scapy.all import *
class ScannerStatus(object):
OPEN = "Open"
CLOSED = "Closed"
FILTERED = "Filtered"
class Scanner(object):
def __init__(self, timeout):
self.timeout = timeout
def arp_ping(self, subnet):
"""ARP Pings entire subnet returns found in subnet."""
conf.verb = 0
answered, unanswered = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=subnet), timeout=self.timeout, verbose=False, inter=0.1)
return [rcv.sprintf(r"%Ether.src% - %ARP.psrc%") for snd, rcv in answered]
def _tcp_default(self, dst_ip, dst_port, src_port):
"""Default TCP Scan."""
default_scan = sr1(IP(dst=dst_ip)/TCP(sport=src_port, dport=dst_port, flags="S"), timeout=self.timeout)
if default_scan is not None:
if default_scan.getlayer(TCP).flags == 0x12:
send_rst = sr(IP(dst=dst_ip)/TCP(sport=src_port, dport=dst_port, flags="AR"), timeout=self.timeout)
return ScannerStatus.OPEN
return ScannerStatus.CLOSED
def _tcp_stealth(self, dst_ip, dst_port, src_port):
"""Stealthy TCP Scan"""
stealth_scan = sr1(IP(dst=dst_ip)/TCP(sport=src_port, dport=dst_port, flags="S"), timeout=self.timeout)
if stealth_scan is not None:
if stealth_scan.getlayer(TCP).flags == 0x12:
send_rst = sr(IP(dst=dst_ip)/TCP(sport=src_port, dport=dst_port, flags="R"), timeout=self.timeout)
return ScannerStatus.OPEN
elif stealth_scan.getlayer(TCP).flags == 0x14:
return ScannerStatus.CLOSED
return ScannerStatus.FILTERED
def tcp(self, dst_ip, dst_port, stealth=False):
src_port = RandShort()
fn = self._tcp_stealth if stealth else self._tcp_default
return fn(dst_ip, dst_port, src_port)
def tcp_scan(self, dst_ip, ports, stealth=False):
"""Scans TCP ports for availability."""
for dst_port in ports:
yield port, self.tcp(dst_ip, dst_port, stealth)
def parse_arguments():
"""Arguments parser."""
parser = argparse.ArgumentParser(usage='%(prog)s [options] <subnet>',
description='port scanning tool @Ludisposed',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent('''\
Examples:
python port_scan.py "192.168.1.0/24" -s
python port_scan.py "192.168.1.0/24" --timeout 10'''))
parser.add_argument('-s', '--stealth', default=False, action="store_true", help='Stealthy TCP scan')
parser.add_argument('--timeout', type=int, default=2, help='Timeout parameter of scans')
parser.add_argument('subnet', type=str, help='Subnet in from of [ip/bitmask]')
return parser.parse_args()
if __name__ == '__main__':
args = parse_arguments()
scanner = Scanner(args.timeout)
system_ports = {20: 'FTP',
21: 'FTP Control',
22: 'SSH',
23: 'Telnet',
25: 'SMPT',
53: 'DNS',
67: 'DHCP Server',
68: 'DHCP Client',
69: 'TFTP',
80: 'HTTP',
110: 'POP3',
119: 'NNTP',
139: 'NetBIOS',
143: 'IMAP',
389: 'LDAP',
443: 'HTTPS',
445: 'SMB',
465: 'SMTP',
569: 'MSN',
587: 'SMTP',
990: 'FTPS',
993: 'IMAP',
995: 'POP3'}
user_ports = {1080: 'SOCKS',
1194: 'OpenVPN',
3306: 'MySQL',
3389: 'RDP',
3689: 'DAAP',
5432: 'PostGreSQL',
5800: 'VNC',
5900: 'VNC',
6346: 'Grutella',
8080: 'HTTP'}
conf.verb = 0
network = scanner.arp_ping(args.subnet)
scan_type = 'stealth' if args.stealth else 'default'
for connection in network:
mac, ip = connection.split(' - ')
print('\n\n[!] Trying port scan of current connection with mac={} and ip={}'.format(mac, ip))
print('[!] User Ports')
for port, status in scanner.tcp_scan(ip, user_ports.keys(), args.stealth):
print('[*] TCP {} scan: dest_ip={} port={}, service={}, status={}'
.format(scan_type, ip, port, user_ports[port], status))
print('[!] System Ports')
for port, status in scanner.tcp_scan(ip, system_ports.keys(), args.stealth):
print('[*] TCP {} scan: dest_ip={} port={}, service={}, status={}'
.format(scan_type, ip, port, system_ports[port], status))
-
\$\begingroup\$ "Why force this to be Python 2?" I guess habits die slowly, keep making this mistake over and over. About the
pars_arguments
, I think this is my mistake, withcopy/paste
. Good review overalll, I agree on all points. \$\endgroup\$Ludisposed– Ludisposed2017年10月12日 10:23:10 +00:00Commented Oct 12, 2017 at 10:23 -
\$\begingroup\$ I love the way this scales, I can easily implement more
scanner
functions! \$\endgroup\$Ludisposed– Ludisposed2017年10月12日 10:30:17 +00:00Commented Oct 12, 2017 at 10:30 -
1\$\begingroup\$ @Ludisposed To be fair, making code work in both Python 2 and 3 is a hard skill, :) That's good, I look forward to rev 2 of this question then, :) \$\endgroup\$2017年10月12日 10:45:41 +00:00Commented Oct 12, 2017 at 10:45
-
1\$\begingroup\$ Finally got the time to play around with your code, it works perfectly! I had an error though, but easily fixed. Would it be fine, if I'd edit your code to fix the error? Or is it unnecessary? \$\endgroup\$Ludisposed– Ludisposed2017年10月12日 18:51:27 +00:00Commented Oct 12, 2017 at 18:51
-
1\$\begingroup\$ @Ludisposed unnecessary, but feel free to if you wish, :) \$\endgroup\$2017年10月12日 18:55:33 +00:00Commented Oct 12, 2017 at 18:55
/etc/services
? \$\endgroup\$/etc/services
with dictionary literals. \$\endgroup\$