Manage Flows
Each flow
is a continuous aggregation query in GreptimeDB.
It continuously updates the aggregated data based on the incoming data.
This document describes how to create, and delete a flow.
Create a Source Table
Before creating a flow, you need to create a source table to store the raw data. Like this:
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
);
However, if you don't want to store the raw data, you can use a temporary table as the source table by creating table using WITH ('ttl' = 'instant')
table option:
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
) WITH ('ttl' = 'instant');
Setting 'ttl'
to 'instant'
will make the table a temporary table, which means it will automatically discard all inserted data and the table will always be empty, only sending them to flow task for computation.
Create a Sink Table
Before creating a flow, you need a sink table to store the aggregated data generated by the flow. While it is the same to a regular time series table, there are a few important considerations:
- Column order and type: Ensure the order and type of the columns in the sink table match the query result of the flow.
- Time index: Specify the
TIME INDEX
for the sink table, typically using the time window column generated by the time window function. - Update time: The Flow engine automatically appends the update time to the end of each computation result row. This update time is stored in the
updated_at
column. Ensure that this column is included in the sink table schema. - Tags: Use
PRIMARY KEY
to specify Tags, which together with the time index serves as a unique identifier for row data and optimizes query performance.
For example:
/* Create sink table */
CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP,
PRIMARY KEY(sensor_id, loc)
);
CREATE FLOW temp_monitoring
SINK TO temp_alerts
AS
SELECT
sensor_id,
loc,
max(temperature) AS max_temp,
date_bin(INTERVAL '10 seconds', ts) AS time_window
FROM temp_sensor_data
GROUP BY
sensor_id,
loc,
time_window
HAVING max_temp > 100;
The sink table has the columns sensor_id
, loc
, max_temp
, time_window
, and update_at
.
- The first four columns correspond to the query result columns of flow:
sensor_id
,loc
,max(temperature)
anddate_bin(INTERVAL '10 seconds', ts)
respectively. - The
time_window
column is specified as theTIME INDEX
for the sink table. - The
update_at
column is the last one in the schema to store the update time of the data. - The
PRIMARY KEY
at the end of the schema definition specifiessensor_id
andloc
as the tag columns. This means the flow will insert or update data based on the tagssensor_id
andloc
along with the time indextime_window
.
Create a flow
The grammar to create a flow is:
CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
AS
<SQL>;
When OR REPLACE
is specified, any existing flow with the same name will be updated to the new version. It's important to note that this only affects the flow task itself; the source and sink tables will remain unchanged.
Conversely, when IF NOT EXISTS
is specified, the command will have no effect if the flow already exists, rather than reporting an error. Additionally, please note that OR REPLACE
cannot be used in conjunction with IF NOT EXISTS
.
flow-name
is an unique identifier in the catalog level.sink-table-name
is the table name where the materialized aggregated data is stored. It can be an existing table or a new one.flow
will create the sink table if it doesn't exist.EXPIRE AFTER
is an optional interval to expire the data from the Flow engine. For more details, please refer to theEXPIRE AFTER
part.COMMENT
is the description of the flow.SQL
part defines the continuous aggregation query. It defines the source tables provide data for the flow. Each flow can have multiple source tables. Please Refer to Write a Query for the details.
A simple example to create a flow:
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT 'My first flow in GreptimeDB'
AS
SELECT
max(temperature) as max_temp,
date_bin(INTERVAL '10 seconds', ts) as time_window,
FROM temp_sensor_data
GROUP BY time_window;
The created flow will compute max(temperature)
for every 10 seconds and store the result in my_sink_table
. All data comes within 1 hour will be used in the flow.
EXPIRE AFTER
clause
The EXPIRE AFTER
clause specifies the interval after which data will expire from the flow engine.
This expiration only affects the data in the flow engine and does not impact the data in the source table.
When the flow engine processes the aggregation operation (the update_at
time),
data with a time index older than the specified interval will expire.
For example, if the flow engine processes the aggregation at 10:00:00 and the INTERVAL '1 hour'
is set,
any data older than 1 hour (before 09:00:00) will expire.
Only data timestamped from 09:00:00 onwards will be used in the aggregation.
Write a SQL query
The SQL
part of the flow is similar to a standard SELECT
clause with a few differences. The syntax of the query is as follows:
SELECT AGGR_FUNCTION(column1, column2,..) [, TIME_WINDOW_FUNCTION() as time_window] FROM <source_table> GROUP BY {time_window | column1, column2,.. };
Only the following types of expressions are allowed after the SELECT
keyword:
- Aggregate functions: Refer to the Expressions documentation for details.
- Time window functions: Refer to the define time window section for details.
- Scalar functions: Such as
col
,to_lowercase(col)
,col + 1
, etc. This part is the same as in a standardSELECT
clause in GreptimeDB.
The following points should be noted about the rest of the query syntax:
- The query must include a
FROM
clause to specify the source table. As join clauses are currently not supported, the query can only aggregate columns from a single table. WHERE
andHAVING
clauses are supported. TheWHERE
clause filters data before aggregation, while theHAVING
clause filters data after aggregation.DISTINCT
currently only works with theSELECT DISTINCT column1 ..
syntax. It is used to remove duplicate rows from the result set. Support forSELECT count(DISTINCT column1) ...
is not available yet but will be added in the future.- The
GROUP BY
clause works the same as a standard queries, grouping data by specified columns. The time window column in theGROUP BY
clause is crucial for continuous aggregation scenarios. Other expressions inGROUP BY
can include literals, columns, or scalar expressions. ORDER BY
,LIMIT
, andOFFSET
are not supported.
Refer to Continuous Aggregation for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboards.
Define time window
A time window is a crucial attribute of your continuous aggregation query. It determines how data is aggregated within the flow. These time windows are left-closed and right-open intervals.
A time window represents a specific range of time. Data from the source table is mapped to the corresponding window based on the time index column. The time window also defines the scope for each calculation of an aggregation expression, resulting in one row per time window in the result table.
You can use date_bin()
after the SELECT
keyword to define fixed time windows.
For example:
SELECT
max(temperature) as max_temp,
date_bin(INTERVAL '10 seconds', ts) as time_window
FROM temp_sensor_data
GROUP BY time_window;
In this example, the date_bin(INTERVAL '10 seconds', ts)
function creates 10-second time windows starting from UTC 00:00:00.
The max(temperature)
function calculates the maximum temperature value within each time window.
For more details on the behavior of the function,
please refer to date_bin
.
Currently, the internal state of the flow, such as the accumulator's value for incremental query results (e.g., the accumulator for count(col)
which records the current count), is not persistently stored. To minimize data loss in case of internal state failure, it is advisable to use smaller time windows.
This internal state loss does not affect the existing data in the sink table.
Flush a flow
The flow engine automatically processes aggregation operations within 1 second when new data arrives in the source table.
However, you can manually trigger the flow engine to process the aggregation operation immediately using the ADMIN FLUSH_FLOW
command.
ADMIN FLUSH_FLOW('<flow-name>')
Delete a flow
To delete a flow, use the following DROP FLOW
clause:
DROP FLOW [IF EXISTS] <name>
For example:
DROP FLOW IF EXISTS my_flow;