手记

Spring Boot集成ElasticSearch7

环境

    我的开发环境如下:

  1.     Java 8

  2.     ElasticSearch 7.13.4

  3.  Spring Boot 2.7.14

一、依赖

<!--   elasticsearch依赖     -->
<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>7.17.9</version>
</dependency>
<dependency>
    <groupId>org.glassfish</groupId>
    <artifactId>jakarta.json</artifactId>
    <version>2.0.1</version>
</dependency>


二、项目配置

    1、在application.yaml文件中添加配置

spring:
  elasticsearch:
    uris: test.xxxxxx.com:9200  #替换成实际的地址和端口
    username: xxxxxx
    password: xxxxxx    #替换成实际的账号密码
    socket-timeout: 10s
    connection-timeout: 15s
    webclient:
      max-in-memory-size: 100MB

    2、实现ElasticSearchConfig类

@Configuration
public class ElasticSearchConfig
{
    @Value("${spring.elasticsearch.uris}")
    private String host;

    @Value("${spring.elasticsearch.username}")
    private String name;

    @Value("${spring.elasticsearch.password}")
    private String password;

    @Bean
    public ElasticsearchClient getElasticsearchClient()
    {

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(name, password));

        var headers = Collections.singletonList(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()));

        HttpHost httpHost = HttpHost.create(host);
        RestClient client = RestClient
                .builder(httpHost)
                .setHttpClientConfigCallback(httpClientBuilder
                        ->httpClientBuilder
                        .setDefaultHeaders(headers)
                        .addInterceptorLast((HttpResponseInterceptor) (response, context)
                                -> response.addHeader("X-Elastic-Product", "Elasticsearch"))
                        .setDefaultCredentialsProvider(credentialsProvider)
                        .setKeepAliveStrategy((response, context) -> 180 * 1000)
                )
                .build();

        ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper());
        return new ElasticsearchClient(transport);
    }
}

三、ElasticSearch工具类

ElasticSearchHandle工具类主要封装了index和document的相关方法。

@Slf4j
@Service
public class ElasticSearchHandle{
    private final ElasticsearchClient client;

    public ElasticSearchHandle(ElasticsearchClient elasticsearchClient) {
        this.client = elasticsearchClient;
    }
    
    //判断index是否存在
    public boolean hasIndex(String indexName) throws IOException {
        var exists = client.indices().exists(d -> d.index(indexName));

        return exists.value();
    }
    
    //创建index
    public void createIndex(String indexName) {
        try {
            var response = client.indices().create(c -> c.index(indexName));
            
        } catch (IOException e) {
            log.error("es create index error",e);

            throw new RuntimeException("es create index error");
        }
    }
    
    //删除index
    public void deleteIndex(String indexName) throws IOException{
        client.indices().delete(d -> d.index(indexName));
    }
    
    //插入document
    public void insertDocument(String indexName, Object obj, String id){
        try{

            client.index(i -> i.index(indexName).id(id).document(obj));

        } catch (IOException e) {
            log.error("es insert document error",e);

            throw new RuntimeException("es insert document error");
        }

    }
    
    //删除document
    public void deleteDocument(String indexName, String id) {
        try {
            client.delete(d -> d.index(indexName).id(id));

        } catch (IOException e) {
            log.error("es delete document error",e);

            throw new RuntimeException("es delete document error");
        }
    }
    
    //查询document
    public <T> GetResponse<T> getDocument(String indexName, String id, Class<T> t) {
        try {

            return client.get(d -> d.index(indexName).id(id),t );
        } catch (IOException e) {
            log.error("es get document error",e);

            throw new RuntimeException("es get document error");
        }
    }
    
    //条件查询
    public <T> PageData<T> searchDocumentPage(SearchRequest request, Class<T> t){

        try{

            var response = client.search(request, t);

            var totalCount = response.hits().total() == null ? 0 : response.hits().total().value();

            var data = response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());

            return new PageData<>(data,totalCount);

        }catch (IOException e){
            log.error("es search document error",e);

            throw new RuntimeException("es get document error");
        }

    }
}

条件查询时,定义返回结果:PageData

public class PageData<T>{
    public List<T> data;

    public long total;

    public PageData(List<T> data, long total)
    {
        this.data = data;
        this.total = total;
    }
}

    以上,elasticSearch的准备工作就算完成了,后面只需要将ElasticSearchHandle注入到具体的service中,就可以实现具体的业务需求了。

四、查询示例

    这是我通过elasticSearch实现的一个示例,包括按月份动态创建index、保存和分页查询。

@Service
public class AlarmHistoryService{

    public static final String INDEX = "alarm_history_";
    
    private final ElasticSearchHandle elasticSearchHandle;

    public AlarmHistoryService(ElasticSearchHandle elasticSearchHandle) {
        this.elasticSearchHandle = elasticSearchHandle;
    }
    
    //按月份动态建立index,示例:alarm_history_2024-08
    public String getIndexName(){
        var now = Instant.now();
        return INDEX + now.toString().substring(0,7);
    }
    
    //创建动态index
    public String createIndex() throws IOException {

        String indexName = getIndexName();

        if(!elasticSearchHandle.hasIndex(indexName)){
            elasticSearchHandle.createIndex(indexName);
        }

        return indexName;
    }
    
    //保存
    public void commit(AlarmHistory alarmHistory) throws IOException {

        var indexName = createIndex();
        elasticSearchHandle.insertDocument(indexName, alarmHistory, alarmHistory.getId());

    }
    
    //按条件过滤
    public PageData<AlarmHistory> queryPage(Integer level, Date startTime, Date endTime, Integer pageIndex, Integer pageSize){
        List<Query> mustQueries = new ArrayList<>();
        if(level != null){
            mustQueries.add(Query.of(q -> q.term(t -> t.field("level").value(level))));
        }
        if(startTime != null && endTime != null){
            mustQueries.add(Query.of(q -> q.range(r -> r.field("alarmTime").gt(JsonData.of(startTime)).lt(JsonData.of(endTime)))));
        }

        //这里是查询所有alarm_history_开头的index,也可以替换成指定的index。
        //对结果按"alarmTime"倒叙排序,并分页
        SearchRequest request = SearchRequest.of(r -> r.index(AlarmHistoryService.INDEX + "*")
            .query(q -> q.bool(b -> b.must(mustQueries)))
            .sort(so -> so.field(f -> f.field("alarmTime").order(SortOrder.Desc)))
            .from(pageIndex)
            .size(pageSize)
        );

        return elasticSearchHandle.searchDocumentPage(request, AlarmHistory.class);
    }
}






















0人推荐
随时随地看视频
慕课网APP