@@ -6,22 +6,126 @@ subtitle: ⚠️ 4.1.0-SNAPSHOT
6
6
7
7
** Spark Declarative Pipelines (SDP)** is a declarative framework for building ETL pipelines on Apache Spark using Python or SQL.
8
8
9
- !!! danger
9
+ ??? warning "Apache Spark 4.1.0-SNAPSHOT"
10
10
Declarative Pipelines framework is only available in the development branch of Apache Spark 4.1.0-SNAPSHOT.
11
11
12
12
Declarative Pipelines has not been released in any Spark version yet.
13
13
14
- Streaming flows are backed by streaming sources, and batch flows are backed by batch sources.
14
+ ```console
15
+ ❯ $SPARK_HOME/bin/pyspark --version
16
+ Welcome to
17
+ ____ __
18
+ / __/__ ___ _____/ /__
19
+ _\ \/ _ \/ _ `/ __/ '_/
20
+ /___/ .__/\_,_/_/ /_/\_\ version 4.1.0-SNAPSHOT
21
+ /_/
22
+
23
+ Using Scala version 2.13.16, OpenJDK 64-Bit Server VM, 17.0.16
24
+ Branch master
25
+ Compiled by user jacek on 2025年08月04日T11:30:08Z
26
+ Revision 6ef9a9d340539fc870acca042bd036f33ea995c3
27
+ Url https://github.com/apache/spark.git
28
+ Type --help for more information.
29
+ ```
15
30
16
- Declarative Pipelines uses the following [ Python decorators ] ( https://peps.python.org/pep-0318/ ) to describe tables and views:
31
+ Streaming flows are backed by streaming sources, and batch flows are backed by batch sources.
17
32
18
- * ` @sdp.materialized_view ` for materialized views
19
- * ` @sdp.table ` for streaming and batch tables
33
+ Declarative Pipelines uses [ Python decorators] ( #python-decorators ) to describe tables, views and flows, declaratively.
20
34
21
35
[ DataflowGraph] ( DataflowGraph.md ) is the core graph structure in Declarative Pipelines.
22
36
23
37
Once described, a pipeline can be [ started] ( PipelineExecution.md#runPipeline ) (on a [ PipelineExecution] ( PipelineExecution.md ) ).
24
38
39
+ ## Python Decorators for Datasets and Flows { #python-decorators }
40
+
41
+ Declarative Pipelines uses the following [ Python decorators] ( https://peps.python.org/pep-0318/ ) to describe tables and views:
42
+
43
+ * [ @sdp .materialized_view] ( #materialized_view ) for materialized views
44
+ * [ @sdp .table] ( #table ) for streaming and batch tables
45
+
46
+ ### pyspark.pipelines Python Module { #pyspark_pipelines }
47
+
48
+ ` pyspark.pipelines ` module (in ` __init__.py ` ) imports ` pyspark.pipelines.api ` module to expose the following Python decorators to wildcard imports:
49
+
50
+ * [ append_flow] ( #append_flow )
51
+ * [ create_streaming_table] ( #create_streaming_table )
52
+ * [ materialized_view] ( #materialized_view )
53
+ * [ table] ( #table )
54
+ * [ temporary_view] ( #temporary_view )
55
+
56
+ Use the following import in your Python code:
57
+
58
+ ``` py
59
+ from pyspark import pipelines as sdp
60
+ ```
61
+
62
+ ### @sdp .append_flow { #append_flow }
63
+
64
+ ### @sdp .create_streaming_table { #create_streaming_table }
65
+
66
+ ### @sdp .materialized_view { #materialized_view }
67
+
68
+ ### @sdp .table { #table }
69
+
70
+ ### @sdp .temporary_view { #temporary_view }
71
+
72
+ ## Demo: Python API
73
+
74
+ In a terminal, start a Spark Connect Server.
75
+
76
+ ``` bash
77
+ ./sbin/start-connect-server.sh
78
+ ```
79
+
80
+ It will listen on port 15002.
81
+
82
+ ??? note "Tip"
83
+ Review the logs with ` tail -f ` .
84
+
85
+ Start a Spark Connect-enabled PySpark shell.
86
+
87
+ ``` bash
88
+ $SPARK_HOME /bin/pyspark --remote sc://localhost:15002
89
+ ```
90
+
91
+ ``` py
92
+ from pyspark.pipelines.spark_connect_pipeline import create_dataflow_graph
93
+ dataflow_graph_id = create_dataflow_graph(
94
+ spark,
95
+ default_catalog = None ,
96
+ default_database = None ,
97
+ sql_conf = None ,
98
+ )
99
+
100
+ # >>> print(dataflow_graph_id)
101
+ # 3cb66d5a-0621-4f15-9920-e99020e30e48
102
+ ```
103
+
104
+ ``` py
105
+ from pyspark.pipelines.spark_connect_graph_element_registry import SparkConnectGraphElementRegistry
106
+ registry = SparkConnectGraphElementRegistry(spark, dataflow_graph_id)
107
+ ```
108
+
109
+ ``` py
110
+ from pyspark import pipelines as sdp
111
+ ```
112
+
113
+ ``` py
114
+ from pyspark.pipelines.graph_element_registry import graph_element_registration_context
115
+ with graph_element_registration_context(registry):
116
+ sdp.create_streaming_table(" demo_streaming_table" )
117
+ ```
118
+
119
+ You should see the following INFO message in the logs of the Spark Connect Server:
120
+
121
+ ``` text
122
+ INFO PipelinesHandler: Define pipelines dataset cmd received: define_dataset {
123
+ dataflow_graph_id: "3cb66d5a-0621-4f15-9920-e99020e30e48"
124
+ dataset_name: "demo_streaming_table"
125
+ dataset_type: TABLE
126
+ }
127
+ ```
128
+
25
129
## Demo: spark-pipelines CLI
26
130
27
131
``` bash
0 commit comments