Skip to content

Commit

Permalink
docs: add flow user guide (#954)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yiran <cuiyiran3@gmail.com>
Co-authored-by: Jeremyhi <jiachun_feng@proton.me>
  • Loading branch information
3 people authored May 20, 2024
1 parent f7b806b commit 700cfe6
Show file tree
Hide file tree
Showing 14 changed files with 555 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/nightly/en/summary.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
- sql
- promql
- query-external-data
- Continuous-Aggregation:
- overview
- manage-flow
- query
- define-time-window
- expression
- Client-Libraries:
- overview
- go
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Define Time Window

Time window is an important attribute of your continuous aggregation query. It defines how the data is aggregated in the flow.

A time window corresponds to a range of time. Data from source table will be mapped to the corresponding window based on the time index column. Time window is also the scope of one calculation of an aggregation expression, so each time window will result in one row in the result table.

GreptimeDB provides two types of time windows: `hop` and `tumble`, or "sliding window" and "fixed window" in other words. You can specify the time window in the `GROUP BY` clause using `hop()` function or `tumble()` function respectively. These two functions are only supported in continuous aggregate queries's `GROUP BY` position.

Here illustrates how the `hop()` and `tumble()` functions work:

![Time Window](/time-window.svg)

## Tumble

`tumble()` defines fixed windows that do not overlap.

```
tumble(col, interval, start_time)
```

- `col` specifies use which column to compute the time window. The provided column must have a timestamp type.
- `interval` specifies the size of the window. The `tumble` function divides the time column into fixed-size windows and aggregates the data in each window.
- `start_time` specify the start time of the first window.
<!-- - `start_time` is an optional parameter to specify the start time of the first window. If not provided, the start time will be aligned to calender. -->

## Hop (not supported yet)

`hop` defines sliding window that moves forward by a fixed interval. This feeaure is not supported yet and is expected to be available in the near future.

<!-- `hop` defines sliding window that moves forward by a fixed interval. It signature is like the following:
```
hop(col, size_interval, hop_interval, <start_time>)
```
Where `col` specifies use which column to compute the time window. The provided column must have a timestamp type.
`size_interval` specifies the size of each window, while `hop_interval` specifies the delta between two windows' start timestamp. You can think the `tumble()` function as a special case of `hop()` function where the `size_interval` and `hop_interval` are the same.
`start_time` is an optional parameter to specify the start time of the first window. If not provided, the start time will be aligned to calender. -->
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Expression

This part list all supported aggregate functions.

- `count(column)`: count the number of rows.
- `sum(column)`: sum the values of the column.
- `avg(column)`: calculate the average value of the column.
- `min(column)`: find the minimum value of the column.
- `max(column)`: find the maximum value of the column.
80 changes: 80 additions & 0 deletions docs/nightly/en/user-guide/continuous-aggregation/manage-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Manage Flows

Each `flow` is a continuous aggregation query in GreptimeDB.
It continuously updates the aggregated data based on the incoming data.
This document describes how to create, update, and delete a flow.

A `flow` have those attributes:
- `name`: the name of the flow. It's an unique identifier in the catalog level.
- `source tables`: tables provide data for the flow. Each flow can have multiple source tables.
- `sink table`: the table to store the materialized aggregated data.
<!-- - `expire after`: the interval to expire the data from the source table. Data after the expiration time will not be used in the flow. -->
- `comment`: the description of the flow.
- `SQL`: the continuous aggregation query to define the flow. Refer to [Expression](./expression.md) for the available expressions.

## Create or update a flow

The grammar to create a flow is:

<!-- ```sql
CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <name>
OUTPUT TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS
<SQL>;
``` -->

```sql
CREATE FLOW [ IF NOT EXISTS ] <name>
OUTPUT TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS
<SQL>;
```

<!-- When `OR REPLACE` is specified, if a flow with the same name already exists, it will be updated to the new one. Notice that this only affects the flow task itself, and both source and sink tables will not be changed. -->

`sink-table-name` is the table name to store the materialized aggregated data. It can be an existing table or a new table, `flow` will create the sink table if it doesn't exist. But if the table already exists, the schema of the table must match the schema of the query result.

<!-- `expire after` is an optional interval to expire the data from the source table. The expiration time is a relative time from the current time (by "current time" we means the physical time of the data arrive the Flow engine). For example, `INTERVAL '1 hour'` means the data **older** than 1 hour from the current time will be expired. Expired data will be dropped directly. -->

`SQL` part defines the continuous aggregation query. Refer to [Write a Query](./query.md) for the details. Generally speaking, the `SQL` part is just like a normal `SELECT` clause with a few difference.

A simple example to create a flow:

<!-- ```sql
CREATE FLOW IF NOT EXISTS my_flow
OUTPUT TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, INTERVAL '5 minutes');
``` -->

```sql
CREATE FLOW IF NOT EXISTS my_flow
OUTPUT TO my_sink_table
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, INTERVAL '5 minutes', '2024-05-20 00:00:00');
```

The created flow will compute `count(item)` for every 5 minutes and store the result in `my_sink_table`. For the `tumble()` function, refer to [define time window](./define-time-window.md) part.

<!-- The created flow will compute `count(item)` for every 5 minutes and store the result in `my_sink_table`. All data comes within 1 hour will be used in the flow. For the `tumble()` function, refer to [define time window](./define-time-window.md) part. -->

## Delete a flow

To delete a flow, use the following `DROP FLOW` clause:

```sql
DROP FLOW [IF EXISTS] <name>
```

For example:

```sql
DROP FLOW IF EXISTS my_flow;
```
106 changes: 106 additions & 0 deletions docs/nightly/en/user-guide/continuous-aggregation/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Overview

GreptimeDB provides a continuous aggregation feature that allows you to aggregate data in real-time. This feature is useful when you need to calculate and query the sum, average, or other aggregations on the fly. The continuous aggregation feature is provided by the Flow engine. It continuously updates the aggregated data based on the incoming data and materialize it.

When you insert data into the source table, the data is also sent to and stored in the Flow engine.
The Flow engine calculate the aggregation by time windows and store the result in the sink table.
The entire process is illustrated in the following image:

![Continuous Aggregation](/flow-ani.svg)

## Quick start with an example

Here is a complete example of how a continuous aggregation query looks like.

First, create a source table `numbers_input` and a sink table `out_num_cnt` with following clauses:

```sql
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
```

```sql
CREATE TABLE out_num_cnt (
sum_number BIGINT,
start_window TIMESTAMP TIME INDEX,
end_window TIMESTAMP,
update_at TIMESTAMP,
);
```

Then create the flow `test_numbers` to aggregate the sum of `number` column in `numbers_input` table. The aggregation is calculated in 1-second fixed windows.

```sql
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
```

To observe the outcome of the continuous aggregation in the `out_num_cnt` table, insert some data into the source table `numbers_input`.

```sql
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
```

The sum of the `number` column is 42 (20+22), so the sink table `out_num_cnt` should have the following data:

```sql
SELECT * FROM out_num_cnt;
```

```sql
sum_number | start_window | end_window | update_at
------------+----------------------------+----------------------------+----------------------------
42 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:01.000000 | 2024-05-17 08:32:56.026000
(1 row)
```

Try to insert more data into the `numbers_input` table:

```sql
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
```

The sink table `out_num_cnt` now contains two rows: representing the sum data 42 and 47 (23+24) for the two respective 1-second windows.

```sql
SELECT * FROM out_num_cnt;
```

```sql
sum_number | start_window | end_window | update_at
------------+----------------------------+----------------------------+----------------------------
42 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:01.000000 | 2024-05-17 08:32:56.026000
47 | 2021-07-01 00:00:01.000000 | 2021-07-01 00:00:02.000000 | 2024-05-17 08:33:10.048000
(2 rows)
```

Here is the explanation of the columns in the `out_num_cnt` table:

- `sum_number`: the sum of the `number` column in the window.
- `start_window`: the start time of the window.
- `end_window`: the end time of the window.
- `update_at`: the time when the row data is updated.

The `start_window`, `end_window`, and `update_at` columns are automatically added by the time window functions of Flow engine.

## Next Steps

Congratulations you already have a preliminary understanding of the continuous aggregation feature.
Please refer to the following sections to learn more:

- [Manage Flows](./manage-flow.md) describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow.
- [Write a Query](./query.md) describes how to write a continuous aggregation query.
- [Define Time Window](./define-time-window.md) describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation.
- [Expression](./expression.md) is a reference of available expressions in the continuous aggregation query.
19 changes: 19 additions & 0 deletions docs/nightly/en/user-guide/continuous-aggregation/query.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Write a Query

This chapter describes how to write a continuous aggregation query in GreptimeDB. Query here should be a `SELECT` statement with either aggregating functions or non-aggregating functions (i.e., scalar function).

The grammar of the query is like the following:

```sql
SELECT AGGR_FUNCTION(column1, column2,..) FROM <source_table> GROUP BY TIME_WINDOW_FUNCTION();
```

Only two kinds of expression are allowed after `SELECT` keyword:
- Aggregate functions: see the reference in [Expression](./expression.md) for detail.
- Scalar functions: like `col`, `to_lowercase(col)`, `col + 1`, etc. This part is the same as the normal `SELECT` clause in GreptimeDB.

The query should have a `FROM` clause to identify the source table. As the join clause is currently not supported, the query can only aggregate columns from a single table.

`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions.

Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query.
1 change: 1 addition & 0 deletions docs/nightly/zh/summary-i18n.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Clients: 客户端
Client-Libraries: 客户端库
Write-Data: 写入数据
Query-Data: 读取数据
Continuous-Aggregation: 持续聚合
Python-Scripts: Python 脚本
Operations: 运维操作
Prometheus: Prometheus
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# 定义时间窗口

时间窗口是连续聚合查询的重要属性。
它定义了数据在流中的聚合方式。

时间窗口对应于时间范围。
source 表中的数据将根据时间索引列映射到相应的窗口。
时间窗口也是聚合表达式计算的范围,因此每个时间窗口将在结果表中生成一行。

GreptimeDB 提供两种时间窗口类型:`hop``tumble`,或者换句话说是滑动窗口和固定窗口。
你可以在 `GROUP BY` 子句中使用 `hop()` 函数或 `tumble()` 函数指定时间窗口。
这两个函数仅支持在连续聚合查询的 `GROUP BY` 位置使用。

下图展示了 `hop()``tumble()` 函数的工作方式:

![Time Window](/time-window.svg)

## Tumble

`tumble()` 定义固定窗口,窗口之间不重叠。

```
tumble(col, interval, start_time)
```

- `col` 指定使用哪一列计算时间窗口。提供的列必须是时间戳类型。
- `interval` 指定窗口的大小。`tumble` 函数将时间列划分为固定大小的窗口,并在每个窗口中聚合数据。
- `start_time` 用于指定第一个窗口的开始时间。
<!-- `start_time` 是一个可选参数,用于指定第一个窗口的开始时间。如果未提供,开始时间将与日历对齐。 -->

## Hop(尚未支持)

`hop` 定义滑动窗口,窗口按固定间隔向前移动。
此功能尚未支持,预计将在不久的将来提供。

<!-- `hop` defines sliding window that moves forward by a fixed interval. It signature is like the following:
```
hop(col, size_interval, hop_interval, <start_time>)
```
Where `col` specifies use which column to compute the time window. The provided column must have a timestamp type.
`size_interval` specifies the size of each window, while `hop_interval` specifies the delta between two windows' start timestamp. You can think the `tumble()` function as a special case of `hop()` function where the `size_interval` and `hop_interval` are the same.
`start_time` is an optional parameter to specify the start time of the first window. If not provided, the start time will be aligned to calender. -->
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# 表达式

此处列出了所有支持的表达式。

- `count(column)`: 行数。
- `sum(column)`: 列的和。
- `avg(column)`: 列的平均值。
- `min(column)`: 列的最小值。
- `max(column)`: 列的最大值。
Loading

0 comments on commit 700cfe6

Please sign in to comment.