3
\$\begingroup\$

I have written this backup utility to keep incremental backups by copying new and modified files, and hard linking unchanged or simply moved files. In an attempt to speed up the comparisons, I save a record of the file stats from the previous backup to avoid iterating over the old backup directory. The backup is called from the command line passing the destination folder, followed by the folder to be backed-up. Configuration options are taken from text files in the same folder as the destination of the backup. I have done some amount of testing for all my "#TODO's", but not enough yet to feel confident it's particularly robust (particularly not on other OS's than Windows 10). No backup pruning is performed or intended as of yet. A good place to start is by calling the help from the command line: >python backup_utility.py -h

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Mar 11 13:20:15 2022
@author: Aaron Thompson
@license: CC BY 3.0
@license-url: https://creativecommons.org/licenses/by/3.0/
"""
# main imports
import argparse
from collections.abc import Iterable
from datetime import datetime
from inspect import cleandoc
import logging
from logging.handlers import MemoryHandler, RotatingFileHandler
import os
from os import stat_result
from pathlib import Path
import pickle
import re
import shutil
import stat
import sys
__version__ = "2022年05月03日"
# TODO test logging cases
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
# handler for buffering logging messages before log file is defined
memory_handler = MemoryHandler(1e6)
logger.addHandler(memory_handler)
# DEFAULT OPTIONS
options_template = cleandoc(r"""
 #Backup job options
 #
 #lines starting with "#" are ignored
 #lines of the form "key = value" are added to the options dictionary
 #backup folder naming convention based on python datetime formatting
 #https://docs.python.org/3/library/datetime.html
 format = {format}
 #skip backup if no files are changed? True, False
 skip = {skip}
 #follow symbolic links?
 symlinks = {symlinks}
 #file operation error behavior: [Ignore, Warn, Fail]
 errors = {errors}
 #log file location (leaving this empty disables logging to file)
 logfile =
 #log file verbosity: [DEBUG, INFO, WARNING, ERROR, CRITICAL]
 loglevel = INFO
""")
default_options = {"format": "%Y%m%d-%H%M%S",
 "skip": "True",
 "symlinks": "True",
 "errors": "Warn",
 "logfile": "",
 "loglevel": "INFO"}
# DEFAULT FILTERS
filter_default = cleandoc(r"""
 #Backup file/folder configuration:
 # blacklist file includes filters for files/folders to be skipped
 # whitelist file includes filters for files/folders which should
 # be included, overriding the blacklist.
 #
 # Blank lines and lines starting with "#" are skipped
 # One filter per line: exact file or folder matches
 # Lines starting with ^ are python style regex filters
 #
 # Example: filter a specific file
 # C:\Users\uname\Documents\temporary.txt
 # Example: filter an entire folder (and subfolders)
 # C:\Users\uname\AppData\
 # Example: regex filter for selecting .log files from a project folder
 # ^C:\\Users\\uname\\project\\*\.log$
""")
# TODO test robustness
def get_config(dest: Path) -> tuple[dict[str, str], list[str], list[str]]:
 op = (dest / "BackupOptions.txt")
 wl = (dest / "Whitelist.txt")
 bl = (dest / "Blacklist.txt")
 if op.exists() and op.is_file():
 logger.debug("reading config")
 with open(op) as f:
 options = list(f)
 options = [s.strip() for s in options] # strip whitespace
 options = [s for s in options if s and not s.startswith("#")] # strip empty and comments
 options = {line.split("=")[0].strip(): line.split("=")[1].strip() for line in options if '=' in line}
 for option in default_options:
 if option not in options:
 logger.warning(f"option:{option} missing from BackupOptions.txt: using default: {default_options[option]}")
 options[option] = default_options[option]
 # setup logger file handler options here and append buffered logs
 if options['logfile']:
 logger.debug("setting up rotating log file handler")
 # TODO make log file size and number of logs configurable? or default is good enough for anyone?
 file_handler = RotatingFileHandler(options['logfile'], maxBytes=2**20, backupCount=10)
 try:
 level = {"DEBUG": logging.DEBUG,
 "INFO": logging.INFO,
 "WARNING": logging.WARNING,
 "ERROR": logging.ERROR,
 "CRITICAL": logging.CRITICAL}[options["loglevel"]]
 except KeyError:
 logger.warning(f"{options['loglevel']} is not a valid 'loglevel': defaulting to INFO")
 level = logging.INFO
 file_handler.setLevel(level)
 file_handler.addFilter(lambda record: record.levelno >= level)
 file_handler.setFormatter(formatter)
 logger.debug("swapping out memory handler for file handler")
 logger.addHandler(file_handler)
 logger.removeHandler(memory_handler)
 memory_handler.setTarget(file_handler)
 memory_handler.flush()
 logger.debug(f"config={options}")
 else:
 logger.info("creating default config file")
 with open(op, "w") as f:
 f.write(options_template.format(**default_options))
 return get_config(dest) # recursing is easier so default config can just be `options_default`
 if wl.exists() and wl.is_file():
 logger.debug("reading whitelist")
 with open(wl) as f:
 whitelist = list(f)
 whitelist = [s.strip() for s in whitelist] # strip whitespace
 whitelist = [s for s in whitelist if s and not s.startswith("#")] # strip empty and comments
 else:
 logger.info("creating default whitelist file")
 with open(wl, "w") as f:
 f.write(filter_default)
 whitelist = []
 if bl.exists() and bl.is_file():
 logger.debug("reading blacklist")
 with open(bl) as f:
 blacklist = list(f)
 blacklist = [s.strip() for s in blacklist] # strip whitespace
 blacklist = [s for s in blacklist if s and not s.startswith("#")] # strip empty and comments
 else:
 logger.info("creating default blacklist file")
 with open(bl, "w") as f:
 f.write(filter_default)
 blacklist = []
 return options, whitelist, blacklist
def match_filter(file: str, pattern: str, src: Path) -> bool:
 if pattern.startswith("$"):
 return bool(re.match(pattern, file))
 file = Path(file)
 pattern = Path(pattern)
 if not pattern.is_absolute(): # assume relative to src
 pattern = src / pattern
 if pattern.exists():
 if pattern.is_dir():
 return file.is_relative_to(pattern)
 elif pattern.is_file():
 return pattern.samefile(file)
 else:
 return False
# TODO Test file filtering
def filter_files(files: dict[str, stat_result],
 src: Path,
 blacklist: Iterable[str],
 whitelist: Iterable[str]) -> dict[str, stat_result]:
 names = set(files.keys())
 filtered = {}
 for file in names:
 if not any(match_filter(file, pattern, src) for pattern in blacklist):
 filtered[file] = files[file]
 else:
 logger.debug(f"blacklisted: {file}")
 for file in names:
 if any(match_filter(file, pattern, src) for pattern in whitelist):
 filtered[file] = files[file]
 logger.debug(f"whitelisted: {file}")
 return filtered
# TODO testing robustness
def get_prior_backup(dest: Path, format: str) -> tuple[dict[str, stat_result], Path]:
 most_recent_dt = None
 most_recent_dir = None
 most_recent_stats = None
 dt = datetime(1970, 1, 1)
 for path in dest.iterdir():
 # only look at folders of the correct name format
 if not path.is_dir():
 continue
 # stats file must also exist
 stats_file = (path.parent / (path.name + ".stats"))
 if not stats_file.is_file():
 continue
 try:
 dt = datetime.strptime(path.name, format)
 except ValueError:
 pass
 else:
 if not most_recent_dt:
 most_recent_dt = dt
 most_recent_stats = stats_file
 most_recent_dir = path
 else:
 if dt > most_recent_dt:
 most_recent_dt = dt
 most_recent_stats = stats_file
 most_recent_dir = path
 if most_recent_stats is not None:
 logger.debug(f"opening prior backup stats: {most_recent_stats}")
 with open(most_recent_stats, 'rb') as f:
 return pickle.load(f), most_recent_dir
 else:
 return {}, Path()
def compare_stat_result(a: stat_result, b: stat_result) -> bool: # ignore things like access time and metadata change time
 return all([
 a.st_ino == b.st_ino,
 a.st_dev == b.st_dev,
 a.st_mtime == b.st_mtime
 ])
# TODO testing accuracy and robustness (multiarch)
def compare_stats(new: dict[str, stat_result], old: dict[str, stat_result]) -> tuple[bool, list[str], list[str], list[str]]:
 is_modified = False # is there any change at all from the old backup
 dirs = [] # create all (src) #dirs can't be linked so just copy all
 do_link = [] # (src, dst) #for unchanged and moved files
 do_copy = [] # (src) #dst is always same as src #for new and modified files
 # reverse mapping to find renamed (moved) files
 old_names_by_ino = {}
 for k, v in old.items():
 if v.st_ino in old_names_by_ino:
 old_names_by_ino[v.st_ino].append(k)
 else:
 old_names_by_ino[v.st_ino] = [k]
 # walk the new items
 for k, v in new.items():
 if stat.S_ISDIR(v.st_mode):
 dirs.append(k)
 elif v.st_ino in old_names_by_ino: # inode existed previously
 if compare_stat_result(old[old_names_by_ino[v.st_ino][0]], v): # stat unchanged (unmodified)
 if k in old_names_by_ino[v.st_ino]: # name unchanged
 do_link.append((k, k)) # (src, dst)
 else: # name changed (moved)
 do_link.append((old_names_by_ino[v.st_ino][0], k)) # (src, dst)
 is_modified = True
 else: # file modified (stat changed)
 do_copy.append(k)
 is_modified = True
 else: # inode did not previously exist (new file)
 do_copy.append(k)
 is_modified = True
 return (is_modified, dirs, do_link, do_copy)
def do_backup(src: Path, dest: Path) -> None:
 logger.info("Starting backup")
 logger.debug("ensuring destination path exists")
 if not dest.is_dir():
 logger.critical("destination path given is not a vaild directory")
 raise RuntimeError
 options, whitelist, blacklist = get_config(dest)
 follow_symlinks = options["symlinks"].lower() in ("true", "yes", "y")
 def handle_error(e: Exception) -> None:
 if options['errors'].lower() == "ignore":
 pass
 elif options['errors'].lower() == "warn":
 logger.exception(e, exc_info=True)
 elif options['errors'].lower() == "fail":
 logger.critical(e, exc_info=True)
 raise e
 logger.debug("walking source directory")
 # get target dir stats
 target_stats = {}
 # XXX better file stats scan that recursive glob?
 # qwery journal for file modifications?
 # options to throttle file operations to prevent system slowdown with disk usage?
 # os.walk is not faster.
 # os.scandir produces dict_result without needed stats,
 # requiring extra stat() call anyway. Not faster.
 for i in src.rglob('*'):
 try:
 if follow_symlinks:
 target_stats[str(i)] = i.stat()
 else:
 target_stats[str(i)] = i.lstat()
 except Exception as e:
 handle_error(e)
 logger.debug("filtering target files")
 # filter stats
 new_stats = filter_files(target_stats, src, blacklist, whitelist)
 # don't try to backup recursively # TODO test this
 for file in new_stats.keys():
 if Path(file).is_relative_to(dest):
 raise Exception(f"Backed up files cannot contain backup destination\n\tsrc:{file}\n\tdst:{dest}")
 # convert absolute to relative path for processing
 new_stats = {str(Path(k).relative_to(src)): v for k, v in new_stats.items()}
 logger.debug("comparing source directory to old backups")
 # get old backup
 old_stats, old_backup = get_prior_backup(dest, options["format"])
 # compare old - new
 is_modified, dirs, do_link, do_copy = compare_stats(new_stats, old_stats)
 # optionally skip this backup
 if options["skip"].lower() in ("true", "yes", "y") and not is_modified:
 logger.info("Skipping backup: directory is unchanged")
 return # did_backup=False
 # new folder
 this_backup = (dest / datetime.now().strftime(options['format']))
 this_backup.mkdir(parents=True, exist_ok=False)
 logger.info(f"Creating new backup: {this_backup}")
 logger.debug("creating dir structure")
 # build the structure
 for d in dirs:
 (this_backup / d).mkdir(parents=True, exist_ok=True)
 # copy files
 for i in sorted(do_copy): # sorted() makes finding a specific file in debug output easier
 logger.debug(f"copying {i}")
 try:
 shutil.copy2(src / i, this_backup / i, follow_symlinks=follow_symlinks)
 except Exception as e:
 handle_error(e)
 del new_stats[i] # delete from stats to indicate file is not present in this backup
 for s, d in sorted(do_link):
 logger.debug(f"linking {d}")
 try:
 os.link(old_backup / s, this_backup / d, follow_symlinks=follow_symlinks)
 except Exception as e:
 handle_error(e)
 del new_stats[d] # delete from stats to indicate file is not present in this backup
 logger.debug("writing backup stats")
 with open(this_backup.parent / (this_backup.name + ".stats"), "wb") as f:
 pickle.dump(new_stats, f)
 logger.info("Backup complete")
 return # did_backup=True
def main():
 parser = argparse.ArgumentParser(description=f"A single-file zero-dependency python backup utility. version: {__version__}")
 parser.add_argument('Destination', type=Path, help="Destination for backup files including backup config files")
 parser.add_argument('Source', nargs="?", type=Path, help="Path to directory which will be backed up. Omit this to generate default config files in the destination directory without performing a backup.")
 group = parser.add_mutually_exclusive_group()
 group.add_argument('-v', '--verbose', action="store_true", help="set console logging verbosity to DEBUG")
 group.add_argument('-q', '--quiet', action="store_true", help="set console logging verbosity to ERROR")
 args = parser.parse_args()
 if args.quiet:
 console_handler.setLevel(logging.ERROR)
 elif args.verbose:
 console_handler.setLevel(logging.DEBUG)
 logger.info("backup_utility.main")
 logger.debug(f"got args: {args}")
 if args.Source is None:
 logger.info("no backup source given: ensuring config files exist in destination directory.")
 get_config(args.Destination)
 else:
 do_backup(args.Source, args.Destination)
if __name__ == "__main__":
 main()
Reinderien
70.9k5 gold badges76 silver badges256 bronze badges
asked May 3, 2022 at 18:27
\$\endgroup\$

2 Answers 2

1
\$\begingroup\$

The overall code reads well, however there are some issues.

Missing module docstring

Your module has a docstring, but it only conveys its author and license, not its purpose. It should contain something along the line of your question's title.

Long functions

The function get_config() is currently undocumented and quite long. Also the return type hint does not help anybody, without scrolling to the end of that mega function where the purpose of its return values can be inferred from the variable names. I suggest you use a config object, such as a NamedTuple to contain the relevant configuration and return that from get_config() -> Configuration. Also consider building the - politically correctly called - allow and deny lists in separate functions. The fact that you did not include those in the config object in the first place suggest, that they are not related to it anyway.

Use return early

IMO it makes for easier reading of the code. E.g. consider converting this:

 if pattern.exists():
 if pattern.is_dir():
 return file.is_relative_to(pattern)
 elif pattern.is_file():
 return pattern.samefile(file)
 else:
 return False

into this:

 if pattern.is_dir():
 return file.is_relative_to(pattern)
 if pattern.is_file():
 return pattern.samefile(file)
 return False

Also note that your current implementation of above function may implicitly return None in the case that a file exists but is neither a directory nor a regular file (but e.g. a block device). That gap would be apparent when using the return-early pattern. Also note that the check for pattern.exists() is redundant, since it is implicitly done by is_file() and is_dir() respectively (see the docs).

answered May 4, 2022 at 7:20
\$\endgroup\$
1
\$\begingroup\$

This code is not really ready for review, based on the inline comments you include. Bring us a finished piece of code please, not one that in progress.

I have quickly skimmed the code without understanding it, the below review is based on only that.

You say I have done some amount of testing for all my "#TODO's", but not enough yet to feel confident it's particularly robust . My suggestion based on this is, add automated tests.

Add docstrings explaining what each function does or returns. Rename the functions.

My impression is that the code as written is meant for you alone. If not, add a simple explanation of what the program does and how to use it for users. Realistically most people write backup programs for themselves only, and that's okay.

Deal with corner cases:

  • Read up on case-sensitivity in Windows, Linux, and OS X if you haven't.
  • Think about what happens if the config files exist in the source if you haven't.
  • Think about what happens if you're copying from one filesystem to another if you haven't (I saw a mention of inodes, which are only unique within a filesystem).
answered May 3, 2022 at 19:53
\$\endgroup\$
3
  • \$\begingroup\$ Case-sensitivity: That's a good thought that I hadn't really considered! Regarding having default config in source: Is that a bad thing? I was trying to keep it to one file for simplicity. Rregarding copying from one fs to another: Do you mean if Src, and Dst are not the same FS? If so, I only ever store / compare against the source directory. stat() is never called in the destination directory. \$\endgroup\$ Commented May 3, 2022 at 21:08
  • \$\begingroup\$ By "source" I mean "the source copy directory". You try to read dest/config.ini, but you would also copy src/config.init to dest/config.ini. Which one wins? Is that what you want? \$\endgroup\$ Commented May 5, 2022 at 21:37
  • \$\begingroup\$ I see now what you're getting at, but this cannot happen given how the backup is performed \$\endgroup\$ Commented May 11, 2022 at 0:22

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.