Today I’d like to introduce a new library we’ve been working on - dataflows. DataFlows is a part of a larger conceptual framework for data processing.
We call it ‘Data Factory’ - an open framework for building and running lightweight data processing workflows quickly and easily. LAMP for data wrangling!
Most of you already know what Data Packages are. In short, it is a portable format for packaging different resources (tabular or otherwise) in a standard way that takes care of most interoperability problems (e.g. "what’s the character encoding of the file?" or "what is the data type for this column?" or "which date format are they using?"). It also provides rich and flexible metadata, which users can then use to understand what the data is about (take a look at frictionlessdata.io to learn more!).
Data Factory complements the Data Package concepts by adding dynamics to the mix.
While Data Packages are a great solution for describing data sets, these data sets are always static - located in one place. Data Factory is all about transforming Data Packages - modifying their data or meta-data and transmitting them from one location to another.
Data Factory defines standard interfaces for building processors - software modules for mutating a Data Package - and protocols for streaming the contents of a Data Package for efficient processing.
Data Factory is more pattern/convention than library.
An analogy is with web frameworks. Web frameworks were more about a core pattern plus a set of ready to use components than a library themselves. For example, python frameworks were built around WSGI e.g. Pylons, Flask etc. Or consider ExpressJS for Node.
In this sense these frameworks were about convention over configuration. They attempted to decrease the number of decisions that a developer using the framework is required to make without necessarily losing flexibility.
Like web frameworks, Data Factory uses convention over configuration with the aim of decreasing the number of decisions that a data developer is required to make without necessarily losing flexibility.
By following a standard scheme, developers are able to use a large and growing library of existing, reusable processors. This also increases readability and maintainability of data processing code.
Our focus is on:
What are we not?
This new framework is built on the foundations of the Frictionless Data project - both conceptually as well as technically. This project provided us the definition of Data Packages and the software to read and write these packages.
On top of this Frictionless Data basis, we’re introducing a few new concepts:
- the Data Stream - essentially a Data Package in transit;
- the Data Processor, which manipulates a Data Package, receiving one Data Stream as its input and producing a new Data Stream as its output.
- A chain of Data Processors is what we call a Data Flow.
We will be providing a library of such processors: some for loading data from various sources, some for storing data in different locations, services or databases, and some for doing common manipulation and transformation on the processed data.
On top of all that we’re building a few integrated services:
dataflows-server (formerly known as datapackage-pipelines) - a server side multi-processor runner for Data Flows.dataflows-cli - a client library for building and running Data Flows locallydataflows-blueprints - ready made flow generators for common scenarios (e.g. ‘I want to regularly pull all my analytics from these X services and dump them in a database’)Data Factory
In our experience, data processing starts simple - downloading and inspecting a CSV, deleting a column or a row. We wanted something that was as fast as the command line to get started but would also provide a solid basis as your pipeline grows. We also wanted something that provided some standardization and conventions over completely bespoke code.
With integration in mind, DataFlows comes with very little environmental requirements, and can be embedded in your existing data processing setup.
In short, DataFlows provides a simple, quick and easy-to-setup, and extensible way to build lightweight data processing pipelines.
The first piece of software we’re introducing today is dataflows and its standard library of processors.
dataflows introduces the concept of a Flow - a chain of data processors, reading, transforming and modifying a stream of data and writing it to any location (or loading it to memory for further analysis).
dataflows also comes with a rich set of built-in data processors, ready to do most of the heavy-lifting you’ll need to reduce boilerplate code and increase your productivity.
Most data processing starts simple: getting a file and having a look.
With dataflows you can do this in a few seconds and you’ll have a solid basis for whatever you want to do next.
Bootstrapping a data processing script
$ pip install dataflows
$ dataflows init https://rawgit.com/datahq/demo/_/first.csv
Writing processing code into first_csv.py
Running first_csv.py
first:
# Name Composed DOB
(string) (string) (date)
--- ---------- ---------- ----------
1 George 22 1943年02月25日
2 John 90 1940年10月09日
3 Richard 2 1940年07月07日
4 Paul 88 1942年06月18日
5 Brian n/a 1934年09月19日
Done!
dataflows init actually does 3 things:
In our case, a script named first_csv.py was created - here’s what it contains:
# ...
def first_csv():
flow = Flow(
# Load inputs
load('https://rawgit.com/datahq/demo/_/first.csv',
format='csv', ),
# Process them (if necessary)
# Save the results
add_metadata(name='first_csv', title='first.csv'),
printer(),
)
flow.process()
if __name__ == '__main__':
first_csv()
The flow variable contains the chain of processing steps (i.e. the processors). In this simple flow, load loads the source data, add_metadata modifies the file’s metadata and printer outputs the contents to the standard output.
You can run this script again at any time, and it will re-run the processing flow:
$ python first_csv.py
first:
# Name Composed DOB
(string) (string) (date)
--- ---------- ---------- ----------
1 George 22 1943年02月25日
...
This is all very nice, but now it’s time for some real data wrangling. By editing the processing script it’s possible to add more functionality to the flow - dataflows provides a simple, solid basis for building up your pipeline quickly, reliably and repeatedly.
Fixing some bad data
Let’s start by getting rid of that annoying n/a in the last line of the data.
We edit first_csv.py and add to the flow two more steps:
def removeNA(row):
row['Composed'] = row['Composed'].replace('n/a', '')
f = Flow(
load('https://rawgit.com/datahq/demo/_/first.csv'),
# added here custom processing:
removeNa,
# now parse column as Integer:
set_type('Composed', type='integer'),
printer()
)
removeNa is a simple function which modifies each row it sees, replacing n/as with the empty string. After it we call set_type, which declares the Composedcolumn should be an integer - and verifies it’s indeed an integer while processing data.
Writing the cleaned data
Finally, let’s write the output to a file using the dump_to_path processor:
def removeNA(row):
row['Composed'] = row['Composed'].replace('n/a', '')
f = Flow(
load('https://rawgit.com/datahq/demo/_/first.csv'),
add_metadata(
name='beatles_infoz',
title='Beatle Member Information',
),
removeNa,
set_type('Composed', type='integer'),
dump_to_path('first_csv/')
)
Now, we re-run our modified processing script...
$ python first_csv.py
...
we get a valid Data Package which we can use...
$ tree
├── first_csv
│ ├── datapackage.json
│ └── first.csv
which contains a normalized and cleaned-up CSV file...
$ head out/out.csv
Name,Composed,DOB
George,22,1943年02月25日
John,90,1940年10月09日
Richard,2,1940年07月07日
Paul,88,1942年06月18日
Brian,,1934年09月19日
datapackage.json, a JSON file containing the package’s metadata...
$ cat first_csv/datapackage.json # Edited for brevity
{
"count_of_rows": 5,
"name": "beatles_infoz",
"title": "Beatle Member Information",
"resources": [
{
"name": "first",
"path": "first.csv",
"schema": {
"fields": [
{"name": "Name", "type": "string"},
{"name": "Composed", "type": "integer"},
{"name": "DOB", "type": "date"}
]
}
}
]
}
and is very simple to use in Python (or JS, Ruby, PHP and many other programming languages) -
$ python
>>> from datapackage import Package
>>> p = Package('first_csv/datapackage.json')
>>> list(p.resources[0].iter())
[['George', 22, datetime.date(1943, 2, 25)],
['John', 90, datetime.date(1940, 10, 9)],
['Richard', 2, datetime.date(1940, 7, 7)],
['Paul', 88, datetime.date(1942, 6, 18)],
['Brian', None, datetime.date(1934, 9, 19)]]
>>>
Lots, lots more - there is a whole suite of processors built in plus you can quickly add your own with a few lines of python code.
Dig in at the project’s GitHub Page or continue reading the in-depth tutorial here.
Follow @okfnlabs
After 6 years at Google, Daniel Fireman is currently a Ph.D. student, professor and activist for government transparency and accountability...
Today we’re releasing a major version for datapackage-pipelines, version 2.0.0. This new version marks a big step forward in realizing...
Data Factory is an open framework for building and running lightweight data processing workflows quickly and easily. We recommend reading...
Matt Thompson was one of 2017’s Frictionless Data Tool Fund grantees tasked with extending implementation of core Frictionless Data data...
Georges Labrèche was one of 2017’s Frictionless Data Tool Fund grantees tasked with extending implementation of core Frictionless Data libraries...
On March 3, communities around the world marked Open Data Day in over 400 events. Here’s the dataset for all...