Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit cbccb7b

Browse files
docs:专栏更新
1 parent 92136ec commit cbccb7b

8 files changed

+258
-26
lines changed

‎docs/.vuepress/config.js‎

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ module.exports = {
164164
text: '数据中台',
165165
items: [{
166166
text: '00-新一代数据栈将逐步替代国内单一"数据中台"',
167-
link: '/md/biz-arch/00-新一代数据栈将逐步替代国内单一"数据中台".md'
167+
link: '/md/bigdata/00-新一代数据栈将逐步替代国内单一"数据中台".md'
168168
}, ]
169169
},
170170

@@ -468,6 +468,14 @@ module.exports = {
468468
link: '/md/ck/clickhouse概述.md'
469469
}]
470470
},
471+
472+
{
473+
text: 'HBase',
474+
items: [{
475+
text: 'HBase概述',
476+
link: '/md/hbase/hbase-scan.md'
477+
}]
478+
},
471479

472480
{
473481
text: 'Neo4j',
@@ -503,6 +511,14 @@ module.exports = {
503511
]
504512
},
505513

514+
{
515+
text: '数据中台',
516+
items: [{
517+
text: '01-大数据的尽头是数据中台吗?',
518+
link: '/md/bigdata/01-大数据的尽头是数据中台吗?.md'
519+
}]
520+
},
521+
506522
{
507523
text: 'Hadoop',
508524
items: [{
@@ -847,17 +863,6 @@ module.exports = {
847863
"打造一个高并发的十万用户 IM 聊天系统,你需要了解这些架构设计技巧!",
848864
]
849865
},
850-
{
851-
title: "数据中台",
852-
collapsable: false,
853-
sidebarDepth: 0,
854-
children: [
855-
"00-新一代数据栈将逐步替代国内单一"数据中台"",
856-
"01-大数据的尽头是数据中台吗?",
857-
"03-构建数据中台的三要素:方法论、组织和技术",
858-
"05-如何统一管理纷繁杂乱的数据指标?",
859-
]
860-
},
861866

862867
{
863868
title: "用户画像",
@@ -923,6 +928,17 @@ module.exports = {
923928
"02-分布式对象存储设计原理",
924929
]
925930
},
931+
{
932+
title: "数据中台",
933+
collapsable: false,
934+
sidebarDepth: 0,
935+
children: [
936+
"00-新一代数据栈将逐步替代国内单一"数据中台"",
937+
"01-大数据的尽头是数据中台吗?",
938+
"03-构建数据中台的三要素:方法论、组织和技术",
939+
"05-如何统一管理纷繁杂乱的数据指标?",
940+
]
941+
},
926942
{
927943
title: "Hadoop",
928944
collapsable: false,
@@ -1834,6 +1850,7 @@ module.exports = {
18341850
"flink-architecture",
18351851
"flink-state-management",
18361852
"05-Flink实战DataStream API编程",
1853+
"streaming-connectors-programming",
18371854
"flink-data-latency-solution",
18381855
"flink-cep",
18391856
"flink-checkpoint",
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

‎docs/md/flink/05-Flink实战DataStream API编程.md‎

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,6 @@ public static void map(StreamExecutionEnvironment env) {
560560
return new Access(time, domain, traffic);
561561
}
562562
});
563-
564563
mapStream.print();
565564
}
566565
```
@@ -630,14 +629,14 @@ public class JavaDataStreamTransformationApp {
630629

631630
### 5.3 split拆分
632631

633-
DataStream→SplitStream
634-
根据某些标准将流拆分为两个或更多个流。
635-
![](https://img-blog.csdnimg.cn/2019072023312897.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_SmF2YUVkZ2U=,size_16,color_FFFFFF,t_70)
632+
DataStream→SplitStream。按标准将流拆分为两个或更多个流:
633+
634+
![](https://my-img.javaedge.com.cn/javaedge-blog/2024/08/024431d0d5640d04b46a5d1799540e68.png)
636635

637636
### 5.4 select
638637

639-
SplitStream→DataStream
640-
从拆分流中选择一个或多个流。
638+
SplitStream→DataStream。从拆分流中选择一个或多个流:
639+
641640
![](https://img-blog.csdnimg.cn/20190720234320281.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_SmF2YUVkZ2U=,size_16,color_FFFFFF,t_70)
642641

643642
```scala
@@ -670,6 +669,24 @@ public static void splitSelectFunction(StreamExecutionEnvironment env) {
670669
...
671670
```
672671

672+
### FlatMap
673+
674+
DataStream → DataStream。接收一个元素并产生0、1或多个元素。 将句子分割成单词的平面映射函数:
675+
676+
```java
677+
dataStream.flatMap(new FlatMapFunction<String, String>() {
678+
@Override
679+
public void flatMap(String value, Collector<String> out)
680+
throws Exception {
681+
for(String word: value.split(" ")){
682+
out.collect(word);
683+
}
684+
}
685+
});
686+
```
687+
688+
689+
673690
## 6 Data Sinks
674691

675692
Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。Flink 自带多种内置的输出格式,封装在 DataStreams 的算子:
@@ -760,15 +777,19 @@ public class JavaCustomSinkToMySQL {
760777

761778
### 自定义Sink总结
762779

763-
- RichSinkFunction<T>
764-
T就是你想要写入对象的类型
765-
- 重写方法
766-
open/ close
767-
生命周期方法
768-
invoke
769-
每条记录执行一次
770-
数据接收器使用DataStream并将它们转发到文件,套接字,外部系统或打印它们。Flink带有各种内置输出格式,这些格式封装在DataStreams上的 算子操作后面:
780+
```java
781+
RichSinkFunction<T>
782+
```
783+
784+
T就是你想要写入对象的类型
771785

786+
重写方法:open/ close 生命周期方法
787+
788+
invoke:每条记录执行一次
789+
790+
数据接收器使用DataStream并将它们转发到文件,套接字,外部系统或打印它们。Flink带有各种内置输出格式,这些格式封装在DataStreams上的 算子操作后面:
791+
792+
```
772793
writeAsText()/ TextOutputFormat- 按字符串顺序写入元素。通过调用每个元素的toString()方法获得字符串。
773794
774795
writeAsCsv(...)/ CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
@@ -782,5 +803,6 @@ writeToSocket - 根据a将元素写入套接字 SerializationSchema
782803
addSink - 调用自定义接收器函数。Flink捆绑了其他系统(如Apache Kafka)的连接器,这些系统实现为接收器函数。
783804
784805
write*()方法DataStream主要用于调试目的。他们没有参与Flink的检查点,这意味着这些函数通常具有至少一次的语义。刷新到目标系统的数据取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的数据元都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。
806+
```
785807

786808
要将流可靠,准确地一次传送到文件系统,请使用flink-connector-filesystem。此外,通过该.addSink(...)方法的自定义实现可以参与Flink的精确一次语义检查点。
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Streaming Connectors 编程
2+
3+
Flink已内置很多Source、Sink。
4+
5+
## 2 附带的连接器(Flink Project Connectors)
6+
7+
连接器可和第三方系统交互,目前支持:
8+
9+
- 文件
10+
- 目录
11+
- socket
12+
- 从集合和迭代器摄取数据
13+
14+
预定义的数据接收器支持写入:
15+
16+
- 文件
17+
- 标准输入输出
18+
- socket
19+
20+
### 1.2 绑定连接器
21+
22+
连接器提供用于与各种第三方系统连接的代码。目前支持:
23+
24+
- [Apache Kafka](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/kafka/) (source/sink)
25+
- [Elasticsearch](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/elasticsearch/) (sink)
26+
- [Apache Pulsar](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/pulsar/) (source)
27+
- [JDBC](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/jdbc/) (sink)
28+
29+
使用一种连接器时,通常需额外的第三方组件,如数据存储服务器或MQ。 这些列举的连接器是 Flink 工程的一部分,包含在发布的源码中,但不包含在二进制发行版中。
30+
31+
## 3 Apache Bahir中的连接器
32+
33+
Flink额外的连接器,通过 [Apache Bahir](https://bahir.apache.org/) 发布,:
34+
35+
- [Redis](https://bahir.apache.org/docs/flink/current/flink-streaming-redis/) (sink)
36+
- [Akka](https://bahir.apache.org/docs/flink/current/flink-streaming-akka/) (sink)
37+
- [Netty](https://bahir.apache.org/docs/flink/current/flink-streaming-netty/) (source)
38+
39+
## 4 连接Flink的其它方法
40+
41+
### 4.1 异步I / O
42+
43+
使用connector不是将数据输入和输出Flink的唯一方法。一种常见的模式:从外部DB或 Web 服务查询数据得到初始数据流,然后 `Map``FlatMap` 对初始数据流进行丰富和增强。
44+
45+
Flink 提供[异步 I/O](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/operators/asyncio/) API 让这过程更简单、高效和稳定。
46+
47+
### 4.2 可查询状态
48+
49+
当 Flink 应用程序需向外部存储推送大量数据时会导致 I/O 瓶颈问题出现。此时,若对数据的读操作<<写操作,则让外部应用从 Flink 拉取所需的数据更好。 [可查询状态](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/fault-tolerance/queryable_state/) 接口可以实现这个功能,该接口允许被 Flink 托管的状态可以被按需查询。
50+
51+
看几个具体的连接器。
52+
53+
参考
54+
55+
- [Streaming Connectors](https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html)
56+
- [Kafka官方文档](

‎docs/md/hbase/hbase-scan.md‎

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# 轻松应对亿级数据,HBase Scan读取速度翻倍!
2+
3+
基于Hadoop的分布式列存储数据库,支持大规模结构化数据的存储和随机访问。
4+
5+
## 1 概念
6+
7+
扫描是一种读取表中数据的方式,它可以按照一定的条件过滤出表中符合条件的一部分或全部数据,并返回给用户。
8+
9+
HBase的扫描是基于rowkey的顺序扫描,可设置startRow、stopRow限制扫描范围,还可设置过滤器来进一步过滤数据。
10+
11+
## 2 扫描的使用
12+
13+
可通过HBase Shell、Java API和REST API操作,本文以Java API为例介绍。
14+
15+
### 2.1 创建扫描对象
16+
17+
```java
18+
// 创建一个Scan对象用于设置扫描的参数
19+
Scan scan = new Scan();
20+
```
21+
22+
### 2.2 设置扫描的范围
23+
24+
```java
25+
scan.setStartRow(Bytes.toBytes("startRow"));
26+
scan.setStopRow(Bytes.toBytes("stopRow"));
27+
```
28+
29+
设置扫描的起始行和结束行,可通过Bytes.toBytes方法将字符串转换为字节数组。
30+
31+
### 设置过滤器
32+
33+
```java
34+
Filter filter = new SingleColumnValueFilter(Bytes.toBytes("cf"), Bytes.toBytes("col"), CompareOperator.EQUAL, Bytes.toBytes("value"));
35+
scan.setFilter(filter);
36+
```
37+
38+
设置过滤器,可以通过SingleColumnValueFilter等过滤器来过滤数据。
39+
40+
### 执行扫描
41+
42+
```java
43+
ResultScanner scanner = table.getScanner(scan);
44+
for (Result result : scanner) {
45+
// 处理扫描结果
46+
}
47+
scanner.close();
48+
```
49+
50+
执行扫描并遍历结果,可以通过ResultScanner获取扫描结果,并通过for循环遍历结果。最后记得关闭ResultScanner。
51+
52+
## 3 性能优化
53+
54+
### 3.1 设置缓存和批量大小
55+
56+
```java
57+
scan.setCaching(100);
58+
scan.setBatch(10);
59+
```
60+
61+
设置扫描的缓存大小和批量大小,可有效减少RPC调用次数。
62+
63+
### 3.2 避免全表扫描
64+
65+
设置扫描的范围和过滤器等方式来限制扫描的范围。
66+
67+
### 3.3 使用扫描缓存
68+
69+
```java
70+
scan.setScanMetricsEnabled(true);
71+
```
72+
73+
设置扫描缓存,可以获取扫描的性能指标,如扫描的时间、扫描的行数等。通过这些指标可以优化扫描的性能,例如调整缓存大小、批量大小等。
74+
75+
使用扫描缓存可以有效地提高扫描的性能,因为它可以减少RPC调用次数,从而降低了网络开销和延迟。在HBase中,扫描缓存是通过设置scan.setCaching()方法来实现的。
76+
77+
#### 设置扫描缓存大小
78+
79+
```java
80+
scan.setCaching(100);
81+
```
82+
83+
设置扫描缓存大小,可以控制每次RPC调用返回的行数。缓存大小越大,网络开销就越小,但是内存开销就越大。通常情况下,扫描缓存大小的设置应该在100到1000之间,根据具体情况来调整。
84+
85+
### 3.4 设置批量大小
86+
87+
```java
88+
scan.setBatch(10);
89+
```
90+
91+
设置批量大小,可以控制每次RPC调用的数据量。批量大小越大,网络开销就越小,但是RPC调用的延迟就越大。通常情况下,批量大小的设置应该在5到10之间,根据具体情况来调整。
92+
93+
### 3.5 获取扫描指标
94+
95+
```java
96+
scan.setScanMetricsEnabled(true);
97+
```
98+
99+
设置扫描指标,可以获取扫描的性能指标,如扫描的时间、扫描的行数等。通过这些指标可以优化扫描的性能,例如调整缓存大小、批量大小等。
100+
101+
```java
102+
ScanResultCache resultCache = new ScanResultCache(cacheSize);
103+
ResultScanner scanner = table.getScanner(scan);
104+
List<Result> results = resultCache.cache(scanner);
105+
for (Result result : results) {
106+
// 处理扫描结果
107+
}
108+
scanner.close();
109+
```
110+
111+
使用ScanResultCache可以缓存扫描结果,从而减少RPC调用次数,提高扫描的性能。ScanResultCache是一个开源的扫描缓存库,可以在GitHub上找到。
112+
113+
### 3.6 异步扫描
114+
115+
可提高扫描的并发度,从而提高扫描的性能。可以通过使用CompletableFuture等方式来实现异步扫描。
116+
117+
```java
118+
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
119+
try {
120+
ResultScanner scanner = table.getScanner(scan);
121+
for (Result result : scanner) {
122+
// 处理扫描结果
123+
}
124+
scanner.close();
125+
} catch (IOException e) {
126+
e.printStackTrace();
127+
}
128+
});
129+
future.join();
130+
```
131+
132+
使用CompletableFuture实现异步扫描,可以在主线程中启动异步任务,并在异步任务执行完毕后等待结果。
133+
134+
## 4 总结
135+
136+
本文介绍了HBase中扫描的概念、使用方法和性能优化。扫描是HBase中常见的数据读取方式,通过设置扫描的参数、过滤器等方式可以实现灵活的数据查询。在实际应用中,我们需要根据数据的特点和查询需求来选择合适的扫描方法,并结合缓存、批量处理、异步等方式来优化扫描的性能。
137+

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /