Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 9bb9cfb

Browse files
[SDP] PipelinesHandler and Pipeline Commands
1 parent dbc9c67 commit 9bb9cfb

File tree

5 files changed

+42
-25
lines changed

5 files changed

+42
-25
lines changed

‎docs/declarative-pipelines/GraphRegistrationContext.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ toDataflowGraph: DataflowGraph
3131

3232
`toDataflowGraph` is used when:
3333

34-
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [startRun](PipelinesHandler.md#startRun)
34+
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [start a pipeline run](PipelinesHandler.md#startRun)
3535

3636
## Tables { #tables }
3737

‎docs/declarative-pipelines/PipelineExecution.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
runPipeline(): Unit
1919
```
2020

21-
`runPipeline` [starts the pipeline](#startPipeline) and requests the [PipelineExecution](PipelineUpdateContext.md#pipelineExecution) (of this [PipelineUpdateContext](#context)) to [wait for the execution to complete](#awaitCompletion).
21+
`runPipeline` [starts this pipeline](#startPipeline) and requests the [PipelineExecution](PipelineUpdateContext.md#pipelineExecution) (of this [PipelineUpdateContext](#context)) to [wait for the execution to complete](#awaitCompletion).
2222

2323
---
2424

2525
`runPipeline` is used when:
2626

27-
* `PipelinesHandler` is requested to [startRun](PipelinesHandler.md#startRun) (for [Spark Connect]({{ book.spark_connect }}))
27+
* `PipelinesHandler` is requested to [start a pipeline run](PipelinesHandler.md#startRun)
2828

2929
## Start Pipeline { #startPipeline }
3030

‎docs/declarative-pipelines/PipelinesHandler.md

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
`PipelinesHandler` is used to [handle pipeline commands](#handlePipelinesCommand) in [Spark Connect]({{ book.spark_connect }}) ([SparkConnectPlanner]({{ book.spark_connect }}/server/SparkConnectPlanner), precisely).
44

5+
`PipelinesHandler` acts as a bridge between Python and SQL "frontends" and Spark Connect Server (where pipeline execution happens).
6+
57
## Handle Pipelines Command { #handlePipelinesCommand }
68

79
```scala
@@ -14,14 +16,14 @@ handlePipelinesCommand(
1416

1517
`handlePipelinesCommand` handles the given pipeline `cmd` command.
1618

17-
| PipelineCommand | Description |
18-
|-----------------|-------------|
19-
| `CREATE_DATAFLOW_GRAPH` | [Creates a new Dataflow Graph](#createDataflowGraph) |
20-
| `DROP_DATAFLOW_GRAPH` | [Drops a pipeline](#DROP_DATAFLOW_GRAPH) |
21-
| `DEFINE_DATASET` | [Defines a dataset](#DEFINE_DATASET) |
22-
| `DEFINE_FLOW` | [Defines a flow](#DEFINE_FLOW) |
23-
| `START_RUN` | [Starts a pipeline](#START_RUN) |
24-
| `DEFINE_SQL_GRAPH_ELEMENTS` | [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) |
19+
| PipelineCommand | Description | Initiator |
20+
|-----------------|-------------|-----------|
21+
| `CREATE_DATAFLOW_GRAPH` | [Creates a new dataflow graph](#CREATE_DATAFLOW_GRAPH)|[pyspark.pipelines.spark_connect_pipeline](#create_dataflow_graph) |
22+
| `DROP_DATAFLOW_GRAPH` | [Drops a pipeline](#DROP_DATAFLOW_GRAPH) ||
23+
| `DEFINE_DATASET` | [Defines a dataset](#DEFINE_DATASET) |[SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_dataset)|
24+
| `DEFINE_FLOW` | [Defines a flow](#DEFINE_FLOW) |[SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_flow)|
25+
| `START_RUN` | [Starts a pipeline run](#START_RUN)|[pyspark.pipelines.spark_connect_pipeline](#start_run) |
26+
| `DEFINE_SQL_GRAPH_ELEMENTS` | [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) |[SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_sql)|
2527

2628
`handlePipelinesCommand` reports an `UnsupportedOperationException` for incorrect commands:
2729

@@ -33,9 +35,13 @@ handlePipelinesCommand(
3335

3436
`handlePipelinesCommand` is used when:
3537

36-
* `SparkConnectPlanner` is requested to `handlePipelineCommand` (for `PIPELINE_COMMAND` command)
38+
* `SparkConnectPlanner` ([Spark Connect]({{ book.spark_connect }}/server/SparkConnectPlanner)) is requested to `handlePipelineCommand` (for `PIPELINE_COMMAND` command)
39+
40+
### CREATE_DATAFLOW_GRAPH { #CREATE_DATAFLOW_GRAPH }
3741

38-
### Define Dataset Command { #DEFINE_DATASET }
42+
`handlePipelinesCommand` [creates a dataflow graph](#createDataflowGraph) and sends the graph ID back.
43+
44+
### DEFINE_DATASET { #DEFINE_DATASET }
3945

4046
`handlePipelinesCommand` prints out the following INFO message to the logs:
4147

@@ -45,7 +51,7 @@ Define pipelines dataset cmd received: [cmd]
4551

4652
`handlePipelinesCommand` [defines a dataset](#defineDataset).
4753

48-
### Define Flow Command { #DEFINE_FLOW }
54+
### DEFINE_FLOW { #DEFINE_FLOW }
4955

5056
`handlePipelinesCommand` prints out the following INFO message to the logs:
5157

@@ -55,7 +61,17 @@ Define pipelines flow cmd received: [cmd]
5561

5662
`handlePipelinesCommand` [defines a flow](#defineFlow).
5763

58-
### Start Pipeline { #startRun }
64+
### START_RUN { #START_RUN }
65+
66+
`handlePipelinesCommand` prints out the following INFO message to the logs:
67+
68+
```text
69+
Start pipeline cmd received: [cmd]
70+
```
71+
72+
`handlePipelinesCommand` [starts a pipeline run](#startRun).
73+
74+
## Start Pipeline Run { #startRun }
5975

6076
```scala
6177
startRun(
@@ -64,21 +80,18 @@ startRun(
6480
sessionHolder: SessionHolder): Unit
6581
```
6682

67-
`startRun` prints out the following INFO message to the logs:
68-
69-
```text
70-
Start pipeline cmd received: [cmd]
71-
```
83+
??? note "`START_RUN` Pipeline Command"
84+
`startRun` is used when `PipelinesHandler` is requested to handle [proto.PipelineCommand.CommandTypeCase.START_RUN](#START_RUN) command.
7285

7386
`startRun` finds the [GraphRegistrationContext](GraphRegistrationContext.md) by `dataflowGraphId` in the [DataflowGraphRegistry](DataflowGraphRegistry.md) (in the given `SessionHolder`).
7487

7588
`startRun` creates a `PipelineEventSender` to send pipeline events back to the Spark Connect client (_Python pipeline runtime_).
7689

7790
`startRun` creates a [PipelineUpdateContextImpl](PipelineUpdateContextImpl.md) (with the `PipelineEventSender`).
7891

79-
In the end, `startRun` requests the `PipelineUpdateContextImpl` for the [PipelineExecution](PipelineExecution.md) to [runPipeline](PipelineExecution.md#runPipeline) or [dryRunPipeline](PipelineExecution.md#dryRunPipeline) for `dry-run` or `run` command, respectively.
92+
In the end, `startRun` requests the `PipelineUpdateContextImpl` for the [PipelineExecution](PipelineUpdateContext.md#pipelineExecution) to [run a pipeline](PipelineExecution.md#runPipeline) or [dry-run a pipeline](PipelineExecution.md#dryRunPipeline) for `dry-run` or `run` command, respectively.
8093

81-
### Create Dataflow Graph { #createDataflowGraph }
94+
## Create Dataflow Graph { #createDataflowGraph }
8295

8396
```scala
8497
createDataflowGraph(
@@ -90,7 +103,7 @@ createDataflowGraph(
90103

91104
`createDataflowGraph` returns the ID of the created dataflow graph.
92105

93-
### defineSqlGraphElements { #defineSqlGraphElements }
106+
## defineSqlGraphElements { #defineSqlGraphElements }
94107

95108
```scala
96109
defineSqlGraphElements(
@@ -100,7 +113,7 @@ defineSqlGraphElements(
100113

101114
`defineSqlGraphElements`...FIXME
102115

103-
### Define Dataset (Table or View) { #defineDataset }
116+
## Define Dataset (Table or View) { #defineDataset }
104117

105118
```scala
106119
defineDataset(
@@ -123,7 +136,7 @@ For unknown types, `defineDataset` reports an `IllegalArgumentException`:
123136
Unknown dataset type: [type]
124137
```
125138

126-
### Define Flow { #defineFlow }
139+
## Define Flow { #defineFlow }
127140

128141
```scala
129142
defineFlow(
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# UnresolvedFlow
2+
3+
`UnresolvedFlow` is...FIXME

‎docs/declarative-pipelines/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ Pipelines elements are defined in SQL files included as `definitions` in a [pipe
152152
Supported SQL statements:
153153

154154
* [CREATE FLOW AS INSERT INTO BY NAME](../sql/SparkSqlAstBuilder.md#visitCreatePipelineInsertIntoFlow)
155+
* ...
155156

156157
## Demo: Create Virtual Environment for Python Client
157158

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /