From 3ea4f8b70decf327adcf3ad2cfec03ed1e33444b Mon Sep 17 00:00:00 2001 From: localhost Date: Wed, 11 Sep 2024 10:54:06 +0800 Subject: [PATCH 1/3] chore: add doc for dryrun pipeline api (#1177) Co-authored-by: Yiran --- docs/user-guide/logs/manage-pipelines.md | 136 +++++++++++++++++- .../user-guide/logs/manage-pipelines.md | 134 ++++++++++++++++- 2 files changed, 268 insertions(+), 2 deletions(-) diff --git a/docs/user-guide/logs/manage-pipelines.md b/docs/user-guide/logs/manage-pipelines.md index 5c36c3cc5..67209a053 100644 --- a/docs/user-guide/logs/manage-pipelines.md +++ b/docs/user-guide/logs/manage-pipelines.md @@ -87,4 +87,138 @@ Output: Readable timestamp (UTC): 2024-06-27 12:02:34.257312110Z ``` -The output `Readable timestamp (UTC)` represents the creation time of the pipeline and also serves as the version number. \ No newline at end of file +The output `Readable timestamp (UTC)` represents the creation time of the pipeline and also serves as the version number. + +## Debug + +First, please refer to the [Quick Start example](/user-guide/logs/quick-start.md#write-logs-by-pipeline) to see the correct execution of the Pipeline. + +### Debug creating a Pipeline + +You may encounter errors when creating a Pipeline. For example, when creating a Pipeline using the following configuration: + + +```bash +curl -X "POST" "http://localhost:4000/v1/events/pipelines/test" \ + -H 'Content-Type: application/x-yaml' \ + -d $'processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + - gsub: + fields: + - message + pattern: "\\\." + replacement: + - "-" + ignore_missing: true + +transform: + - fields: + - message + type: string + - field: time + type: time + index: timestamp' +``` + +The pipeline configuration contains an error. The `gsub` Processor expects the `replacement` field to be a string, but the current configuration provides an array. As a result, the pipeline creation fails with the following error message: + + +```json +{"error":"Failed to parse pipeline: 'replacement' must be a string"} +``` + +Therefore, We need to modify the configuration of the `gsub` Processor and change the value of the `replacement` field to a string type. + +```bash +curl -X "POST" "http://localhost:4000/v1/events/pipelines/test" \ + -H 'Content-Type: application/x-yaml' \ + -d $'processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + - gsub: + fields: + - message + pattern: "\\\." + replacement: "-" + ignore_missing: true + +transform: + - fields: + - message + type: string + - field: time + type: time + index: timestamp' +``` + +Now that the Pipeline has been created successfully, you can test the Pipeline using the `dryrun` interface. + +### Debug writing logs + +We can test the Pipeline using the `dryrun` interface. We will test it with erroneous log data where the value of the message field is in numeric format, causing the pipeline to fail during processing. + +**This API is only used to test the results of the Pipeline and does not write logs to GreptimeDB.** + + +```bash +curl -X "POST" "http://localhost:4000/v1/events/pipelines/dryrun?pipeline_name=test" \ + -H 'Content-Type: application/json' \ + -d $'{"message": 1998.08,"time":"2024-05-25 20:16:37.217"}' + +{"error":"Failed to execute pipeline, reason: gsub processor: expect string or array string, but got Float64(1998.08)"} +``` + +The output indicates that the pipeline processing failed because the `gsub` Processor expects a string type rather than a floating-point number type. We need to adjust the format of the log data to ensure the pipeline can process it correctly. +Let's change the value of the message field to a string type and test the pipeline again. + +```bash +curl -X "POST" "http://localhost:4000/v1/events/pipelines/dryrun?pipeline_name=test" \ + -H 'Content-Type: application/json' \ + -d $'{"message": "1998.08","time":"2024-05-25 20:16:37.217"}' +``` + +At this point, the Pipeline processing is successful, and the output is as follows: + +```json +{ + "rows": [ + [ + { + "data_type": "STRING", + "key": "message", + "semantic_type": "FIELD", + "value": "1998-08" + }, + { + "data_type": "TIMESTAMP_NANOSECOND", + "key": "time", + "semantic_type": "TIMESTAMP", + "value": "2024-05-25 20:16:37.217+0000" + } + ] + ], + "schema": [ + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "message" + }, + { + "colume_type": "TIMESTAMP", + "data_type": "TIMESTAMP_NANOSECOND", + "fulltext": false, + "name": "time" + } + ] +} +``` + +It can be seen that the `.` in the string `1998.08` has been replaced with `-`, indicating a successful processing of the Pipeline. \ No newline at end of file diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/logs/manage-pipelines.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/logs/manage-pipelines.md index f0175c261..5d6514072 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/logs/manage-pipelines.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/logs/manage-pipelines.md @@ -87,4 +87,136 @@ timestamp_ns="1719489754257312110"; readable_timestamp=$(TZ=UTC date -d @$((${ti Readable timestamp (UTC): 2024-06-27 12:02:34.257312110Z ``` -输出的 `Readable timestamp (UTC)` 即为 Pipeline 的创建时间同时也是版本号。 \ No newline at end of file +输出的 `Readable timestamp (UTC)` 即为 Pipeline 的创建时间同时也是版本号。 + +## 问题调试 + +首先,请参考 [快速入门示例](/user-guide/logs/quick-start.md#使用-pipeline-写入日志)来查看 Pipeline 正确的执行情况。 + +### 调试创建 Pipeline + +在创建 Pipeline 的时候你可能会遇到错误,例如使用如下配置创建 Pipeline: + +```bash +curl -X "POST" "http://localhost:4000/v1/events/pipelines/test" \ + -H 'Content-Type: application/x-yaml' \ + -d $'processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + - gsub: + fields: + - message + pattern: "\\\." + replacement: + - "-" + ignore_missing: true + +transform: + - fields: + - message + type: string + - field: time + type: time + index: timestamp' +``` + +Pipeline 配置存在错误。`gsub` processor 期望 `replacement` 字段为字符串,但当前配置提供了一个数组。因此,该 Pipeline 创建失败,并显示以下错误消息: + + +```json +{"error":"Failed to parse pipeline: 'replacement' must be a string"} +``` + +因此,你需要修改 `gsub` processor 的配置,将 `replacement` 字段的值更改为字符串类型。 + +```bash +curl -X "POST" "http://localhost:4000/v1/events/pipelines/test" \ + -H 'Content-Type: application/x-yaml' \ + -d $'processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + - gsub: + fields: + - message + pattern: "\\\." + replacement: "-" + ignore_missing: true + +transform: + - fields: + - message + type: string + - field: time + type: time + index: timestamp' +``` + +此时 Pipeline 创建成功,可以使用 `dryrun` 接口测试该 Pipeline。 + +### 调试日志写入 + +我们可以使用 `dryrun` 接口测试 Pipeline。我们将使用错误的日志数据对其进行测试,其中消息字段的值为数字格式,会导致 Pipeline 在处理过程中失败。 + +**此接口仅仅用于测试 Pipeline 的处理结果,不会将日志写入到 GreptimeDB 中。** + +```bash +curl -X "POST" "http://localhost:4000/v1/events/pipelines/dryrun?pipeline_name=test" \ + -H 'Content-Type: application/json' \ + -d $'{"message": 1998.08,"time":"2024-05-25 20:16:37.217"}' + +{"error":"Failed to execute pipeline, reason: gsub processor: expect string or array string, but got Float64(1998.08)"} +``` + +输出显示 Pipeline 处理失败,因为 `gsub` Processor 期望的是字符串类型,而不是浮点数类型。我们需要修改日志数据的格式,确保 Pipeline 能够正确处理。 +我们再将 message 字段的值修改为字符串类型,然后再次测试该 Pipeline。 + +```bash +curl -X "POST" "http://localhost:4000/v1/events/pipelines/dryrun?pipeline_name=test" \ + -H 'Content-Type: application/json' \ + -d $'{"message": "1998.08","time":"2024-05-25 20:16:37.217"}' +``` + +此时 Pipeline 处理成功,输出如下: + +```json +{ + "rows": [ + [ + { + "data_type": "STRING", + "key": "message", + "semantic_type": "FIELD", + "value": "1998-08" + }, + { + "data_type": "TIMESTAMP_NANOSECOND", + "key": "time", + "semantic_type": "TIMESTAMP", + "value": "2024-05-25 20:16:37.217+0000" + } + ] + ], + "schema": [ + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "message" + }, + { + "colume_type": "TIMESTAMP", + "data_type": "TIMESTAMP_NANOSECOND", + "fulltext": false, + "name": "time" + } + ] +} +``` + +可以看到,`1998.08` 字符串中的 `.` 已经被替换为 `-`,Pipeline 处理成功。 \ No newline at end of file From 671a4f1f6575a4993b5b6046a32e4daae2d550c0 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 11 Sep 2024 12:40:38 +0800 Subject: [PATCH 2/3] docs: superset and metabase integration (#1179) Co-authored-by: Yiran Co-authored-by: jeremyhi --- docs/greptimecloud/integrations/metabase.md | 17 ++++++ docs/greptimecloud/integrations/superset.md | 18 +++++++ docs/user-guide/integrations/metabase.md | 28 ++++++++++ docs/user-guide/integrations/overview.md | 3 +- docs/user-guide/integrations/superset.md | 53 +++++++++++++++++++ .../greptimecloud/integrations/metabase.md | 16 ++++++ .../greptimecloud/integrations/superset.md | 17 ++++++ .../user-guide/integrations/metabase.md | 25 +++++++++ .../user-guide/integrations/overview.md | 3 +- .../user-guide/integrations/superset.md | 51 ++++++++++++++++++ sidebars.ts | 4 ++ 11 files changed, 233 insertions(+), 2 deletions(-) create mode 100644 docs/greptimecloud/integrations/metabase.md create mode 100644 docs/greptimecloud/integrations/superset.md create mode 100644 docs/user-guide/integrations/metabase.md create mode 100644 docs/user-guide/integrations/superset.md create mode 100644 i18n/zh/docusaurus-plugin-content-docs/current/greptimecloud/integrations/metabase.md create mode 100644 i18n/zh/docusaurus-plugin-content-docs/current/greptimecloud/integrations/superset.md create mode 100644 i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/metabase.md create mode 100644 i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/superset.md diff --git a/docs/greptimecloud/integrations/metabase.md b/docs/greptimecloud/integrations/metabase.md new file mode 100644 index 000000000..e7bf400f4 --- /dev/null +++ b/docs/greptimecloud/integrations/metabase.md @@ -0,0 +1,17 @@ +# Metabase + +[Metabase](https://github.com/metabase/metabase) is an open source BI tool that +written in Clojure. You can configure GreptimeDB as a metabase data source from +a community driver plugin. + +See [our docs](https://docs.greptime.com/user-guide/integrations/metabase) for +the instructions of plugin installation. + +## Connection information + +- Database type: `GreptimeDB` +- Host: `` +- Port: `4003` +- Database name: `` +- User: `` +- Password: `` diff --git a/docs/greptimecloud/integrations/superset.md b/docs/greptimecloud/integrations/superset.md new file mode 100644 index 000000000..e41c031b3 --- /dev/null +++ b/docs/greptimecloud/integrations/superset.md @@ -0,0 +1,18 @@ +# Superset + +[Superset](https://superset.apache.org) is an open source BI tool that written +in Python. You can configure GreptimeDB as a metabase data source from python +package. + +See [our docs](https://docs.greptime.com/user-guide/integrations/superset) for +the instructions of plugin installation. + +## Connection information + +Select `GreptimeDB` from database list. + +Use following SQlAlchemy URL for connection: + +``` +greptimedb://:@:4003/ +``` diff --git a/docs/user-guide/integrations/metabase.md b/docs/user-guide/integrations/metabase.md new file mode 100644 index 000000000..c69a69b3f --- /dev/null +++ b/docs/user-guide/integrations/metabase.md @@ -0,0 +1,28 @@ +# Metabase + +[Metabase](https://github.com/metabase/metabase) is an open source BI tool that +written in Clojure. You can configure GreptimeDB as a metabase data source from +a community driver plugin. + +## Installation + +Download the driver plugin file `greptimedb.metabase-driver.jar` from its +[release +page](https://github.com/greptimeteam/greptimedb-metabase-driver/releases/latest/). Copy +the jar file to `plugins/` of metabase's working directory. It will be +discovered by Metabase automatically. + +## Add GreptimeDB as database + +To add GreptimeDB database, select *Settings* / *Admin Settings* / *Databases*, +click *Add Database* button and select GreptimeDB from *Database type*. + +You will be asked to provide host, port, database name and authentication +information. + +- Use Greptime's Postgres protocol port `4003` as port. If you changed the + defaults, use you own settings. +- Username and password are optional if you didn't enable + [authentication](/user-guide/operations/authentication.md). +- Use `public` as default *Database name*. When using GreptimeCloud instance, + use the database name from your instance. diff --git a/docs/user-guide/integrations/overview.md b/docs/user-guide/integrations/overview.md index ed2c58ce8..637b09ac8 100644 --- a/docs/user-guide/integrations/overview.md +++ b/docs/user-guide/integrations/overview.md @@ -6,5 +6,6 @@ The subsequent sections offer comprehensive guidance on integrating GreptimeDB w - [Prometheus](./prometheus.md) - [Vector](./vector.md) - [Grafana](./grafana.md) +- [Superset](./superset.md) +- [Metabase](./metabase.md) - [EMQX](./emqx.md) - diff --git a/docs/user-guide/integrations/superset.md b/docs/user-guide/integrations/superset.md new file mode 100644 index 000000000..9f0e4c5d8 --- /dev/null +++ b/docs/user-guide/integrations/superset.md @@ -0,0 +1,53 @@ +# Superset + +[Apache Superset](https://superset.apache.org) is an open source BI tool that +written in Python. To configure GreptimeDB as a database in Superset, you can +follow this guide. + +## Installation + +### Running Superset with Docker Compose + +[Docker compose](https://superset.apache.org/docs/installation/docker-compose) +is the recommended way to run Superset. To add GreptimeDB extension, create a +`requirements-local.txt` file in `docker/` of Superset codebase. + +Add GreptimeDB dependency in `requirements-local.txt`: + +```txt +greptimedb-sqlalchemy +``` + +Start Superset services: + +```bash +docker compose -f docker-compose-non-dev.yml up +``` + +### Running Superset Locally + +If you are [running Superset from +pypi](https://superset.apache.org/docs/installation/pypi), install our extension +to the same environment. + +```bash +pip install greptimedb-sqlalchemy +``` + +## Add GreptimeDB as database + +To add GreptimeDB database, select *Settings* / *Database Connections*. + +Add database and select *GreptimeDB* from list of supported databases. + +Follow the SQLAlchemy URI pattern to provide your connection information: + +``` +greptimedb://:@:/ +``` + +- Ignore `:@` if you don't have + [authentication](/user-guide/operations/authentication.md) enabled. +- Use `4003` for default port (this extension uses Postgres protocol). +- Use `public` as default `database`. When using GreptimeCloud instance, use the + database name from your instance. diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/greptimecloud/integrations/metabase.md b/i18n/zh/docusaurus-plugin-content-docs/current/greptimecloud/integrations/metabase.md new file mode 100644 index 000000000..e1bc1e8d3 --- /dev/null +++ b/i18n/zh/docusaurus-plugin-content-docs/current/greptimecloud/integrations/metabase.md @@ -0,0 +1,16 @@ +# Metabase + +[Metabase](https://github.com/metabase/metabase) 是一个用 Clojure 编写的开源 BI +工具,可以通过社区维护的数据库驱动将 GreptimeDB 添加到 Metabase。 + +关于插件的安装,请[查看文 +档](https://docs.greptime.com/user-guide/integrations/metabase). + +## 连接信息 + +- 数据库类型: `GreptimeDB` +- 主机名: `` +- 端口: `4003` +- 数据库名: `` +- 用户名: `` +- 密码: `` diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/greptimecloud/integrations/superset.md b/i18n/zh/docusaurus-plugin-content-docs/current/greptimecloud/integrations/superset.md new file mode 100644 index 000000000..15128f359 --- /dev/null +++ b/i18n/zh/docusaurus-plugin-content-docs/current/greptimecloud/integrations/superset.md @@ -0,0 +1,17 @@ +# Superset + +[Apache Superset](https://superset.apache.org) 是开源的 BI 工具,用 Python 编写。 +以下内容可以帮助你把 GreptimeDB 作为 Superset 的数据源。 + +关于插件的安装,请[查看文 +档](https://docs.greptime.com/user-guide/integrations/superset). + +## 连接信息 + +从数据库列表中选择 `GreptimeDB`。 + +填写以下 URL + +``` +greptimedb://:@:4003/ +``` diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/metabase.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/metabase.md new file mode 100644 index 000000000..7da0c711c --- /dev/null +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/metabase.md @@ -0,0 +1,25 @@ +# Metabase + +[Metabase](https://github.com/metabase/metabase) 是一个用 Clojure 编写的开源 BI +工具,可以通过社区维护的数据库驱动将 GreptimeDB 添加到 Metabase。 + +## 安装 + +从 [发布 +页](https://github.com/greptimeteam/greptimedb-metabase-driver/releases/latest/) +下载最新的驱动插件文件 `greptimedb.metabase-driver.jar`,并将文件拷贝到 Metabase +的工作目录下 `plugins/` 目录中(如果不存在需要创建 `plugins/`)。当 Metabase 启 +动时,会自动检测到插件。 + +## 添加 GreptimeDB 数据库 + +选择 *设置* / *管理员设置* / *数据库*, 点击 *添加数据库* 按钮并选择 GreptimeDB +作为 *数据库类型*. + +进一步添加其他数据库信息: + +- 端口请填写 GreptimeDB 的 Postgres 协议端口 `4003`。 +- 如果没有开启[认证](/user-guide/operations/authentication.md),用户名和密码字段 + 是可选的。 +- 默认填写 `public` 作为 *数据库名*。如果是使用 GreptimeCloud 的实例,可以从控制 + 台复制数据库名称。 diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/overview.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/overview.md index e0b79131e..0f8c22182 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/overview.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/overview.md @@ -6,5 +6,6 @@ GreptimeDB 可以与流行的数据写入、查询和可视化工具无缝集成 - [Prometheus](./prometheus.md) - [Vector](./vector.md) - [Grafana](./grafana.md) +- [Superset](./superset.md) +- [Metabase](./metabase.md) - [EMQX](./emqx.md) - diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/superset.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/superset.md new file mode 100644 index 000000000..9341afd83 --- /dev/null +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/integrations/superset.md @@ -0,0 +1,51 @@ +# Superset + +[Apache Superset](https://superset.apache.org) 是开源的 BI 工具,用 Python 编写。 +以下内容可以帮助你把 GreptimeDB 作为 Superset 的数据源。 + +## 安装 + +### 用 Docker Compose 运行 Superset + +[Docker compose](https://superset.apache.org/docs/installation/docker-compose) +是 Superset 的推荐使用方式。在这种运行方式下,需要在 Superset 代码目录下的 +`docker/` 中添加一个 `requirements-local.txt`。 + +并将 GreptimeDB 依赖加入到 `requirements-local.txt`: + +```txt +greptimedb-sqlalchemy +``` + +启动 Supertset 服务: + +```bash +docker compose -f docker-compose-non-dev.yml up +``` + +### 本地运行 Superset + +假如你通过 [Pypi 包安装和运行 +Superset](https://superset.apache.org/docs/installation/pypi),需要将 GreptimeDB +的依赖安装到相同的 Python 环境。 + +```bash +pip install greptimedb-sqlalchemy +``` + +## 添加 GreptimeDB 数据库 + +准备添加,选择 *设置* / *数据库连接*. + +添加数据库,并在支持的数据库列表中选择 *GreptimeDB*。 + +根据 SQLAlchemy URI 的规范,填写以下格式的数据库连接地址。 + +``` +greptimedb://:@:/ +``` + +- 如果没有启动[认证](/user-guide/operations/authentication.md),可以忽略 + `:@` 部分。 +- 默认端口 `4003` (我们用 PostgresSQL 协议通信)。 +- 默认数据库 `public`。如果是使用 GreptimeCloud 实例,可以从控制台复制数据库名称。 diff --git a/sidebars.ts b/sidebars.ts index ee78b5c5c..2861caba0 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -101,6 +101,8 @@ const sidebars: SidebarsConfig = { 'user-guide/integrations/prometheus', 'user-guide/integrations/vector', 'user-guide/integrations/grafana', + 'user-guide/integrations/superset', + 'user-guide/integrations/metabase', 'user-guide/integrations/emqx', ], }, @@ -233,6 +235,8 @@ const sidebars: SidebarsConfig = { 'greptimecloud/integrations/vector', 'greptimecloud/integrations/emqx', 'greptimecloud/integrations/streamlit', + 'greptimecloud/integrations/superset', + 'greptimecloud/integrations/metabase', 'greptimecloud/integrations/mindsdb', { type: 'category', From 20f03037195775a941e175982f71818e3287d849 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 11 Sep 2024 17:07:25 +0800 Subject: [PATCH 3/3] docs: add three usecase for flow (#1165) --- .../continuous-aggregation/overview.md | 6 +- .../continuous-aggregation/query.md | 89 +------ .../continuous-aggregation/usecase-example.md | 223 +++++++++++++++++ .../continuous-aggregation/overview.md | 4 +- .../continuous-aggregation/query.md | 96 +------- .../continuous-aggregation/usecase-example.md | 224 ++++++++++++++++++ sidebars.ts | 1 + 7 files changed, 467 insertions(+), 176 deletions(-) create mode 100644 docs/user-guide/continuous-aggregation/usecase-example.md create mode 100644 i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md diff --git a/docs/user-guide/continuous-aggregation/overview.md b/docs/user-guide/continuous-aggregation/overview.md index 313c0fe68..a6b801f4b 100644 --- a/docs/user-guide/continuous-aggregation/overview.md +++ b/docs/user-guide/continuous-aggregation/overview.md @@ -64,7 +64,7 @@ SELECT min(size) as min_size, max(size) as max_size, avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, + sum(case when `size` > 550 then 1 else 0 end) as high_size_count, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM ngx_access_log GROUP BY @@ -133,11 +133,15 @@ Here is the explanation of the columns in the `ngx_statistics` table: - `time_window`: The time window of the aggregation. - `update_at`: The time when the aggregation is updated. + + + ## Next Steps Congratulations you already have a preliminary understanding of the continuous aggregation feature. Please refer to the following sections to learn more: +- [Usecase Examples](./usecase-example.md) provides more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. - [Manage Flows](./manage-flow.md) describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow. - [Write a Query](./query.md) describes how to write a continuous aggregation query. - [Define Time Window](./define-time-window.md) describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation. diff --git a/docs/user-guide/continuous-aggregation/query.md b/docs/user-guide/continuous-aggregation/query.md index b9e574f0b..f21ee2af0 100644 --- a/docs/user-guide/continuous-aggregation/query.md +++ b/docs/user-guide/continuous-aggregation/query.md @@ -15,91 +15,12 @@ Only two kinds of expression are allowed after `SELECT` keyword: The query should have a `FROM` clause to identify the source table. As the join clause is currently not supported, the query can only aggregate columns from a single table. -`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. - -Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query. - -## Rewrite an existing query to a continuous aggregation query - -Some of simple existing aggregation queries can be directly used as continuous aggregation queries. For example, the example in [overview](./overview.md) can be used to query both in standard SQL and continuous aggregation query, since it's also a valid SQL query without any flow-specific syntax or functions: +`WHERE` and `HAVING` clauses are supported in the continuous aggregation query. They work as in a normal query. The `WHERE` clause filters the data before aggregation, and the `HAVING` clause filters the data after aggregation. -```sql -SELECT - status, - count(client) AS total_logs, - min(size) as min_size, - max(size) as max_size, - avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` +`DISTINCT` currently only work with `SELECT DISTINCT column1 ..` syntax. It is used to remove duplicate rows from the result set. Support for `SELECT count(DISTINCT column1) ...` is not available yet, but it will be added in the future. -However, there are other types of queries that cannot be directly used as continuous aggregation queries. -For example, a query that needs to compute percentiles would be unwise to repeatedly calculate the percentile for each time window everytime a new batch of data arrive. In this case, you can pre-aggregate the data into buckets of the desired size, and then calculate the percentile in the sink table using standard SQL when needed. The original SQL might be: -```sql -SELECT - status, - percentile_approx(size, 0.5) as median_size, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` -The above query can be rewritten to first aggregate the data into buckets of size 10, and then calculate the percentile in the sink table. -The flow query would be: -```sql -CREATE FLOW calc_ngx_distribution -SINK TO ngx_distribution -AS -SELECT - status, - trunc(size, -1) as bucket, - count(client) AS total_logs, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window, - bucket; -``` - -And then you can calculate the percentile in the sink table using standard SQL: -```sql -SELECT - outer.status, - outer.time_window, - outer.bucket, - SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile -FROM ngx_distribution AS outer -JOIN ngx_distribution AS in1 -ON in1.status = outer.status -AND in1.time_window = outer.time_window -GROUP BY - status, - time_window, - bucket -ORDER BY status, time_window, bucket; -``` - -The SQL query groups the data by status, time_window, and bucket. The percentile column calculates the percentage within each group by taking the sum of all buckets not greater than the current bucket and dividing it by the total count of all logs. The result would be something like this: +`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. -```sql - status | time_window | bucket | percentile ---------+----------------------------+--------+------------ - 404 | 1970-01-01 00:00:00.000000 | 0 | 22 - 404 | 1970-01-01 00:00:00.000000 | 1 | 55 - 404 | 1970-01-01 00:00:00.000000 | 2 | 66 - 404 | 1970-01-01 00:00:00.000000 | 3 | 100 -(4 rows) -``` +Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query. - \ No newline at end of file +See [Usecase Examples](./usecase-example.md) for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. \ No newline at end of file diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md new file mode 100644 index 000000000..d59140c74 --- /dev/null +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -0,0 +1,223 @@ +# Usecase Examples +Following are three major usecase examples for continuous aggregation: + +1. **Real-time Analytics**: A real-time analytics platform that continuously aggregates data from a stream of events, delivering immediate insights while optionally downsampling the data to a lower resolution. For instance, this system can compile data from a high-frequency stream of log events (e.g., occurring every millisecond) to provide up-to-the-minute insights such as the number of requests per minute, average response times, and error rates per minute. + +2. **Real-time Monitoring**: A real-time monitoring system that continuously aggregates data from a stream of events and provides real-time alerts based on the aggregated data. For example, a system that aggregates data from a stream of sensor events and provides real-time alerts when the temperature exceeds a certain threshold. + +3. **Real-time Dashboard**: A real-time dashboard that shows the number of requests per minute, the average response time, and the number of errors per minute. This dashboard can be used to monitor the health of the system and to detect any anomalies in the system. + +In all these usecases, the continuous aggregation system continuously aggregates data from a stream of events and provides real-time insights and alerts based on the aggregated data. The system can also downsample the data to a lower resolution to reduce the amount of data stored and processed. This allows the system to provide real-time insights and alerts while keeping the data storage and processing costs low. + +## Real-time analytics example + +See [Overview](/user-guide/continuous-aggregation/overview.md#quick-start-with-an-example) for an example of real-time analytics. Which is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs. + +Another example of real-time analytics is to get all distinct country from the `ngx_access_log` table. The query for continuous aggregation would be: + +```sql +/* input table */ +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + update_at TIMESTAMP, + __ts_placeholder TIMESTAMP TIME INDEX, + PRIMARY KEY(country) +); + +/* create flow task to calculate the distinct country */ +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, +FROM ngx_access_log; +``` + +now that we have created the flow task, we can insert some data into the source table `ngx_access_log`: + +```sql +/* insert some data */ +INSERT INTO ngx_access_log VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); + +``` + + +Wait for one second for the Flow to write the result to the sink table and then query: + + +```sql +select * from ngx_country; +``` + +Or if you want to group the data by time window, you can use the following query: + +```sql +/* input table create same as above */ +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, + PRIMARY KEY(country) +); +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM ngx_access_log +GROUP BY + country, + time_window; +/* insert data using the same data as above */ +``` + +The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. + +Note that there is currently no persistent storage for the internal state of the flow. The internal state refers to the intermediate state used in computing incremental query results, such as the accumulator's value for an aggregation query (e.g., `count(col)`'s accumulator records the current count number). However, there is persistent storage for the data in the sink table. +Therefore, it is recommended to use an appropriate time window (e.g., hourly) to minimize data loss. This is because if the internal state is lost, the related data within that time window will also be lost. + +## Real-time monitoring example + +Consider a usecase where you have a stream of sensor events from a network of temperature sensors that you want to monitor in real-time. The sensor events contain information such as the sensor ID, the temperature reading, the timestamp of the reading, and the location of the sensor. You want to continuously aggregate this data to provide real-time alerts when the temperature exceeds a certain threshold. Then the query for continuous aggregation would be: + +```sql +/* create input table */ +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +/* create sink table */ +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + update_at TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +); + +CREATE FLOW temp_monitoring +SINK TO temp_alerts +AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING max_temp > 100; +``` + +Now that we have created the flow task, we can insert some data into the source table `temp_sensor_data`: + +```sql + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 98.5, "2022-01-01 00:00:00"), + (2, "room2", 99.5, "2022-01-01 00:00:01"); +``` +table should be empty now, but still wait at least one second for flow to update results to sink table: + +```sql +SELECT * FROM temp_alerts; +``` + +```sql + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 101.5, "2022-01-01 00:00:02"), + (2, "room2", 102.5, "2022-01-01 00:00:03"); +``` +wait at least one second for flow to update results to sink table: +```sql +/* wait at least one second for flow to update results to sink table */ +SELECT * FROM temp_alerts; +``` + +The above query continuously aggregates the data from the `temp_sensor_data` table into the `temp_alerts` table. It calculates the maximum temperature reading for each sensor and location and filters out the data where the maximum temperature exceeds 100 degrees. The `temp_alerts` table will be continuously updated with the aggregated data, providing real-time alerts (Which is a new row in the `temp_alerts` table) when the temperature exceeds the threshold. + + +## Real-time dashboard + +Consider a usecase in which you need a bar graph that show the distribution of packet sizes for each status code to monitor the health of the system. The query for continuous aggregation would be: + +```sql +/* create input table */ +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); +/* create sink table */ +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, /* auto generated column to store the last update time */ + PRIMARY KEY(stat, bucket_size) +); +/* create flow task to calculate the distribution of packet sizes for each status code */ +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1)::INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; +``` + +Now that we have created the flow task, we can insert some data into the source table `ngx_access_log`: + +```sql +INSERT INTO ngx_access_log VALUES + ("cli1", 200, 100, "2022-01-01 00:00:00"), + ("cli2", 200, 104, "2022-01-01 00:00:01"), + ("cli3", 200, 120, "2022-01-01 00:00:02"), + ("cli4", 200, 124, "2022-01-01 00:00:03"), + ("cli5", 200, 140, "2022-01-01 00:00:04"), + ("cli6", 404, 144, "2022-01-01 00:00:05"), + ("cli7", 404, 160, "2022-01-01 00:00:06"), + ("cli8", 404, 164, "2022-01-01 00:00:07"), + ("cli9", 404, 180, "2022-01-01 00:00:08"), + ("cli10", 404, 184, "2022-01-01 00:00:09"); +``` +wait at least one second for flow to update results to sink table: +```sql +SELECT * FROM ngx_distribution; +``` + +The above query puts the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket (in this case, since `trunc`'s second argument is -1, meaning a bucket size of 10) for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code. + +## Conclusion + +Continuous aggregation is a powerful tool for real-time analytics, monitoring, and dashboarding. It allows you to continuously aggregate data from a stream of events and provide real-time insights and alerts based on the aggregated data. By downsampling the data to a lower resolution, you can reduce the amount of data stored and processed, making it easier to provide real-time insights and alerts while keeping the data storage and processing costs low. Continuous aggregation is a key component of any real-time data processing system and can be used in a wide range of usecases to provide real-time insights and alerts based on streaming data. \ No newline at end of file diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md index 03794d1eb..282535fe1 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md @@ -67,7 +67,7 @@ SELECT min(size) as min_size, max(size) as max_size, avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, + sum(case when `size` > 550 then 1 else 0 end) as high_size_count, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM ngx_access_log GROUP BY @@ -141,6 +141,8 @@ SELECT * FROM ngx_statistics; 恭喜你已经初步了解了持续聚合功能。 请参考以下章节了解更多: + +- [用例](./usecase-example.md) 提供了更多关于如何在实时分析、监控和仪表板中使用持续聚合的示例。 - [管理 Flow](./manage-flow.md) 描述了如何创建、更新和删除 flow。你的每个持续聚合查询都是一个 flow。 - [编写查询语句](./query.md) 描述了如何编写持续聚合查询。 - [定义时间窗口](./define-time-window.md) 描述了如何为持续聚合定义时间窗口。时间窗口是持续聚合查询的一个重要属性,它定义了聚合的时间间隔。 diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md index 77c05ac92..2e98aec98 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md @@ -16,98 +16,14 @@ SELECT AGGR_FUNCTION(column1, column2,..) FROM GROUP BY TIME_WIND 查询应该有一个 `FROM` 子句来标识 source 表。由于不支持 join 子句,目前只能从单个表中聚合列。 -`GROUP BY` 子句与普通查询中的工作方式相同。 -它根据指定的列对数据进行分组。 -`GROUP BY` 子句中使用的时间窗口函数 `hop()` 和 `tumble()` 在 [定义时间窗口](./define-time-window.md) 部分中有描述。 +`WHERE` 和 `HAVING` 子句在持续聚合查询中是支持的。它们的工作方式与普通查询中的相同。`WHERE` 子句在聚合之前过滤数据,而 `HAVING` 子句在聚合之后过滤数据。 + +`DISTINCT` 用于从结果集中删除重复行,目前仅支持 `SELECT DISTINCT column1 ..` 语法,不支持 `SELECT count(DISTINCT column1) ...`,但将来会添加支持。 + +`GROUP BY` 子句与普通查询中的工作方式相同。它根据指定的列对数据进行分组。`GROUP BY` 子句中使用的时间窗口函数 `hop()` 和 `tumble()` 在 [定义时间窗口](./define-time-window.md) 部分中有描述。 它们用于在聚合中定义时间窗口。 `GROUP BY` 中的其他表达式可以是 literal、列名或 scalar 表达式。 持续聚合查询不支持 `ORDER BY`、`LIMIT`、`OFFSET` 等其他操作。 - -## 将现有查询重写为持续聚合查询 - -一些简单的现有聚合查询可以直接用作持续聚合查询。例如,[概述](./overview.md) 部分中的示例可以用于标准 SQL 查询和持续聚合查询,因为它也是一个有效的 SQL 查询,没有任何特定于流的语法或函数: - -```sql -SELECT - status, - count(client) AS total_logs, - min(size) as min_size, - max(size) as max_size, - avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` - -然而,还有其他类型的查询不能直接用作持续聚合查询。 -例如,需要计算百分位数的查询不应该在每次新数据到达时重复计算每个时间窗口的百分位数。 -在这种情况下,您可以将数据预聚合到所需大小的桶中,然后在需要时使用标准 SQL 在 sink 表中计算百分位数。原始 SQL 可能是: - -```sql -SELECT - status, - percentile_approx(size, 0.5) as median_size, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` -上述查询可以重写为首先将数据聚合到大小为 10 的桶中,然后在 sink 表中计算百分位数。 -流查询将是: - -```sql -CREATE FLOW calc_ngx_distribution -SINK TO ngx_distribution -AS -SELECT - status, - trunc(size, -1) as bucket, - count(client) AS total_logs, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window, - bucket; -``` -接下来,您可以使用标准 SQL 在 sink 表中计算百分位数: -```sql -SELECT - outer.status, - outer.time_window, - outer.bucket, - SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile -FROM ngx_distribution AS outer -JOIN ngx_distribution AS in1 -ON in1.status = outer.status -AND in1.time_window = outer.time_window -GROUP BY - status, - time_window, - bucket -ORDER BY status, time_window, bucket; -``` - -上述 SQL 查询按 status、time_window 和 bucket 对数据进行分组。percentile 列通过计算小于或等于当前 bucket 的所有 bucket 的总和,并将其除以所有日志的总数来计算每个组内的百分比。结果可能如下所示: - -```sql - status | time_window | bucket | percentile ---------+----------------------------+--------+------------ - 404 | 1970-01-01 00:00:00.000000 | 0 | 22 - 404 | 1970-01-01 00:00:00.000000 | 1 | 55 - 404 | 1970-01-01 00:00:00.000000 | 2 | 66 - 404 | 1970-01-01 00:00:00.000000 | 3 | 100 -(4 rows) -``` - - \ No newline at end of file +更多关于如何在实时分析、监控和仪表盘中使用持续聚合的示例,请参考 [用例](./usecase-example.md)。 \ No newline at end of file diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md new file mode 100644 index 000000000..1f739b9a4 --- /dev/null +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -0,0 +1,224 @@ +# 用例 +持续聚合的三个主要用例示例如下: + +1. **实时分析**:一个实时分析平台,不断聚合来自事件流的数据,提供即时洞察,同时可选择将数据降采样到较低分辨率。例如,此系统可以编译来自高频日志事件流(例如,每毫秒发生一次)的数据,以提供每分钟的请求数、平均响应时间和每分钟的错误率等最新洞察。 + +2. **实时监控**:一个实时监控系统,不断聚合来自事件流的数据,根据聚合数据提供实时警报。例如,此系统可以处理来自传感器事件流的数据,以提供当温度超过某个阈值时的实时警报。 + +3. **实时仪表盘**:一个实时仪表盘,显示每分钟的请求数、平均响应时间和每分钟的错误数。此仪表板可用于监控系统的健康状况,并检测系统中的任何异常。 + +在所有这些用例中,持续聚合系统不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。系统还可以将数据降采样到较低分辨率,以减少存储和处理的数据量。这使得系统能够提供实时洞察和警报,同时保持较低的数据存储和处理成本。 + +## 实时分析示例 + +请参阅[概述](/user-guide/continuous-aggregation/overview.md#快速开始示例)中的实时分析示例。该示例用于计算日志的总数、包大小的最小、最大和平均值,以及大小大于 550 的数据包数量按照每个状态码在 1 分钟固定窗口中的实时分析。 + +另外,您还可以使用持续聚合来计算其他类型的实时分析。例如,要从 `ngx_access_log` 表中获取所有不同的国家。持续聚合的查询如下: + +```sql +/* input table */ +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + update_at TIMESTAMP, + __ts_placeholder TIMESTAMP TIME INDEX, + PRIMARY KEY(country) +); + +/* create flow task to calculate the distinct country */ +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, +FROM ngx_access_log; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `ngx_access_log` 中: + +```sql +/* insert some data */ +INSERT INTO ngx_access_log VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); +``` +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql +/* check the result */ +select * from ngx_country; +``` + +或者,如果您想要按时间窗口对数据进行分组,可以使用以下查询: + +```sql +/* input table create same as above */ +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, + PRIMARY KEY(country) +); +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM ngx_access_log +GROUP BY + country, + time_window; +/* insert data using the same data as above */ +``` + +上述的查询将 `ngx_access_log` 表中的数据放入 `ngx_country` 表中。它计算每个时间窗口的不同国家。`date_bin` 函数用于将数据分组为一小时的间隔。`ngx_country` 表将不断更新聚合数据,提供实时洞察,显示正在访问系统的不同国家。 + +请注意,目前 Flow 的内部状态没有持久存储。内部状态指的是用于计算增量查询结果的中间状态,例如聚合查询的累加器值(如count(col)的累加器记录了目前为止的 count 计数)。然而,Sink 表的数据是有持久存储的。因此,建议您使用适当的时间窗口(例如设置为每小时)来最小化数据丢失。因为一旦内部状态丢失,相关时间窗口的数据也将随之丢失。 + +## 实时监控示例 + +假设您希望实时监控一个来自温度传感器网络的传感器事件流。传感器事件包含传感器 ID、温度读数、读数的时间戳和传感器的位置等信息。您希望不断聚合这些数据,以在温度超过某个阈值时提供实时警报。那么持续聚合的查询将是: + +```sql +/* create input table */ +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +/* create sink table */ +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + update_at TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +); + +CREATE FLOW temp_monitoring +SINK TO temp_alerts +AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING max_temp > 100; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `temp_sensor_data` 中: + +```sql + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 98.5, "2022-01-01 00:00:00"), + (2, "room2", 99.5, "2022-01-01 00:00:01"); + +/* You may want to flush the flow task to see the result */ +ADMIN FLUSH_FLOW('temp_monitoring'); +``` + +当前输出表应该为空。您可以在等待一秒后通过以下查询查看结果: + +```sql +/* for now sink table will be empty */ +SELECT * FROM temp_alerts; +``` + +```sql + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 101.5, "2022-01-01 00:00:02"), + (2, "room2", 102.5, "2022-01-01 00:00:03"); + +``` + +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql +/* now sink table will have the max temperature data */ +SELECT * FROM temp_alerts; +``` + +上述的查询将从 `temp_sensor_data` 表中不断聚合数据到 `temp_alerts` 表中。它计算每个传感器和位置的最高温度读数,并过滤出最高温度超过 100 度的数据。`temp_alerts` 表将不断更新聚合数据,并且当温度超过阈值时提供实时警报(即 `temp_alerts` 表中的新行)。 + +## 实时仪表盘示例 + +假设您需要一个柱状图显示每个状态码的数据包大小分布,以监控系统的健康状况。持续聚合的查询将是: + +```sql +/* create input table */ +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); +/* create sink table */ +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, /* auto generated column to store the last update time */ + PRIMARY KEY(stat, bucket_size) +); +/* create flow task to calculate the distribution of packet sizes for each status code */ +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1)::INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `ngx_access_log` 中: + +```sql +INSERT INTO ngx_access_log VALUES + ("cli1", 200, 100, "2022-01-01 00:00:00"), + ("cli2", 200, 104, "2022-01-01 00:00:01"), + ("cli3", 200, 120, "2022-01-01 00:00:02"), + ("cli4", 200, 124, "2022-01-01 00:00:03"), + ("cli5", 200, 140, "2022-01-01 00:00:04"), + ("cli6", 404, 144, "2022-01-01 00:00:05"), + ("cli7", 404, 160, "2022-01-01 00:00:06"), + ("cli8", 404, 164, "2022-01-01 00:00:07"), + ("cli9", 404, 180, "2022-01-01 00:00:08"), + ("cli10", 404, 184, "2022-01-01 00:00:09"); +``` +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql +SELECT * FROM ngx_distribution; +``` + +上述查询将从 `ngx_access_log` 表中的数据放入 `ngx_distribution` 表中。它计算每个状态码的数据包大小分布,并将数据分组到每个时间窗口中。`ngx_distribution` 表将不断更新聚合数据,提供实时洞察,显示每个状态码的数据包大小分布。 + +## 结论 + +持续聚合是实时分析、监控和仪表盘的强大工具。它允许您不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。通过将数据降采样到较低分辨率,您可以减少存储和处理的数据量,从而更容易提供实时洞察和警报,同时保持较低的数据存储和处理成本。持续聚合是任何实时数据处理系统的关键组件,可以在各种用例中使用,以提供基于流数据的实时洞察和警报。 \ No newline at end of file diff --git a/sidebars.ts b/sidebars.ts index 2861caba0..3204b8898 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -125,6 +125,7 @@ const sidebars: SidebarsConfig = { label: 'Continuous Aggregation', items: [ 'user-guide/continuous-aggregation/overview', + 'user-guide/continuous-aggregation/usecase-example', 'user-guide/continuous-aggregation/manage-flow', 'user-guide/continuous-aggregation/query', 'user-guide/continuous-aggregation/define-time-window',