适合新手快速学习,节省你的时间,就是我的价值
1. 项目总览2. 配置pom.xml3. 配置elasticsearch.properties4. es的初始化-含权限认证5. 构建ClientBuilders6. 使用HighLevelClient客户端7. 异常处理类8. 实体bean9. 增删改查API10. 增删改查API测试11. 增值服务
1. 项目总览
2. 配置pom.xml
<?xml version
="1.0" encoding
="UTF-8"?>
<project xmlns
="http://maven.apache.org/POM/4.0.0" xmlns
:xsi
="http://www.w3.org/2001/XMLSchema-instance"
xsi
:schemaLocation
="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion
>4.0.0</modelVersion
>
<groupId
>com
.shok
</groupId
>
<artifactId
>esTest
</artifactId
>
<version
>0.0.1-SNAPSHOT</version
>
<packaging
>war
</packaging
>
<name
>esTest
</name
>
<description
>esTest
</description
>
<parent
>
<groupId
>org
.springframework
.boot
</groupId
>
<artifactId
>spring
-boot
-starter
-parent
</artifactId
>
<version
>2.3.3.RELEASE</version
>
<relativePath
/> <!-- lookup parent
from repository
-->
</parent
>
<properties
>
<project
.build
.sourceEncoding
>UTF-8</project
.build
.sourceEncoding
>
<project
.reporting
.outputEncoding
>UTF-8</project
.reporting
.outputEncoding
>
<java
.version
>1.8</java
.version
>
</properties
>
<dependencies
>
<dependency
>
<groupId
>org
.springframework
.boot
</groupId
>
<artifactId
>spring
-boot
-starter
-web
</artifactId
>
</dependency
>
<dependency
>
<groupId
>org
.elasticsearch
.client
</groupId
>
<artifactId
>elasticsearch
-rest
-high
-level
-client
</artifactId
>
<version
>7.9.0</version
>
</dependency
>
<dependency
>
<groupId
>org
.elasticsearch
</groupId
>
<artifactId
>elasticsearch
</artifactId
>
<version
>7.9.0</version
>
</dependency
>
<dependency
>
<groupId
>org
.elasticsearch
.client
</groupId
>
<artifactId
>elasticsearch
-rest
-client
</artifactId
>
<version
>7.9.0</version
>
</dependency
>
<dependency
>
<groupId
>org
.springframework
.boot
</groupId
>
<artifactId
>spring
-boot
-starter
-test
</artifactId
>
<scope
>test
</scope
>
</dependency
>
<dependency
>
<groupId
>org
.springframework
</groupId
>
<artifactId
>spring
-beans
</artifactId
>
<scope
>compile
</scope
>
</dependency
>
<dependency
>
<groupId
>commons
-lang
</groupId
>
<artifactId
>commons
-lang
</artifactId
>
<version
>2.6</version
>
<scope
>compile
</scope
>
</dependency
>
<dependency
>
<groupId
>com
.alibaba
</groupId
>
<artifactId
>fastjson
</artifactId
>
<version
>1.2.73</version
>
<scope
>compile
</scope
>
</dependency
>
</dependencies
>
<build
>
<finalName
>esTest
</finalName
>
<plugins
>
<plugin
>
<groupId
>org
.apache
.maven
.plugins
</groupId
>
<artifactId
>maven
-compiler
-plugin
</artifactId
>
<configuration
>
<source
>8</source
><!-- 源代码使用的
JDK版本
-->
<target
>8</target
><!-- 需要生成的目标
class文件的编译版本
-->
<encoding
>UTF-8</encoding
><!-- 字符集编码
-->
</configuration
>
</plugin
>
</plugins
>
</build
>
</project
>
3. 配置elasticsearch.properties
# es用户名
elastic
.username
=elastic
# es密码
elastic
.password
=kibana2020
#es集群名
elastic
.cluster
.name
=app
-es
#es集群地址列表,多个地址,用“
,”分开,地址和端口号相对应
elastic
.cluster
.discover
.hostname
=192.168.1.58:9200,192.168.1.58:9202,192.168.1.58:9203
#es集群是否加入如自动嗅探
elastic
.cluster
.clientTransportSniff
=true
4. es的初始化-含权限认证
package com
.shok
.utils
;
import java
.io
.IOException
;
import java
.io
.InputStream
;
import java
.util
.Properties
;
public class ConfigUtils {
private static String esConfigFileName
= "elasticsearch.properties";
private static String esClusterName
;
private static String esClusterDiscoverHostName
;
private static String clientTransportSniff
;
private static String esUserName
;
private static String esPassword
;
public static String
getEsUserName() {
return esUserName
;
}
public static void setEsUserName(String esUserName
) {
ConfigUtils
.esUserName
= esUserName
;
}
public static String
getEsPassword() {
return esPassword
;
}
public static void setEsPassword(String esPassword
) {
ConfigUtils
.esPassword
= esPassword
;
}
private static Properties properties
= new Properties();
static {
try {
ClassLoader classLoader
= ConfigUtils
.class.getClassLoader();
InputStream resourceAsStream
= classLoader
.getResourceAsStream(esConfigFileName
);
properties
.load(resourceAsStream
);
init();
} catch (IOException e
) {
e
.printStackTrace();
}
}
static void init() {
esUserName
= properties
.getProperty("elastic.username");
esPassword
= properties
.getProperty("elastic.password");
esClusterName
= properties
.getProperty("elastic.cluster.name");
esClusterDiscoverHostName
= properties
.getProperty("elastic.cluster.discover.hostname");
clientTransportSniff
= properties
.getProperty("elastic.cluster.clientTransportSniff");
if ("".equals(esClusterName
) || "".equals(esClusterName
) || "".equals(clientTransportSniff
)) {
throw new RuntimeException("elasticsearch 集群参数为空异常!");
}
if ("".equals(esUserName
) || "".equals(esPassword
)) {
throw new RuntimeException("elasticsearch 集群登录用户名和密码不能为空!");
}
}
public static String
getEsClusterName() {
return esClusterName
;
}
public static String
getEsClusterDiscoverHostName() {
return esClusterDiscoverHostName
;
}
public static void setEsClusterDiscoverHostName(String esClusterDiscoverHostName
) {
ConfigUtils
.esClusterDiscoverHostName
= esClusterDiscoverHostName
;
}
public static String
getClientTransportSniff() {
return clientTransportSniff
;
}
public static void setClientTransportSniff(String clientTransportSniff
) {
ConfigUtils
.clientTransportSniff
= clientTransportSniff
;
}
}
5. 构建ClientBuilders
package com
.shok
.client
;
import com
.shok
.utils
.ConfigUtils
;
import org
.apache
.http
.Header
;
import org
.apache
.http
.HttpHost
;
import org
.apache
.http
.client
.config
.RequestConfig
;
import org
.apache
.http
.impl
.nio
.client
.HttpAsyncClientBuilder
;
import org
.apache
.http
.message
.BasicHeader
;
import org
.elasticsearch
.client
.RestClient
;
import org
.elasticsearch
.client
.RestClientBuilder
;
import java
.util
.List
;
import java
.util
.stream
.Collectors
;
import java
.util
.stream
.Stream
;
public class ClientBuilders {
private static final String
CLUSTER_HOSTNAME_PORT = ConfigUtils
.getEsClusterDiscoverHostName();
public RestClientBuilder
getSimpleClientBuilder() {
String
[] ipHosts
= CLUSTER_HOSTNAME_PORT.split(",");
List
<HttpHost
> httpHostsList
= Stream
.of(ipHosts
)
.map(this::createHttpHost
)
.collect(Collectors
.toList());
HttpHost
[] httpHosts
= httpHostsList
.toArray(new HttpHost[httpHostsList
.size()]);
RestClientBuilder builder
= RestClient
.builder(httpHosts
);
return builder
;
}
private HttpHost
createHttpHost(String ip
) {
return HttpHost
.create(ip
);
}
public static RestClientBuilder
getClientBulider() {
String
[] hostNamesPort
= CLUSTER_HOSTNAME_PORT.split(",");
String host
;
int port
;
String
[] temp
;
RestClientBuilder restClientBuilder
= null;
if (0 != hostNamesPort
.length
) {
for (String hostPort
: hostNamesPort
) {
temp
= hostPort
.split(":");
host
= temp
[0].trim();
port
= Integer
.parseInt(temp
[1].trim());
restClientBuilder
= RestClient
.builder(new HttpHost(host
, port
, "http"));
}
}
Header
[] defaultHeaders
= new Header[]{
new BasicHeader("header", "value")
};
restClientBuilder
.setDefaultHeaders(defaultHeaders
);
restClientBuilder
.setFailureListener(new RestClient.FailureListener() {
public void onFailure(HttpHost host
) {
}
});
restClientBuilder
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig
.Builder
customizeRequestConfig(RequestConfig
.Builder requestConfigBuilder
) {
return requestConfigBuilder
.setSocketTimeout(10000);
}
});
restClientBuilder
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder
customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder
) {
return httpClientBuilder
.setProxy(new HttpHost("proxy", 9000, "http"));
}
});
return restClientBuilder
;
}
}
6. 使用HighLevelClient客户端
package com
.shok
.client
;
import com
.shok
.exception
.ESIoException
;
import com
.shok
.utils
.ConfigUtils
;
import org
.apache
.http
.HttpHost
;
import org
.apache
.http
.auth
.AuthScope
;
import org
.apache
.http
.auth
.UsernamePasswordCredentials
;
import org
.apache
.http
.client
.CredentialsProvider
;
import org
.apache
.http
.impl
.client
.BasicCredentialsProvider
;
import org
.elasticsearch
.client
.RestClient
;
import org
.elasticsearch
.client
.RestClientBuilder
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import java
.io
.IOException
;
public class HighLevelClient {
private static final String
CLUSTER_HOSTNAME_PORT = ConfigUtils
.getEsClusterDiscoverHostName();
private static final String
ES_USERNAME = ConfigUtils
.getEsUserName();
private static final String
ES_PASSWORK = ConfigUtils
.getEsPassword();
private static RestHighLevelClient restHighLevelClient
;
public static RestHighLevelClient
getClient() {
CredentialsProvider credentialsProvider
= new BasicCredentialsProvider();
credentialsProvider
.setCredentials(AuthScope
.ANY, new UsernamePasswordCredentials(ES_USERNAME, ES_PASSWORK));
String
[] nodes
= CLUSTER_HOSTNAME_PORT.split(",");
HttpHost
[] hosts
= new HttpHost[nodes
.length
];
for (int i
= 0, j
= nodes
.length
; i
< j
; i
++) {
String hostName
= org
.apache
.commons
.lang
.StringUtils
.substringBeforeLast(nodes
[i
], ":");
String port
= org
.apache
.commons
.lang
.StringUtils
.substringAfterLast(nodes
[i
], ":");
hosts
[i
] = new HttpHost(hostName
, Integer
.valueOf(port
));
}
RestClientBuilder builder
= RestClient
.builder(hosts
);
builder
.setHttpClientConfigCallback(f
-> f
.setDefaultCredentialsProvider(credentialsProvider
));
restHighLevelClient
= new RestHighLevelClient(builder
);
return restHighLevelClient
;
}
public static void closeRestHighLevelClient() throws ESIoException
{
if (null != restHighLevelClient
) {
try {
restHighLevelClient
.close();
} catch (IOException e
) {
throw new ESIoException("RestHighLevelClient Client close exception", e
);
}
}
}
}
7. 异常处理类
package com
.shok
.exception
;
import java
.io
.IOException
;
public class ESIoException extends IOException {
public ESIoException(String messaget
, Throwable throwable
) {
super(messaget
, throwable
);
}
public ESIoException(String messaget
) {
super(messaget
);
}
public ESIoException(Throwable throwable
) {
super(throwable
);
}
}
8. 实体bean
package com
.shok
.bean
;
public class UserBean {
public String id
;
public String name
;
public int age
;
public String addr
;
public String message
;
public UserBean(String id
) {
this.id
= id
;
}
public UserBean(String id
, String name
, int age
, String addr
, String message
) {
this.id
= id
;
this.name
= name
;
this.age
= age
;
this.addr
= addr
;
this.message
= message
;
}
public String
getId() {
return id
;
}
public void setId(String id
) {
this.id
= id
;
}
public String
getName() {
return name
;
}
public void setName(String name
) {
this.name
= name
;
}
public int
getAge() {
return age
;
}
public void setAge(int age
) {
this.age
= age
;
}
public String
getAddr() {
return addr
;
}
public void setAddr(String addr
) {
this.addr
= addr
;
}
public String
getMessage() {
return message
;
}
public void setMessage(String message
) {
this.message
= message
;
}
}
9. 增删改查API
package com
.shok
.api
;
import com
.alibaba
.fastjson
.JSONObject
;
import com
.shok
.bean
.UserBean
;
import com
.shok
.client
.HighLevelClient
;
import org
.elasticsearch
.action
.DocWriteResponse
;
import org
.elasticsearch
.action
.admin
.indices
.delete.DeleteIndexRequest
;
import org
.elasticsearch
.action
.bulk
.BulkItemResponse
;
import org
.elasticsearch
.action
.bulk
.BulkRequest
;
import org
.elasticsearch
.action
.bulk
.BulkResponse
;
import org
.elasticsearch
.action
.delete.DeleteRequest
;
import org
.elasticsearch
.action
.delete.DeleteResponse
;
import org
.elasticsearch
.action
.get.GetRequest
;
import org
.elasticsearch
.action
.get.GetResponse
;
import org
.elasticsearch
.action
.index
.IndexRequest
;
import org
.elasticsearch
.action
.index
.IndexResponse
;
import org
.elasticsearch
.action
.search
.SearchRequest
;
import org
.elasticsearch
.action
.search
.SearchResponse
;
import org
.elasticsearch
.action
.support
.master
.AcknowledgedResponse
;
import org
.elasticsearch
.action
.update
.UpdateRequest
;
import org
.elasticsearch
.action
.update
.UpdateResponse
;
import org
.elasticsearch
.client
.RequestOptions
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import org
.elasticsearch
.client
.indices
.GetIndexRequest
;
import org
.elasticsearch
.common
.xcontent
.XContentType
;
import org
.elasticsearch
.index
.query
.QueryBuilders
;
import org
.elasticsearch
.search
.SearchHit
;
import org
.elasticsearch
.search
.builder
.SearchSourceBuilder
;
import java
.io
.IOException
;
import java
.util
.LinkedList
;
import java
.util
.List
;
import java
.util
.Map
;
public class HighLevelClientAPI {
RestHighLevelClient highLevelClient
= HighLevelClient
.getClient();
public boolean
deleteIndex(String indexName
) throws IOException
{
DeleteIndexRequest request
= new DeleteIndexRequest(indexName
);
AcknowledgedResponse respone
= highLevelClient
.indices().delete(request
, RequestOptions
.DEFAULT);
boolean isSuccess
= respone
.isAcknowledged();
return isSuccess
;
}
public boolean
existsIndex(String indexName
) throws IOException
{
GetIndexRequest request
= new GetIndexRequest(indexName
);
boolean exists
= highLevelClient
.indices().exists(request
, RequestOptions
.DEFAULT);
return exists
;
}
public String
addDoc(Map
<String
, Object
> jsonMap
, String indexName
, String rowId
) throws IOException
{
IndexRequest indexRequest
= new IndexRequest(indexName
).id(rowId
).source(jsonMap
);
IndexResponse response
= highLevelClient
.index(indexRequest
, RequestOptions
.DEFAULT);
DocWriteResponse
.Result result
= response
.getResult();
return result
.toString();
}
public String
deleteDoc(String indexName
, String id
) throws IOException
{
DeleteRequest deleteRequest
= new DeleteRequest(indexName
, id
);
DeleteResponse response
= highLevelClient
.delete(deleteRequest
, RequestOptions
.DEFAULT);
DocWriteResponse
.Result result
= response
.getResult();
return result
.toString();
}
public String
updateDoc(Map
<String
, Object
> jsonMap
, String indexName
, String rowId
) throws IOException
{
UpdateRequest request
= new UpdateRequest(indexName
, rowId
).doc(jsonMap
);
UpdateResponse response
= highLevelClient
.update(request
, RequestOptions
.DEFAULT);
DocWriteResponse
.Result result
= response
.getResult();
System
.out
.println("=====" + result
.toString());
return result
.toString();
}
public String
bulkDoc(String indexName
, List
<UserBean
> add
, List
<UserBean
> up
, List
<UserBean
> del
) throws IOException
{
BulkRequest request
= new BulkRequest();
for (UserBean user
: add
) {
request
.add(new IndexRequest(indexName
).id(user
.getId())
.source(XContentType
.JSON, "name", user
.getName(), "age", user
.getAge(), "addr", user
.getAddr(), "message", user
.getMessage()));
}
for (UserBean user
: up
) {
request
.add(new UpdateRequest(indexName
, user
.getId())
.doc(XContentType
.JSON, "name", user
.getName(), "age", user
.getAge(), "addr", user
.getAddr(), "message", user
.getMessage()));
}
for (UserBean user
: del
) {
request
.add(new DeleteRequest(indexName
, user
.getId()));
}
BulkResponse bulkResponse
= highLevelClient
.bulk(request
, RequestOptions
.DEFAULT);
if (bulkResponse
.hasFailures()) {
StringBuffer sb
= new StringBuffer("");
for (BulkItemResponse bulkItemResponse
: bulkResponse
) {
if (bulkItemResponse
.isFailed()) {
BulkItemResponse
.Failure failure
= bulkItemResponse
.getFailure();
sb
.append(failure
.toString()).append("\n");
}
}
System
.out
.println("=bulk error="+sb
.toString());
return sb
.toString();
} else {
return "SUCCESS";
}
}
public Map
<String
, Object
> getDocById(String indexName
, String rowId
) throws IOException
{
GetRequest getRequest
= new GetRequest(indexName
, rowId
);
GetResponse response
= highLevelClient
.get(getRequest
, RequestOptions
.DEFAULT);
Map
<String
, Object
> map
= response
.getSource();
map
.put("id", rowId
);
return map
;
}
public List
<UserBean
> searchMatch(String indexName
, String fieldName
, String fileValue
, int startPage
, int maxSize
) throws IOException
{
SearchRequest searchRequest
= new SearchRequest(indexName
);
SearchSourceBuilder searchSourceBuilder
= new SearchSourceBuilder();
searchSourceBuilder
.query(QueryBuilders
.matchQuery(fieldName
, fileValue
));
searchSourceBuilder
.from(startPage
);
searchSourceBuilder
.size(maxSize
);
searchRequest
.source(searchSourceBuilder
);
SearchResponse response
= highLevelClient
.search(searchRequest
, RequestOptions
.DEFAULT);
SearchHit
[] hits
= response
.getHits().getHits();
List
<UserBean
> userList
= new LinkedList<>();
for (SearchHit hit
: hits
) {
UserBean user
= JSONObject
.parseObject(hit
.getSourceAsString(), UserBean
.class);
user
.setId(hit
.getId());
userList
.add(user
);
}
return userList
;
}
}
10. 增删改查API测试
package com
.shok
;
import com
.shok
.api
.HighLevelClientAPI
;
import com
.shok
.bean
.UserBean
;
import com
.shok
.client
.HighLevelClient
;
import org
.junit
.Test
;
import java
.io
.IOException
;
import java
.util
.ArrayList
;
import java
.util
.HashMap
;
import java
.util
.List
;
import java
.util
.Map
;
import java
.util
.concurrent
.ExecutorService
;
import java
.util
.concurrent
.LinkedBlockingQueue
;
import java
.util
.concurrent
.ThreadPoolExecutor
;
import java
.util
.concurrent
.TimeUnit
;
public class MultiThreadTest {
@Test
public void testRestClient() {
System
.setProperty("es.set.netty.runtime.available.processors", "false");
ExecutorService executorService
= new ThreadPoolExecutor(10, 10,
0L
, TimeUnit
.MILLISECONDS, new LinkedBlockingQueue<Runnable
>());
for (int i
= 0; i
< 10; i
++) {
int index
= i
;
if (!executorService
.isShutdown()) {
executorService
.execute(new Runnable() {
@Override
public void run() {
System
.out
.println(("第" + index
+ "次获取到了连接对象————地址:" + HighLevelClient
.getClient()));
}
});
}
}
executorService
.shutdown();
try {
while (!executorService
.awaitTermination(10000, TimeUnit
.MILLISECONDS)) {
System
.out
.println("10秒没有执行完,强制关闭线程池");
executorService
.shutdownNow();
}
} catch (Exception e
) {
e
.printStackTrace();
}
}
@Test
public void testAddDoc() {
HighLevelClientAPI es
= new HighLevelClientAPI();
Map jsonMap
= new HashMap<String
, Object
>();
jsonMap
.put("name", "张三11");
jsonMap
.put("age", 33);
jsonMap
.put("addr", "浦东 上海11");
jsonMap
.put("message", "胜多负少的 message 多少分的13");
try {
System
.out
.println("====11111===========");
es
.addDoc(jsonMap
, "intr_index", "1");
System
.out
.println("====22222===========");
} catch (IOException e
) {
e
.printStackTrace();
}
}
@Test
public void testUpdateDoc() {
HighLevelClientAPI es
= new HighLevelClientAPI();
Map jsonMap
= new HashMap<String
, Object
>();
jsonMap
.put("name", "张三22");
jsonMap
.put("age", 11);
jsonMap
.put("addr", "上海浦东22");
jsonMap
.put("message", "胜多负少的 message 22 多少分的22");
try {
System
.out
.println("====11111===========");
es
.updateDoc(jsonMap
, "intr_index", "1");
System
.out
.println("====22222===========");
} catch (IOException e
) {
e
.printStackTrace();
}
}
@Test
public void testBulkDoc() {
List
<UserBean
> ls_add
= new ArrayList<UserBean
>();
List
<UserBean
> ls_up
= new ArrayList<UserBean
>();
List
<UserBean
> ls_del
= new ArrayList<UserBean
>();
ls_add
.add(new UserBean("1", "lisi88", 88, "上海闵行88", "message 第三方三的订单88"));
ls_add
.add(new UserBean("2", "lisi9", 99, "上海闵行88", "message 第三方三的订单88"));
ls_up
.add(new UserBean("3", "lisi2", 11, "上海闵行11", "message 第三方三的订单11"));
ls_up
.add(new UserBean("4", "lisi4", 25, "上海闵行4", "message 第三方三的订单4"));
ls_del
.add(new UserBean("4"));
ls_del
.add(new UserBean("5"));
HighLevelClientAPI es
= new HighLevelClientAPI();
try {
System
.out
.println("====testBulkDoc 11111===========");
es
.bulkDoc("intr_index", ls_add
, ls_up
, ls_del
);
System
.out
.println("====testBulkDoc 22222===========");
} catch (IOException e
) {
e
.printStackTrace();
}
}
@Test
public void testGetDocById() {
try {
HighLevelClientAPI es
= new HighLevelClientAPI();
Map
<String
, Object
> map
= es
.getDocById("intr_index", "7");
for (Map
.Entry
<String
, Object
> entry
: map
.entrySet()) {
String mapKey
= entry
.getKey();
Object mapValue
= entry
.getValue();
System
.out
.println(mapKey
+ "===:===" + mapValue
);
}
} catch (IOException e
) {
e
.printStackTrace();
}
}
@Test
public void testSearchMatch() {
try {
HighLevelClientAPI es
= new HighLevelClientAPI();
List
<UserBean
> ls
= es
.searchMatch("intr_index", "name", "张三13", 0, 10);
for (int i
= 0; i
< ls
.size(); i
++) {
UserBean u
= ls
.get(i
);
System
.out
.println("testSearchMatch==="+u
.getId() + "==" + u
.getName() + "=" + u
.getAge() + "==" + u
.getAddr() + "==" + u
.getMessage());
}
} catch (IOException e
) {
e
.printStackTrace();
}
}
}
11. 增值服务
tel
/wx
:15000227329,加好友备注:csdn领取esTest代码,既可免费领取项目代码。
https
://item
.taobao
.com
/item
.htm
?spm
=a2126o
.success
.0.0.647c4831eSR754
&id
=626561714988