现在您已经了解了从InfluxDB查询数据的基础知识, 让我们超越基本查询,开始处理查询数据。 “处理”数据可能意味着转换、聚合、下采样或警报 数据。本教程涵盖以下数据处理用例:
大多数数据处理操作需要手动编辑Flux查询。 如果使用InfluxDB数据资源管理器,请切换到脚本编辑器 而不是使用Query Builder。
使用map()
函数 迭代数据中的每一行并更新该行中的值。 map()
是Flux中最有用的函数之一,将帮助您完成 其中许多数据处理操作需要执行。
了解有关map()
工作原理的更多信息
from(bucket: "get-started") |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "hum") |> map(fn: (r) => ({r with _value: r._value / 100.0}))
执行数学运算
map()
允许您对数据执行数学运算。 例如,使用在“开始写入InfluxDB”中写入的数据:
temp
字段以返回以°C为单位的室温。map()
迭代每一行,并将 _value
柱至°F,使用以下公式:°F = (°C * 1.8) + 32.0
.from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp") |> map(fn: (r) => ({r with _value: (r._value * 1.8) + 32.0}))
输入 输出量 单击以查看输出
_start
和_stop
列已被省略。
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
有条件地分配状态
数据警报
使用group()
函数 按特定列值重新组合数据,以准备进一步处理。
from(bucket: "get-started") |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> group(columns: ["room", "_field"])
理解数据分组及其重要性很重要,但可能太多了 这篇“入门”教程。 有关如何对数据进行分组及其重要性的更多信息,请参见 通量数据模型文档。
默认情况下,from()
返回从InfluxDB查询的数据,按系列分组 (测量、标签和字段键)。 返回的表流中的每个表表示一个组。 每个表都包含数据分组所依据的列的相同值。 在聚合数据时,此分组非常重要。
按特定列对数据进行分组
使用“开始写入InfluxDB”中写入的数据:
temp
和hum
字段。group()
仅按_field
列分组。from(bucket: "get-started") |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp" or r._field == "hum") |> group(columns: ["_field"])
输入 输出量 单击以查看输出
以下数据从最后一个filter()
输出,并通过管道传输到group()
:
_start
和_stop
列已被省略。
组密钥实例= [_measurement=home,room=Kitchen,_field=hum]
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T08:00:00Z | 家 | 厨房 | 嗡嗡声 | 35.9 |
2022-01-01T09:00:00Z | 家 | 厨房 | 嗡嗡声 | 36.2 |
2022-01-01T10:00:00Z | 家 | 厨房 | 嗡嗡声 | 36.1 |
组密钥实例= [_measurement=home,room=Living Room,_field=hum]
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T08:00:00Z | 家 | 客厅 | 嗡嗡声 | 35.9 |
2022-01-01T09:00:00Z | 家 | 客厅 | 嗡嗡声 | 35.9 |
2022-01-01T10:00:00Z | 家 | 客厅 | 嗡嗡声 | 36 |
组密钥实例= [_measurement=home,room=Kitchen,_field=temp]
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T08:00:00Z | 家 | 厨房 | 温度 | 21 |
2022-01-01T09:00:00Z | 家 | 厨房 | 温度 | 23 |
2022-01-01T10:00:00Z | 家 | 厨房 | 温度 | 22.7 |
组密钥实例= [_measurement=home,room=Living Room,_field=temp]
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T08:00:00Z | 家 | 客厅 | 温度 | 21.1 |
2022-01-01T09:00:00Z | 家 | 客厅 | 温度 | 21.4 |
2022-01-01T10:00:00Z | 家 | 客厅 | 温度 | 21.8 |
取消数据分组
使用“开始写入InfluxDB”中写入的数据:
temp
和hum
字段。group()
来“取消”数据分组或不按列分组。 columns
参数的默认值是空数组([]
)。from(bucket: "get-started") |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T10:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp" or r._field == "hum") |> group()
输入 输出量 单击以查看输出
以下数据从最后一个filter()
输出,并通过管道传输到group()
:
_start
和_stop
列已被省略。
组密钥实例= [_measurement=home,room=Kitchen,_field=hum]
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T08:00:00Z | 家 | 厨房 | 嗡嗡声 | 35.9 |
2022-01-01T09:00:00Z | 家 | 厨房 | 嗡嗡声 | 36.2 |
2022-01-01T10:00:00Z | 家 | 厨房 | 嗡嗡声 | 36.1 |
组密钥实例= [_measurement=home,room=Living Room,_field=hum]
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T08:00:00Z | 家 | 客厅 | 嗡嗡声 | 35.9 |
2022-01-01T09:00:00Z | 家 | 客厅 | 嗡嗡声 | 35.9 |
2022-01-01T10:00:00Z | 家 | 客厅 | 嗡嗡声 | 36 |
组密钥实例= [_measurement=home,room=Kitchen,_field=temp]
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T08:00:00Z | 家 | 厨房 | 温度 | 21 |
2022-01-01T09:00:00Z | 家 | 厨房 | 温度 | 23 |
2022-01-01T10:00:00Z | 家 | 厨房 | 温度 | 22.7 |
组密钥实例= [_measurement=home,room=Living Room,_field=temp]
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T08:00:00Z | 家 | 客厅 | 温度 | 21.1 |
2022-01-01T09:00:00Z | 家 | 客厅 | 温度 | 21.4 |
2022-01-01T10:00:00Z | 家 | 客厅 | 温度 | 21.8 |
使用通量聚合 或选择器功能 从每个输入表中返回聚合值或选定值。
from(bucket: "get-started") |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp") |> mean()
如果要查询一段时间内的聚合值,这是一种 下采样
聚合函数丢弃 不在组键中的列 并为每个输入表返回一行,其中包含该表的聚合值。
计算每个房间的平均温度
使用“开始写入InfluxDB”中写入的数据:
temp
字段。默认情况下,from()
返回分组为 _measurement
、room
和_field
,因此每个表代表一个房间。mean()
返回每个房间的平均温度。from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp") |> mean()
输入 输出量 单击以查看输出
_start
和_stop
列已被省略。
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
计算所有房间的整体平均温度
使用“开始写入InfluxDB”中写入的数据:
temp
字段。group()
将数据解组到单个表中。默认情况下, from()
返回按_measurement
、room
和_field
分组的数据。 要获得总体平均值,您需要将所有结果构建为单个表。mean()
返回平均温度。from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp") |> group() |> mean()
输入 输出量 单击以查看输出
以下输入数据表示通过管道向前传输的未分组数据 mean()
_start
和_stop
列已被省略。
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
计算所有字段中每个房间报告的点数
使用“开始写入InfluxDB”中写入的数据:
home
测量进行过滤来查询所有字段。home
测量中的字段是不同的类型。 使用toFloat()
将所有字段值转换为浮点数。group()
按room
对数据进行分组。count()
返回每个输入表中的行数。from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> toFloat() |> group(columns: ["room"]) |> count()
_start
和_stop
列已被省略。
房间 | _值 |
---|---|
厨房 | 21 |
房间 | _值 |
---|---|
客厅 | 21 |
_time
通常不是组密钥的一部分,在使用时将被删除 聚合函数。要为聚合点分配新的时间戳,请复制 表示查询边界的_start
或_stop
列作为 新建_time
列。
from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp") |> mean() |> duplicate(column: "_stop", as: "_time")
选择器功能返回 一个或多个列,并保留所有列及其值。
返回每个房间的第一个温度
使用“开始写入InfluxDB”中写入的数据:
temp
字段。first()
返回 每个表的第一行。from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp") |> first()
输入 输出量 单击以查看输出
_start
和_stop
列已被省略。
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
返回每个房间的最后温度
使用“开始写入InfluxDB”中写入的数据:
temp
字段。last()
返回 每个表的最后一行。from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp") |> last()
输入 输出量 单击以查看输出
_start
和_stop
列已被省略。
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
返回每个房间的最高温度
使用“开始写入InfluxDB”中写入的数据:
temp
字段。max()
返回行 其中最高值在每个表的_value
列中。from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp") |> max()
输入 输出量 单击以查看输出
_start
和_stop
列已被省略。
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 厨房 | 温度 | 22.8 |
2022-01-01T15:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T16:00:00Z | 家 | 厨房 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 厨房 | 温度 | 22.7 |
2022-01-01T18:00:00Z | 家 | 厨房 | 温度 | 23.3 |
2022-01-01T19:00:00Z | 家 | 厨房 | 温度 | 23.1 |
2022-01-01T20:00:00Z | 家 | 厨房 | 温度 | 22.7 |
_时间 | _测量 | 房间 | _字段 | _值 |
---|---|---|---|---|
2022-01-01T14:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T15:00:00Z | 家 | 客厅 | 温度 | 22.3 |
2022-01-01T16:00:00Z | 家 | 客厅 | 温度 | 22.4 |
2022-01-01T17:00:00Z | 家 | 客厅 | 温度 | 22.6 |
2022-01-01T18:00:00Z | 家 | 客厅 | 温度 | 22.8 |
2022-01-01T19:00:00Z | 家 | 客厅 | 温度 | 22.5 |
2022-01-01T20:00:00Z | 家 | 客厅 | 温度 | 22.2 |
如果来自关系SQL或类似SQL的查询语言,如InfluxQL, Flux使用的数据模型与您习惯的不同。 Flux返回多个表,其中每个表包含不同的字段。 “关系”模式将每个字段构造为每行中的一列。
使用pivot()
函数 基于时间戳将数据透视到“关系”模式中。
from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp") |> filter(fn: (r) => r.room == "Kitchen") |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
查看输入和透视输出
对数据进行下采样是一种提高查询时性能的策略,并且 优化长期数据存储。简单地说,下采样减少了 点,而不会丢失数据中的一般趋势。
有关对数据进行缩减采样的详细信息,请参见 下采样数据.
下采样数据的最常见方法是按时间间隔或“窗口”。 例如,您可能希望查询最近一小时的数据并返回平均值 每五分钟窗口的值。
使用aggregateWindow()
要按指定的时间间隔对数据进行下采样,请执行以下操作:
every
参数指定每个窗口的持续时间。fn
参数指定聚合 或选择器功能 适用于每个窗口。timeSrc
参数指定要 用于为每个窗口创建新的聚合时间戳。 默认值为_stop
。from(bucket: "get-started") |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z) |> filter(fn: (r) => r._measurement == "home") |> filter(fn: (r) => r._field == "temp") |> aggregateWindow(every: 2h, fn: mean)
查看输入和下采样输出
InfluxDB任务是计划查询 其可以执行上述任何数据处理操作。 一般来说,任务会使用to()
函数 将处理后的结果写回InfluxDB。
有关创建和配置任务的详细信息,请参阅 开始使用InfluxDB任务.
option task = { name: "Example task" every: 1d, } from(bucket: "get-started-downsampled") |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "home") |> aggregateWindow(every: 2h, fn: mean)