influxdb处理数据 influxdb教程
influxdb处理数据 influxdb教程

现在您已经了解了从InfluxDB查询数据的基础知识, 让我们超越基本查询,开始处理查询数据。 “处理”数据可能意味着转换、聚合、下采样或警报 数据。本教程涵盖以下数据处理用例:

  • 重新映射或分配数据中的值
  • 组数据
  • 聚合或选择特定数据
  • Pivot data into a relational schema相关文章
  • 下采样数据
  • 使用InfluxDB任务自动处理

大多数数据处理操作需要手动编辑Flux查询。 如果使用InfluxDB数据资源管理器,请切换到脚本编辑器 而不是使用Query Builder。

重新映射或分配数据中的值

使用map()函数 迭代数据中的每一行并更新该行中的值。 map()是Flux中最有用的函数之一,将帮助您完成 其中许多数据处理操作需要执行。

了解有关map()工作原理的更多信息

from(bucket: "get-started")     |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "hum")     |> map(fn: (r) => ({r with _value: r._value / 100.0})) 

地图示例

执行数学运算

map()允许您对数据执行数学运算。 例如,使用在“开始写入InfluxDB”中写入的数据:

  1. 查询temp字段以返回以°C为单位的室温。
  2. 使用map()迭代每一行,并将 _value柱至°F,使用以下公式:°F = (°C * 1.8) + 32.0.
from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp")     |> map(fn: (r) => ({r with _value: (r._value * 1.8) + 32.0})) 

输入 输出量 单击以查看输出

_start_stop列已被省略。

_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 厨房 温度 22.8
2022-01-01T15:00:00Z 厨房 温度 22.7
2022-01-01T16:00:00Z 厨房 温度 22.4
2022-01-01T17:00:00Z 厨房 温度 22.7
2022-01-01T18:00:00Z 厨房 温度 23.3
2022-01-01T19:00:00Z 厨房 温度 23.1
2022-01-01T20:00:00Z 厨房 温度 22.7
_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 客厅 温度 22.3
2022-01-01T15:00:00Z 客厅 温度 22.3
2022-01-01T16:00:00Z 客厅 温度 22.4
2022-01-01T17:00:00Z 客厅 温度 22.6
2022-01-01T18:00:00Z 客厅 温度 22.8
2022-01-01T19:00:00Z 客厅 温度 22.5
2022-01-01T20:00:00Z 客厅 温度 22.2

有条件地分配状态

数据警报

组数据

使用group()函数 按特定列值重新组合数据,以准备进一步处理。

from(bucket: "get-started")     |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> group(columns: ["room", "_field"]) 

理解数据分组及其重要性很重要,但可能太多了 这篇“入门”教程。 有关如何对数据进行分组及其重要性的更多信息,请参见 通量数据模型文档。

默认情况下,from()返回从InfluxDB查询的数据,按系列分组 (测量、标签和字段键)。 返回的表流中的每个表表示一个组。 每个表都包含数据分组所依据的列的相同值。 在聚合数据时,此分组非常重要。

组示例

按特定列对数据进行分组

使用“开始写入InfluxDB”中写入的数据:

  1. 查询temphum字段。
  2. 使用group()仅按_field列分组。
from(bucket: "get-started")     |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp" or r._field == "hum")     |> group(columns: ["_field"]) 

输入 输出量 单击以查看输出

以下数据从最后一个filter()输出,并通过管道传输到group()

_start_stop列已被省略。

组密钥实例= [_measurement=home,room=Kitchen,_field=hum]

_时间 _测量 房间 _字段 _值
2022-01-01T08:00:00Z 厨房 嗡嗡声 35.9
2022-01-01T09:00:00Z 厨房 嗡嗡声 36.2
2022-01-01T10:00:00Z 厨房 嗡嗡声 36.1

组密钥实例= [_measurement=home,room=Living Room,_field=hum]

_时间 _测量 房间 _字段 _值
2022-01-01T08:00:00Z 客厅 嗡嗡声 35.9
2022-01-01T09:00:00Z 客厅 嗡嗡声 35.9
2022-01-01T10:00:00Z 客厅 嗡嗡声 36

组密钥实例= [_measurement=home,room=Kitchen,_field=temp]

_时间 _测量 房间 _字段 _值
2022-01-01T08:00:00Z 厨房 温度 21
2022-01-01T09:00:00Z 厨房 温度 23
2022-01-01T10:00:00Z 厨房 温度 22.7

组密钥实例= [_measurement=home,room=Living Room,_field=temp]

_时间 _测量 房间 _字段 _值
2022-01-01T08:00:00Z 客厅 温度 21.1
2022-01-01T09:00:00Z 客厅 温度 21.4
2022-01-01T10:00:00Z 客厅 温度 21.8

取消数据分组

使用“开始写入InfluxDB”中写入的数据:

  1. 查询temphum字段。
  2. 使用不带任何参数的group()来“取消”数据分组或不按列分组。 columns参数的默认值是空数组([])。
from(bucket: "get-started")     |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp" or r._field == "hum")     |> group() 

输入 输出量 单击以查看输出

以下数据从最后一个filter()输出,并通过管道传输到group()

_start_stop列已被省略。

组密钥实例= [_measurement=home,room=Kitchen,_field=hum]

_时间 _测量 房间 _字段 _值
2022-01-01T08:00:00Z 厨房 嗡嗡声 35.9
2022-01-01T09:00:00Z 厨房 嗡嗡声 36.2
2022-01-01T10:00:00Z 厨房 嗡嗡声 36.1

组密钥实例= [_measurement=home,room=Living Room,_field=hum]

_时间 _测量 房间 _字段 _值
2022-01-01T08:00:00Z 客厅 嗡嗡声 35.9
2022-01-01T09:00:00Z 客厅 嗡嗡声 35.9
2022-01-01T10:00:00Z 客厅 嗡嗡声 36

组密钥实例= [_measurement=home,room=Kitchen,_field=temp]

_时间 _测量 房间 _字段 _值
2022-01-01T08:00:00Z 厨房 温度 21
2022-01-01T09:00:00Z 厨房 温度 23
2022-01-01T10:00:00Z 厨房 温度 22.7

组密钥实例= [_measurement=home,room=Living Room,_field=temp]

_时间 _测量 房间 _字段 _值
2022-01-01T08:00:00Z 客厅 温度 21.1
2022-01-01T09:00:00Z 客厅 温度 21.4
2022-01-01T10:00:00Z 客厅 温度 21.8

聚合或选择特定数据

使用通量聚合 或选择器功能 从每个输入表中返回聚合值或选定值。

from(bucket: "get-started")     |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")     |> mean() 

随时间的聚集

如果要查询一段时间内的聚合值,这是一种 下采样

聚合函数

聚合函数丢弃 不在组键中的列 并为每个输入表返回一行,其中包含该表的聚合值。

聚合实例

计算每个房间的平均温度

使用“开始写入InfluxDB”中写入的数据:

  1. 查询temp字段。默认情况下,from()返回分组为 _measurementroom_field,因此每个表代表一个房间。
  2. 使用mean()返回每个房间的平均温度。
from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp")     |> mean() 

输入 输出量 单击以查看输出

_start_stop列已被省略。

_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 厨房 温度 22.8
2022-01-01T15:00:00Z 厨房 温度 22.7
2022-01-01T16:00:00Z 厨房 温度 22.4
2022-01-01T17:00:00Z 厨房 温度 22.7
2022-01-01T18:00:00Z 厨房 温度 23.3
2022-01-01T19:00:00Z 厨房 温度 23.1
2022-01-01T20:00:00Z 厨房 温度 22.7
_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 客厅 温度 22.3
2022-01-01T15:00:00Z 客厅 温度 22.3
2022-01-01T16:00:00Z 客厅 温度 22.4
2022-01-01T17:00:00Z 客厅 温度 22.6
2022-01-01T18:00:00Z 客厅 温度 22.8
2022-01-01T19:00:00Z 客厅 温度 22.5
2022-01-01T20:00:00Z 客厅 温度 22.2

