The full repo is here, but I'll post the most important extracts below.
Background
I'm trying to get the hang of blockchain and public key cryptography, so I thought it would be fun to create a program for drawing up and promulgating government decrees - the government in question being one that I've just made up.
The system is intended to work like this:
- A government official in possession of the necessary passwords draws up a decree, stamps it with a digital stamp, and then adds it the register.
- A private citizen then encounters decrees while going about his business:
- If he encounters a decree in isolation, he can judge its authenticity using its stamp and the public key.
- If he has access to the register, he can check that the government hasn't tried to hide an earlier decree by checking the chain of hashes.
The Code
This is the code in which I make and verify my stamps, the certificates which authenticate decrees:
### This code defines two classes: one of which produces a digital stamp for
### documents issued by the Chancellor of Cyprus, and the other of which
### verfies the same.
# Imports.
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from pathlib import Path
import datetime, getpass, os
# Local constants.
path_to_digistamp = str(Path.home())+"/chancery-b/digistamp/"
path_to_private_key = path_to_digistamp+"stamp_private_key.pem"
path_to_public_key = path_to_digistamp+"stamp_public_key.pem"
public_exponent = 65537
key_size = 2048
encoding = "utf-8"
################
# MAIN CLASSES #
################
# A class which produces a string which testifies as to the authenticity of
# a given document.
class Stamp_Machine:
def __init__(self, data):
if os.path.exists(path_to_private_key) == False:
raise Exception("No private key on disk.")
self.private_key = load_private_key()
self.data = data
# Ronseal.
def make_stamp(self):
data_bytes = bytes(self.data, encoding)
result_bytes = self.private_key.sign(
data_bytes,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
result = result_bytes.hex()
return result
# A class which allows the user to verify a stamp produced as above.
class Verifier:
def __init__(self, stamp, data):
if isinstance(stamp, str) == False:
raise Exception("")
self.stamp_str = stamp
self.stamp_bytes = bytes.fromhex(stamp)
self.public_key = load_public_key()
self.data = data
# Decide whether the stamp in question is authentic or not.
def verify(self):
data_bytes = bytes(self.data, encoding)
try:
self.public_key.verify(
self.stamp_bytes,
data_bytes,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
except InvalidSignature:
return False
else:
return True
####################
# HELPER FUNCTIONS #
####################
# Get a password from the user, and convert it into bytes.
def get_bytes_password():
password = getpass.getpass(prompt="Digistamp password: ")
result = bytes(password, encoding)
return result
# Get a NEW password from the user, and convert it into bytes.
def get_bytes_password_new():
password = getpass.getpass(prompt="Digistamp password: ")
password_ = getpass.getpass(prompt="Confirm password: ")
if password != password_:
raise Exception("Passwords do not match.")
result = bytes(password, encoding)
return result
# Generate a public key from a private key object.
def generate_public_key(private_key):
public_key = private_key.public_key()
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
f = open(path_to_public_key, "wb")
f.write(pem)
f.close()
# Generate a new private and public key.
def generate_keys():
if os.path.exists(path_to_private_key):
raise Exception("Private key file already exists.")
bpw = get_bytes_password_new()
private_key = rsa.generate_private_key(public_exponent=public_exponent,
key_size=key_size,
backend=default_backend())
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=
serialization.BestAvailableEncryption(bpw))
f = open(path_to_private_key, "wb")
f.write(pem)
f.close()
generate_public_key(private_key)
# Load the private key from its file.
def load_private_key():
bpw = get_bytes_password()
key_file = open(path_to_private_key, "rb")
result = serialization.load_pem_private_key(key_file.read(),
password=bpw,
backend=default_backend())
return result
# Load the public key from its file.
def load_public_key():
key_file = open(path_to_public_key, "rb")
result = serialization.load_pem_public_key(key_file.read(),
backend=default_backend())
return result
###########
# TESTING #
###########
# Run the unit tests.
def test():
stamp = Stamp_Machine("123").make_stamp()
assert(Verifier(stamp, "123").verify())
assert(Verifier(stamp, "abc").verify() == False)
print("Tests passed!")
###################
# RUN AND WRAP UP #
###################
def run():
test()
if __name__ == "__main__":
run()
This is the code in which a decree-in-the-making is uploaded to the register:
### This code defines a class, which uploads a record to the ledger.
# Imports.
import datetime, hashlib, os, sqlite3
# Local imports.
from digistamp.digistamp import Stamp_Machine
import ordinance_inputs
# Constants.
encoding = "utf-8"
##############
# MAIN CLASS #
##############
# The class in question.
class Uploader:
def __init__(self):
self.connection = None
self.c = None
self.block = Block_of_Ledger()
# Ronseal.
def make_connection(self):
self.connection = sqlite3.connect("ledger.db")
self.connection.row_factory = dict_factory
self.c = self.connection.cursor()
# Ronseal.
def close_connection(self):
self.connection.close()
# Add the ordinal and the previous block's hash to the block.
def add_ordinal_and_prev(self):
self.make_connection()
query = "SELECT * FROM Block ORDER BY ordinal DESC;"
self.c.execute(query)
result = self.c.fetchone()
self.close_connection()
if result is None:
self.block.set_ordinal(1)
self.block.set_prev("genesis")
else:
self.block.set_ordinal(result["ordinal"]+1)
self.block.set_prev(result["hash"])
# Add the hash to the present block.
def add_hash(self):
m = hashlib.sha256()
m.update(bytes(self.block.ordinal))
m.update(bytes(self.block.ordinance_type, encoding))
m.update(bytes(self.block.latex, encoding))
m.update(bytes(self.block.year))
m.update(bytes(self.block.month))
m.update(bytes(self.block.day))
if self.block.annexe:
m.update(self.block.annexe)
m.update(bytes(self.block.prev, encoding))
self.block.set_the_hash(m.hexdigest())
# Add a new block to the legder.
def add_new_block(self):
new_block_tuple = (self.block.ordinal, self.block.ordinance_type,
self.block.latex, self.block.year,
self.block.month, self.block.day,
self.block.stamp, self.block.annexe,
self.block.prev, self.block.the_hash)
query = ("INSERT INTO Block (ordinal, ordinanceType, latex, year, "+
" month, day, stamp, annexe, prev, "+
" hash) "+
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);")
self.make_connection()
self.c.execute(query, new_block_tuple)
self.connection.commit()
self.close_connection()
# Construct a new block and add it to the chain.
def upload(self):
self.add_ordinal_and_prev()
self.add_hash()
self.add_new_block()
################################
# HELPER CLASSES AND FUNCTIONS #
################################
# A class to hold the properties of a block of the ledger.
class Block_of_Ledger:
def __init__(self):
dt = datetime.datetime.now()
self.ordinal = None
self.ordinance_type = ordinance_inputs.ordinance_type
self.latex = ordinance_inputs.latex
self.year = dt.year
self.month = dt.month
self.day = dt.day
self.annexe = annexe_to_bytes()
self.prev = None
self.the_hash = None
self.stamp = None
# Assign a value to the "ordinal" field of this object.
def set_ordinal(self, ordinal):
self.ordinal = ordinal
# Assign a value to the "prev" field of this object.
def set_prev(self, prev):
self.prev = prev
# Assign a value to the "the_hash" field of this object.
def set_the_hash(self, the_hash):
self.the_hash = the_hash
self.stamp = Stamp_Machine(self.the_hash).make_stamp()
# A function which allows queries to return dictionaries, rather than the
# default tuples.
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
# Convert the annexe folder to a zip, load the bytes thereof into memory,
# and then delete the zip.
def annexe_to_bytes():
if len(os.listdir("annexe/")) == 0:
return None
os.system("zip -r annexe.zip annexe/")
f = open("annexe.zip", "rb")
result = f.read()
f.close()
os.system("rm annexe.zip")
return result
###################
# RUN AND WRAP UP #
###################
def run():
Uploader().upload()
if __name__ == "__main__":
run()
And this is the code in which decrees are extracted from the register and put into something more human-readable, i.e. a PDF:
### This code defines a class which takes a given record in the ledger and
### converts it into a directory.
# Imports.
from uploader import dict_factory
import os, sqlite3, sys
# Local imports.
from digistamp.digistamp import Verifier
# Constants.
month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug",
"Sep", "Oct", "Nov", "Dec"]
##############
# MAIN CLASS #
##############
# The class in question.
class Extractor:
def __init__(self, ordinal):
self.ordinal = ordinal
self.block = self.fetch_block()
self.main_tex = self.make_main_tex()
# Fetch the block matching this object's ordinal from the ledger.
def fetch_block(self):
connection = sqlite3.connect("ledger.db")
connection.row_factory = dict_factory
c = connection.cursor()
query = "SELECT * FROM Block WHERE ordinal = ?;"
c.execute(query, (self.ordinal,))
result = c.fetchone()
connection.close()
if result is None:
raise Exception("No block with ordinal "+str(self.ordinal)+
" in the ledger.")
return result
# Get the base for main.tex, given the type of the Ordinance.
def get_base(self):
if self.block["ordinanceType"] == "Decleration":
path_to_base = "latexery/base_decleration.tex"
elif self.block["ordinanceType"] == "Order":
path_to_base = "latexery/base_order.tex"
else:
raise Exception("Invalid ordinanceType: "+
self.block["ordinanceType"])
f = open(path_to_base, "r")
result = f.read()
f.close()
return result
# Make the code for main.tex, which will then be used build our PDF.
def make_main_tex(self):
day_str = str(self.block["day"])
if len(day_str) == 0:
day_str = "0"+day_str
packed_ordinal = str(self.ordinal)
while len(packed_ordinal) < 3:
packed_ordinal = "0"+packed_ordinal
month_str = month_names[self.block["month"]-1]
result = self.get_base()
result = result.replace("#BODY", self.block["latex"])
result = result.replace("#DAY_STR", day_str)
result = result.replace("#MONTH_STR", month_str)
result = result.replace("#YEAR", str(self.block["year"]))
result = result.replace("#PACKED_ORDINAL", packed_ordinal)
result = result.replace("#DIGISTAMP", self.block["stamp"])
return result
# Check that the block isn't a forgery.
def authenticate(self):
self.compare_hashes()
self.verify_stamp()
# Compare the "prev" field of this block with the hash of the previous.
def compare_hashes(self):
if self.ordinal == 1:
if self.block["prev"] != "genesis":
raise Exception("Block with ordinal=1 should be the "+
"genesis block.")
else:
return
prev_ordinal = self.ordinal-1
connection = sqlite3.connect("ledger.db")
c = connection.cursor()
query = "SELECT hash FROM Block WHERE ordinal = ?;"
c.execute(query, (prev_ordinal,))
extract = c.fetchone()
connection.close()
prev_hash = extract["0"]
if prev_hash != self.block["prev"]:
raise Exception("Block with ordinal="+str(self.ordinal)+" is "+
"not authentic: \"prev\" does not match "+
"previous \"hash\".")
# Check that this block's stamp is in order.
def verify_stamp(self):
v = Verifier(self.block["stamp"], self.block["hash"])
if v.verify() == False:
raise Exception("Block with ordinal="+str(self.ordinal)+" is "+
"not authentic: \"prev\" does not match "+
"previous \"hash\".")
# Ronseal.
def write_main_tex(self):
f = open("latexery/main.tex", "w")
f.write(self.main_tex)
f.close()
# Compile the PDF.
def compile_main_tex(self):
script = ("cd latexery/\n"+
"pdflatex main.tex")
os.system(script)
# Create the directory, and copy the PDF into it.
def create_and_copy(self):
script1 = ("cd extracts/\n"+
"mkdir "+str(self.ordinal)+"/")
script2 = "cp latexery/main.pdf extracts/"+str(self.ordinal)+"/"
if os.path.isdir("extracts/"+str(self.ordinal)+"/"):
os.system("rm -r extracts/"+str(self.ordinal)+"/")
os.system(script1)
os.system(script2)
# Write annexe to a file in the directory.
def write_annexe_zip(self):
if self.block["annexe"] is None:
return
f = open("extracts/"+str(self.ordinal)+"/annexe.zip", "wb")
f.write(self.block["annexe"])
f.close()
# Do the thing.
def extract(self):
self.authenticate()
self.write_main_tex()
self.compile_main_tex()
self.create_and_copy()
self.write_annexe_zip()
# Ronseal.
def zip_and_delete(self):
script = ("cd extracts/\n"+
"zip -r ordinance_"+str(self.ordinal)+".zip "+
str(self.ordinal)+"/\n"+
"rm -r "+str(self.ordinal)+"/")
if os.path.exists("extracts/ordinance_"+str(self.ordinal)+".zip"):
os.system("rm extracts/ordinance_"+str(self.ordinal)+".zip")
os.system(script)
###########
# TESTING #
###########
# Run a demonstration.
def demo():
e = Extractor(1)
e.extract()
#e.zip_and_delete()
###################
# RUN AND WRAP UP #
###################
def run():
if len(sys.argv) == 2:
e = Extractor(int(sys.argv[1]))
e.extract()
else:
print("Please run me with exactly one argument, the number of the "+
"Ordinance you wish to extract.")
if __name__ == "__main__":
run()
What I'd Like to Know
- Am I using blockchain and public key cryptography in a sane fashion? Or have I misunderstood some of the fundamental concepts involved?
- Are there any glaring security issues? Is there any obvious exploit in which someone could start manufacturing convincing forgeries?
- How's my Python?
1 Answer 1
Neat question! I'm afraid I ran out of steam before reaching anything too crypto-ish, though. There's a lot of Python to unpack here.
The repeated comment "Ronseal." is cute if you know the reference but I guess you know it doesn't help the reader; I'd rather save a line of screen real estate.
Re Python style, I recommend running your code through flake8
and fixing everything it complains about (with the possible exception of line-length warnings). It'll rightly complain about commented-out code like
#e.zip_and_delete()
def write_annexe_zip(self):
if self.block["annexe"] is None:
return
f = open("extracts/"+str(self.ordinal)+"/annexe.zip", "wb")
f.write(self.block["annexe"])
f.close()
Either this is another in-joke that I don't get, or you misspelled "annex." (You certainly misspelled "declaration" and "ledger" at least once each.) I also looked for anywhere you might have explained what an annex[e] actually is in this context, without success.
If f.write
throws an exception, then f.close()
won't happen and you'll have leaked a file handle. It's always best to use RAII constructions for this kind of thing: i.e.,
def write_annexe_zip(self):
if self.block["annexe"] is not None:
with open("extracts/%d/annexe.zip" % self.ordinal, "wb") as f:
f.write(self.block["annexe"])
Looks like you haven't learned printf
format strings yet. It's a really good investment in your career! Consider:
day_str = str(self.block["day"])
if len(day_str) == 0: ## I'm guessing this was a typo?!
day_str = "0"+day_str
packed_ordinal = str(self.ordinal)
while len(packed_ordinal) < 3:
packed_ordinal = "0"+packed_ordinal
versus what you could have written:
day_str = "%02d" % self.block["day"]
packed_ordinal = "%03d" % self.ordinal
Anything you can do to eliminate mutation (repeated overwriting) of your variables is a good thing for maintainability. In this case it's also shorter and clearer.
script1 = ("cd extracts/\n"+
"mkdir "+str(self.ordinal)+"/")
I'm surprised that os.system()
allows you to pass multiple commands separated by newlines. However, here it would be clearer to do it in a single command. There's no reason for you to change working directories before creating the extracts/foo
directory.
def create_and_copy(self):
mypath = "extracts/%d" % self.ordinal
if os.path.isdir(mypath):
os.system("rm -r %s" % mypath)
os.system("mkdir -p %s" % mypath)
os.system("cp latexery/main.pdf %s" % mypath)
class Stamp_Machine:
def __init__(self, data):
if os.path.exists(path_to_private_key) == False:
raise Exception("No private key on disk.")
self.private_key = load_private_key()
self.data = data
Why is
path_to_private_key
not a function parameter toload_private_key
?Why is
load_private_key
a free function? Shouldn't it be a private implementation detail (i.e., a member) of theStamp_Machine
class?The construction
if f() == False
would be better expressed asif not f()
.Are you at all concerned that someone might remove the file in between the time you check for its existence and the time you read it? (This is a "TOCTTOU" vulnerability.)
Are you at all concerned that someone might replace the file with another file, such as a private key under their personal control?
if isinstance(stamp, str) == False:
raise Exception("")
self.stamp_str = stamp
This should at least raise RuntimeError("expected a string")
. And in reality, you shouldn't manually raise an exception at all; any operation that requires a string will eventually raise TypeError
, which is plenty self-descriptive, and if no such operation ever takes place, then maybe you didn't need to raise an exception in the first place.
You never use the member variable self.stamp_str
, so you shouldn't be creating it.
You repeatedly pass backend=default_backend()
to crypto functions. That's redundant; by definition, the backend that will be used by default is the default backend.
password = getpass.getpass(prompt="Digistamp password: ")
password_ = getpass.getpass(prompt="Confirm password: ")
if password != password_:
raise Exception("Passwords do not match.")
This seems to be conflating two different kinds of "error handling." Superficially, this function's job seems to be to interact with the user, present a prompt to the user, and wait for the user to type in their new password. But then when the user makes a typo, you don't ask the user to fix their typo; you raise a programming exception indicating an unexpected failure to be handled by code in your calling routine. What's the calling routine supposed to do about it?
This should just be a loop:
while True:
password = getpass.getpass(prompt="Digistamp password: ")
password_ = getpass.getpass(prompt="Confirm password: ")
if password == password_:
break
print("Your two inputs did not match. Please try again.")
# Add the hash to the present block.
def add_hash(self):
This function operates only on self.block
. Shouldn't it be a member function of Block_of_Ledger
instead?
Incidentally, Python style would be CamelCase, e.g. BlockOfLedger
or simply LedgerBlock
.
It is absolutely insane that creating a Block_of_Ledger
object in Python — i.e., the single line
foo = Block_of_Ledger()
— causes the program to irretrievably delete files from disk. That just shouldn't happen. You should write a conspicuously named function that deletes files, and then use its result as a parameter to the constructor. E.g.:
annex = collect_annex_files(delete_from_disk=True)
foo = Block_of_Ledger(annex)
Similarly, dt = datetime.datetime.now()
should be a parameter to the constructor. Everything you write should be unit-testable, and that means no hard dependencies on external realities like "time" or "disk" or "network" or "database." Look up dependency injection and/or watch this excellent talk on unit-testing.
-
\$\begingroup\$ Thank you for such a thoughtful and useful review. Inevitably, there are a few suggestions which I've decided not to carry out - although I've carried out all the others - and which I detail below... \$\endgroup\$Tom Hosker– Tom Hosker2020年03月29日 15:52:31 +00:00Commented Mar 29, 2020 at 15:52
-
\$\begingroup\$ 1. The spelling annexe is British English, like axe and programme (although not even I use the latter). It pains me every time Gmail tries to nudge me into using an American spelling, and this is my way of fighting back! \$\endgroup\$Tom Hosker– Tom Hosker2020年03月29日 15:57:24 +00:00Commented Mar 29, 2020 at 15:57
-
\$\begingroup\$ 2. I'm aware of
printf
-ery. (Would you believe that C was my first language?) And I like what you've done here. However, I'm going to keep things as they are because: (A) in Python, I'll always prefer something verbose but clear over something terse, and (B) I still have PTSD from my days as a C programmer! \$\endgroup\$Tom Hosker– Tom Hosker2020年03月29日 16:06:55 +00:00Commented Mar 29, 2020 at 16:06 -
\$\begingroup\$ 3. So
backend=default_backend()
is redundant? Ah, that'll be why it throws an exception when I remove that snippet! \$\endgroup\$Tom Hosker– Tom Hosker2020年03月29日 16:10:41 +00:00Commented Mar 29, 2020 at 16:10 -
\$\begingroup\$ 4. Yes, the constructor of
Block_of_Ledger
calls a function which deletes a file, but that same function creates the file to be deleted only 4 lines up. I really don't find that insane - either absolutely or relatively. \$\endgroup\$Tom Hosker– Tom Hosker2020年03月29日 16:29:37 +00:00Commented Mar 29, 2020 at 16:29