I'm working on a use case in hobby project where I am merging data from an external data source with an internal one and exporting that to a downloadable file
The use case is exposed in an ExportService
class through the public method export
class ExportService:
def __init__(self):
self.external_export_generator = ExternalExportGenerator()
self.external_data_reader = ExternalDataReader()
self.internal_data_reader = InternalDataReader()
self.configuration = ImportConfigurationParser()
self.import_audit_service = ImportAuditService()
self.storage_provider = S3Storage()
self.department_repo = DjangoDepartmentRepository()
def export(self, department_id: uuid.UUID) -> str:
"""
Generates a download URL for an excel sheet containing originally imported data with the current data
for a workspace
"""
(
first_import,
all_import_audits,
) = self.import_audit_service.get_first_and_all_import_audits(
department_id=department_id
)
self.configuration.parse(first_import.configuration)
internal_data = self.internal_data_reader.read(
configuration=self.configuration.config(), department_id=department_id
)
external_data = self.external_data_reader.read(
import_audits=all_import_audits,
parsed_configuration=self.configuration,
)
merged_data = self._merge_data(
external_data=external_data, internal_data=internal_data
)
bytes = self.external_export_generator.generate(rows=merged_data)
uploaded_filename = self._persist_export(byte_stream=bytes)
client_download_name = self._get_download_name(department_id=department_id)
return self._generate_download_url(
filename=uploaded_filename, download_name=client_download_name
)
def _merge_data(self, external_data: Dict, internal_data: Dict) -> List[Dict]:
"""
Merged an external dataset with an internal one based on a shared ID.
The internal is northarc data.
The list will maintain properties from the external dataset that are not present in the internal one
"""
merged_data_list = []
for task in internal_data:
ext_id = self.configuration.extract_id(row=task)
# per design it should always exist in the original dataset
new_obj = {**external_data[ext_id], **task}
merged_data_list.append(new_obj)
return merged_data_list
def _persist_export(self, byte_stream: BytesIO) -> str:
"""
Persists a byte_stream to external storage and returns the filename
"""
folder = settings.S3["FOLDERS"]["EXPORTS"]
file_id = uuid.uuid4()
filename = f"{folder}{file_id}.xlsx"
self.storage_provider.upload_in_memory_file(file=byte_stream, filename=filename)
return filename
def _get_download_name(self, department_id: uuid.UUID):
"""
Constructs a download name based on the name of the workspace
"""
current_dep = self.department_repo.get(id=department_id)
return f"{current_dep.name}_ekstern_eksport.xlsx"
def _generate_download_url(self, filename: str, download_name: str) -> str:
"""
Generates a url for the persisted resource. The link will expire after 1 hour
"""
return self.storage_provider.get_presigned_url(
key=filename,
download_name=download_name,
expiration=3600,
)
I have identified the following responsibilities:
- Read the internal data
- Read the external data
- Merge the two datasets
- Generate the CSV file from the merged dataset
- Persisted the CSV file to storage (S3)
- Generate a download link to return to the user
I have extracted reading of internal/external data into two separate classes that share the same interface.
The interaction with S3 is handled in the base class S3Storage
and the generation of the CSV file happens in ExternalExportGenerator
My concern is really if my ExportService
is doing "too much" or I should refactor some of the responsibilities into separate classes. Maybe one for each responsibility I have identified above? Struggling to pick the correct abstractions..
Entrypoint is the Django API Controller:
class TaskExportAPI(DjangoAPIView):
def post(self, request):
export_service = ExportService()
download_url = export_service.export(department_id=request.department_id)
return Response(status=status.HTTP_200_OK, data=download_url)
class ExternalExportGenerator(ExcelGenerator):
def __init__(self):
self.output = BytesIO()
self.workbook = xlsxwriter.Workbook(self.output)
self.worksheet = self.workbook.add_worksheet()
self.headers = []
# TODO find more elegant solution to generate excel column headers
self.cols = [
"A",
"B",
"C",
"D",
"E",
"F",
"G",
"H",
"I",
"J",
"K",
"L",
"M",
"N",
"O",
"P",
"Q",
"R",
"S",
"T",
"U",
"V",
"W",
"X",
"Y",
"Z",
"AA",
"AB",
"AC",
"AD",
"AE",
"AF",
"AG",
"AH",
"AI",
"AJ",
"AK",
"AL",
"AM",
"AN",
"AO",
"AP",
"AQ",
"AR",
"AS",
"AT",
"AU",
"AV",
"AW",
"AX",
"AY",
"AZ",
]
def generate(self, rows: List[Dict]):
"""
Generate the xlsx file for external data export
"""
logger.info("Generating new external export")
self.build_headers(rows=rows)
self.add_headers()
for index, row in enumerate(rows, start=2):
self.add_row(row_no=index, row=row)
self.workbook.close()
self.output.seek(0)
logger.info("Successfully generated external export. Returning byte stream")
return self.output
def add_row(self, row_no: int, row: Dict):
for index, value in enumerate(row.values()):
str_val = str(value)
str_val = "" if value is None else str_val
self.worksheet.write(f"{self.cols[index]}{row_no}", str_val)
def add_headers(self):
bold = self.workbook.add_format({"bold": True})
for header in self.headers:
self.worksheet.write(f"{header.col}1", header.title, bold)
self.worksheet.set_column(f"{header.col}:{header.col}", header.width)
def build_headers(self, rows: List[Dict]):
if not rows:
return
header = namedtuple("Header", ["col", "title", "width"])
for index, key in enumerate(rows[0].keys()):
self.headers.append(header(self.cols[index], key, 30))
class ExternalDataReader:
def read(
self,
import_audits: List[ImportAuditModel],
parsed_configuration: ImportConfigurationParser,
):
"""
Reads external data from an S3 bucket based on the previous imports in the workspace
"""
external_data = {}
def pass_generator(generator):
for row in generator:
ext_id = parsed_configuration.extract_id(row=row)
external_data[ext_id] = row
# the import_audits must be sorted to ensure the newest entry will override older ones for multiple imports
for import_audit in import_audits:
pass_generator(generator=self.get_object_dict(import_audit=import_audit))
return external_data
@staticmethod
def get_object_dict(import_audit: ImportAuditModel) -> Generator:
"""
Reads a csv file from S3 and returns a Generator object with a CSV reader
"""
storage_provider = S3Storage()
logger.info("Fetching byes from filename %s", import_audit.filename)
byte_stream = storage_provider.get_bytes(
filename=import_audit.filename
)
csv_util = CsvUtil()
dict_reader = csv_util.get_dict_reader(byte_stream=byte_stream)
return (row for row in dict_reader)
class InternalDataReader:
def __init__(self):
self.tasks_repo = DjangoTasksRepository()
def read(self, configuration: Dict, department_id: uuid.UUID):
"""
Reads a northarc dataset for a workspace
The configuration is used to transform the object fields to the ones they were originally imported from
"""
return self.tasks_repo.get_all_tasks_with_related(department_id)
class ImportAuditService:
def __init__(self):
self.import_audit_repo = DjangoImportAuditRepository()
def create(self, configuration: any, department_id: uuid.UUID, filename: str):
return self.import_audit_repo.create(
configuration=configuration, department_id=department_id, filename=filename
)
def get_latest(self, department_id: uuid.UUID):
return self.import_audit_repo.get_latest(department_id=department_id)
def get_all_by_dep(self, department_id: uuid.UUID) -> List[ImportAuditModel]:
return self.import_audit_repo.get_all(department_id=department_id)
def get_first_and_all_import_audits(
self, department_id: uuid.UUID
) -> (ImportAuditModel, List[ImportAuditModel]):
"""
Fetches the import audits for the workspace
Returns a tuple of the first import followed by all imports. The first import will also be included in the ful
list
Raises a validation error if no imports have happened in the workspace
"""
import_audits = self.import_audit_repo.get_all(department_id=department_id)
if not import_audits:
raise ValidationError(detail={"error": "No data has been imported"})
first_import = import_audits[0]
return first_import, import_audits
When the user originally imported the dataset a mapping of CSV column headers => the internal model was used. This is dumped as a json object in the database and read from the "ImportAuditModel". To identify a csv row the user can either select a "single" column (left_field) or generate a composite key with left_field and right_field
class ImportConfigurationParser:
_left_field: str
_right_field: str
_passed_config: Dict
_has_parsed: bool = False
def parse(self, configuration: List[Dict]):
try:
self._passed_config = {key["field"]: key["header"] for key in configuration}
except KeyError as exc:
logger.error(
"Invalid configuration. One of the entries had no field/header key"
)
raise exc
id_dict = {
self._passed_config["leftIDValue"],
self._passed_config["rightIDValue"],
}
# check if both id keys are present (<= means is subset of)
if id_dict <= self._passed_config.keys():
logger.error(
"Invalid configuration. Both leftIDValue and rightIDValue keys must be present"
)
raise KeyError(
"leftIDValue or rightIDValue is missing in %s",
json.dumps(configuration),
)
self._left_field = self._passed_config["leftIDValue"]
self._right_field = self._passed_config["rightIDValue"]
self._has_parsed = True
def extract_id(self, row: Dict):
"""
Extract ID based on the config
left_field is always present
"""
self.raise_if_not_parsed()
if self._left_field not in row:
logger.error(
"Header %s must exist in row %s", self._left_field, json.dumps(row)
)
raise KeyError("Left field not found in row while extracting id")
left_value = row[self._left_field]
if self._right_field:
right_value = row[self._right_field]
return f"{left_value}_{right_value}"
return left_value
def config(self):
self.raise_if_not_parsed()
return self._passed_config
def raise_if_not_parsed(self):
if not self._has_parsed:
raise ValueError(
"Configuration must be parsed before calling helper methods"
)
class S3Storage:
bucket_name: str
def __init__(self):
self.bucket_name = settings.S3["BUCKET"]
def upload_in_memory_file(self, file, filename: str):
s3 = boto3.client("s3")
try:
s3.upload_fileobj(file, self.bucket_name, filename)
logger.info(
"Uploaded file to S3 BUCKET %s with filename %s",
self.bucket_name,
filename,
)
except Exception as exc:
logger.error("Failed to upload file with filename %s", filename)
sentry_sdk.capture_exception(exc)
raise exc
def get_bytes(self, filename: str):
s3 = boto3.client("s3")
try:
bytes_buffer = io.BytesIO()
s3.download_fileobj(
Bucket=self.bucket_name, Key=filename, Fileobj=bytes_buffer
)
# ensure any callers read the buffer from the beginning
bytes_buffer.seek(0)
logger.info(
"Retrieved file from S3 BUCKET %s with filename %s",
self.bucket_name,
filename,
)
return bytes_buffer
except Exception as exc:
logger.error(
"Failed to retrieve from S3 BUCKET %s with filename %s",
self.bucket_name,
filename,
)
sentry_sdk.capture_exception(exc)
raise exc
def get_presigned_url(self, key: str, download_name: str, expiration: int = 10):
"""Generate a presigned URL to share an S3 object
:param key: string
:param download_name: The name of the file when downloaded
:param expiration: Time in seconds for the presigned URL to remain valid
:return: Presigned URL as string. If error, throws
"""
s3 = boto3.client("s3")
try:
presigned_url = s3.generate_presigned_url(
"get_object",
ExpiresIn=expiration,
Params={
"ResponseContentDisposition": f"attachment; filename = {download_name}",
"Bucket": self.bucket_name,
"Key": key,
},
)
return presigned_url
except ClientError as exc:
logger.error(
"Failed to generate pre-signed url for S3 BUCKET %s with filename %s",
self.bucket_name,
download_name,
)
sentry_sdk.capture_exception(exc)
raise exc
```
1 Answer 1
OK, well. There are still many unexplained dependencies, like DjangoDepartmentRepository
, settings
, DjangoAPIView
, etc.; but it may be difficult to draw a line between what's included and what isn't in this question so for now I'll attempt to work with what you've shown. Since these dependencies are so deeply tangled with your code none of my suggestions will have been tested.
Avoid calling a variable bytes
as in
bytes = self.external_export_generator.generate(rows=merged_data)
since that shadows a built-in.
Don't hint with bare Dict
, as in
external_data: Dict
If you really don't have any idea as to the contents, hint dict[Any, Any]
; but hopefully you can be more narrow than that.
_merge_data
is a good candidate for removing the constructed list and instead yield
ing a generator.
The formation of
filename = f"{folder}{file_id}.xlsx"
should not naive-prepend a folder; instead use pathlib
.
_get_download_name
is missing a return typehint of str
.
TaskExportAPI.post()
is missing both parameter and return hints.
As you've already indicated, you should avoid hard-coding ExternalExportGenerator.cols
. Your use of self.worksheet.write(f"{self.cols[index]}{row_no}", str_val)
can be trivially replaced with coordinate-based write.
ExternalExportGenerator.generate
is missing a return typehint. It also has side-effect problems. It seems that upon entry, you assume self.workbook
is open, but then you close it within the function. This is asymmetric and also not exception-safe. It needs to be rewritten so that the workbook is opened and closed in the same scope, ideally with context management.
Don't bare-hint Generator
; it needs arguments.
This generator wrapper:
return (row for row in dict_reader)
can probably be replaced with
return dict_reader
though it's impossible to say for sure since you haven't shown CsvUtil
.
get_first_and_all_import_audits
is strange. Why return the first element separate from the returned list? Surely a caller can figure this out?
ImportConfigurationParser
has hinted members as if it's a NamedTuple
or @dataclass
, but has no parent, no decorator and no __init__
. Those fields aren't set in the constructor and are instead set in parse()
. Linters will tell you that this is a bad idea. Classes should not be setting brand-new members outside of the constructor. Renaming parse
to __init__
would bring this class closer to reality. Said another way, self.raise_if_not_parsed()
should go away, and the class instance should not exist at all if parsing has not occurred.
This except
:
except Exception as exc:
logger.error("Failed to upload file with filename %s", filename)
sentry_sdk.capture_exception(exc)
raise exc
should first of all bare raise
instead of raise exc
; and also may benefit from passing exc_info
to logger
.
Comments like
:param key: string
are deeply non-useful.
Explore related questions
See similar questions with these tags.
ExternalDataReader
- that are not explained here. If possible, please show your whole program. \$\endgroup\$