【Furion】Elasticsearch的数据读写
发布日期:2021-06-30 21:36:05 浏览次数:2 分类:技术文章

本文共 7423 字,大约阅读时间需要 24 分钟。

背景

性能平台Furion在做请求数据报告聚合时,初步聚合后的数据大概每条数据1M每秒100条数据,将数据写入mysql数据库时瞬间将数据库打爆

接下来尝试将聚合后的数据在ES中进行读写

什么是Elasticsearch

Elasticsearch 是一个高扩展、开源的全文检索和分析引擎,它可以准实时地快速存储、搜索、分析海量的数据

全文检索

全文检索是指计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置

当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式
这个过程类似于通过字典中的检索字表查字的过程

Elasticsearch版本

目前使用的版本:6.8.4

ElasticsearchRepository

使用ElasticsearchRepository进行ES数据写入

一、添加依赖

org.springframework.boot
spring-boot-starter-data-elasticsearch

二、添加配置

在apollo中配置elasticsearch的配置信息

# ESelasticsearch.index = furion-streamingelasticsearch.urls = http://es.lluozh.cnelasticsearch.username = furion-streamingelasticsearch.password = furion-streaming

三、配置连接池

package com.lluozh.streaming.es.config;/** * @author lluozh * @Description: * @date 2021/1/25 */import org.apache.http.HttpHost;import org.apache.http.auth.AuthScope;import org.apache.http.auth.UsernamePasswordCredentials;import org.apache.http.client.CredentialsProvider;import org.apache.http.impl.client.BasicCredentialsProvider;import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.client.RestHighLevelClient;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;import org.springframework.util.StringUtils;@Configurationpublic class RestClientConfig extends AbstractElasticsearchConfiguration {
@Value("${elasticsearch.username}") private String USERNAME; @Value("${elasticsearch.password}") private String PASSWORD; @Value("${elasticsearch.urls}") private String URLS; @Override @Bean public RestHighLevelClient elasticsearchClient() {
if (StringUtils.isEmpty(URLS)) {
throw new RuntimeException("配置有问题,elasticsearch.urls为空"); } String[] urls = URLS.split(","); HttpHost[] httpHostArr = new HttpHost[urls.length]; for (int i=0; i

四、创建实体类

package com.lluozh.streaming.es.domain;import lombok.Data;import org.springframework.data.annotation.Id;import org.springframework.data.elasticsearch.annotations.Document;import java.io.Serializable;/** * @author lluozh * @Description: * @date 2021/1/26 */@Data// 索引名称@Document(indexName = "furion-streaming")public class ESTestReportDetail implements Serializable {
@Id private String indexId; private String reportId; private long part; private String content; private static final long serialVersionUID = 1L;}

五、创建数据连接层

package com.cvte.streaming.es.dao;import com.cvte.streaming.es.domain.ESTestReportDetail;import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;public interface EsReportDetailRepository extends ElasticsearchRepository
{
}

六、插入数据

import com.lluozh.streaming.es.dao.EsReportDetailRepository;import com.lluozh.streaming.es.domain.ESTestReportDetail;import org.springframework.stereotype.Service;import javax.annotation.Resource;/** * @author lluozh * @Description: * @date 2021/1/26 */@Servicepublic class EsService {
@Resource private EsReportDetailRepository esReportDetailRepository; public void insert(String reportId, long part, String content) {
ESTestReportDetail esTestReportDetail = new ESTestReportDetail(); esTestReportDetail.setIndexId(reportId+"_"+(part + 1)); esTestReportDetail.setReportId(reportId); esTestReportDetail.setPart(part + 1); esTestReportDetail.setContent(content); esReportDetailRepository.save(esTestReportDetail); }}

可以看到已经成功插入数据

RestHighLevelClient

但是在做数据查询计算时,发现可以使用另外一个库进行操作

一、添加依赖

org.elasticsearch.client
elasticsearch-rest-high-level-client
6.8.4

需要注意的是,一定要使用与你的elasticsearch版本一致的依赖,否则可能会出错

添加配置和配置连接池等方式保持不变

二、查询数据

package com.lluozh.streaming.es.service;import com.lluozh.streaming.es.dao.EsReportDetailRepository;import com.lluozh.streaming.es.domain.ESTestReportDetail;import com.google.gson.Gson;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.elasticsearch.search.sort.SortBuilders;import org.elasticsearch.search.sort.SortOrder;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.io.IOException;import java.util.Arrays;import java.util.Collections;import java.util.List;import java.util.stream.Collectors;/** * @author lluozh * @Description: * @date 2021/1/26 */@Servicepublic class EsService {
@Resource private RestHighLevelClient restHighLevelClient; public List
queryByExample(String reportId, int partFrom){
SearchRequest searchRequest = new SearchRequest("furion-streaming"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.boolQuery().must((QueryBuilders.matchQuery("reportId", reportId))) .must(QueryBuilders.rangeQuery("part").gt(partFrom).lte(partFrom+20))) .sort(SortBuilders.fieldSort("part").order(SortOrder.ASC)); searchSourceBuilder.fetchSource("content", ""); searchRequest.source(searchSourceBuilder); try {
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT.toBuilder().build()); List
responseJsonList = getResponseList(response); return responseJsonList.stream().map(responseJson -> new Gson().fromJson(responseJson, ESTestReportDetail.class).getContent()).collect(Collectors.toList()); } catch (IOException e) {
throw new RuntimeException(e); } } public List
getResponseList(SearchResponse searchResponse) {
SearchHit[] searchHits = searchResponse.getHits().getHits(); if (searchHits.length > 0) {
return Arrays.asList(searchHits).stream().map(searchHit -> searchHit.getSourceAsString()).collect(Collectors.toList()); } else {
return Collections.emptyList(); } }}
  • QueryBuilder

    主要用来构建查询条件、过滤条件
    要构建QueryBuilder,我们可以使用工具类QueryBuilders,里面有大量的方法用来完成各种各样的QueryBuilder的构建,字符串的、Boolean型的、match的、地理范围的等等

  • SortBuilder

    主要是构建排序
    要构建SortBuilder,可以使用SortBuilders来完成各种排序

然后就可以通过SearchSourceBuilder来组合这些QueryBuilder和SortBuilder,再组合分页的参数等等,最终就能得到一个SearchQuery

三、查询数量

public int countByExample(String reportId) {
SearchRequest searchRequest = new SearchRequest("furion-streaming"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("reportId", reportId)); searchRequest.source(searchSourceBuilder); SearchResponse response = null; try {
return (int) restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT.toBuilder().build()).getHits().totalHits; } catch (IOException e) {
throw new RuntimeException(e); } }

转载地址:https://lluozh.blog.csdn.net/article/details/113306969 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:【并发编程的艺术】并发编程的挑战
下一篇:【软件性能测试与调优实践】分析调优案例

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2024年05月04日 17时07分56秒