Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add flow user guide #954

Merged
merged 21 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/nightly/en/summary.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
- sql
- promql
- query-external-data
- Continuous-Aggregation:
- overview
- manage-flow
- define-time-window
- query
- expression
- Client-Libraries:
- overview
- go
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Define Time Window
nicecui marked this conversation as resolved.
Show resolved Hide resolved

Time window is an important attribute of your continuous aggregation query. It defines how the data is aggregated in the flow. GreptimeDB provides two types of time windows: `hop` and `tumble`, or "sliding window" and "fixed window" in other words. You can specify the time window in the `GROUP BY` clause using `hop()` function or `tumble()` function respectively. These two functions are only supported in continuous aggregate queries's `GROUP BY` position.

Here illustrates how the `hop()` and `tumble()` functions work:

![Time Window](/time-window.svg)

# Tumble

`tumble()` defines fixed windows that do not overlap. It signature is like the following:

```
tumble(col, interval, <start_time>)
nicecui marked this conversation as resolved.
Show resolved Hide resolved
```

Where `col` specifies use which column to compute the time window. The provided column must have a timestamp type. `interval` specifies the size of the window. The `tumble` function divides the time column into fixed-size windows and aggregates the data in each window.

`start_time` is an optional parameter to specify the start time of the first window. If not provided, the start time will be aligned to calender.

# Hop (not supported yet)
nicecui marked this conversation as resolved.
Show resolved Hide resolved

`hop` defines sliding window that moves forward by a fixed interval. It signature is like the following:

```
hop(col, size_interval, hop_interval, <start_time>)
```

Where `col` specifies use which column to compute the time window. The provided column must have a timestamp type.

`size_interval` specifies the size of each window, while `hop_interval` specifies the delta between two windows' start timestamp. You can think the `tumble()` function as a special case of `hop()` function where the `size_interval` and `hop_interval` are the same.
nicecui marked this conversation as resolved.
Show resolved Hide resolved

`start_time` is an optional parameter to specify the start time of the first window. If not provided, the start time will be aligned to calender.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Expression

This part list all supported aggregate functions.

- `count(column)`: count the number of rows.
- `sum(column)`: sum the values of the column.
- `avg(column)`: calculate the average value of the column.
- `min(column)`: find the minimum value of the column.
- `max(column)`: find the maximum value of the column.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Manage Flows

Each `flow` is a continuous aggregation query in GreptimeDB. It is a query that continuously updates the aggregated data based on the incoming data and materializes the result. This document describes how to create, update, and delete a flow.

A `flow` have those attributes:
- `name`: the name of the flow. It's an unique identifier in the catalog level.
- `source tables`: tables provide data for the flow. Each flow can have multiple source tables.
- `sink table`: the table to store the materialized aggregated data.
- `expire after`: the interval to expire the data from the source table. Data after the expiration time will not be used in the flow.
- `comment`: the description of the flow.
- `SQL`: the continuous aggregation query to define the flow. Refer to [Expression](./expression.md) for the available expressions.

# Create or update a flow

The grammar to create a flow is:

```sql
CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <name>
OUTPUT TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS
<SQL>;
```

When `OR REPLACE` is specified, if a flow with the same name already exists, it will be updated to the new one.
nicecui marked this conversation as resolved.
Show resolved Hide resolved

`sink-table-name` is the table name to store the materialized aggregated data. It can be an existing table or a new table, `flow` will create the sink table if it doesn't exist. But if the table already exists, the schema of the table must match the schema of the query result.

`SQL` part defines the continuous aggregation query. Refer to [Query](./query.md) for the details. Generally speaking, the `SQL` part is just like a normal `SELECT` clause with a few difference.

A simple example to create a flow:

```sql
CREATE FLOW IF NOT EXISTS my_flow
OUTPUT TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, INTERVAL '5 minutes');
nicecui marked this conversation as resolved.
Show resolved Hide resolved
```

The created flow will compute `count(item)` for every 5 minutes and store the result in `my_sink_table`. All data comes within 1 hour will be used in the flow. For the `tumble()` function, refer to [define time window](./define-time-window.md) part.
nicecui marked this conversation as resolved.
Show resolved Hide resolved
nicecui marked this conversation as resolved.
Show resolved Hide resolved

