Fugue is a unified interface for distributed computing that lets users execute Python, Pandas, and SQL code on Spark, Dask, and Ray with minimal rewrites.
Based on the "Science and Data Analysis" category.
Alternatively, view fugue alternatives based on common mentions on social networks and blogs.
* Code Quality Rankings and insights are calculated and provided by Lumnify.
They vary from L1 to L5 with "L5" being the highest.
Do you think we are missing an alternative of #<Sawyer::Resource:0x00007f547e829e00> or a related project?
PyPI version PyPI pyversions PyPI license codecov Codacy Badge Downloads
| Documentation | Tutorials | Chat with us on slack! |
|---|---|---|
| Doc | Jupyter Book Badge | Slack Status |
Fugue is a unified interface for distributed computing that lets users execute Python, pandas, and SQL code on Spark, Dask and Ray without rewrites.
The most common use cases are:
For a more comprehensive overview of Fugue, read this article.
Fugue can be installed through pip or conda. For example:
pip install fugue
It also has the following extras:
For example a common use case is:
pip install fugue[duckdb,spark]
Notice that installing extras may not be necessary. For example if you already installed Spark or DuckDB independently, Fugue is able to automatically enable the support for them.
The best way to get started with Fugue is to work through the 10 minute tutorials:
The tutorials can also be run in an interactive notebook environment through binder or Docker:
Note it runs slow on binder because the machine on binder isn't powerful enough for a distributed framework such as Spark. Parallel executions can become sequential, so some of the performance comparison examples will not give you the correct numbers.
Alternatively, you should get decent performance by running this Docker image on your own machine:
docker run -p 8888:8888 fugueproject/tutorials:latest
For the API docs, click here
The simplest way to use Fugue is the transform() function. This lets users parallelize the execution of a single function by bringing it to Spark, Dask or Ray. In the example below, the map_letter_to_food() function takes in a mapping and applies it on a column. This is just pandas and Python so far (without Fugue).
import pandas as pd
from typing import Dict
input_df = pd.DataFrame({"id":[0,1,2], "value": (["A", "B", "C"])})
map_dict = {"A": "Apple", "B": "Banana", "C": "Carrot"}
def map_letter_to_food(df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
df["value"] = df["value"].map(mapping)
return df
Now, the map_letter_to_food() function is brought to the Spark execution engine by invoking the transform function of Fugue. The output schema, params and engine are passed to the transform() call. The schema is needed because it's a requirement on Spark. A schema of "*" below means all input columns are in the output.
from pyspark.sql import SparkSession
from fugue import transform
spark = SparkSession.builder.getOrCreate()
df = transform(input_df,
map_letter_to_food,
schema="*",
params=dict(mapping=map_dict),
engine=spark
)
df.show()
+---+------+
| id| value|
+---+------+
| 0| Apple|
| 1|Banana|
| 2|Carrot|
+---+------+
PySpark equivalent of Fugue transform
from typing import Iterator, Union
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame, SparkSession
spark_session = SparkSession.builder.getOrCreate()
def mapping_wrapper(dfs: Iterator[pd.DataFrame], mapping):
for df in dfs:
yield map_letter_to_food(df, mapping)
def run_map_letter_to_food(input_df: Union[DataFrame, pd.DataFrame], mapping):
# conversion
if isinstance(input_df, pd.DataFrame):
sdf = spark_session.createDataFrame(input_df.copy())
else:
sdf = input_df.copy()
schema = StructType(list(sdf.schema.fields))
return sdf.mapInPandas(lambda dfs: mapping_wrapper(dfs, mapping),
schema=schema)
result = run_map_letter_to_food(input_df, map_dict)
result.show()
This syntax is simpler, cleaner, and more maintainable than the PySpark equivalent. At the same time, no edits were made to the original pandas-based function to bring it to Spark. It is still usable on pandas DataFrames. Because the Spark execution engine was used, the returned df is now a Spark DataFrame. Fugue transform() also supports Dask, Ray and pandas as execution engines.
A SQL-based language capable of expressing end-to-end workflows. The map_letter_to_food() function above is used in the SQL expression below. This is how to use a Python-defined transformer along with the standard SQL SELECT statement.
from fugue_sql import fsql
import json
query = """
SELECT id, value FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
PRINT
"""
map_dict_str = json.dumps(map_dict)
fsql(query,mapping=map_dict_str).run()
For FugueSQL, we can change the engine by passing it to the run() method: fsql(query,mapping=map_dict_str).run(spark).
There is an accompanying notebook extension for FugueSQL that lets users use the %%fsql cell magic. The extension also provides syntax highlighting for FugueSQL cells. It works for both classic notebook and Jupyter Lab. More details can be found in the installation instructions.
FugueSQL gif
By being an abstraction layer, Fugue can be used with a lot of other open-source projects seamlessly.
Fugue can use the following projects as backends:
Fugue is available as a backend or can integrate with the following projects:
View some of our latest conferences presentations and content. For a more complete list, check the Resources page in the tutorials.
Feel free to message us on Slack. We also have [contributing instructions](CONTRIBUTING.md).
*Note that all licence references and agreements mentioned in the #<Sawyer::Resource:0x00007f547e829e00> README section above
are relevant to that project's source code only.
Do not miss the trending, packages, news and articles with our weekly report.