跳到主要内容
版本:nightly

Kafka

如果你使用 Kafka 或兼容 Kafka 的消息队列进行可观测性数据传输,可以直接将数据写入到 GreptimeDB 中。

这里我们使用 Vector 作为工具将数据从 Kafka 传输到 GreptimeDB。

指标

从 Kafka 写入指标到 GreptimeDB 时,消息应采用 InfluxDB 行协议格式。例如:

census,location=klamath,scientist=anderson bees=23 1566086400000000000

然后配置 Vector 使用 influxdb 解码器来处理这些消息。

[sources.metrics_mq]
# 指定源类型为 Kafka
type = "kafka"
# Kafka 的消费者组 ID
group_id = "vector0"
# 要消费消息的 Kafka 主题列表
topics = ["test_metric_topic"]
# 要连接的 Kafka 地址
bootstrap_servers = "kafka:9092"
# `influxdb` 表示消息应采用 InfluxDB 行协议格式
decoding.codec = "influxdb"

[sinks.metrics_in]
inputs = ["metrics_mq"]
# 指定接收器类型为 `greptimedb_metrics`
type = "greptimedb_metrics"
# GreptimeDB 服务器的端点
# 将 <host> 替换为实际的主机名或 IP 地址
endpoint = "<host>:4001"
dbname = "<dbname>"
username = "<username>"
password = "<password>"
tls = {}

有关 InfluxDB 行协议指标如何映射到 GreptimeDB 数据的详细信息,请参阅 InfluxDB 行协议文档中的数据模型部分。

日志

开发人员通常处理两种类型的日志:JSON 日志和纯文本日志。 例如以下从 Kafka 发送的日志示例。

纯文本日志:

127.0.0.1 - - [25/May/2024:20:16:37 +0000] "GET /index.html HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"

或 JSON 日志:

{
"timestamp": "2024-12-23T10:00:00Z",
"level": "INFO",
"message": "Service started"
}

GreptimeDB 将这些日志转换为具有多个列的结构化数据,并自动创建必要的表。 Pipeline 在写入到 GreptimeDB 之前将日志处理为结构化数据。 不同的日志格式需要不同的 Pipeline 来解析,详情请继续阅读下面的内容。

JSON 格式的日志

对于 JSON 格式的日志(例如 {"timestamp": "2024-12-23T10:00:00Z", "level": "INFO", "message": "Service started"}), 你可以使用内置的 greptime_identity pipeline 直接写入日志。 此 pipeline 根据 JSON 日志消息中的字段自动创建列。

你只需要配置 Vector 的 transforms 设置以解析 JSON 消息,并使用 greptime_identity pipeline,如以下示例所示:

[sources.logs_in]
type = "kafka"
# Kafka 的消费者组 ID
group_id = "vector0"
# 要消费消息的 Kafka 主题列表
topics = ["test_log_topic"]
# 要连接的 Kafka 代理地址
bootstrap_servers = "kafka:9092"

# 将日志转换为 JSON 格式
[transforms.logs_json]
type = "remap"
inputs = ["logs_in"]
source = '''
. = parse_json!(.message)
'''

[sinks.logs_out]
# 指定此接收器将接收来自 `logs_json` 源的数据
inputs = ["logs_json"]
# 指定接收器类型为 `greptimedb_logs`
type = "greptimedb_logs"
# GreptimeDB 服务器的端点
endpoint = "http://<host>:4000"
compression = "gzip"
# 将 <dbname>、<username> 和 <password> 替换为实际值
dbname = "<dbname>"
username = "<username>"
password = "<password>"
# GreptimeDB 中的表名,如果不存在,将自动创建
table = "demo_logs"
# 使用内置的 `greptime_identity` 管道
pipeline_name = "greptime_identity"

文本格式的日志

对于文本格式的日志,例如下面的访问日志格式,你需要创建自定义 pipeline 来解析它们:

127.0.0.1 - - [25/May/2024:20:16:37 +0000] "GET /index.html HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"

创建 pipeline

要创建自定义 pipeline, 请参阅创建 pipelinepipeline 配置文档获取详细说明。

写入数据

创建 pipeline 后,将其配置到 Vector 配置文件中的 pipeline_name 字段。

# sample.toml
[sources.log_mq]
# 指定源类型为 Kafka
type = "kafka"
# Kafka 的消费者组 ID
group_id = "vector0"
# 要消费消息的 Kafka 主题列表
topics = ["test_log_topic"]
# 要连接的 Kafka 地址
bootstrap_servers = "kafka:9092"

[sinks.sink_greptime_logs]
# 指定接收器类型为 `greptimedb_logs`
type = "greptimedb_logs"
# 指定此接收器将接收来自 `log_mq` 源的数据
inputs = [ "log_mq" ]
# 使用 `gzip` 压缩以节省带宽
compression = "gzip"
# GreptimeDB 服务器的端点
# 将 <host> 替换为实际的主机名或 IP 地址
endpoint = "http://<host>:4000"
dbname = "<dbname>"
username = "<username>"
password = "<password>"
# GreptimeDB 中的表名,如果不存在,将自动创建
table = "demo_logs"
# 你创建的自定义管道名称
pipeline_name = "your_custom_pipeline"

Demo

有关数据转换和写入的可运行演示,请参阅 Kafka Ingestion Demo