Java
GreptimeDB 提供了用于高吞吐量数据写入的 ingester 库。 它使用 gRPC 协议,支持自动生成表结构,无需在写入数据前创建表。 更多信息请参考 自动生成表结构。
GreptimeDB 提供的 Java ingester SDK 是一个轻量级库,具有以下特点:
- 基于 SPI 的可扩展网络传输层,提供了使用 gRPC 框架的默认实现。
- 非阻塞、纯异步的易于使用的 API。
- 默认情况下自动收集各种性能指标,然后可以配置并将其写入本地文件。
- 能够对关键对象进行内存快照,配置并将其写入本地文件。这对于解决复杂问题很有帮助。
快速开始 Demo
你可以通过 快速开始 Demo 来了解如何使用 GreptimeDB Java SDK。
安装
- 安装 Java 开发工具包(JDK)
确保你的系统已安装 JDK 8 或更高版本。有关如何检查 Java 版本并安装 JDK 的更多信息,请参见 Oracle JDK 安装概述文档
- 将 GreptimeDB Java SDK 添加为依赖项
如果你使用的是 Maven,请将以下内容添加到 pom.xml 的依赖项列表中:
<dependency>
<groupId>io.greptime</groupId>
<artifactId>ingester-all</artifactId>
<version>0.11.0</version>
</dependency>
最新版本可以在 这里 查看。
配置依赖项后,请确保它们对项目可用,这可能需要在 IDE 中刷新项目或运行依赖项管理器。
连接数据库
如果你在启动 GreptimeDB 时设置了 --user-provider
,
则需要提供用户名和密码才能连接到 GreptimeDB。
以下示例显示了使用 SDK 连接到 GreptimeDB 时如何设置用户名和密码。
下方的代码展示了以最简单的配置连接到 GreptimeDB 的方法。 如果想要自定义连接选项,请参考 API 文档。 请注意每个选项的注释,它们提供了对其各自角色的详细解释。
// GreptimeDB 默认 database 为 "public",默认 catalog 为 "greptime",
// 我们可以将其作为测试数据库使用
String database = "public";
// 默认情况下,GreptimeDB 使用 gRPC 协议在监听端口 4001。
// 我们可以提供多个指向同一 GreptimeDB 集群的 endpoints,
// 客户端将根据负载均衡策略调用这些 endpoints。
String[] endpoints = {"127.0.0.1:4001"};
// 设置鉴权信息
AuthInfo authInfo = new AuthInfo("username", "password");
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database)
// 如果数据库不需要鉴权,我们可以使用 AuthInfo.noAuthorization() 作为参数。
.authInfo(authInfo)
// 如果服务配置了 TLS,设置 TLS 选项来启用安全连接
//.tlsOptions(new TlsOptions())
// 好的开始 ^_^
.build();
GreptimeDB client = GreptimeDB.create(opts);
数据模型
表中的每条行数据包含三种类型的列:Tag
、Timestamp
和 Field
。更多信息请参考 数据模型。
列值的类型可以是 String
、Float
、Int
、JSON
, Timestamp
等。更多信息请参考 数据类型。
设置表选项
虽然在通过 SDK 向 GreptimeDB 写入数据时会自动创建时间序列表,但你仍然可以配置表选项。 SDK 支持以下表选项:
auto_create_table
:默认值为True
。如果设置为False
,则表示表已经存在且不需要自动创建,这可以提高写入性能。ttl
、append_mode
、merge_mode
:更多详情请参考表选项。
你可以使用 Context
设置表选项。
例如,使用以下代码设置 ttl
选项:
Context ctx = Context.newDefault();
// 添加一个 hint,使数据库创建一个具有指定 TTL (time-to-live) 的表
ctx = ctx.withHint("ttl", "3d");
// 将压缩算法设置为 Zstd
ctx = ctx.withCompression(Compression.Zstd)
// 使用 ctx 对象写入数据
// `cpuMetric` 和 `memMetric` 是定义的数据对象,之后的章节中有详细描述
CompletableFuture<Result<WriteOk, Err>> future = greptimeDB.write(Arrays.asList(cpuMetric, memMetric), WriteOp.Insert, ctx);
关于如何向 GreptimeDB 写入数据,请参考以下各节。
低层级 API
GreptimeDB 的低层级 API 通过向具有预定义模式的 table
对象添加 row
来写入数据。
创建行数据
以下代码片段首先构建了一个名为 cpu_metric
的表,其中包括 host
、cpu_user
、cpu_sys
和 ts
列。
随后,它向表中插入了一行数据。
该表包含三种类型的列:
Tag
:host
列,值类型为String
。Field
:cpu_user
和cpu_sys
列,值类型为Float
。Timestamp
:ts
列,值类型为Timestamp
。
// 为 `cpu_metric` 构建表结构。
// schema 是不可变的,可以安全地在多个操作中重复使用。
// 建议使用蛇形命名法(snake_case)作为列名。
TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric")
.addTag("host", DataType.String) // 主机的标识符
.addTimestamp("ts", DataType.TimestampMillisecond) // 毫秒级的时间戳
.addField("cpu_user", DataType.Float64) // 用户进程的 CPU 使用率
.addField("cpu_sys", DataType.Float64) // 系统进程的 CPU 使用率
.build();
// 根据指定的 schema 创建一个 table
// Table 不可重复使用 - 每次写操作都必须创建一个新实例。
// 然而,在真正发起写入操作之前,我们可以向单个 table 中添加多行数据,然后一次性执行写入操作,
// 这比逐行写入更有效率。
Table cpuMetric = Table.from(cpuMetricSchema);
// 单行的示例数据
String host = "127.0.0.1"; // 主机标识符
long ts = System.currentTimeMillis(); // 当前时间戳
double cpuUser = 0.1; // 用户进程的 CPU 使用率(百分比)
double cpuSys = 0.12; // 系统进程的 CPU 使用率(百分比)
// 将一行数据插入表中
// 注意:参数必须按照定义的表结构的列顺序排列:host, ts, cpu_user, cpu_sys
cpuMetric.addRow(host, ts, cpuUser, cpuSys);
// 可以继续添加更多行数据到 table 中
// ..
// 调用 `complete()` 方法使 table 变为不可变状态,为写入操作做最后准备。
// 如果用户忘记调用此方法,系统会在实际写入数据前自动调用它。
cpuMetric.complete();
为了提高写入数据的效率,你可以一次创建多行数据以便写入到 GreptimeDB。
// 创建表结构
// schema 是不可变的,可以安全地在多个操作中重复使用。
// 建议使用蛇形命名法(snake_case)作为列名。
TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric")
.addTag("host", DataType.String)
.addTimestamp("ts", DataType.TimestampMillisecond)
.addField("cpu_user", DataType.Float64)
.addField("cpu_sys", DataType.Float64)
.build();
TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric")
.addTag("host", DataType.String)
.addTimestamp("ts", DataType.TimestampMillisecond)
.addField("mem_usage", DataType.Float64)
.build();
// Table 不可重复使用 - 每次写操作都必须创建一个新实例。
// 然而,在真正发起写入操作之前,我们可以向单个 table 中添加多行数据,然后一次性执行写入操作,
// 这比逐行写入更有效率。
Table cpuMetric = Table.from(cpuMetricSchema);
Table memMetric = Table.from(memMetricSchema);
// 添加行数据
for (int i = 0; i < 10; i++) {
String host = "127.0.0." + i;
long ts = System.currentTimeMillis();
double cpuUser = i + 0.1;
double cpuSys = i + 0.12;
// 向 `cpu_metric` 表中添加一行数据。
// 值的顺序必须与表结构定义匹配。
cpuMetric.addRow(host, ts, cpuUser, cpuSys);
}
for (int i = 0; i < 10; i++) {
String host = "127.0.0." + i;
long ts = System.currentTimeMillis();
double memUsage = i + 0.2;
// 向 `mem_metric` 表中添加一行数据。
// 值的顺序必须与表结构定义匹配。
memMetric.addRow(host, ts, memUsage);
}
// 调用 `complete()` 方法使 table 变为不可变状态。即使用户忘记调用此方法,
// 系统也会在实际写入数据前自动在内部调用它。
cpuMetric.complete();
memMetric.complete();
插入数据
下方示例展示了如何向 GreptimeDB 的表中插入行数据。
// 插入数据
// 出于性能考虑,SDK 被设计为纯异步的。
// 返回值是一个 CompletableFuture 对象。如果你想立即获取结果,
// 可以调用 `future.get()`,这将阻塞直到操作完成。
// 对于生产环境,建议使用回调或 CompletableFuture API 等非阻塞方式。
CompletableFuture<Result<WriteOk, Err>> future = greptimeDB.write(cpuMetric, memMetric);
Result<WriteOk, Err> result = future.get();
if (result.isOk()) {
LOG.info("Write result: {}", result.getOk());
} else {
LOG.error("Failed to write: {}", result.getErr());
}
流式插入
当你需要插入大量数据时,例如导入历史数据,流式插入是非常有用的。
// 设置压缩算法为 Zstd。
Context ctx = Context.newDefault().withCompression(Compression.Zstd);
// 创建一个流式写入器,限制速率为每秒 100,000 个数据点。
// 这有助于控制数据流量,防止数据库过载。
StreamWriter<Table, WriteOk> writer = greptimeDB.streamWriter(100000, ctx);
// 将表数据写入流中。数据将立即被刷新到网络中。这使得数据可以高效、低延迟地传输到数据库。
// 由于这是客户端到服务端的单向流传输,我们无法立即从数据库端获取写入结果。
// 写入所有数据后,我们可以调用 `completed()` 来完成这个流,并获取结果。
writer.write(cpuMetric);
writer.write(memMetric);
// 你可以对流执行操作,例如删除前 5 行
writer.write(cpuMetric.subRange(0, 5), WriteOp.Delete);
在所有数据写入完毕后关闭流式写入。 一般情况下,连续写入数据时不需要关闭流式写入。
// 完成流式写入
CompletableFuture<WriteOk> future = writer.completed();
// 现在我们可以获取写入结果。
WriteOk result = future.get();
LOG.info("Write result: {}", result);
高层级 API
SDK 的高层级 API 使用 ORM 风格的对象写入数据, 它允许你以更面向对象的方式创建、插入和更新数据,为开发者提供了更友好的体验。 然而,高层级 API 不如低层级 API 高效。 这是因为 ORM 风格的对象在转换对象时可能会消耗更多的资源和时间。
创建行数据
GreptimeDB Java Ingester SDK 允许我们使用基本的 POJO 对象进行写入。虽然这种方法需要使用 Greptime 的注解,但它们很容易使用。
@Metric(name = "cpu_metric")
public class Cpu {
@Column(name = "host", tag = true, dataType = DataType.String)
private String host;
@Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond)
private long ts;
@Column(name = "cpu_user", dataType = DataType.Float64)
private double cpuUser;
@Column(name = "cpu_sys", dataType = DataType.Float64)
private double cpuSys;
// getters and setters
// ...
}
@Metric(name = "mem_metric")
public class Memory {
@Column(name = "host", tag = true, dataType = DataType.String)
private String host;
@Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond)
private long ts;
@Column(name = "mem_usage", dataType = DataType.Float64)
private double memUsage;
// getters and setters
// ...
}
// 添加行
List<Cpu> cpus = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Cpu c = new Cpu();
c.setHost("127.0.0." + i);
c.setTs(System.currentTimeMillis());
c.setCpuUser(i + 0.1);
c.setCpuSys(i + 0.12);
cpus.add(c);
}
List<Memory> memories = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Memory m = new Memory();
m.setHost("127.0.0." + i);
m.setTs(System.currentTimeMillis());
m.setMemUsage(i + 0.2);
memories.add(m);
}
插入数据
写入 POJO 对象:
// 插入数据
CompletableFuture<Result<WriteOk, Err>> puts = greptimeDB.writePOJOs(cpus, memories);
Result<WriteOk, Err> result = puts.get();
if (result.isOk()) {
LOG.info("Write result: {}", result.getOk());
} else {
LOG.error("Failed to write: {}", result.getErr());
}
流式插入
当你需要插入大量数据时,例如导入历史数据,流式插入是非常有用的。
StreamWriter<List<?>, WriteOk> writer = greptimeDB.streamWriterPOJOs();
// 写入数据到流中
writer.write(cpus);
writer.write(memories);
// 你可以对流执行操作,例如删除前 5 行
writer.write(cpus.subList(0, 5), WriteOp.Delete);
在所有数据写入完毕后关闭流式写入。 一般情况下,连续写入数据时不需要关闭流式写入。
// 完成流式写入
CompletableFuture<WriteOk> future = writer.completed();
WriteOk result = future.get();
LOG.info("Write result: {}", result);
插入 JSON 类型的数据
GreptimeDB 支持使用 JSON 类型数据 存储复杂的数据结构。
使用此 ingester 库,你可以通过字符串值插入 JSON 数据。
假如你有一个名为 sensor_readings
的表,并希望添加一个名为 attributes
的 JSON 列,
请参考以下代码片段。
在低层级 API 中,
你可以使用 addField
方法将列类型指定为 DataType.Json
来添加 JSON 列,
然后使用 Map 对象添加 JSON 数据。
// 为 sensor_readings 表构建表结构
TableSchema sensorReadings = TableSchema.newBuilder("sensor_readings")
// 此处省略了创建其他列的代码
// ...
// 将列类型指定为 JSON
.addField("attributes", DataType.Json)
.build();
// ...
// 使用 map 添加 JSON 数据
Map<String, Object> attr = new HashMap<>();
attr.put("location", "factory-1");
sensorReadings.addRow(<other-column-values>... , attr);
// 以下省略了写入数据的代码
// ...
在高层级 API 中,你可以在 POJO 对象中指定列类型为 DataType.Json
。
@Metric(name = "sensor_readings")
public class Sensor {
// 此处省略了创建其他列的代码
// ...
// 将列类型指定为 JSON
@Column(name = "attributes", dataType = DataType.Json)
private Map<String, Object> attributes;
// ...
}
Sensor sensor = new Sensor();
// ...
// 使用 map 添加 JSON 数据
Map<String, Object> attr = new HashMap<>();
attr.put("action", "running");
sensor.setAttributes(attr);
// 以下省略了写入数据的代码
// ...
调试日志
Java SDK 提供了用于调试的指标和日志。 请参考 Metrics & Display 和 Magic Tools 了解如何启用或禁用日志。