计算所有房间的整体平均温度

使用“开始写入InfluxDB”中写入的数据:

  1. 查询temp字段。
  2. 使用group()将数据解组到单个表中。默认情况下, from()返回按_measurementroom_field分组的数据。 要获得总体平均值,您需要将所有结果构建为单个表。
  3. 使用mean()返回平均温度。
from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp")     |> group()     |> mean() 

输入 输出量 单击以查看输出

以下输入数据表示通过管道向前传输的未分组数据 mean()

_start_stop列已被省略。

_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 厨房 温度 22.8
2022-01-01T15:00:00Z 厨房 温度 22.7
2022-01-01T16:00:00Z 厨房 温度 22.4
2022-01-01T17:00:00Z 厨房 温度 22.7
2022-01-01T18:00:00Z 厨房 温度 23.3
2022-01-01T19:00:00Z 厨房 温度 23.1
2022-01-01T20:00:00Z 厨房 温度 22.7
2022-01-01T14:00:00Z 客厅 温度 22.3
2022-01-01T15:00:00Z 客厅 温度 22.3
2022-01-01T16:00:00Z 客厅 温度 22.4
2022-01-01T17:00:00Z 客厅 温度 22.6
2022-01-01T18:00:00Z 客厅 温度 22.8
2022-01-01T19:00:00Z 客厅 温度 22.5
2022-01-01T20:00:00Z 客厅 温度 22.2

计算所有字段中每个房间报告的点数

使用“开始写入InfluxDB”中写入的数据:

  1. 通过简单地按home测量进行过滤来查询所有字段。
  2. home测量中的字段是不同的类型。 使用toFloat()将所有字段值转换为浮点数。
  3. 使用group()room对数据进行分组。
  4. 使用count()返回每个输入表中的行数。
from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> toFloat()     |> group(columns: ["room"])     |> count() 
输出量

_start_stop列已被省略。

房间 _值
厨房 21
房间 _值
客厅 21

分配新的聚合时间戳

_time通常不是组密钥的一部分,在使用时将被删除 聚合函数。要为聚合点分配新的时间戳,请复制 表示查询边界的_start_stop列作为 新建_time列。

from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp")     |> mean()     |> duplicate(column: "_stop", as: "_time") 

选择器功能

选择器功能返回 一个或多个列,并保留所有列及其值。

选择器示例

返回每个房间的第一个温度

使用“开始写入InfluxDB”中写入的数据:

  1. 查询temp字段。
  2. 使用first()返回 每个表的第一行。
from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp")     |> first() 

输入 输出量 单击以查看输出

_start_stop列已被省略。

_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 厨房 温度 22.8
2022-01-01T15:00:00Z 厨房 温度 22.7
2022-01-01T16:00:00Z 厨房 温度 22.4
2022-01-01T17:00:00Z 厨房 温度 22.7
2022-01-01T18:00:00Z 厨房 温度 23.3
2022-01-01T19:00:00Z 厨房 温度 23.1
2022-01-01T20:00:00Z 厨房 温度 22.7
_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 客厅 温度 22.3
2022-01-01T15:00:00Z 客厅 温度 22.3
2022-01-01T16:00:00Z 客厅 温度 22.4
2022-01-01T17:00:00Z 客厅 温度 22.6
2022-01-01T18:00:00Z 客厅 温度 22.8
2022-01-01T19:00:00Z 客厅 温度 22.5
2022-01-01T20:00:00Z 客厅 温度 22.2

返回每个房间的最后温度

使用“开始写入InfluxDB”中写入的数据:

  1. 查询temp字段。
  2. 使用last()返回 每个表的最后一行。
from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp")     |> last() 

输入 输出量 单击以查看输出

_start_stop列已被省略。

