Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 85fcddf

Browse files
[SDP] StreamingFlowExecution and StreamingTableWrite
1 parent 5362247 commit 85fcddf

File tree

4 files changed

+106
-9
lines changed

4 files changed

+106
-9
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# StreamingFlow
2+
3+
`StreamingFlow` is...FIXME
Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,63 @@
11
# StreamingFlowExecution
22

3-
`StreamingFlowExecution` is...FIXME
3+
`StreamingFlowExecution` is an [extension](#contract) of the [FlowExecution](FlowExecution.md) abstraction for [streaming flow executions](#implementations) that process data statefully using [Spark Structured Streaming]({{ book.structured_streaming }}).
4+
5+
## Contract
6+
7+
### Execute Streaming Query { #startStream }
8+
9+
```scala
10+
startStream(): StreamingQuery
11+
```
12+
13+
See:
14+
15+
* [StreamingTableWrite](StreamingTableWrite.md#startStream)
16+
17+
Used when:
18+
19+
* `StreamingFlowExecution` is requested to [executeInternal](#executeInternal)
20+
21+
### Streaming Trigger { #trigger }
22+
23+
```scala
24+
trigger: Trigger
25+
```
26+
27+
See:
28+
29+
* [StreamingTableWrite](StreamingTableWrite.md#trigger)
30+
31+
Used when:
32+
33+
* `FlowPlanner` is requested to [plan a StreamingFlow](FlowPlanner.md#plan)
34+
* `StreamingTableWrite` is requested to [execute the streaming query](StreamingTableWrite.md#startStream)
35+
36+
## Implementations
37+
38+
* [StreamingTableWrite](StreamingTableWrite.md)
39+
40+
## executeInternal { #executeInternal }
41+
42+
??? note "FlowExecution"
43+
44+
```scala
45+
executeInternal(): Future[Unit]
46+
```
47+
48+
`executeInternal` is part of the [FlowExecution](FlowExecution.md#executeInternal) abstraction.
49+
50+
`executeInternal` prints out the following INFO message to the logs:
51+
52+
```text
53+
Starting [identifier] with checkpoint location [checkpointPath]"
54+
```
55+
56+
`executeInternal` [starts the stream](#startStream) (with this [SparkSession](FlowExecution.md#spark) and [sqlConf](#sqlConf)).
57+
58+
In the end, `executeInternal` awaits termination of the `StreamingQuery`.
59+
60+
??? note "Final Method"
61+
`executeInternal` is a Scala **final method** and may not be overridden in [subclasses](#implementations).
62+
63+
Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#final).
Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,46 @@
11
# StreamingTableWrite
22

3-
## startStream { #startStream }
3+
`StreamingTableWrite` is a [StreamingFlowExecution](StreamingFlowExecution.md).
44

5-
```scala
6-
startStream(): StreamingQuery
7-
```
5+
When [executed](#startStream), `StreamingTableWrite` starts a streaming query to append new rows to an [output table](#destination).
86

9-
`startStream`...FIXME
7+
## Creating Instance
108

11-
---
9+
`StreamingTableWrite` takes the following to be created:
1210

13-
`startStream` is used when:
11+
* <span id="identifier"> [TableIdentifier](FlowExecution.md#identifier)
12+
* <span id="flow"> [ResolvedFlow](StreamingFlowExecution.md#flow)
13+
* <span id="graph"> [DataflowGraph](DataflowGraph.md)
14+
* <span id="updateContext"> [PipelineUpdateContext](FlowExecution.md#updateContext)
15+
* <span id="checkpointPath"> [Checkpoint Path](StreamingFlowExecution.md#checkpointPath)
16+
* <span id="trigger"> [Streaming Trigger](StreamingFlowExecution.md#trigger)
17+
* <span id="destination"> [Output table](FlowExecution.md#destination)
18+
* <span id="sqlConf"> [SQL Configuration](StreamingFlowExecution.md#sqlConf)
1419

15-
* FIXME
20+
`StreamingTableWrite` is created when:
21+
22+
* `FlowPlanner` is requested to [plan a StreamingFlow](FlowPlanner.md#plan)
23+
24+
## Execute Streaming Query { #startStream }
25+
26+
??? note "StreamingFlowExecution"
27+
28+
```scala
29+
startStream(): StreamingQuery
30+
```
31+
32+
`startStream` is part of the [StreamingFlowExecution](StreamingFlowExecution.md#startStream) abstraction.
33+
34+
`startStream` builds the logical query plan of this [flow](#flow)'s structured query (requesting the [DataflowGraph](#graph) to [reanalyze](DataflowGraph.md#reanalyzeFlow) this [flow](#flow)).
35+
36+
`startStream` creates a `DataStreamWriter` ([Spark Structured Streaming]({{ book.structured_streaming }}/DataStreamWriter/)) with the following:
37+
38+
`DataStreamWriter`'s Property | Value
39+
-|-
40+
`queryName` | This [displayName](FlowExecution.md#displayName)
41+
`checkpointLocation` option | This [checkpoint path](#checkpointPath)
42+
`trigger` | This [streaming trigger](#trigger)
43+
`outputMode` | [Append]({{ book.structured_streaming }}/OutputMode/#append) (always)
44+
`format` | The [format](Table.md#format) of this [output table](#destination) (only when defined)
45+
46+
In the end, `startStream` starts the streaming write query to this [output table](#destination).

‎docs/declarative-pipelines/Table.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Table
2+
3+
`Table` is...FIXME

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /