ES

"ES"

Posted by zwt on May 24, 2021

基础知识

与关系型数据库对比

1
2
3
4
index对应数据库
type对应表  后面的版本去掉了
document对应行
filed对应列

elasticsearch是面向文档的,一切皆json

操作es高级客户端步骤

1
2
3
4
1.注入RestHighLevelClient 客户端
2.创建请求
3.构造条件
4.执行请求

基本操作

连接es

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
@Component
public class EsConfig {
    private String esUsername = "";
    private String esPwd = "";
    private String esAddress = "";
    private Integer esPort = 9200;
    @Bean
    public RestHighLevelClient esClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esUsername, esPwd));
        RestClientBuilder builder = RestClient.builder(new HttpHost(esAddress,esPort)).setRequestConfigCallback(
(RequestConfig.Builder requestConfigBuilder) ->{                         requestConfigBuilder.setConnectTimeout(-1);
requestConfigBuilder.setSocketTimeout(-1);
requestConfigBuilder.setConnectionRequestTimeout(-1);
return requestConfigBuilder;}).setHttpClientConfigCallback(
(HttpAsyncClientBuilder httpClientBuilder) -> {
httpClientBuilder.disableAuthCaching();
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);});
return new RestHighLevelClient(builder);}}

索引

建立索引

1
2
3
4
// 创建索引请求
CreateIndexRequest request = new CreateIndexRequest("title");
// 执行创建请求
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);

获取索引

1
2
GetIndexRequest request = new GetIndexRequest("demo_zhang");
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);

删除索引

1
2
DeleteIndexRequest request = new DeleteIndexRequest("demo_zhang");
AcknowledgedResponse delete =  client.indices().delete(request, RequestOptions.DEFAULT);

文档

添加文档

1
2
3
4
5
6
7
8
9
10
11
//创建对象
User user = new User("掌握是", 3);
//创建请求
IndexRequest request = new IndexRequest("demo_zhang");
//设置规则 put index/doc/id
request.id("1");
request.timeout(TimeValue.timeValueSeconds(1));
//放入数据  json
request.source(JSON.toJSONString(user), XContentType.JSON);
//客户端发送请求
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

获取文档

1
2
GetRequest getRequest = new GetRequest("demo_zhang", "1");
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

更新文档

1
2
3
4
5
UpdateRequest updateRequest =  new UpdateRequest("demo_zhang", "1");
updateRequest.timeout("1s");
User user = new User("java", 18);
updateRequest.doc(JSON.toJSONString(user), XContentType.JSON);
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);

删除文档

1
2
3
DeleteRequest deleteRequest = new DeleteRequest("demo_zhang", "1");
deleteRequest.timeout("3s");
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);

批量插入数据

1
2
3
4
5
6
7
8
9
10
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("10s");
ArrayList<User> userList = new ArrayList<>();
userList.add(new User("demo2", 2));
userList.add(new User("demo3", 3));
userList.add(new User("demo4", 4));
for (int i=0;i<userList.size();i++){
   bulkRequest.add(new IndexRequest("demo_zhang").id(""+(i+1))
   .source(JSON.toJSONString(userList.get(i)), XContentType.JSON));}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);

查询

1
2
3
4
5
6
7
8
9
SearchRequest searchRequest = new SearchRequest("demo_zhang");
//构建搜索条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 精确匹配
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("name", "java");
searchSourceBuilder.query(termQueryBuilder);
searchSourceBuilder.timeout(new TimeValue(10, TimeUnit.SECONDS));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

ES插件开发

ES数据迁移

1
2
3
4
5
6
7
8
POST _reindex
{
"source": {
"index": "metricbeat-*"(老的索引名)
},
"dest": {
"index": "metricbeat"(新的索引名)
}

检索

只有text类型是会被分词的,在存储的时候事先指定分词的模式(mapping)

1
2
match_phrase查询分析文本,并从分析的文本中创建短语查询,match_phrase的主要作用是用于匹配包含当前短语的文档。

Kibana

创建索引和mapping

1
2
3
4
5
PUT 索引名称
{
"mappings":{
}
}

查询所有

1
2
3
4
5
6
GET _search
{
  "query": {
    "match_all": {}
  }
}

查询语句

ELK

1
1.过滤返回字段:"_source": [ "commodity_id"]

java

1
2
3
4
// 构建搜索条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 只返回Id
searchSourceBuilder.fetchSource(Id", null);

嵌套查询

ELK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{"size":50,
	"query":{
		"bool":{
			"filter":[
						{
							"terms":{
							"Id":[1],
							"boost":1.0
							}
						},
						{
							"term":{"a":						     
								{"value":1,
								"boost":1.0}
							}
						},
			{"nested":{
					"query":
						{"bool":{
							"filter":[
								{"term":
									{"c.d":
										{"value":0,"boost":1.0}
									}
								}],
								"boost":1.0
								}
						},
						"path":"c",
						"score_mode":"sum",
						"boost":1.0}
						},
			"sort":[{"c.e":
					{"order":"desc"}},
					{"c.r":																	{"order":"desc"}}]}

java

1
2
3
4
5
6
// 排序
NestedSortBuilder nestedSortBuilder = new NestedSortBuilder("a");
sortBuilders = SortBuilders.fieldSort("a.b").order(SortOrder.ASC).setNestedSort(nestedSortBuilder);
// 筛选
QueryBuilder pathQuery = QueryBuilders.nestedQuery("a",         QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("a.b", pointSearchDTO.getGroupId())), ScoreMode.Total);
boolQueryBuilder.filter(pathQuery);

更新

搜索更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
     "query": {
                "term": {
                    "goods": {
                      "value":
                    }
                } }
  ,
    "script": {
        "inline": "ctx._source.firstLevelTag = params.firstLevelTag",
        "params": {"firstLevelTag": "测试"},
        "lang": "painless"
    }
}

局部更新

1
es.update(index="test", doc_type="doc", id="4Z6XcXcBChYTHL1ZdwjL", body={"doc": {"name": "Jerry", "age": 25}})

全局更新

会触发reindex,数据量大的时候效率低

1
2
es.index(index="test", doc_type="doc", id="dfebcXcBCWwWKoXwQ2Gk", body={
"name": "Python编程实战","num": 5})

python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import elasticsearch
import json
conn = elasticsearch.Elasticsearch(
['http://username:pwd@url:9200'])
query_json = {
     "query": {
                "term": {
                    "goods": {
                      "value":
                    }
                } }
  ,
    "script": {
        "inline": "ctx._source.firstLevelTag = params.firstLevelTag",
        "params": {"firstLevelTag": "测试"},
        "lang": "painless"
    }
}
body = json.dumps(query_json)
index_name = ''
index_type = 'doc'
conn.update_by_query(index=index_name, body=body)

nest更新

1
2
3
4
5
6
7
8
9
10
{
	"script": {
	  "source": "for (item in ctx._source.3) {if(item['2']==params.2){item['1']=params.1}}",
	  "lang": "painless",
	  "params": {
	    "2":6,
	    "1": ""
	  }
	}
}

# 问题

1
1.nest查询中,如果写在一个bool中,就不可以出现多个filter,拆分开来可以。否则会出现score为0得情况
graph LR
    A[用户] --> B[表现层]
    B --> C[控制层]
    C --> D[业务层]
    D --> E[业务管理层]
    E --> F[数据访问层]
    F --> G[数据库]
    F --> H[数据对象]
    H --> I[数据传输对象]
    I --> J[业务对象]
    J --> K[查询对象]
    K --> L[显示对象]
    L --> B

参考

1.match_phrase