环境
我的开发环境如下:
Java 8
ElasticSearch 7.13.4
Spring Boot 2.7.14
一、依赖
<!-- elasticsearch依赖 --> <dependency> <groupId>co.elastic.clients</groupId> <artifactId>elasticsearch-java</artifactId> <version>7.17.9</version> </dependency> <dependency> <groupId>org.glassfish</groupId> <artifactId>jakarta.json</artifactId> <version>2.0.1</version> </dependency>
二、项目配置
1、在application.yaml文件中添加配置
spring: elasticsearch: uris: test.xxxxxx.com:9200 #替换成实际的地址和端口 username: xxxxxx password: xxxxxx #替换成实际的账号密码 socket-timeout: 10s connection-timeout: 15s webclient: max-in-memory-size: 100MB
2、实现ElasticSearchConfig类
@Configuration public class ElasticSearchConfig { @Value("${spring.elasticsearch.uris}") private String host; @Value("${spring.elasticsearch.username}") private String name; @Value("${spring.elasticsearch.password}") private String password; @Bean public ElasticsearchClient getElasticsearchClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(name, password)); var headers = Collections.singletonList(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())); HttpHost httpHost = HttpHost.create(host); RestClient client = RestClient .builder(httpHost) .setHttpClientConfigCallback(httpClientBuilder ->httpClientBuilder .setDefaultHeaders(headers) .addInterceptorLast((HttpResponseInterceptor) (response, context) -> response.addHeader("X-Elastic-Product", "Elasticsearch")) .setDefaultCredentialsProvider(credentialsProvider) .setKeepAliveStrategy((response, context) -> 180 * 1000) ) .build(); ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper()); return new ElasticsearchClient(transport); } }
三、ElasticSearch工具类
ElasticSearchHandle工具类主要封装了index和document的相关方法。
@Slf4j @Service public class ElasticSearchHandle{ private final ElasticsearchClient client; public ElasticSearchHandle(ElasticsearchClient elasticsearchClient) { this.client = elasticsearchClient; } //判断index是否存在 public boolean hasIndex(String indexName) throws IOException { var exists = client.indices().exists(d -> d.index(indexName)); return exists.value(); } //创建index public void createIndex(String indexName) { try { var response = client.indices().create(c -> c.index(indexName)); } catch (IOException e) { log.error("es create index error",e); throw new RuntimeException("es create index error"); } } //删除index public void deleteIndex(String indexName) throws IOException{ client.indices().delete(d -> d.index(indexName)); } //插入document public void insertDocument(String indexName, Object obj, String id){ try{ client.index(i -> i.index(indexName).id(id).document(obj)); } catch (IOException e) { log.error("es insert document error",e); throw new RuntimeException("es insert document error"); } } //删除document public void deleteDocument(String indexName, String id) { try { client.delete(d -> d.index(indexName).id(id)); } catch (IOException e) { log.error("es delete document error",e); throw new RuntimeException("es delete document error"); } } //查询document public <T> GetResponse<T> getDocument(String indexName, String id, Class<T> t) { try { return client.get(d -> d.index(indexName).id(id),t ); } catch (IOException e) { log.error("es get document error",e); throw new RuntimeException("es get document error"); } } //条件查询 public <T> PageData<T> searchDocumentPage(SearchRequest request, Class<T> t){ try{ var response = client.search(request, t); var totalCount = response.hits().total() == null ? 0 : response.hits().total().value(); var data = response.hits().hits().stream().map(Hit::source).collect(Collectors.toList()); return new PageData<>(data,totalCount); }catch (IOException e){ log.error("es search document error",e); throw new RuntimeException("es get document error"); } } }
条件查询时,定义返回结果:PageData
public class PageData<T>{ public List<T> data; public long total; public PageData(List<T> data, long total) { this.data = data; this.total = total; } }
以上,elasticSearch的准备工作就算完成了,后面只需要将ElasticSearchHandle注入到具体的service中,就可以实现具体的业务需求了。
四、查询示例
这是我通过elasticSearch实现的一个示例,包括按月份动态创建index、保存和分页查询。
@Service public class AlarmHistoryService{ public static final String INDEX = "alarm_history_"; private final ElasticSearchHandle elasticSearchHandle; public AlarmHistoryService(ElasticSearchHandle elasticSearchHandle) { this.elasticSearchHandle = elasticSearchHandle; } //按月份动态建立index,示例:alarm_history_2024-08 public String getIndexName(){ var now = Instant.now(); return INDEX + now.toString().substring(0,7); } //创建动态index public String createIndex() throws IOException { String indexName = getIndexName(); if(!elasticSearchHandle.hasIndex(indexName)){ elasticSearchHandle.createIndex(indexName); } return indexName; } //保存 public void commit(AlarmHistory alarmHistory) throws IOException { var indexName = createIndex(); elasticSearchHandle.insertDocument(indexName, alarmHistory, alarmHistory.getId()); } //按条件过滤 public PageData<AlarmHistory> queryPage(Integer level, Date startTime, Date endTime, Integer pageIndex, Integer pageSize){ List<Query> mustQueries = new ArrayList<>(); if(level != null){ mustQueries.add(Query.of(q -> q.term(t -> t.field("level").value(level)))); } if(startTime != null && endTime != null){ mustQueries.add(Query.of(q -> q.range(r -> r.field("alarmTime").gt(JsonData.of(startTime)).lt(JsonData.of(endTime))))); } //这里是查询所有alarm_history_开头的index,也可以替换成指定的index。 //对结果按"alarmTime"倒叙排序,并分页 SearchRequest request = SearchRequest.of(r -> r.index(AlarmHistoryService.INDEX + "*") .query(q -> q.bool(b -> b.must(mustQueries))) .sort(so -> so.field(f -> f.field("alarmTime").order(SortOrder.Desc))) .from(pageIndex) .size(pageSize) ); return elasticSearchHandle.searchDocumentPage(request, AlarmHistory.class); } }