_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 厨房 温度 22.8
2022-01-01T15:00:00Z 厨房 温度 22.7
2022-01-01T16:00:00Z 厨房 温度 22.4
2022-01-01T17:00:00Z 厨房 温度 22.7
2022-01-01T18:00:00Z 厨房 温度 23.3
2022-01-01T19:00:00Z 厨房 温度 23.1
2022-01-01T20:00:00Z 厨房 温度 22.7
_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 客厅 温度 22.3
2022-01-01T15:00:00Z 客厅 温度 22.3
2022-01-01T16:00:00Z 客厅 温度 22.4
2022-01-01T17:00:00Z 客厅 温度 22.6
2022-01-01T18:00:00Z 客厅 温度 22.8
2022-01-01T19:00:00Z 客厅 温度 22.5
2022-01-01T20:00:00Z 客厅 温度 22.2

返回每个房间的最高温度

使用“开始写入InfluxDB”中写入的数据:

  1. 查询temp字段。
  2. 使用max()返回行 其中最高值在每个表的_value列中。
from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp")     |> max() 

输入 输出量 单击以查看输出

_start_stop列已被省略。

_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 厨房 温度 22.8
2022-01-01T15:00:00Z 厨房 温度 22.7
2022-01-01T16:00:00Z 厨房 温度 22.4
2022-01-01T17:00:00Z 厨房 温度 22.7
2022-01-01T18:00:00Z 厨房 温度 23.3
2022-01-01T19:00:00Z 厨房 温度 23.1
2022-01-01T20:00:00Z 厨房 温度 22.7
_时间 _测量 房间 _字段 _值
2022-01-01T14:00:00Z 客厅 温度 22.3
2022-01-01T15:00:00Z 客厅 温度 22.3
2022-01-01T16:00:00Z 客厅 温度 22.4
2022-01-01T17:00:00Z 客厅 温度 22.6
2022-01-01T18:00:00Z 客厅 温度 22.8
2022-01-01T19:00:00Z 客厅 温度 22.5
2022-01-01T20:00:00Z 客厅 温度 22.2

Pivot data into a relational schema相关文章

如果来自关系SQL或类似SQL的查询语言,如InfluxQL, Flux使用的数据模型与您习惯的不同。 Flux返回多个表,其中每个表包含不同的字段。 “关系”模式将每个字段构造为每行中的一列。

使用pivot()函数 基于时间戳将数据透视到“关系”模式中。

from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")     |> filter(fn: (r) => r.room == "Kitchen")     |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") 

查看输入和透视输出

下采样数据

对数据进行下采样是一种提高查询时性能的策略,并且 优化长期数据存储。简单地说,下采样减少了 点,而不会丢失数据中的一般趋势。

有关对数据进行缩减采样的详细信息,请参见 下采样数据.

下采样数据的最常见方法是按时间间隔或“窗口”。 例如,您可能希望查询最近一小时的数据并返回平均值 每五分钟窗口的值。

使用aggregateWindow() 要按指定的时间间隔对数据进行下采样,请执行以下操作:

  • 使用every参数指定每个窗口的持续时间。
  • 使用fn参数指定聚合 或选择器功能 适用于每个窗口。
  • (可选)使用timeSrc参数指定要 用于为每个窗口创建新的聚合时间戳。 默认值为_stop
from(bucket: "get-started")     |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)     |> filter(fn: (r) => r._measurement == "home")     |> filter(fn: (r) => r._field == "temp")     |> aggregateWindow(every: 2h, fn: mean) 

查看输入和下采样输出

使用InfluxDB任务自动处理

InfluxDB任务是计划查询 其可以执行上述任何数据处理操作。 一般来说,任务会使用to()函数 将处理后的结果写回InfluxDB。

有关创建和配置任务的详细信息,请参阅 开始使用InfluxDB任务.

下采样任务示例

option task = {     name: "Example task"     every: 1d, }  from(bucket: "get-started-downsampled")     |> range(start: -task.every)     |> filter(fn: (r) => r._measurement == "home")     |> aggregateWindow(every: 2h, fn: mean)
我的微信