Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 8d1ef20

Browse files
committed
feat: ES6 java 客户端示例
1 parent 2f93f74 commit 8d1ef20

File tree

9 files changed

+581
-0
lines changed

9 files changed

+581
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>io.github.dunwu</groupId>
7+
<artifactId>javadb-elasticsearch6</artifactId>
8+
<version>1.0.0</version>
9+
<packaging>jar</packaging>
10+
11+
<properties>
12+
<java.version>1.8</java.version>
13+
<maven.compiler.source>${java.version}</maven.compiler.source>
14+
<maven.compiler.target>${java.version}</maven.compiler.target>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.elasticsearch.client</groupId>
22+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
23+
<version>6.4.3</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.projectlombok</groupId>
27+
<artifactId>lombok</artifactId>
28+
<version>1.18.22</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>cn.hutool</groupId>
32+
<artifactId>hutool-all</artifactId>
33+
<version>5.7.20</version>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>ch.qos.logback</groupId>
38+
<artifactId>logback-classic</artifactId>
39+
<version>1.2.10</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.logging.log4j</groupId>
43+
<artifactId>log4j-to-slf4j</artifactId>
44+
<version>2.17.1</version>
45+
</dependency>
46+
</dependencies>
47+
48+
</project>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.github.dunwu.javadb.elasticsearch;
2+
3+
import io.github.dunwu.javadb.elasticsearch.entity.User;
4+
import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper;
5+
6+
import java.io.IOException;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
10+
11+
public class Demo {
12+
public static void main(String[] args) throws IOException, InterruptedException {
13+
UserEsMapper mapper = new UserEsMapper();
14+
15+
System.out.println("索引是否存在:" + mapper.isIndexExists());
16+
17+
User jack = User.builder().id(1L).username("jack").age(18).build();
18+
User tom = User.builder().id(2L).username("tom").age(20).build();
19+
List<User> users = Arrays.asList(jack, tom);
20+
21+
System.out.println("批量插入:" + mapper.batchInsert(users));
22+
System.out.println("根据ID查询:" + mapper.getById("1").toString());
23+
System.out.println("根据ID查询:" + mapper.pojoById("2").toString());
24+
System.out.println("根据ID批量查询:" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString());
25+
26+
Thread.sleep(1000);
27+
System.out.println("根据ID批量删除:" + mapper.deleteByIds(Arrays.asList("1", "2")));
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.github.dunwu.javadb.elasticsearch.entity;
2+
3+
/**
4+
* ES 实体接口
5+
*
6+
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
7+
* @since 2023年06月28日
8+
*/
9+
public interface EsEntity {
10+
Long getId();
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.github.dunwu.javadb.elasticsearch.entity;
2+
3+
import lombok.Builder;
4+
import lombok.Data;
5+
6+
/**
7+
* 用户实体
8+
*
9+
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
10+
* @since 2023年06月28日
11+
*/
12+
@Data
13+
@Builder
14+
public class User implements EsEntity {
15+
private Long id;
16+
private String username;
17+
private String password;
18+
private Integer age;
19+
private String email;
20+
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package io.github.dunwu.javadb.elasticsearch.mapper;
2+
3+
import cn.hutool.core.bean.BeanUtil;
4+
import cn.hutool.core.bean.copier.CopyOptions;
5+
import cn.hutool.core.collection.CollectionUtil;
6+
import cn.hutool.core.io.IoUtil;
7+
import cn.hutool.core.lang.Assert;
8+
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
9+
import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
13+
import org.elasticsearch.action.bulk.BackoffPolicy;
14+
import org.elasticsearch.action.bulk.BulkProcessor;
15+
import org.elasticsearch.action.bulk.BulkRequest;
16+
import org.elasticsearch.action.bulk.BulkResponse;
17+
import org.elasticsearch.action.delete.DeleteRequest;
18+
import org.elasticsearch.action.get.MultiGetItemResponse;
19+
import org.elasticsearch.action.get.MultiGetRequest;
20+
import org.elasticsearch.action.get.MultiGetResponse;
21+
import org.elasticsearch.action.index.IndexRequest;
22+
import org.elasticsearch.action.index.IndexResponse;
23+
import org.elasticsearch.action.search.SearchRequest;
24+
import org.elasticsearch.action.search.SearchResponse;
25+
import org.elasticsearch.client.IndicesClient;
26+
import org.elasticsearch.client.RequestOptions;
27+
import org.elasticsearch.client.Requests;
28+
import org.elasticsearch.client.RestHighLevelClient;
29+
import org.elasticsearch.common.unit.ByteSizeUnit;
30+
import org.elasticsearch.common.unit.ByteSizeValue;
31+
import org.elasticsearch.common.unit.TimeValue;
32+
import org.elasticsearch.common.xcontent.XContentBuilder;
33+
import org.elasticsearch.common.xcontent.XContentFactory;
34+
import org.elasticsearch.index.query.QueryBuilder;
35+
import org.elasticsearch.index.query.QueryBuilders;
36+
import org.elasticsearch.search.builder.SearchSourceBuilder;
37+
38+
import java.io.Closeable;
39+
import java.io.IOException;
40+
import java.util.*;
41+
import java.util.concurrent.TimeUnit;
42+
import java.util.function.BiConsumer;
43+
44+
/**
45+
* ES Mapper 基础类
46+
*
47+
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
48+
* @date 2023年06月27日
49+
*/
50+
@Slf4j
51+
public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T>, Closeable {
52+
53+
public static final String HOSTS = "127.0.0.1:9200";
54+
55+
private BulkProcessor bulkProcessor;
56+
57+
private final RestHighLevelClient restHighLevelClient = ElasticsearchUtil.newRestHighLevelClient(HOSTS);
58+
59+
@Override
60+
public RestHighLevelClient getClient() throws IOException {
61+
Assert.notNull(restHighLevelClient, () -> new IOException("【ES】not connected."));
62+
return restHighLevelClient;
63+
}
64+
65+
66+
@Override
67+
public synchronized BulkProcessor getBulkProcessor() {
68+
if (bulkProcessor == null) {
69+
bulkProcessor = newAsyncBulkProcessor();
70+
}
71+
return bulkProcessor;
72+
}
73+
74+
@Override
75+
public boolean isIndexExists() throws IOException {
76+
IndicesClient indicesClient = getClient().indices();
77+
GetIndexRequest request = new GetIndexRequest();
78+
request.indices(getIndexAlias());
79+
return indicesClient.exists(request, RequestOptions.DEFAULT);
80+
}
81+
82+
@Override
83+
public SearchResponse getById(String id) throws IOException {
84+
SearchRequest searchRequest = Requests.searchRequest(getIndexAlias());
85+
QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(id);
86+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
87+
sourceBuilder.query(queryBuilder);
88+
searchRequest.source(sourceBuilder);
89+
return getClient().search(searchRequest, RequestOptions.DEFAULT);
90+
}
91+
92+
@Override
93+
public T pojoById(String id) throws IOException {
94+
SearchResponse response = getById(id);
95+
if (response == null) {
96+
return null;
97+
}
98+
List<T> list = ElasticsearchUtil.toPojoList(response, getEntityClass());
99+
if (CollectionUtil.isEmpty(list)) {
100+
return null;
101+
}
102+
return list.get(0);
103+
}
104+
105+
@Override
106+
public List<T> pojoListByIds(Collection<String> ids) throws IOException {
107+
108+
if (CollectionUtil.isEmpty(ids)) {
109+
return null;
110+
}
111+
112+
MultiGetRequest request = new MultiGetRequest();
113+
for (String id : ids) {
114+
request.add(new MultiGetRequest.Item(getIndexAlias(), getIndexType(), id));
115+
}
116+
117+
MultiGetResponse multiGetResponse = getClient().mget(request, RequestOptions.DEFAULT);
118+
if (null == multiGetResponse || multiGetResponse.getResponses() == null || multiGetResponse.getResponses().length <= 0) {
119+
return new ArrayList<>();
120+
}
121+
122+
List<T> list = new ArrayList<>();
123+
for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
124+
if (itemResponse.isFailed()) {
125+
log.error("通过id获取文档失败", itemResponse.getFailure().getFailure());
126+
} else {
127+
T entity = ElasticsearchUtil.toPojo(itemResponse.getResponse(), getEntityClass());
128+
if (entity != null) {
129+
list.add(entity);
130+
}
131+
}
132+
}
133+
return list;
134+
}
135+
136+
@Override
137+
public String insert(T entity) throws IOException {
138+
Map<String, Object> map = new HashMap<>();
139+
BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError());
140+
XContentBuilder builder = XContentFactory.jsonBuilder();
141+
builder.startObject();
142+
for (Map.Entry<String, Object> entry : map.entrySet()) {
143+
String key = entry.getKey();
144+
Object value = entry.getValue();
145+
builder.field(key, value);
146+
}
147+
builder.endObject();
148+
149+
IndexRequest request = Requests.indexRequest(getIndexAlias()).type(getIndexType()).source(builder);
150+
if (entity.getId() != null) {
151+
request.id(entity.getId().toString());
152+
}
153+
154+
IndexResponse response = getClient().index(request, RequestOptions.DEFAULT);
155+
if (response == null) {
156+
return null;
157+
}
158+
return response.getId();
159+
}
160+
161+
@Override
162+
public boolean batchInsert(Collection<T> list) throws IOException {
163+
164+
if (CollectionUtil.isEmpty(list)) {
165+
return true;
166+
}
167+
168+
BulkRequest bulkRequest = new BulkRequest();
169+
for (T entity : list) {
170+
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
171+
IndexRequest request = Requests.indexRequest(getIndexAlias()).type(getIndexType()).source(map);
172+
if (entity.getId() != null) {
173+
request.id(entity.getId().toString());
174+
}
175+
bulkRequest.add(request);
176+
}
177+
178+
BulkResponse response = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
179+
return !(response == null || response.hasFailures());
180+
}
181+
182+
@Override
183+
public boolean deleteById(String id) throws IOException {
184+
return deleteByIds(Collections.singleton(id));
185+
}
186+
187+
@Override
188+
public boolean deleteByIds(Collection<String> ids) throws IOException {
189+
190+
if (CollectionUtil.isEmpty(ids)) {
191+
return true;
192+
}
193+
194+
BulkRequest bulkRequest = new BulkRequest();
195+
ids.forEach(id -> {
196+
DeleteRequest deleteRequest = Requests.deleteRequest(getIndexAlias()).type(getIndexType()).id(id);
197+
bulkRequest.add(deleteRequest);
198+
});
199+
200+
BulkResponse response = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
201+
return response != null && !response.hasFailures();
202+
}
203+
204+
private BulkProcessor newAsyncBulkProcessor() {
205+
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
206+
@Override
207+
public void beforeBulk(long executionId, BulkRequest request) {
208+
}
209+
210+
@Override
211+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
212+
if (response.hasFailures()) {
213+
log.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
214+
}
215+
}
216+
217+
@Override
218+
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
219+
}
220+
};
221+
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
222+
bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
223+
// 1000条数据请求执行一次bulk
224+
.setBulkActions(1000)
225+
// 5mb的数据刷新一次bulk
226+
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
227+
// 并发请求数量, 0不并发, 1并发允许执行
228+
.setConcurrentRequests(2)
229+
// 固定1s必须刷新一次
230+
.setFlushInterval(TimeValue.timeValueMillis(1000L))
231+
// 重试3次,间隔100ms
232+
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(200L), 3)).build();
233+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
234+
try {
235+
bulkProcessor.flush();
236+
bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
237+
} catch (Exception e) {
238+
log.error("Failed to close bulkProcessor", e);
239+
}
240+
log.info("bulkProcessor closed!");
241+
}));
242+
return bulkProcessor;
243+
}
244+
245+
@Override
246+
public void close() {
247+
IoUtil.close(restHighLevelClient);
248+
}
249+
250+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /