# Manage Flows

> Describes how to manage flows in GreptimeDB, including creating, updating, and deleting flows. It explains the syntax for creating flows, the importance of sink tables, and how to use the EXPIRE AFTER clause. Examples of SQL queries for managing flows are provided.

# Manage Flows

Each `flow` is a continuous aggregation query in GreptimeDB.
It continuously updates the aggregated data based on the incoming data.
This document describes how to create, and delete a flow.

## Create a Source Table

Before creating a flow, you need to create a source table to store the raw data. Like this:

```sql
CREATE TABLE temp_sensor_data (
  sensor_id INT,
  loc STRING,
  temperature DOUBLE,
  ts TIMESTAMP TIME INDEX,
  PRIMARY KEY(sensor_id, loc)
);
```
However, if you don't want to store the raw data, you can use a temporary table as the source table by creating table using `WITH ('ttl' = 'instant')` table option:

```sql
CREATE TABLE temp_sensor_data (
  sensor_id INT,
  loc STRING,
  temperature DOUBLE,
  ts TIMESTAMP TIME INDEX,
  PRIMARY KEY(sensor_id, loc)
) WITH ('ttl' = 'instant');
```

Setting `'ttl'` to `'instant'` will make the table a temporary table, which means it will automatically discard all inserted data and the table will always be empty, only sending them to flow task for computation.

## Create a Sink Table

Before creating a flow, you need a sink table to store the aggregated data generated by the flow.
While it is the same to a regular time series table, there are a few important considerations:

- **Column order and type**: Ensure the order and type of the columns in the sink table match the query result of the flow.
- **Time index**: Specify the `TIME INDEX` for the sink table, typically using the time window column generated by the time window function.
- **Update time**: The Flow engine automatically appends the update time to the end of each computation result row. This update time is stored in the `updated_at` column. Ensure that this column is included in the sink table schema.
- **Tags**: Use `PRIMARY KEY` to specify Tags, which together with the time index serves as a unique identifier for row data and optimizes query performance.

For example:

```sql
/* Create sink table */
CREATE TABLE temp_alerts (
  sensor_id INT,
  loc STRING,
  max_temp DOUBLE,
  time_window TIMESTAMP TIME INDEX,
  update_at TIMESTAMP,
  PRIMARY KEY(sensor_id, loc)
);

CREATE FLOW temp_monitoring
SINK TO temp_alerts
AS
SELECT
  sensor_id,
  loc,
  max(temperature) AS max_temp,
  date_bin('10 seconds'::INTERVAL, ts) AS time_window
FROM temp_sensor_data
GROUP BY
  sensor_id,
  loc,
  time_window
HAVING max_temp > 100;
```

The sink table has the columns `sensor_id`, `loc`, `max_temp`, `time_window`, and `update_at`.

- The first four columns correspond to the query result columns of flow: `sensor_id`, `loc`, `max(temperature)` and `date_bin('10 seconds'::INTERVAL, ts)` respectively.
- The `time_window` column is specified as the `TIME INDEX` for the sink table.
- The `update_at` column is the last one in the schema to store the update time of the data.
- The `PRIMARY KEY` at the end of the schema definition specifies `sensor_id` and `loc` as the tag columns.
  This means the flow will insert or update data based on the tags `sensor_id` and `loc` along with the time index `time_window`.

## Create a flow

The grammar to create a flow is:

<!-- ```sql
CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
AS 
<SQL>;
``` -->

```sql
CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
AS 
<SQL>;
```

When `OR REPLACE` is specified, any existing flow with the same name will be updated to the new version. It's important to note that this only affects the flow task itself; the source and sink tables will remain unchanged.

Conversely, when `IF NOT EXISTS` is specified, the command will have no effect if the flow already exists, rather than reporting an error. Additionally, please note that `OR REPLACE` cannot be used in conjunction with `IF NOT EXISTS`.

- `flow-name` is an unique identifier in the catalog level.
- `sink-table-name` is the table name where the materialized aggregated data is stored.
  It can be an existing table or a new one. `flow` will create the sink table if it doesn't exist. 
  <!-- If the table already exists, its schema must match the schema of the query result. -->
- `EXPIRE AFTER` is an optional interval to expire the data from the Flow engine.
  For more details, please refer to the [`EXPIRE AFTER`](#expire-after) part.
- `COMMENT` is the description of the flow.
- `SQL` part defines the continuous aggregation query.
  It defines the source tables provide data for the flow.
  Each flow can have multiple source tables.
  Please Refer to [Write a Query](#write-a-sql-query) for the details.

A simple example to create a flow:

```sql
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER '1 hour'::INTERVAL
COMMENT 'My first flow in GreptimeDB'
AS
SELECT
    max(temperature) as max_temp,
    date_bin('10 seconds'::INTERVAL, ts) as time_window,
FROM temp_sensor_data
GROUP BY time_window;
```

The created flow will compute `max(temperature)` for every 10 seconds and store the result in `my_sink_table`. All data comes within 1 hour will be used in the flow.

### EXPIRE AFTER

The `EXPIRE AFTER` clause specifies the interval after which data will expire from the flow engine. 

Data in the source table that exceeds the specified expiration time will no longer be included in the flow's calculations.
Similarly, data in the sink table that is older than the expiration time will not be updated. 
This means the flow engine will ignore data older than the specified interval during aggregation.
This mechanism helps to manage the state size for stateful queries, such as those involving `GROUP BY`.

It is important to note that the `EXPIRE AFTER` clause does not delete data from either the source table or the sink table.
It only controls how the flow engine processes the data.
If you want to delete data from the source or sink table, please [set the `TTL` option](/user-guide/manage-data/overview.md#manage-data-retention-with-ttl-policies) when creating tables.

Setting a reasonable time interval for `EXPIRE AFTER` is helpful to limit state size and avoid memory overflow. This is somewhere similar to the ["Watermarks"](https://docs.risingwave.com/processing/watermarks) concept in streaming processing.

For example, if the flow engine processes the aggregation at 10:00:00 and the `'1 hour'::INTERVAL` is set,
any input data that arrive now with a time index older than 1 hour (before 09:00:00) will expire and be ignore.
Only data timestamped from 09:00:00 onwards will be used in the aggregation and update to sink table.

### Write a SQL query

The `SQL` part of the flow is similar to a standard `SELECT` clause with a few differences. The syntax of the query is as follows:

```sql
SELECT AGGR_FUNCTION(column1, column2,..) [, TIME_WINDOW_FUNCTION() as time_window] FROM <source_table> GROUP BY {time_window | column1, column2,.. };
```

Only the following types of expressions are allowed after the `SELECT` keyword:
- Aggregate functions: Refer to the [Expressions](expressions.md) documentation for details.
- Time window functions: Refer to the [define time window](#define-time-window) section for details.
- Scalar functions: Such as `col`, `to_lowercase(col)`, `col + 1`, etc. This part is the same as in a standard `SELECT` clause in GreptimeDB.

The following points should be noted about the rest of the query syntax:
- The query must include a `FROM` clause to specify the source table.
  As join clauses are currently not supported,
  the query can only aggregate columns from a single table.
- `WHERE` and `HAVING` clauses are supported.
  The `WHERE` clause filters data before aggregation,
  while the `HAVING` clause filters data after aggregation.
- `DISTINCT` currently only works with the `SELECT DISTINCT column1 ..` syntax.
  It is used to remove duplicate rows from the result set.
  Support for `SELECT count(DISTINCT column1) ...` is not available yet but will be added in the future.
- The `GROUP BY` clause works the same as a standard queries,
  grouping data by specified columns.
  The time window column in the `GROUP BY` clause is crucial for continuous aggregation scenarios.
  Other expressions in `GROUP BY` can include literals, columns, or scalar expressions.
- `ORDER BY`, `LIMIT`, and `OFFSET` are not supported.

Refer to [Continuous Aggregation](continuous-aggregation.md) for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboards.

### Define time window

A time window is a crucial attribute of your continuous aggregation query.
It determines how data is aggregated within the flow.
These time windows are left-closed and right-open intervals.

A time window represents a specific range of time.
Data from the source table is mapped to the corresponding window based on the time index column.
The time window also defines the scope for each calculation of an aggregation expression,
resulting in one row per time window in the result table.

You can use `date_bin()` after the `SELECT` keyword to define fixed time windows.
For example:

```sql
SELECT
    max(temperature) as max_temp,
    date_bin('10 seconds'::INTERVAL, ts) as time_window
FROM temp_sensor_data
GROUP BY time_window;
```

In this example, the `date_bin('10 seconds'::INTERVAL, ts)` function creates 10-second time windows starting from UTC 00:00:00.
The `max(temperature)` function calculates the maximum temperature value within each time window.

For more details on the behavior of the function,
please refer to [`date_bin`](/reference/sql/functions/df-functions.md#date_bin).

:::tip NOTE
Currently, flow rely on the time window expr to determine how to incrementally update the result. So it's better to use a relatively small time window when possible.
:::

## Flush a flow

The flow engine automatically processes aggregation operations within a short period(i.e. few seconds) when new data arrives in the source table.
However, you can manually trigger the flow engine to process the aggregation operation immediately using the `ADMIN FLUSH_FLOW` command.

```sql
ADMIN FLUSH_FLOW('<flow-name>')
```

## Delete a flow

To delete a flow, use the following `DROP FLOW` clause:

```sql
DROP FLOW [IF EXISTS] <name>
```

For example:

```sql
DROP FLOW IF EXISTS my_flow;
```
