4
\$\begingroup\$

I wrote this for a class project, the backend for this dog voting website. I noticed duplicate code among multiple functions I was writing to be deployed as a cloud function: they all were wrapped in try/except blocks that returned either 200 and JSON or 500 and a traceback. (I understand that it would be better to return an informative, structured error code, but this helped with debugging and we didn't have extensive error handling on the front-end: this was a loss I was willing to take).

The decorator gives each function a connection pool, uses jsonschema to validate input and output, responds to CORS OPTIONS from anywhere, and uses print statements for logging. This isn't great, but it was far easier to set up than the Google Cloud Function logging library, and everything printed to stdout during function execution is logged to GCP Stackdriver.

Here is the decorator itself:

import functools
import json
import traceback
import jsonschema
from util.get_pool import get_pool
pg_pool = None
def cloudfunction(in_schema=None, out_schema=None):
 """
 :param in_schema: the schema for the input, or a falsy value if there is no input
 :param out_schema: the schema for the output, or a falsy value if there is no output
 :return: the cloudfunction wrapped function
 """
 # Both schemas must be valid according to jsonschema draft 7, if they are provided.
 if in_schema:
 jsonschema.Draft7Validator.check_schema(in_schema)
 if out_schema:
 jsonschema.Draft7Validator.check_schema(out_schema)
 def cloudfunction_decorator(f):
 """ Wraps a function with two arguments, the first of which is a json object that it expects to be sent with the
 request, and the second is a postgresql pool. It modifies it by:
 - setting CORS headers and responding to OPTIONS requests with `Allow-Origin *`
 - passing a connection from a global postgres connection pool
 - adding logging, of all inputs as well as error tracebacks.
 :param f: A function that takes a `request` and a `pgpool` and returns a json-serializable object
 :return: a function that accepts one argument, a Flask request, and calls f with the modifications listed
 """
 @functools.wraps(f)
 def wrapped(request):
 global pg_pool
 if request.method == 'OPTIONS':
 return cors_options()
 # If it's not a CORS OPTIONS request, still include the base header.
 headers = {'Access-Control-Allow-Origin': '*'}
 if not pg_pool:
 pg_pool = get_pool()
 try:
 conn = pg_pool.getconn()
 if in_schema:
 request_json = request.get_json()
 print(repr({"request_json": request_json}))
 jsonschema.validate(request_json, in_schema)
 function_output = f(request_json, conn)
 else:
 function_output = f(conn)
 if out_schema:
 jsonschema.validate(function_output, out_schema)
 conn.commit()
 print(repr({"response_json": function_output}))
 response_json = json.dumps(function_output)
 # TODO allow functions to specify return codes in non-exceptional cases
 return (response_json, 200, headers)
 except:
 print("Error: Exception traceback: " + repr(traceback.format_exc()))
 return (traceback.format_exc(), 500, headers)
 finally:
 # Make sure to put the connection back in the pool, even if there has been an exception
 try:
 pg_pool.putconn(conn)
 except NameError: # conn might not be defined, depending on where the error happens above
 pass
 return wrapped
 return cloudfunction_decorator
# If given an OPTIONS request, tell the requester that we allow all CORS requests (pre-flight stage)
def cors_options():
 # Allows GET and POST requests from any origin with the Content-Type
 # header and caches preflight response for an 3600s
 headers = {
 'Access-Control-Allow-Origin': '*',
 'Access-Control-Allow-Headers': 'Content-Type',
 'Access-Control-Max-Age': '3600'
 }
 return ('', 204, headers)

get_pool is here:

from os import getenv
from psycopg2 import OperationalError, connect
from psycopg2.pool import SimpleConnectionPool
INSTANCE_CONNECTION_NAME = getenv('INSTANCE_CONNECTION_NAME', "")
POSTGRES_USER = getenv('POSTGRES_USER', "")
POSTGRES_PASSWORD = getenv('POSTGRES_PASSWORD', "")
POSTGRES_NAME = getenv('POSTGRES_DATABASE', "postgres")
pg_config = {
 'user': POSTGRES_USER,
 'password': POSTGRES_PASSWORD,
 'dbname': POSTGRES_NAME
}
def get_pool():
 try:
 return __connect(f'/cloudsql/{INSTANCE_CONNECTION_NAME}')
 except OperationalError:
 # If production settings fail, use local development ones
 return __connect('localhost')
def __connect(host):
 """
 Helper functions to connect to Postgres
 """
 pg_config['host'] = host
 return SimpleConnectionPool(1, 1, **pg_config)
def get_connection(host="localhost"):
 pg_config["host"] = host
 return connect(**pg_config)

And an example of usage:


@cloudfunction(
 in_schema={"type": "string"},
 out_schema={
 "anyOf": [{
 "type": "object",
 "properties": {
 "dog1": {"type": "integer"},
 "dog2": {"type": "integer"}
 },
 "additionalProperties": False,
 "minProperties": 2,
 }, {
 "type": "null"
 }]
 })
def get_dog_pair(request_json, conn):
 [function body elided]
asked Apr 12, 2019 at 2:23
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

Decorator Function -> Decorator class

IMO, this decorator function should be a class. The use of global pool hints at this, the decorator needs to maintain some sort of state. It's not too bad to build, it requires __init__ and __call__ methods:

class CloudFunction:
 def __init__(self, pool=None, in_schema=None, out_schema=None):
 for schema in (in_schema, out_schema):
 if not schema:
 continue
 jsonschema.Draft7Validator.check_schema(schema)
 self.in_schema = in_schema
 self.out_schema = out_schema
 
 self.pool = pool if pool is not None else get_pool()
 def __call__(self, func):
 @functools.wraps(func)
 def wrapper(request, *args, **kwargs):
 if request.method == 'OPTIONS':
 return cors_options()
 
 headers = {'Access-Control-Allow-Origin': '*'}
 ...

get_conn as a contextmanager

This is pretty much implemented as one already, since you've wrapped the creation and management of the connection in a try/finally

from contextlib import contextmanager 
class CloudFunction:
 ~snip~
 @contextmanager
 def get_connection(self):
 conn = self.pool.getconn()
 try:
 yield conn
 finally:
 self.pool.putconn(conn)
 def use_conn(self):
 with self.get_connection() as conn:
 # use conn

However, there's a difference in the implementation. I wouldn't wrap the self.pool.getconn here, because I'd want that error to be raised if no connection is returned from the pool. This way you don't have to deal with the NameError, which feels like a hack.

A helper function

I'd rather not have deep nesting of a try/except and with all defined within __call__. It's too many levels of indentation to keep track of and the code turns into a bit of a wall-of-text, so a quick handle_request helper function does nicely:

class CloudFunction:
 def handle_request(self, request, func, *args, **kwargs):
 with self.get_connection() as conn:
 if self.in_schema is not None:
 request_json = request.get_json()
 print(repr({"request_json": request_json}))
 jsonschema.validate(request_json, self.in_schema)
 function_output = func(request_json, conn, *args, **kwargs)
 else:
 function_output = func(conn, *args, **kwargs)
 if self.out_schema:
 jsonschema.validate(function_output, self.out_schema)
 conn.commit()
 
 response_json = json.dumps(function_output)
 return response_json
 # Wrap all of this in a try/except in here
 def __call__(self, func):
 @functools.wraps(func)
 def wrapper(request, *args, **kwargs):
 if request.method == 'OPTIONS':
 return cors_options()
 
 headers = {'Access-Control-Allow-Origin': '*'}
 try:
 resp = self.handle_request(request, func, *args, **kwargs)
 print(f"Success: {resp!r}")
 output = (resp, 200, headers)
 except Exception:
 tb = traceback.format_exc()
 print(f"Error: {tb!r}")
 output = (tb, 500, headers)
 return output 
 return wrapper

Putting Everything Together

class CloudFunction:
 def __init__(self, pool=None, in_schema=None, out_schema=None):
 for schema in (in_schema, out_schema):
 if not schema:
 continue
 jsonschema.Draft7Validator.check_schema(schema)
 self.in_schema = in_schema
 self.out_schema = out_schema
 
 self.pool = pool if pool is not None else get_pool()
 def __call__(self, func):
 @functools.wraps(func)
 def wrapper(request, *args, **kwargs):
 if request.method == 'OPTIONS':
 return cors_options()
 
 headers = {'Access-Control-Allow-Origin': '*'}
 try:
 resp = self.handle_request(request, func, *args, **kwargs)
 print(f"Success: {resp!r}")
 output = (resp, 200, headers)
 except Exception:
 tb = traceback.format_exc()
 print(f"Error: {tb!r}")
 output = (tb, 500, headers)
 return output 
 return wrapper
 
 def handle_request(self, request, func, *args, **kwargs):
 with self.get_connection() as conn:
 if self.in_schema is not None:
 request_json = request.get_json()
 print(repr({"request_json": request_json}))
 jsonschema.validate(request_json, self.in_schema)
 function_output = func(request_json, conn, *args, **kwargs)
 else:
 function_output = func(conn, *args, **kwargs)
 if self.out_schema:
 jsonschema.validate(function_output, self.out_schema)
 conn.commit()
 
 response_json = json.dumps(function_output)
 return response_json
 
 @contextmanager
 def get_connection(self):
 conn = self.pool.getconn()
 try:
 yield conn
 finally:
 self.pool.putconn(conn)

Example Usage

It should now look pretty similar:

# Nice argument formatting here, by the way
@CloudFunction(
 in_schema={"type": "string"},
 out_schema={
 "anyOf": [{
 "type": "object",
 "properties": {
 "dog1": {"type": "integer"},
 "dog2": {"type": "integer"}
 },
 "additionalProperties": False,
 "minProperties": 2,
 }, {
 "type": "null"
 }]
 })
def get_dog_pair(request_json, conn):
 [function body elided]
answered Aug 8, 2023 at 16:08
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.