diff --git a/docs/user-guide/continuous-aggregation/overview.md b/docs/user-guide/continuous-aggregation/overview.md index 313c0fe68..a6b801f4b 100644 --- a/docs/user-guide/continuous-aggregation/overview.md +++ b/docs/user-guide/continuous-aggregation/overview.md @@ -64,7 +64,7 @@ SELECT min(size) as min_size, max(size) as max_size, avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, + sum(case when `size` > 550 then 1 else 0 end) as high_size_count, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM ngx_access_log GROUP BY @@ -133,11 +133,15 @@ Here is the explanation of the columns in the `ngx_statistics` table: - `time_window`: The time window of the aggregation. - `update_at`: The time when the aggregation is updated. + + + ## Next Steps Congratulations you already have a preliminary understanding of the continuous aggregation feature. Please refer to the following sections to learn more: +- [Usecase Examples](./usecase-example.md) provides more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. - [Manage Flows](./manage-flow.md) describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow. - [Write a Query](./query.md) describes how to write a continuous aggregation query. - [Define Time Window](./define-time-window.md) describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation. diff --git a/docs/user-guide/continuous-aggregation/query.md b/docs/user-guide/continuous-aggregation/query.md index b9e574f0b..f21ee2af0 100644 --- a/docs/user-guide/continuous-aggregation/query.md +++ b/docs/user-guide/continuous-aggregation/query.md @@ -15,91 +15,12 @@ Only two kinds of expression are allowed after `SELECT` keyword: The query should have a `FROM` clause to identify the source table. As the join clause is currently not supported, the query can only aggregate columns from a single table. -`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. - -Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query. - -## Rewrite an existing query to a continuous aggregation query - -Some of simple existing aggregation queries can be directly used as continuous aggregation queries. For example, the example in [overview](./overview.md) can be used to query both in standard SQL and continuous aggregation query, since it's also a valid SQL query without any flow-specific syntax or functions: +`WHERE` and `HAVING` clauses are supported in the continuous aggregation query. They work as in a normal query. The `WHERE` clause filters the data before aggregation, and the `HAVING` clause filters the data after aggregation. -```sql -SELECT - status, - count(client) AS total_logs, - min(size) as min_size, - max(size) as max_size, - avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` +`DISTINCT` currently only work with `SELECT DISTINCT column1 ..` syntax. It is used to remove duplicate rows from the result set. Support for `SELECT count(DISTINCT column1) ...` is not available yet, but it will be added in the future. -However, there are other types of queries that cannot be directly used as continuous aggregation queries. -For example, a query that needs to compute percentiles would be unwise to repeatedly calculate the percentile for each time window everytime a new batch of data arrive. In this case, you can pre-aggregate the data into buckets of the desired size, and then calculate the percentile in the sink table using standard SQL when needed. The original SQL might be: -```sql -SELECT - status, - percentile_approx(size, 0.5) as median_size, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` -The above query can be rewritten to first aggregate the data into buckets of size 10, and then calculate the percentile in the sink table. -The flow query would be: -```sql -CREATE FLOW calc_ngx_distribution -SINK TO ngx_distribution -AS -SELECT - status, - trunc(size, -1) as bucket, - count(client) AS total_logs, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window, - bucket; -``` - -And then you can calculate the percentile in the sink table using standard SQL: -```sql -SELECT - outer.status, - outer.time_window, - outer.bucket, - SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile -FROM ngx_distribution AS outer -JOIN ngx_distribution AS in1 -ON in1.status = outer.status -AND in1.time_window = outer.time_window -GROUP BY - status, - time_window, - bucket -ORDER BY status, time_window, bucket; -``` - -The SQL query groups the data by status, time_window, and bucket. The percentile column calculates the percentage within each group by taking the sum of all buckets not greater than the current bucket and dividing it by the total count of all logs. The result would be something like this: +`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. -```sql - status | time_window | bucket | percentile ---------+----------------------------+--------+------------ - 404 | 1970-01-01 00:00:00.000000 | 0 | 22 - 404 | 1970-01-01 00:00:00.000000 | 1 | 55 - 404 | 1970-01-01 00:00:00.000000 | 2 | 66 - 404 | 1970-01-01 00:00:00.000000 | 3 | 100 -(4 rows) -``` +Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query. - \ No newline at end of file +See [Usecase Examples](./usecase-example.md) for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. \ No newline at end of file diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md new file mode 100644 index 000000000..d59140c74 --- /dev/null +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -0,0 +1,223 @@ +# Usecase Examples +Following are three major usecase examples for continuous aggregation: + +1. **Real-time Analytics**: A real-time analytics platform that continuously aggregates data from a stream of events, delivering immediate insights while optionally downsampling the data to a lower resolution. For instance, this system can compile data from a high-frequency stream of log events (e.g., occurring every millisecond) to provide up-to-the-minute insights such as the number of requests per minute, average response times, and error rates per minute. + +2. **Real-time Monitoring**: A real-time monitoring system that continuously aggregates data from a stream of events and provides real-time alerts based on the aggregated data. For example, a system that aggregates data from a stream of sensor events and provides real-time alerts when the temperature exceeds a certain threshold. + +3. **Real-time Dashboard**: A real-time dashboard that shows the number of requests per minute, the average response time, and the number of errors per minute. This dashboard can be used to monitor the health of the system and to detect any anomalies in the system. + +In all these usecases, the continuous aggregation system continuously aggregates data from a stream of events and provides real-time insights and alerts based on the aggregated data. The system can also downsample the data to a lower resolution to reduce the amount of data stored and processed. This allows the system to provide real-time insights and alerts while keeping the data storage and processing costs low. + +## Real-time analytics example + +See [Overview](/user-guide/continuous-aggregation/overview.md#quick-start-with-an-example) for an example of real-time analytics. Which is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs. + +Another example of real-time analytics is to get all distinct country from the `ngx_access_log` table. The query for continuous aggregation would be: + +```sql +/* input table */ +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + update_at TIMESTAMP, + __ts_placeholder TIMESTAMP TIME INDEX, + PRIMARY KEY(country) +); + +/* create flow task to calculate the distinct country */ +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, +FROM ngx_access_log; +``` + +now that we have created the flow task, we can insert some data into the source table `ngx_access_log`: + +```sql +/* insert some data */ +INSERT INTO ngx_access_log VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); + +``` + + +Wait for one second for the Flow to write the result to the sink table and then query: + + +```sql +select * from ngx_country; +``` + +Or if you want to group the data by time window, you can use the following query: + +```sql +/* input table create same as above */ +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, + PRIMARY KEY(country) +); +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM ngx_access_log +GROUP BY + country, + time_window; +/* insert data using the same data as above */ +``` + +The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. + +Note that there is currently no persistent storage for the internal state of the flow. The internal state refers to the intermediate state used in computing incremental query results, such as the accumulator's value for an aggregation query (e.g., `count(col)`'s accumulator records the current count number). However, there is persistent storage for the data in the sink table. +Therefore, it is recommended to use an appropriate time window (e.g., hourly) to minimize data loss. This is because if the internal state is lost, the related data within that time window will also be lost. + +## Real-time monitoring example + +Consider a usecase where you have a stream of sensor events from a network of temperature sensors that you want to monitor in real-time. The sensor events contain information such as the sensor ID, the temperature reading, the timestamp of the reading, and the location of the sensor. You want to continuously aggregate this data to provide real-time alerts when the temperature exceeds a certain threshold. Then the query for continuous aggregation would be: + +```sql +/* create input table */ +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +/* create sink table */ +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + update_at TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +); + +CREATE FLOW temp_monitoring +SINK TO temp_alerts +AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING max_temp > 100; +``` + +Now that we have created the flow task, we can insert some data into the source table `temp_sensor_data`: + +```sql + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 98.5, "2022-01-01 00:00:00"), + (2, "room2", 99.5, "2022-01-01 00:00:01"); +``` +table should be empty now, but still wait at least one second for flow to update results to sink table: + +```sql +SELECT * FROM temp_alerts; +``` + +```sql + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 101.5, "2022-01-01 00:00:02"), + (2, "room2", 102.5, "2022-01-01 00:00:03"); +``` +wait at least one second for flow to update results to sink table: +```sql +/* wait at least one second for flow to update results to sink table */ +SELECT * FROM temp_alerts; +``` + +The above query continuously aggregates the data from the `temp_sensor_data` table into the `temp_alerts` table. It calculates the maximum temperature reading for each sensor and location and filters out the data where the maximum temperature exceeds 100 degrees. The `temp_alerts` table will be continuously updated with the aggregated data, providing real-time alerts (Which is a new row in the `temp_alerts` table) when the temperature exceeds the threshold. + + +## Real-time dashboard + +Consider a usecase in which you need a bar graph that show the distribution of packet sizes for each status code to monitor the health of the system. The query for continuous aggregation would be: + +```sql +/* create input table */ +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); +/* create sink table */ +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, /* auto generated column to store the last update time */ + PRIMARY KEY(stat, bucket_size) +); +/* create flow task to calculate the distribution of packet sizes for each status code */ +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1)::INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; +``` + +Now that we have created the flow task, we can insert some data into the source table `ngx_access_log`: + +```sql +INSERT INTO ngx_access_log VALUES + ("cli1", 200, 100, "2022-01-01 00:00:00"), + ("cli2", 200, 104, "2022-01-01 00:00:01"), + ("cli3", 200, 120, "2022-01-01 00:00:02"), + ("cli4", 200, 124, "2022-01-01 00:00:03"), + ("cli5", 200, 140, "2022-01-01 00:00:04"), + ("cli6", 404, 144, "2022-01-01 00:00:05"), + ("cli7", 404, 160, "2022-01-01 00:00:06"), + ("cli8", 404, 164, "2022-01-01 00:00:07"), + ("cli9", 404, 180, "2022-01-01 00:00:08"), + ("cli10", 404, 184, "2022-01-01 00:00:09"); +``` +wait at least one second for flow to update results to sink table: +```sql +SELECT * FROM ngx_distribution; +``` + +The above query puts the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket (in this case, since `trunc`'s second argument is -1, meaning a bucket size of 10) for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code. + +## Conclusion + +Continuous aggregation is a powerful tool for real-time analytics, monitoring, and dashboarding. It allows you to continuously aggregate data from a stream of events and provide real-time insights and alerts based on the aggregated data. By downsampling the data to a lower resolution, you can reduce the amount of data stored and processed, making it easier to provide real-time insights and alerts while keeping the data storage and processing costs low. Continuous aggregation is a key component of any real-time data processing system and can be used in a wide range of usecases to provide real-time insights and alerts based on streaming data. \ No newline at end of file diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md index 03794d1eb..282535fe1 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md @@ -67,7 +67,7 @@ SELECT min(size) as min_size, max(size) as max_size, avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, + sum(case when `size` > 550 then 1 else 0 end) as high_size_count, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM ngx_access_log GROUP BY @@ -141,6 +141,8 @@ SELECT * FROM ngx_statistics; 恭喜你已经初步了解了持续聚合功能。 请参考以下章节了解更多: + +- [用例](./usecase-example.md) 提供了更多关于如何在实时分析、监控和仪表板中使用持续聚合的示例。 - [管理 Flow](./manage-flow.md) 描述了如何创建、更新和删除 flow。你的每个持续聚合查询都是一个 flow。 - [编写查询语句](./query.md) 描述了如何编写持续聚合查询。 - [定义时间窗口](./define-time-window.md) 描述了如何为持续聚合定义时间窗口。时间窗口是持续聚合查询的一个重要属性,它定义了聚合的时间间隔。 diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md index 77c05ac92..2e98aec98 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md @@ -16,98 +16,14 @@ SELECT AGGR_FUNCTION(column1, column2,..) FROM GROUP BY TIME_WIND 查询应该有一个 `FROM` 子句来标识 source 表。由于不支持 join 子句,目前只能从单个表中聚合列。 -`GROUP BY` 子句与普通查询中的工作方式相同。 -它根据指定的列对数据进行分组。 -`GROUP BY` 子句中使用的时间窗口函数 `hop()` 和 `tumble()` 在 [定义时间窗口](./define-time-window.md) 部分中有描述。 +`WHERE` 和 `HAVING` 子句在持续聚合查询中是支持的。它们的工作方式与普通查询中的相同。`WHERE` 子句在聚合之前过滤数据,而 `HAVING` 子句在聚合之后过滤数据。 + +`DISTINCT` 用于从结果集中删除重复行,目前仅支持 `SELECT DISTINCT column1 ..` 语法,不支持 `SELECT count(DISTINCT column1) ...`,但将来会添加支持。 + +`GROUP BY` 子句与普通查询中的工作方式相同。它根据指定的列对数据进行分组。`GROUP BY` 子句中使用的时间窗口函数 `hop()` 和 `tumble()` 在 [定义时间窗口](./define-time-window.md) 部分中有描述。 它们用于在聚合中定义时间窗口。 `GROUP BY` 中的其他表达式可以是 literal、列名或 scalar 表达式。 持续聚合查询不支持 `ORDER BY`、`LIMIT`、`OFFSET` 等其他操作。 - -## 将现有查询重写为持续聚合查询 - -一些简单的现有聚合查询可以直接用作持续聚合查询。例如,[概述](./overview.md) 部分中的示例可以用于标准 SQL 查询和持续聚合查询,因为它也是一个有效的 SQL 查询,没有任何特定于流的语法或函数: - -```sql -SELECT - status, - count(client) AS total_logs, - min(size) as min_size, - max(size) as max_size, - avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` - -然而,还有其他类型的查询不能直接用作持续聚合查询。 -例如,需要计算百分位数的查询不应该在每次新数据到达时重复计算每个时间窗口的百分位数。 -在这种情况下,您可以将数据预聚合到所需大小的桶中,然后在需要时使用标准 SQL 在 sink 表中计算百分位数。原始 SQL 可能是: - -```sql -SELECT - status, - percentile_approx(size, 0.5) as median_size, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` -上述查询可以重写为首先将数据聚合到大小为 10 的桶中,然后在 sink 表中计算百分位数。 -流查询将是: - -```sql -CREATE FLOW calc_ngx_distribution -SINK TO ngx_distribution -AS -SELECT - status, - trunc(size, -1) as bucket, - count(client) AS total_logs, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window, - bucket; -``` -接下来,您可以使用标准 SQL 在 sink 表中计算百分位数: -```sql -SELECT - outer.status, - outer.time_window, - outer.bucket, - SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile -FROM ngx_distribution AS outer -JOIN ngx_distribution AS in1 -ON in1.status = outer.status -AND in1.time_window = outer.time_window -GROUP BY - status, - time_window, - bucket -ORDER BY status, time_window, bucket; -``` - -上述 SQL 查询按 status、time_window 和 bucket 对数据进行分组。percentile 列通过计算小于或等于当前 bucket 的所有 bucket 的总和,并将其除以所有日志的总数来计算每个组内的百分比。结果可能如下所示: - -```sql - status | time_window | bucket | percentile ---------+----------------------------+--------+------------ - 404 | 1970-01-01 00:00:00.000000 | 0 | 22 - 404 | 1970-01-01 00:00:00.000000 | 1 | 55 - 404 | 1970-01-01 00:00:00.000000 | 2 | 66 - 404 | 1970-01-01 00:00:00.000000 | 3 | 100 -(4 rows) -``` - - \ No newline at end of file +更多关于如何在实时分析、监控和仪表盘中使用持续聚合的示例,请参考 [用例](./usecase-example.md)。 \ No newline at end of file diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md new file mode 100644 index 000000000..1f739b9a4 --- /dev/null +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -0,0 +1,224 @@ +# 用例 +持续聚合的三个主要用例示例如下: + +1. **实时分析**:一个实时分析平台,不断聚合来自事件流的数据,提供即时洞察,同时可选择将数据降采样到较低分辨率。例如,此系统可以编译来自高频日志事件流(例如,每毫秒发生一次)的数据,以提供每分钟的请求数、平均响应时间和每分钟的错误率等最新洞察。 + +2. **实时监控**:一个实时监控系统,不断聚合来自事件流的数据,根据聚合数据提供实时警报。例如,此系统可以处理来自传感器事件流的数据,以提供当温度超过某个阈值时的实时警报。 + +3. **实时仪表盘**:一个实时仪表盘,显示每分钟的请求数、平均响应时间和每分钟的错误数。此仪表板可用于监控系统的健康状况,并检测系统中的任何异常。 + +在所有这些用例中,持续聚合系统不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。系统还可以将数据降采样到较低分辨率,以减少存储和处理的数据量。这使得系统能够提供实时洞察和警报,同时保持较低的数据存储和处理成本。 + +## 实时分析示例 + +请参阅[概述](/user-guide/continuous-aggregation/overview.md#快速开始示例)中的实时分析示例。该示例用于计算日志的总数、包大小的最小、最大和平均值,以及大小大于 550 的数据包数量按照每个状态码在 1 分钟固定窗口中的实时分析。 + +另外,您还可以使用持续聚合来计算其他类型的实时分析。例如,要从 `ngx_access_log` 表中获取所有不同的国家。持续聚合的查询如下: + +```sql +/* input table */ +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + update_at TIMESTAMP, + __ts_placeholder TIMESTAMP TIME INDEX, + PRIMARY KEY(country) +); + +/* create flow task to calculate the distinct country */ +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, +FROM ngx_access_log; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `ngx_access_log` 中: + +```sql +/* insert some data */ +INSERT INTO ngx_access_log VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); +``` +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql +/* check the result */ +select * from ngx_country; +``` + +或者,如果您想要按时间窗口对数据进行分组,可以使用以下查询: + +```sql +/* input table create same as above */ +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, + PRIMARY KEY(country) +); +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM ngx_access_log +GROUP BY + country, + time_window; +/* insert data using the same data as above */ +``` + +上述的查询将 `ngx_access_log` 表中的数据放入 `ngx_country` 表中。它计算每个时间窗口的不同国家。`date_bin` 函数用于将数据分组为一小时的间隔。`ngx_country` 表将不断更新聚合数据,提供实时洞察,显示正在访问系统的不同国家。 + +请注意,目前 Flow 的内部状态没有持久存储。内部状态指的是用于计算增量查询结果的中间状态,例如聚合查询的累加器值(如count(col)的累加器记录了目前为止的 count 计数)。然而,Sink 表的数据是有持久存储的。因此,建议您使用适当的时间窗口(例如设置为每小时)来最小化数据丢失。因为一旦内部状态丢失,相关时间窗口的数据也将随之丢失。 + +## 实时监控示例 + +假设您希望实时监控一个来自温度传感器网络的传感器事件流。传感器事件包含传感器 ID、温度读数、读数的时间戳和传感器的位置等信息。您希望不断聚合这些数据,以在温度超过某个阈值时提供实时警报。那么持续聚合的查询将是: + +```sql +/* create input table */ +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +/* create sink table */ +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + update_at TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +); + +CREATE FLOW temp_monitoring +SINK TO temp_alerts +AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING max_temp > 100; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `temp_sensor_data` 中: + +```sql + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 98.5, "2022-01-01 00:00:00"), + (2, "room2", 99.5, "2022-01-01 00:00:01"); + +/* You may want to flush the flow task to see the result */ +ADMIN FLUSH_FLOW('temp_monitoring'); +``` + +当前输出表应该为空。您可以在等待一秒后通过以下查询查看结果: + +```sql +/* for now sink table will be empty */ +SELECT * FROM temp_alerts; +``` + +```sql + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 101.5, "2022-01-01 00:00:02"), + (2, "room2", 102.5, "2022-01-01 00:00:03"); + +``` + +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql +/* now sink table will have the max temperature data */ +SELECT * FROM temp_alerts; +``` + +上述的查询将从 `temp_sensor_data` 表中不断聚合数据到 `temp_alerts` 表中。它计算每个传感器和位置的最高温度读数,并过滤出最高温度超过 100 度的数据。`temp_alerts` 表将不断更新聚合数据,并且当温度超过阈值时提供实时警报(即 `temp_alerts` 表中的新行)。 + +## 实时仪表盘示例 + +假设您需要一个柱状图显示每个状态码的数据包大小分布,以监控系统的健康状况。持续聚合的查询将是: + +```sql +/* create input table */ +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); +/* create sink table */ +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, /* auto generated column to store the last update time */ + PRIMARY KEY(stat, bucket_size) +); +/* create flow task to calculate the distribution of packet sizes for each status code */ +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1)::INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `ngx_access_log` 中: + +```sql +INSERT INTO ngx_access_log VALUES + ("cli1", 200, 100, "2022-01-01 00:00:00"), + ("cli2", 200, 104, "2022-01-01 00:00:01"), + ("cli3", 200, 120, "2022-01-01 00:00:02"), + ("cli4", 200, 124, "2022-01-01 00:00:03"), + ("cli5", 200, 140, "2022-01-01 00:00:04"), + ("cli6", 404, 144, "2022-01-01 00:00:05"), + ("cli7", 404, 160, "2022-01-01 00:00:06"), + ("cli8", 404, 164, "2022-01-01 00:00:07"), + ("cli9", 404, 180, "2022-01-01 00:00:08"), + ("cli10", 404, 184, "2022-01-01 00:00:09"); +``` +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql +SELECT * FROM ngx_distribution; +``` + +上述查询将从 `ngx_access_log` 表中的数据放入 `ngx_distribution` 表中。它计算每个状态码的数据包大小分布,并将数据分组到每个时间窗口中。`ngx_distribution` 表将不断更新聚合数据,提供实时洞察,显示每个状态码的数据包大小分布。 + +## 结论 + +持续聚合是实时分析、监控和仪表盘的强大工具。它允许您不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。通过将数据降采样到较低分辨率,您可以减少存储和处理的数据量,从而更容易提供实时洞察和警报,同时保持较低的数据存储和处理成本。持续聚合是任何实时数据处理系统的关键组件,可以在各种用例中使用,以提供基于流数据的实时洞察和警报。 \ No newline at end of file diff --git a/sidebars.ts b/sidebars.ts index 2861caba0..3204b8898 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -125,6 +125,7 @@ const sidebars: SidebarsConfig = { label: 'Continuous Aggregation', items: [ 'user-guide/continuous-aggregation/overview', + 'user-guide/continuous-aggregation/usecase-example', 'user-guide/continuous-aggregation/manage-flow', 'user-guide/continuous-aggregation/query', 'user-guide/continuous-aggregation/define-time-window',