手记

InfluxDB ORM API封装

InfluxDB 是一个开源的分布式时序、时间和指标数据库,用于存储和检索数据。它是一个自定义的高性能存储引擎,具有时间戳索引和SQL类似的查询语言。
influxdb-client-java 是官方提供的 Java 客户端,但是它的 API 使用起来不是很方便,所以我对其进行了二次封装,使其使用起来更加方便。详细源码

依赖

<!-- https://mvnrepository.com/artifact/com.influxdb/influxdb-client-java -->  
<dependency>  
<groupId>com.influxdb</groupId>  
<artifactId>influxdb-client-java</artifactId>  
<version>6.3.0</version>  
</dependency>  

配置

使用 InfluxDB 之前,需要先在 InfluxDB 中创建一个 bucket,然后创建一个 token,用于访问该 bucket。

spring:  
influx:  
url: http://localhost:8086  
org: xxx-org  
token: xxx  
bucket: xxx-bucket  

注入 InfluxDBClient

import com.influxdb.client.InfluxDBClient;  
import com.influxdb.client.InfluxDBClientFactory;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
@Configuration  
public class InfluxDbConfig {  
  
@Value("${spring.influx.url:''}")  
private String url;  
  
@Value("${spring.influx.org:''}")  
private String org;  
  
@Value("${spring.influx.token:''}")  
private String token;  
  
@Value("${spring.influx.bucket:''}")  
private String bucket;  
  
@Bean  
public InfluxDBClient influxDB(){  
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);  
return client;  
}  
}  

注解

为了区分不同实例的相同指标,如cpu0和cpu1等相同的指标,需要在 Measurement 上添加前、后缀,如:cpu_measure_0、cpu_measure_1,从而实现区分。
为了自动化实现这个功能,需要在 Measurement 上添加前、后缀注解,如:

  
import java.lang.annotation.ElementType;  
import java.lang.annotation.Retention;  
import java.lang.annotation.RetentionPolicy;  
import java.lang.annotation.Target;  
  
/**  
* measurement 前缀  
* 与 @Measurement 拼接 动态的 measurement*/  
@Retention(RetentionPolicy.RUNTIME)  
@Target({ElementType.FIELD})  
public @interface MeasurementPrefix {  
}  
import java.lang.annotation.ElementType;  
import java.lang.annotation.Retention;  
import java.lang.annotation.RetentionPolicy;  
import java.lang.annotation.Target;  
  
/**  
* measurement 后缀  
* 与 @Measurement 拼接 动态的 measurement*/  
@Retention(RetentionPolicy.RUNTIME)  
@Target({ElementType.FIELD})  
public @interface MeasurementSuffix {  
}  

ORM 工具类

完成POJO与InfluxDB的转换,包括前后缀的拼接,以及常用的 Query、Write 等操作。

import com.influxdb.annotations.Column;  
import com.influxdb.annotations.Measurement;  
import com.influxdb.client.InfluxDBClient;  
import com.influxdb.client.WriteApiBlocking;  
import com.influxdb.client.domain.WritePrecision;  
import com.influxdb.client.write.Point;  
import com.influxdb.query.FluxRecord;  
import com.influxdb.query.FluxTable;  
import com.sme.smebackend.influx.MeasurementPrefix;  
import com.sme.smebackend.influx.MeasurementSuffix;  
import lombok.SneakyThrows;  
import lombok.extern.slf4j.Slf4j;  
import org.jetbrains.annotations.NotNull;  
import org.jetbrains.annotations.Nullable;  
  
import java.lang.annotation.Annotation;  
import java.lang.reflect.Field;  
import java.time.Instant;  
import java.time.OffsetDateTime;  
import java.util.List;  
import java.util.Map;  
import java.util.Objects;  
  
