I wrote this for a class project, the backend for this dog voting website. I noticed duplicate code among multiple functions I was writing to be deployed as a cloud function: they all were wrapped in try/except blocks that returned either 200 and JSON or 500 and a traceback. (I understand that it would be better to return an informative, structured error code, but this helped with debugging and we didn't have extensive error handling on the front-end: this was a loss I was willing to take).
The decorator gives each function a connection pool, uses jsonschema to validate input and output, responds to CORS OPTIONS from anywhere, and uses print statements for logging. This isn't great, but it was far easier to set up than the Google Cloud Function logging library, and everything printed to stdout during function execution is logged to GCP Stackdriver.
Here is the decorator itself:
import functools
import json
import traceback
import jsonschema
from util.get_pool import get_pool
pg_pool = None
def cloudfunction(in_schema=None, out_schema=None):
"""
:param in_schema: the schema for the input, or a falsy value if there is no input
:param out_schema: the schema for the output, or a falsy value if there is no output
:return: the cloudfunction wrapped function
"""
# Both schemas must be valid according to jsonschema draft 7, if they are provided.
if in_schema:
jsonschema.Draft7Validator.check_schema(in_schema)
if out_schema:
jsonschema.Draft7Validator.check_schema(out_schema)
def cloudfunction_decorator(f):
""" Wraps a function with two arguments, the first of which is a json object that it expects to be sent with the
request, and the second is a postgresql pool. It modifies it by:
- setting CORS headers and responding to OPTIONS requests with `Allow-Origin *`
- passing a connection from a global postgres connection pool
- adding logging, of all inputs as well as error tracebacks.
:param f: A function that takes a `request` and a `pgpool` and returns a json-serializable object
:return: a function that accepts one argument, a Flask request, and calls f with the modifications listed
"""
@functools.wraps(f)
def wrapped(request):
global pg_pool
if request.method == 'OPTIONS':
return cors_options()
# If it's not a CORS OPTIONS request, still include the base header.
headers = {'Access-Control-Allow-Origin': '*'}
if not pg_pool:
pg_pool = get_pool()
try:
conn = pg_pool.getconn()
if in_schema:
request_json = request.get_json()
print(repr({"request_json": request_json}))
jsonschema.validate(request_json, in_schema)
function_output = f(request_json, conn)
else:
function_output = f(conn)
if out_schema:
jsonschema.validate(function_output, out_schema)
conn.commit()
print(repr({"response_json": function_output}))
response_json = json.dumps(function_output)
# TODO allow functions to specify return codes in non-exceptional cases
return (response_json, 200, headers)
except:
print("Error: Exception traceback: " + repr(traceback.format_exc()))
return (traceback.format_exc(), 500, headers)
finally:
# Make sure to put the connection back in the pool, even if there has been an exception
try:
pg_pool.putconn(conn)
except NameError: # conn might not be defined, depending on where the error happens above
pass
return wrapped
return cloudfunction_decorator
# If given an OPTIONS request, tell the requester that we allow all CORS requests (pre-flight stage)
def cors_options():
# Allows GET and POST requests from any origin with the Content-Type
# header and caches preflight response for an 3600s
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Max-Age': '3600'
}
return ('', 204, headers)
get_pool is here:
from os import getenv
from psycopg2 import OperationalError, connect
from psycopg2.pool import SimpleConnectionPool
INSTANCE_CONNECTION_NAME = getenv('INSTANCE_CONNECTION_NAME', "")
POSTGRES_USER = getenv('POSTGRES_USER', "")
POSTGRES_PASSWORD = getenv('POSTGRES_PASSWORD', "")
POSTGRES_NAME = getenv('POSTGRES_DATABASE', "postgres")
pg_config = {
'user': POSTGRES_USER,
'password': POSTGRES_PASSWORD,
'dbname': POSTGRES_NAME
}
def get_pool():
try:
return __connect(f'/cloudsql/{INSTANCE_CONNECTION_NAME}')
except OperationalError:
# If production settings fail, use local development ones
return __connect('localhost')
def __connect(host):
"""
Helper functions to connect to Postgres
"""
pg_config['host'] = host
return SimpleConnectionPool(1, 1, **pg_config)
def get_connection(host="localhost"):
pg_config["host"] = host
return connect(**pg_config)
And an example of usage:
@cloudfunction(
in_schema={"type": "string"},
out_schema={
"anyOf": [{
"type": "object",
"properties": {
"dog1": {"type": "integer"},
"dog2": {"type": "integer"}
},
"additionalProperties": False,
"minProperties": 2,
}, {
"type": "null"
}]
})
def get_dog_pair(request_json, conn):
[function body elided]
1 Answer 1
Decorator Function -> Decorator class
IMO, this decorator function should be a class. The use of global pool
hints at this, the decorator needs to maintain some sort of state. It's not too bad to build, it requires __init__
and __call__
methods:
class CloudFunction:
def __init__(self, pool=None, in_schema=None, out_schema=None):
for schema in (in_schema, out_schema):
if not schema:
continue
jsonschema.Draft7Validator.check_schema(schema)
self.in_schema = in_schema
self.out_schema = out_schema
self.pool = pool if pool is not None else get_pool()
def __call__(self, func):
@functools.wraps(func)
def wrapper(request, *args, **kwargs):
if request.method == 'OPTIONS':
return cors_options()
headers = {'Access-Control-Allow-Origin': '*'}
...
get_conn
as a contextmanager
This is pretty much implemented as one already, since you've wrapped the creation and management of the connection in a try
/finally
from contextlib import contextmanager
class CloudFunction:
~snip~
@contextmanager
def get_connection(self):
conn = self.pool.getconn()
try:
yield conn
finally:
self.pool.putconn(conn)
def use_conn(self):
with self.get_connection() as conn:
# use conn
However, there's a difference in the implementation. I wouldn't wrap the self.pool.getconn
here, because I'd want that error to be raised if no connection is returned from the pool. This way you don't have to deal with the NameError
, which feels like a hack.
A helper function
I'd rather not have deep nesting of a try
/except
and with
all defined within __call__
. It's too many levels of indentation to keep track of and the code turns into a bit of a wall-of-text, so a quick handle_request
helper function does nicely:
class CloudFunction:
def handle_request(self, request, func, *args, **kwargs):
with self.get_connection() as conn:
if self.in_schema is not None:
request_json = request.get_json()
print(repr({"request_json": request_json}))
jsonschema.validate(request_json, self.in_schema)
function_output = func(request_json, conn, *args, **kwargs)
else:
function_output = func(conn, *args, **kwargs)
if self.out_schema:
jsonschema.validate(function_output, self.out_schema)
conn.commit()
response_json = json.dumps(function_output)
return response_json
# Wrap all of this in a try/except in here
def __call__(self, func):
@functools.wraps(func)
def wrapper(request, *args, **kwargs):
if request.method == 'OPTIONS':
return cors_options()
headers = {'Access-Control-Allow-Origin': '*'}
try:
resp = self.handle_request(request, func, *args, **kwargs)
print(f"Success: {resp!r}")
output = (resp, 200, headers)
except Exception:
tb = traceback.format_exc()
print(f"Error: {tb!r}")
output = (tb, 500, headers)
return output
return wrapper
Putting Everything Together
class CloudFunction:
def __init__(self, pool=None, in_schema=None, out_schema=None):
for schema in (in_schema, out_schema):
if not schema:
continue
jsonschema.Draft7Validator.check_schema(schema)
self.in_schema = in_schema
self.out_schema = out_schema
self.pool = pool if pool is not None else get_pool()
def __call__(self, func):
@functools.wraps(func)
def wrapper(request, *args, **kwargs):
if request.method == 'OPTIONS':
return cors_options()
headers = {'Access-Control-Allow-Origin': '*'}
try:
resp = self.handle_request(request, func, *args, **kwargs)
print(f"Success: {resp!r}")
output = (resp, 200, headers)
except Exception:
tb = traceback.format_exc()
print(f"Error: {tb!r}")
output = (tb, 500, headers)
return output
return wrapper
def handle_request(self, request, func, *args, **kwargs):
with self.get_connection() as conn:
if self.in_schema is not None:
request_json = request.get_json()
print(repr({"request_json": request_json}))
jsonschema.validate(request_json, self.in_schema)
function_output = func(request_json, conn, *args, **kwargs)
else:
function_output = func(conn, *args, **kwargs)
if self.out_schema:
jsonschema.validate(function_output, self.out_schema)
conn.commit()
response_json = json.dumps(function_output)
return response_json
@contextmanager
def get_connection(self):
conn = self.pool.getconn()
try:
yield conn
finally:
self.pool.putconn(conn)
Example Usage
It should now look pretty similar:
# Nice argument formatting here, by the way
@CloudFunction(
in_schema={"type": "string"},
out_schema={
"anyOf": [{
"type": "object",
"properties": {
"dog1": {"type": "integer"},
"dog2": {"type": "integer"}
},
"additionalProperties": False,
"minProperties": 2,
}, {
"type": "null"
}]
})
def get_dog_pair(request_json, conn):
[function body elided]
Explore related questions
See similar questions with these tags.