I have a computer that roams, but needs to have an FQDN attached to it for certain reasons such as email system integrations or testing email via a testing SMTP server that requires valid FQDNs and HELOs.
My domain(s) are all on CloudFlare, so I wrote an adapted version of another script I had to wrap around CloudFlare's API so I can update DNS entries and such.
This has a few requirements from PyPI:
ipaddress
- CloudFlare Python wrapper (
cloudflare
on PyPI)
This script also has two other requirements to really function, but I can guarantee you that both of these components work:
- WhatIsMyIP.com API key for IP lookup capabilities
- CloudFlare Account with API key
Note that any sensitive information (such as login credentials or API keys) have been obfuscated in the below code. Additional bits can be provided as needed.
(There is a known limitation that this does not work for IPv6 addresses - I'm working on adding this, but this current iteration of the script does not have IPv6 in it.)
Critiques to improve the script are welcome, but keep in mind that I abide by PEP8 about linelengths <= 120 chars long because this is permissible if users on a team/development group agree on the longer length.
#!/usr/bin/python3
import CloudFlare
import ipaddress
import json
import shlex
import subprocess as sp
import syslog
import urllib.error
import urllib.request
from typing import AnyStr, Optional
# Used for `dig` queries because we are using CloudFlare, and we need actual DNS results, not CF results for checking
# the existence of an IP address currently. Therefore, we use Google DNS here.
DNS_NAMESERVER = "8.8.8.8"
# ZONE = root domain
# DOMAIN = hostname within root domain.
ZONE = "domain.tld"
DOMAIN = "subdomain"
# These next two are for WHATISMYIP - API Endpoints.
WHATISMYIP = "https://api.whatismyip.com/ip.php?key={key}&output=json"
API_KEYS = ['WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWW', 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', 'YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY',
'ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ']
# Wrapper function around syslog to allow default priority of INFO, but
# has the ability to change the priority if wished for a given message.
def _syslog(message: AnyStr, priority: int = syslog.LOG_INFO) -> None:
syslog.syslog(priority, message)
# Horribly unnecessary wrapper function around `exit` which calls the
# "Process Ended" log message, and then actually exists with the given
# exit code (by default it exits on 0 - the "Success" exit code)
def _exit(code=0):
# type: (int) -> None
_syslog("DDNS Update Process Ended.")
exit(code)
# Singular Error handler for EmptyDNSResponse (could probably be a bare
# LookupError, but EmptyDNSResponse is nicer...)
class EmptyDNSResponse(LookupError):
pass # No changes from base LookupError
# Get current public IP address with WhatIsMyIP.com API
def _get_current_ip_address():
# type: () -> Optional[AnyStr]
for key in API_KEYS:
_syslog("Attempting lookup with API key {key}...".format(key=key))
try:
with urllib.request.urlopen(WHATISMYIP.format(key=key)) as req:
data = json.loads(req.read().decode("UTF-8"))
ipaddr = data['ip_address'][1]['result']
except (urllib.error.URLError, urllib.error.HTTPError):
_syslog("Could not look up public IP address, aborting update process.")
_exit(1)
try:
# noinspection PyUnboundLocalVariable
ipaddress.ip_address(ipaddr)
except ValueError:
if data is '0':
_syslog("API key was not entered for lookup, this is a programming error.", syslog.LOG_CRIT)
_exit(5)
if data in ['1', '2']:
_syslog("API Key Invalid or Inactive, attempting to try other keys...", syslog.LOG_WARNING)
if data is '3':
_syslog("API key lookup threshold reached, skipping to next API key...", syslog.LOG_WARNING)
if data in ['4', '5']:
_syslog("Query is bad, it needs 'input', which we can't do. This is a critical issue.",
syslog.LOG_CRIT)
_exit(6)
if data is '6':
_syslog("There was an unknown error with the WhatIsMyIP API, contact their support.",
syslog.LOG_CRIT)
_exit(7)
continue # Try next API key
return data
# Check if the DNS entry for a given hostname differs from current IP,
# and if it has no A record or it differs, return "True". Otherwise,
# return False, and assume the IP address doesn't differ.
def _dns_ip_address_status(host: AnyStr, curip: Optional[AnyStr] = None) -> AnyStr:
if not curip:
raise RuntimeError("Empty IP!")
dnsip = ""
try:
dnsip = sp.check_output(
shlex.split('dig +short @{nameserver} A {hostname}'.format(nameserver=DNS_NAMESERVER, hostname=host))
).decode('utf-8').strip()
if dnsip == '':
_syslog('Current IP record for \'{hostname}\': [NXDOMAIN]'.format(hostname=host), syslog.LOG_INFO)
raise EmptyDNSResponse
else:
_syslog('Current IP record for \'{hostname}\': {record}'.format(hostname=host, record=dnsip))
except sp.CalledProcessError as err:
syslog.syslog(syslog.LOG_CRIT, 'Subprocess error when calling `dig`, exiting.')
print("Subprocess error when calling dig: {}".format(err))
_exit(2) # Exit on code 10: Can't continue if subprocess isn't working...
except EmptyDNSResponse:
syslog.syslog(syslog.LOG_INFO, "Empty DNS response, assuming that entry doesn't exist.")
# Assume that the IP address differs or doesn't exist.
return "NXDOMAIN"
if dnsip == curip:
return "UPTODATE"
else:
return "NEEDSUPDATED"
# CloudFlare has different functions for Add and Change. Determine if we exist first.
def _update_cloudflare(cf: CloudFlare.CloudFlare, domain: AnyStr = ZONE, hostname: AnyStr = DOMAIN):
# Validate that zone exists first.
zone_id = None
try:
zone = cf.zones.get(params={'name': domain})
if len(zone) < 1:
raise LookupError
else:
zone_id = zone[0]['id']
except LookupError:
syslog.syslog(syslog.LOG_ERR, "No valid zone data on CloudFlare, root domain zone might not exist.")
_exit(3)
curip = _get_current_ip_address()
if not curip:
syslog.syslog(syslog.LOG_ERR, "Could not find valid current IP address, aborting update process.")
_exit(2)
fqdn = hostname + '.' + domain
ip_status = _dns_ip_address_status(host=fqdn, curip=curip)
if ip_status == "NXDOMAIN":
# Add new record: POST
cf.zones.dns_records.post(zone_id, data={'name': hostname, 'type': 'A', 'content': curip,
'proxiable': False, 'proxied': False})
elif ip_status == "NEEDSUPDATED":
dns_records = cf.zones.dns_records.get(zone_id, params={'name': fqdn})
if len(dns_records) != 1:
syslog.syslog(syslog.LOG_ERR,
"Invalid number of records returned, this might be a CF DNS records issue, check it.")
_exit(4)
dns_record_id = dns_records[0]['id']
cf.zones.dns_records.delete(zone_id, dns_record_id)
cf.zones.dns_records.post(zone_id, data={'name': hostname, 'type': 'A', 'content': curip,
'proxiable': False, 'proxied': False})
elif ip_status == "UPTODATE":
syslog.syslog(syslog.LOG_INFO, "DNS record for {} does not need adjusted.".format(fqdn))
pass
def execute():
syslog.openlog(ident='py-ddns-ipupdate', logoption=syslog.LOG_PID, facility=syslog.LOG_DAEMON)
_syslog("DDNS Update Process Started.")
# Test if Internet is up by reaching to Google.
try:
req = urllib.request.urlopen('https://google.com', timeout=5)
req.close()
except urllib.error.URLError:
_syslog("No Internet connection available, aborting update process.")
_exit(1)
# Get current public IP
ip = _get_current_ip_address()
if '.' not in ip and ':' not in ip:
_syslog("Unexpected response from WhatIsMyIP.com API: {response}".format(response=ip))
_exit(1)
else:
_syslog("Current Public IP: {ip}".format(ip=ip))
_update_cloudflare(CloudFlare.CloudFlare(email='[email protected]',
token='CloudFlareAPITokenKey',
debug=False))
_exit(0)
if __name__ == "__main__":
execute()
2 Answers 2
Docstrings
You should really consider switching to Docstrings instead of using your current documentation method. (Note that using Docstrings will give the function a .__doc__
property which can be used for a variety including documentation generation). So for instance:
# Wrapper function around syslog to allow default priority of INFO, but
# has the ability to change the priority if wished for a given message.
def _syslog(message: AnyStr, priority: int = syslog.LOG_INFO) -> None:
syslog.syslog(priority, message)
Would (well, almost) become:
def _syslog(message: AnyStr, priority: int = syslog.LOG_INFO) -> None:
"""
Wrapper function around syslog to allow default priority of INFO, but
has the ability to change the priority if wished for a given message.
"""
syslog.syslog(priority, message)
However, there are couple remarks to make:
- The Docstring comment I have linked tends to have the summary on the same line as the
"""
, but many conventions disregard this. - You should probably mention the arguments and return values (when applicable of course.)
Wrap it in a class?
Admittedly, I don't know too much about the domain you are working with, and similarly, I don't really like promoting OOP since I find often too overused, but here is my rational:
- You have a lot of globals like
DNS_NAMESERVER
,ZONE
,DOMAIN
etc, these could be given default values in your class and made private variables. (On the other hand, you might actually want these constant, in which case ignore this.) - A lot of your functions have default values which could be instead omitted and placed as class variables.
On the other hand, I may not know enough about what you're doing. If you disagree with this assessment, just look at the first critique.
-
\$\begingroup\$ Class Wrap > 1: These are constants, not a class. This is just a script which I utilize for my dynamic DNS updates to CLoudFlare on a cronjob. Eventually there will be more than one hostname being updated, at which case #2 will have non-default inputs and I'll revise the script. I do agree with the docstring notes though, I'll work on that. \$\endgroup\$Thomas Ward– Thomas Ward2018年12月28日 00:49:44 +00:00Commented Dec 28, 2018 at 0:49
# Horribly unnecessary wrapper
You're right. Don't write your own exit
. Since exit
itself generates an exception to terminate the program, simply put your _syslog
call in a finally
at the top level.
with urllib.request.urlopen
Unless you have a really good (and obscure) reason, never use urllib
. Use requests
. It's saner in every way.
if data in ['1', '2']:
Technically, since you're testing membership, make this a set:
if data in {'1', '2'}:
As for this function documentation:
# Check if the DNS entry for a given hostname differs from current IP,
# and if it has no A record or it differs, return "True". Otherwise,
# return False, and assume the IP address doesn't differ.
Fine... but this doesn't do what you say it does. It returns strings, not booleans. I'd offer that neither is appropriate, and that you should be returning an enum instead.
if len(zone) < 1:
raise LookupError
else:
zone_id = zone[0]['id']
Get rid of the else
; you've previously raised.
-
\$\begingroup\$ Agreed with these suggestions, and I'll work on improving. The headache I ran into with requests was this was originally written on an OLD Python3 which didn't have
requests
available. I haven't updated it to 3.7 standards yet... or at least, theurllib
bits yet. \$\endgroup\$Thomas Ward– Thomas Ward2018年12月28日 00:50:38 +00:00Commented Dec 28, 2018 at 0:50
Explore related questions
See similar questions with these tags.