@Slf4j  
public class InfluxDBUtil {  
public static String measurementName(Object o) {  
Class clazz = o.getClass();  
  
// 获取 @MeasurementMeasurement meaAnno = (Measurement) clazz.getAnnotation(Measurement.class);  
if (null == meaAnno) {  
return null;  
}  
  
String measurement = meaAnno.name();  
  
// 获取 @MeasurementPrefix 字段的值  
String prefix = getFieldValueByAnno(o, MeasurementPrefix.class);  
  
// 获取 @MeasurementSuffix 字段的值  
String suffix = getFieldValueByAnno(o, MeasurementSuffix.class);  
  
if (null != prefix) {  
measurement = prefix + "_" + measurement;  
}  
  
if (null != suffix) {  
measurement = measurement + "_" + suffix;  
}  
  
return measurement;  
}  
  
@Nullable  
private static String getFieldValueByAnno(Object o, Class annoClazz) {  
Class clazz = o.getClass();  
String val = null;  
  
for (Field field : clazz.getDeclaredFields()) {  
Annotation anno = field.getAnnotation(annoClazz);  
if (null != anno) {  
try {  
field.setAccessible(true);  
val = (String) field.get(o);  
} catch (IllegalAccessException e) {  
log.error("failed to get value of field: {}", field.getName(), e);  
}  
}  
}  
  
return val;  
}  
  
/**  
* 将 @Measurement 注解的对象转换为 Influxdb Point 对象  
*/  
public static Point measurement2Point(Object o) {  
String measurement = measurementName(o);  
if (null == measurement) {  
return null;  
}  
  
Class clazz = o.getClass();  
  
// 遍历所有的 @Column 字段  
Point point = Point.measurement(measurement);  
for (Field field : clazz.getDeclaredFields()) {  
Column colAnno = field.getAnnotation(Column.class);  
MeasurementPrefix prefixAnno = field.getAnnotation(MeasurementPrefix.class);  
MeasurementSuffix suffixAnno = field.getAnnotation(MeasurementSuffix.class);  
if (null == colAnno || null != suffixAnno || null != prefixAnno) {  
continue;  
}  
  
try {  
field.setAccessible(true);  
  
String name = colAnno.name();  
boolean tag = colAnno.tag();  
if (tag) {  
point.addTag(field.getName(), (String) field.get(o));  
} else {  
if (Objects.equals("time", name)) {  
point.time((Instant) field.get(o), WritePrecision.MS);  
} else {  
point.addField(field.getName(), (long) field.get(o));  
}  
}  
} catch (IllegalAccessException e) {  
log.error("failed to get value of field: {}", field.getName(), e);  
}  
}  
  
return point;  
}  
  
/**  
* 写入  
*/  
public static void write(InfluxDBClient influxDB, Object measurement) {  
WriteApiBlocking w = influxDB.getWriteApiBlocking();  
Point point = measurement2Point(measurement);  
if (null == point) {  
return;  
}  
  
w.writePoint(point);  
}  
  
/**  
* 查询某个 measurement 的某个字段  
*/  
public static <T> List<T> query1d(InfluxDBClient influxDB,  
String bucket,  
String measurement, Class<T> clazz) {  
return query1d(influxDB, bucket, measurement, null, clazz);  
}  
  
public static <T> List<T> query1d(InfluxDBClient influxDB,  
String bucket,  
String measurement, List<String> fields, Class<T> clazz) {  
String query = baseQ(bucket, measurement, fields, "-1d");  
return doQuery(influxDB, clazz, query);  
}  
  
private static <T> List<T> doQuery(InfluxDBClient influxDB, Class<T> clazz, String query) {  
List<FluxTable> ts = influxDB.getQueryApi().query(query);  
List<T> res = InfluxDBUtil.fluxTable2Pojo(ts, clazz);  
return res;  
}  
  
@NotNull  
private static String baseQ(String bucket, String measurement, List<String> fields, String start) {  
String query = "from(bucket: \"" + bucket + "\")" +  
" |> range(start: " + start + ")" +  
" |> filter(fn: (r) => r._measurement == \"" + measurement + "\")";  
  
if (!J.empty(fields)) {  
query += " |> filter(fn: (r) => ";  
  
for (int i = 0; i < fields.size(); i++) {  
String field = fields.get(i);  
if (i == 0) {  
query += " r._field == \"" + field + "\"";  
} else {  
query += " or r._field == \"" + field + "\"";  
}  
}  
  
query += ")";  
}  
return query;  
}  
  
/**  
* 聚合查询  
*/  
public static String aggrQ(String bucket, String measurement, List<String> fields,  
String start, String every) {  
String query = baseQ(bucket, measurement, fields, start);  
query += " |> aggregateWindow(every: " + every + ", fn: mean, createEmpty: true)";  
return query;  
}  
  
public static <T> List<T> queryAggr(InfluxDBClient influxDB,  
String bucket, String measurement, Class<T> clazz,  
String start, String every) {  
String query = aggrQ(bucket, measurement, null, start, every);  
return doQuery(influxDB, clazz, query);  
}  
  
/**  
* 通用聚合查询  
*/  
public static <T> List<T> queryAggr(InfluxDBClient influxDB,  
String bucket, String measurement, List<String> fields, Class<T> clazz,  
String start, String every) {  
String query = aggrQ(bucket, measurement, fields, start, every);  
return doQuery(influxDB, clazz, query);  
}  
  
/**  
* 查询某个 measurement 的某个字段, 整 5 min 的数据,如 10:20, 10:25, 10:30*/  
public static <T> List<T> queryBy5Min(InfluxDBClient influxDB,  
String bucket,  
String measurement, Class<T> clazz) {  
return queryBy5Min(influxDB, bucket, measurement, null, clazz);  
}  
  
public static <T> List<T> queryBy5Min(InfluxDBClient influxDB,  
String bucket,  
String measurement, List<String> fields, Class<T> clazz) {  
return queryAggr(influxDB, bucket, measurement, fields, clazz, "-1d", "5m");  
}  
  
public static <T> List<T> queryBy1Min(InfluxDBClient influxDB,  
String bucket,  
String measurement, Class<T> clazz) {  
return queryBy1Min(influxDB, bucket, measurement, null, clazz);  
}  
  
public static <T> List<T> queryBy1Min(InfluxDBClient influxDB,  
String bucket,  
String measurement, List<String> fields, Class<T> clazz) {  
return queryAggr(influxDB, bucket, measurement, fields, clazz, "-1d", "1m");  
}  
  
/**  
* 查询某个 measurement 的某个字段, 整点数据,如 10:00, 11:00, 12:00*/  
public static <T> List<T> queryByHour(InfluxDBClient influxDB,  
String bucket,  
String measurement, Class<T> clazz) {  
return queryByHour(influxDB, bucket, measurement, null, clazz);  
}  
  
public static <T> List<T> queryByHour(InfluxDBClient influxDB,  
String bucket,  
String measurement, List<String> fields, Class<T> clazz) {  
return queryAggr(influxDB, bucket, measurement, fields, clazz, "-7d", "1h");  
}  
  
/**  
* 查询某个 measurement 的某个字段, 整天数据,如 2021-01-01 00:00:00, 2021-01-02 00:00:00*/  
public static <T> List<T> queryByDay(InfluxDBClient influxDB,  
String bucket,  
String measurement, Class<T> clazz) {  
return queryByDay(influxDB, bucket, measurement, null, clazz);  
}  
  
public static <T> List<T> queryByDay(InfluxDBClient influxDB,  
String bucket,  
String measurement, List<String> fields, Class<T> clazz) {  
return queryAggr(influxDB, bucket, measurement, fields, clazz, "-1M", "1d");  
}  
  
/**  
* 删除一个月前的数据  
*/  
public static void deleteBeforeOneMonth(InfluxDBClient influxDB, String bucket, String org, String measurement) {  
OffsetDateTime now = OffsetDateTime.now();  
  
influxDB.getDeleteApi().delete(  
now.minusYears(2), now.minusMonths(1),  
String.format("_measurement=\"%s\"", measurement),  
bucket, org  
);  
}  
  
/**  
* 删除所有数据  
*/  
public static void deleteAll(InfluxDBClient influxDB, String bucket, String org, String measurement) {  
OffsetDateTime now = OffsetDateTime.now();  
  
influxDB.getDeleteApi().delete(  
now.minusYears(100), now,  
String.format("_measurement=\"%s\"", measurement),  
bucket, org  
);  
}  
  
/**  
* 将 FluxTable 转化为 pojo list*/  
@SneakyThrows  
public static <T> List<T> fluxTable2Pojo(List<FluxTable> tables, Class<T> clazz) {  
List<T> ts = J.list();  
  
for (FluxTable tab : tables) {  
List<FluxRecord> recs = tab.getRecords();  
for (int i = 0; i < recs.size(); i++) {  
FluxRecord r = recs.get(i);  
  
T t = null;  
if (i > ts.size() - 1) {  
t = clazz.newInstance();  
ts.add(t);  
} else {  
t = ts.get(i);  
}  
  
record2Pojo(r, t);  
}  
}  
  
return ts;  
}  
  
/**  
* 将 {@link FluxRecord } 转化为 pojo 对象  
*/  
public static <T> T record2Pojo(FluxRecord record, T t) {  
try {  
Class clazz = t.getClass();  
  
Map<String, Object> recordValues = record.getValues();  
  
String recordField = record.getField();  
Object value = record.getValue();  
  
Field[] fileds = clazz.getDeclaredFields();  
for (Field field : fileds) {  
Column colAnno = field.getAnnotation(Column.class);  
  
MeasurementPrefix prefixAnno = field.getAnnotation(MeasurementPrefix.class);  
MeasurementSuffix suffixAnno = field.getAnnotation(MeasurementSuffix.class);  
if (null == colAnno || null != suffixAnno || null != prefixAnno) {  
continue;  
}  
  
field.setAccessible(true);  
  
String colName = colAnno.name();  
  
if (Objects.equals("time", colName)) {  
field.set(t, record.getTime());  
} else if (Objects.equals(colName, recordField)) {  
if (null == value) {  
continue;  
}  
String type = field.getType().getName();  
  
if (colName.equals("writeSectTotal")) {  
System.out.println("writeSectTotal");  
}  
  
if ((type.equals("long") || type.equals("java.lang.Long"))  
&& value instanceof Double) {  
field.set(t, ((Double) value).longValue());  
} else if ((type.equals("int") || type.equals("java.lang.Integer"))  
&& value instanceof Double) {  
field.set(t, ((Double) value).intValue());  
} else {  
field.set(t, value);  
}  
  
} else {  
Object v = recordValues.get(field.getName());  
if (null != v) {  
field.set(t, v);  
}  
}  
}  
  
return t;  
} catch (Exception e) {  
e.printStackTrace();  
}  
  
return null;  
}  
  
/**  
* 将 {@link FluxRecord } 转化为 pojo 对象  
*/  
public static <T> T record2Pojo(FluxRecord record, Class<T> clazz) {  
try {  
T t = clazz.newInstance();  
  
Map<String, Object> recordValues = record.getValues();  
  
String recordField = record.getField();  
Object value = record.getValue();  
  
for (Field field : clazz.getDeclaredFields()) {  
Column colAnno = field.getAnnotation(Column.class);  
  
MeasurementPrefix prefixAnno = field.getAnnotation(MeasurementPrefix.class);  
MeasurementSuffix suffixAnno = field.getAnnotation(MeasurementSuffix.class);  
if (null == colAnno || null != suffixAnno || null != prefixAnno) {  
continue;  
}  
  
field.setAccessible(true);  
  
String colName = colAnno.name();  
  
if (Objects.equals("time", colName)) {  
field.set(t, record.getTime());  
} else if (Objects.equals(colName, recordField)) {  
field.set(t, value);  
} else {  
Object v = recordValues.get(field.getName());  
if (null != v) {  
field.set(t, v);  
}  
}  
}  
  
return t;  
} catch (Exception e) {  
e.printStackTrace();  
}  
  
return null;  
}  
}  

以上的工具是我在使用 InfluxDB 时,总结出来的一些常用的方法,可以直接拿来使用,大大简化了 InfluxDB 的使用。

0人推荐
随时随地看视频
慕课网APP