I have a data processing pipeline with well defined stages and IO boundaries. I can choose a language to suit the needs of this design. It starts with an InputObject
. At the end of each stage, there is some additional data derived from some or all of the results of previous steps and the InputObject
. That is, StageN
extends Stage
and produces an immutable ResultsObjN
, which extends Result
. After the pipeline is complete, the final output I'm interested in is some subset of all results from each step:
InputObject
|
|
v
-----------
| Stage 1 | <-- Tunable Parameters
-----------
|
|
InputObject
ResultsObj1
|
|
v
-----------
| Stage 2 | <-- Tunable Parameters
-----------
|
|
InputObject
ResultsObj1
ResultsObj2
|
|
v
...
Currently, I'm modeling each stage as a Stage
object. I'll explain what I mean by "tunable" parameters soon.
- Each
Stage
has each tunable parameters as attributes exposed by getters and setters. - Each
Stage
is constructed with its input dependencies. - Each
Stage
has a method to compute its result based on the tunable parameters
Here it is in almost-UML:
--------------------------------------------------
| <<abstract>> |
| Stage |
|--------------------------------------------------|
|--------------------------------------------------|
| +computeResult() : Result |
--------------------------------------------------
For instance, Stage3
computes ResultObj3
using the InputObject
and the results of Stage2
. There are 2 parameters that can be set to change the results.
--------------------------------------------------
| Stage3 |
|--------------------------------------------------|
| -param1 : Int |
| -param2 : Float |
|--------------------------------------------------|
| +Stage3(raw : InputObject, corners : ResultObj2) |
| +get/setParam1() |
| +get/setParam2() |
| +computeResult() : ResultObj3 |
--------------------------------------------------
I would like to reuse this processing pipeline pattern, with different stages in different quantities with different input dependencies. The tunable parameters may be tweaked by an automated optimizer or by a human using some UI. In either case, they follow the same feedback process. Here it is tuning stage 3:
ResultObj3 performStage(InputObject raw, ResultObj2 corners):
1. Stage processor = new Stage3(raw, corners)
2. ResultObj result = processor.computeResult()
3. while (notAcceptable(result)):
4. tuneParams(processor, result) // tune to "more acceptable" params
5. result = processor.computeResult()
6. return result
I feel like the tuning process should be performed for each step by a queue executor, but I'm not yet sure how to accommodate the growing set of differently typed results , or the varying construction requirements of each stage.
Sample Pipeline
The point of this pipeline is to read a set of coordinates from a CSV (into String raw
) and pass the string through a pipeline that will parse the string, cluster the points, and plot the clusters. The cluster data and plot images are extracted from the pipeline after it is run.
The string is parsed into a collection of Point
objects at the "ExtractPoints"
stage. That collection is clustered/segmented into a set of Cluster
objects at the "ClusterPoints"
stage. The Point
and Cluster
data is used to plot the points visually into a Plot
image object at the "PlotClusters"
stage.
QueuedPipeline pipeline = new QueuedPipeline()
String raw = readFile("points.csv")
pipeline.setInput(raw)
// addStage() takes the name of the stage, the stage runner, and the names of the stages it needs results from
pipeline.addStage("ExtractPoints", new StageTuner(PointExtractor, ), [])
pipeline.addStage("ClusterPoints", new StageTuner(PointClusterer), ["ExtractPoints"])
pipeline.addStage("PlotClusters", new StageTuner(ClusterPlotter), ["ExtractPoints", "ClusterPoints"])
// tune and execute each stage with the StageTuners
PipelineResultList results = pipeline.run()
// pipeline is done, collect the interesting results
Result pointClusters = results.getResult("ClusterPoints")
Result clusterPlot = results.getResult("PlotClusters")
saveClusterFile(pointClusters)
saveCllusterPlotImage(clusterPlot)
- The classes
PointExtractor
,PointClusterer
, andClusterPlotter
all inherit fromStage
. - Each
StageTuner
takes the class of theStage
to tune and implements the above feedback loop. - Variables
pointClusters
andclusterPlots
are ofResult
type
Problem
But simply passing around classes for construction is a headache in some languages. Also, I'm not really sure how to generically construct and set parameters of each Stage
subclass because of the different number/type of parameters - perhaps each Stage
subclass needs a corresponding StageRunner
subclass. Finally, casting Result
objects to the type I need within the final two functions sounds dangerous. ...but these are just symptoms of my real problem: This is all starting to get complex and fuzzy for what I expected to be a straightforward problem.
Am I defining my objects and behaviours badly? Or is this not as straightforward a problem as I thought? Something else entirely?
-
2We could provide better insight if you provided a concrete example that this arrangement solves.Robert Harvey– Robert Harvey2015年05月20日 18:40:41 +00:00Commented May 20, 2015 at 18:40
-
@RobertHarvey I'd be happy to. By concrete, do you mean something like the sample code I provided, but with motivation and a problem description?kdbanman– kdbanman2015年05月20日 20:04:38 +00:00Commented May 20, 2015 at 20:04
-
Something like that. Something with real-world names instead of someObject and someResult.Robert Harvey– Robert Harvey2015年05月20日 20:08:51 +00:00Commented May 20, 2015 at 20:08
-
1@kdbanman A short answer is that there are essential complexity as discussed in "No Silver Bullets", which means because the "requirements" are complicated, you can'y simplify it beyond a certain point. When designed properly, the data processing pipeline can be highly reusable and will have long term value. To determine whether it is worth, you have to ask: does the project, and the "stages", create sufficient intellectual property value that will pay off? If not, you can use Perl, Python, MATLAB, and other similar data-processing languagesrwong– rwong2015年05月20日 21:14:05 +00:00Commented May 20, 2015 at 21:14
-
(continued) and in particular do not consider reusability or configurability of stages, and write "single-use code".rwong– rwong2015年05月20日 21:14:37 +00:00Commented May 20, 2015 at 21:14
2 Answers 2
This sort of problem is well-suited to dynamic typing. That will give you the most straightforward solution, with the obvious trade offs.
If you wish to use static typing, you'll have better luck if you don't centralize your pipeline construction. Your stages are the ones who know the most about the types of their dependencies and results, so you should give them the responsibility for managing that. I would start by making your example read something like this:
extractedPoints = new PointExtractor(raw)
clusterPoints = new PointClusterer(extractedPoints)
plot = new ClusterPlotter(extractedPoints, clusterPoints)
saveClusterFile(clusterPoints.result())
saveClusterPlotImage(plot.result())
Note this directly uses the type system, which makes for a very natural-looking and idiomatic syntax. Inside result()
you would call common code to handle the tuning and memoize the result. Because each calculation calls result()
on each dependency before performing its own calculation, your "queue" ends up getting naturally formed in the call stack, and memoization prevents multiple execution.
-
That is the kind of simplification I'm looking for. Easy to read and understand, modular and testable... Thank you!kdbanman– kdbanman2015年05月21日 15:30:32 +00:00Commented May 21, 2015 at 15:30
You might be able to model this using a workflow engine. You can model each of your stage as a workflow step and also parameterize each step. Depending on the engine chosen, manual action can be configured against each step.
The following post might help you decide if it is relevant to your use case: https://stackoverflow.com/questions/2353564/use-cases-of-the-workflow-engine
If your expertise is with Java and Spring then the following post will help: http://www.javaworld.com/article/2071865/web-app-frameworks/use-spring-to-create-a-simple-workflow-engine.html
-
2'm not really sure what that means, and the first few google hits aren't that helpul. Just FYI: unless you plan on fleshing this out with some definitions and/or application-specific details, this should likely be a comment, not an answer. See here.kdbanman– kdbanman2015年05月20日 22:42:44 +00:00Commented May 20, 2015 at 22:42
-
Thanks for adding the links. After scanning them and learning a bit about workflow engines, your idea is really interesting. There's an intriguing intersection between dynamically constructed processing pipelines and workflow engines. How would you write the sample problem code from my question with a workflow-style API?kdbanman– kdbanman2015年05月21日 15:17:59 +00:00Commented May 21, 2015 at 15:17
Explore related questions
See similar questions with these tags.