# Delete a flow

To delete a flow, use the following `DROP FLOW` clause:

```sql
DROP FLOW [IF EXISTS] <name>
```

For example:

```sql
DROP FLOW IF EXISTS my_flow;
```
88 changes: 88 additions & 0 deletions docs/nightly/en/user-guide/continuous-aggregation/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Overview

GreptimeDB provides a continuous aggregation feature that allows you to aggregate data in real-time. This feature is useful when you need to calculate and query the sum, average, or other aggregations on the fly. The continuous aggregation feature is provided by our `Flow` engine. It continuously updates the aggregated data based on the incoming data and materialize it.

nicecui marked this conversation as resolved.
Show resolved Hide resolved
- [Manage Flow](./manage-flow.md) describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow.
- [Define Time Window](./define-time-window.md) describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation.
- [Query](./query.md) describes how to write a continuous aggregation query.
- [Expression](./expression.md) is a reference of available expressions in the continuous aggregation query.

![Continuous Aggregation](/flow-ani.svg)

## Example

Here is a complete example of how a continuous aggregation query looks like.

First, we create a source table `numbers_input` and a sink table `out_num_cnt` with following clauses:

```sql
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
```

```sql
CREATE TABLE out_num_cnt (
sum_number BIGINT,
start_window TIMESTAMP TIME INDEX,
end_window TIMESTAMP,
update_at TIMESTAMP,
);
```

Then create our flow `test_numbers` to aggregate the sum of `number` column in `numbers_input` table. The aggregation is calculated in 1-second fixed windows.

```sql
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second');
```

Now we can insert some data into `numbers_input` table and see the result in `out_num_cnt` table.

```sql
INSERT INTO numbers_input
VALUES
(20,1625097600000),
(22,1625097600500);
```

The output table `out_num_cnt` should have the following data:

```sql
SELECT * FROM out_num_cnt;
```

```sql
sum_number | start_window | end_window | update_at
------------+----------------------------+----------------------------+----------------------------
42 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:01.000000 | 2024-05-17 08:32:56.026000
(1 row)
```

Let's try to insert more data into `numbers_input` table.

```sql
public=> INSERT INTO numbers_input
waynexia marked this conversation as resolved.
Show resolved Hide resolved
VALUES
(23,1625097601000),
(24,1625097601500);
```

Now the output table `out_num_cnt` should have the following data:

```sql
SELECT * FROM out_num_cnt;
```

```sql
sum_number | start_window | end_window | update_at
------------+----------------------------+----------------------------+----------------------------
42 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:01.000000 | 2024-05-17 08:32:56.026000
47 | 2021-07-01 00:00:01.000000 | 2021-07-01 00:00:02.000000 | 2024-05-17 08:33:10.048000
(2 rows)
```
18 changes: 18 additions & 0 deletions docs/nightly/en/user-guide/continuous-aggregation/query.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Write a Query

This chapter describes how to write a continuous aggregation query in GreptimeDB. Query here should be a `SELECT` statement with either aggregating functions or non-aggregating functions (i.e., scalar function).

Only two kinds of expression are allowed after `SELECT` keyword:
- Aggregate functions: see the reference in [Expression](./expression.md) for detail.
- Scalar functions: like `col`, `to_lowercase(col)`, `col + 1`, etc. This part is the same as the normal `SELECT` clause in GreptimeDB.

Then each query should have a `FROM` clause to specify the source table. The referenced source table should be one in the flow's source tables in `CREATE FLOW` clause. Join is currently not supported so each query can reference only one table.
nicecui marked this conversation as resolved.
Show resolved Hide resolved
waynexia marked this conversation as resolved.
Show resolved Hide resolved

`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions.

Notice the two time window functions will add several columns to the output schema. The columns are:
- `window_start`: the start time of the window.
- `window_end`: the end time of the window.
- `updated_at`: the time when the window is updated.

Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query.
3 changes: 3 additions & 0 deletions docs/public/flow-ani.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading