很多时候,我们需要快速的处理一些与时间相干性比较高的海量数据,比如IOT、监控、事件、告警等等,这一类数据我们称之为时序数据,传统数据库可以办理此类场景,但是当数据规模达到一定水平之后,开始变得没有那么好用了。
TimescaleDB是时序数据库中的一员,比较与众不同的是,他只是PostgreSQL的一个插件,对于PostgreSQL的爱好者来说,这是对pg非常好的一个加强。这篇文章会带你快速了解TimescaleDB的使用方法。
PS1:固然是快速入门教程,但内容还是有点多。
PS2:最快的学习方法是边学边练,如果以为自己摆设环境麻烦,可以使用一些免费在线数据库。
〇、时序表Hypertable
TimescaleDB的三个焦点特性:Hypertable、持续聚合、压缩。最基础的是Hypertable。
Hypertable是自动按时间对数据进行分区的 PostgreSQL 表。使用Hypertable的方式与常规的 PostgreSQL 表雷同,但增加了一些额外功能,使管理时序数据更加轻松。
0.1 创建Hypertable
先创建一个常规PostgreSQL表:
- CREATE TABLE conditions (
- time TIMESTAMPTZ NOT NULL,
- location TEXT NOT NULL,
- device TEXT NOT NULL,
- temperature DOUBLE PRECISION NULL,
- humidity DOUBLE PRECISION NULL
- );
复制代码 然后将其转换成Hypertable:
- SELECT create_hypertable('conditions', by_range('time'));
复制代码 0.2 调整Hypertable的chuck interval
调整chuck interval可以提升数据库的性能,查询当前chuck interval设置:
- SELECT h.table_name, c.interval_length
- FROM _timescaledb_catalog.dimension c
- JOIN _timescaledb_catalog.hypertable h
- ON h.id = c.hypertable_id;
复制代码 设置chuck interval的方法:
- SELECT create_hypertable(
- 'conditions',
- by_range('time', INTERVAL '1 day')
- );
复制代码 对已经存在的Hypertable调整chuck interval:
- SELECT set_chunk_time_interval('conditions', INTERVAL '24 hours');
复制代码 0.3 创建索引
- 在device_id上创建唯一索引,以time作为分区键。
- CREATE UNIQUE INDEX idx_deviceid_time
- ON hypertable_example(device_id, time);
复制代码
- 在time、user_id、device_id上创建唯一索引
- CREATE UNIQUE INDEX idx_userid_deviceid_time
- ON hypertable_example(user_id, device_id, time);
复制代码 在创建索引的时候,必须包含time字段,因为time字段是分区键。
0.4 删除Hypertable
一、写入数据
Timescale使PostgreSQL的扩展,因此支持以与PostgreSQL雷同的方式写入数据,使用INSERT、UPDATE、INSERT … ON CONFLICT、DELETE等命令进行数据的插入、更新和删除操纵。
1.1 Insert
- INSERT INTO conditions(time, location, temperature, humidity)
- VALUES (NOW(), 'office', 70.0, 50.0);
复制代码
- INSERT INTO conditions
- VALUES
- (NOW(), 'office', 70.0, 50.0),
- (NOW(), 'basement', 66.5, 60.0),
- (NOW(), 'garage', 77.0, 65.2);
复制代码
- INSERT INTO conditions
- VALUES (NOW(), 'office', 70.1, 50.1)
- RETURNING *;
复制代码 上面的语句的返回效果:
- time | location | temperature | humidity
- ------------------------------+----------+-------------+----------
- 2017-07-28 11:42:42.846621+00 | office | 70.1 | 50.1
- (1 row)
复制代码 1.2 Update
- UPDATE conditions
- SET temperature = 70.2, humidity = 50.0
- WHERE time = '2017-07-28 11:42:42.846621+00'
- AND location = 'office';
复制代码
- UPDATE conditions
- SET temperature = temperature + 0.1
- WHERE time >= '2017-07-28 11:40'
- AND time < '2017-07-28 11:50';
复制代码 1.3 Upsert
Upsert的语义是:
- 如果没有匹配(冲突)的数据,则插入一行新数据
- 如果有匹配(冲突)的数据,则更新数据或者什么也不做
下面是冲突则更新的写法(DO UPDATE):
- INSERT INTO conditions
- VALUES ('2017-07-28 11:42:42.846621+00', 'office', 70.2, 50.1)
- ON CONFLICT (time, location) DO UPDATE
- SET temperature = excluded.temperature,
- humidity = excluded.humidity;
复制代码 下面是冲突则什么也不做的写法(DO NOTHING):
- INSERT INTO conditions
- VALUES ('2017-07-28 11:42:42.846621+00', 'office', 70.1, 50.0)
- ON CONFLICT DO NOTHING;
复制代码 1.4 Delete
使用pg语法删除数据:
- DELETE FROM conditions WHERE temperature < 35 OR humidity < 60;
复制代码 1.5 数据保留策略
在时序应用步伐中,数据每每随着时间的增长而变得不那么有用。如果你不需要过期的历史数据,你可以在它达到一定年龄后将其删除。Timescale可以设置自动数据保留策略以丢弃旧数据。你也可以通过手动删除数据块来微调数据。
通常,我们只需要保留历史数据的统计汇总效果,但不需要保留原始数据。Timescale为开发者提供了内置的功能,答应你将持续聚合和数据保留策略合并使用。
1.5.1 按区块删除数据
Timescale数据保留实用于块,而不是行。逐行删除数据(例如使用PostgreSQL DELETE命令)会很慢,但是按块删除数据会更快,因为它会从磁盘中删除整个文件。它不需要垃圾网络和碎片整理。
无论是使用保留策略还是手动删除块,Timescale都会按块删除数据。它只删除所有数据都在指定时间范围内的块,如果该块还包含了其他时间段的数据,则不会被删除。
例如,我们大概存在如下三块数据:
- 数据块1:凌驾36小时
- 数据块2:在12到36小时之间
- 数据块3:过去12小时
此时如果我们手动drop凌驾24小时的数据块,则实际上只会删除数据块1。数据块2会被保留,因为它包含一些24小时内的数据,此时该数据块中的任何行都不会被删除。
1.5.2 创建数据保留策略
语法如下:
- SELECT add_retention_policy('conditions', INTERVAL '24 hours');
复制代码 上面的语句为conditions表设置的数据保留策略为保留24小时内的数据,凌驾24小时的数据块会被删除。
数据保留策略只能用来删除历史数据,要想删除未来时间的数据,可以手动删除:
- -- 删除超过24小时的数据
- SELECT drop_chunks('conditions', INTERVAL '24 hours');
- -- 删除一个范围内的数
- SELECT drop_chunks(
- 'conditions',
- older_than => INTERVAL '3 months',
- newer_than => INTERVAL '4 months'
- )
- -- 删除大于当前时间3个月的数据
- SELECT drop_chunks(
- 'conditions',
- newer_than => now() + INTERVAL '3 months'
- );
复制代码 1.5.3 删除数据保留策略
- SELECT remove_retention_policy('conditions');
复制代码 1.5.4 持续聚合和数据保留
你可以通过将数据保留策略与持续聚合结合起来对数据进行降采样。如果准确设置刷新策略,你可以从Hypertable中删除旧数据,而不会从持续聚合中删除它。如许一来,你就可以节省原始数据存储空间,同时保留用于历史分析的汇总数据。
留意 :当刷新持续聚合数据时,Timescale会根据刷新窗口中原始数据的更改来更新聚合数据。如果发现原始数据已被删除,它也会删除聚合数据。为了防止这种情况发生,请确保聚合数据的刷新窗口不会与任何已删除的数据重叠。
下面是一个示例:
- CREATE MATERIALIZED VIEW conditions_summary_daily (day, device, temp)
- WITH (timescaledb.continuous) AS
- SELECT time_bucket('1 day', time), device, avg(temperature)
- FROM conditions
- GROUP BY (1, 2);
- SELECT add_continuous_aggregate_policy('conditions_summary_daily', '7 days', '1 day', '1 day');
复制代码 上面的SQL创建了一个名为 conditions_summary_daily 的持续聚合,此中存储了每个设备的每日温度数据。该聚合每天刷新一次。每次刷新时,它会更新从 7 天前到 1 天前的任何数据更改。
你不应该在 conditions 表上设置 24 小时的保留策略。如果你如许做了,凌驾 1 天的分块将被删除。然后,根据数据更改,聚合会进行刷新。由于数据更改是删除凌驾 1 天的数据,聚合也会删除数据。最终导致 conditions_summary_daily 表中没有数据。
此时可以将conditions 表的保留策略设置的长一些,比如30天:
- SELECT add_retention_policy('conditions', INTERVAL '30 days');
复制代码 持续聚合表业可以设置数据保留策略,比如我们保留聚合数据600天。
- SELECT add_retention_policy('conditions_summary_daily ', INTERVAL '600 days');
复制代码 二、查询数据
Timescale支持PostgreSQL的所有查询语法,因此你可以像使用pg一样使用Timescale。同时Timescale还提供了一些额外的特性来帮助开发者更高效的分析数据:
- 使用SkipScan 加快 DISTINCT 查询
- 使用Hyperfunction 可以改进数据分析场景的查询体验
- 使用Function pipelines为SQL语言增加函数编程能力,使得对数据进行复杂转换变的简单。
2.1 简单SELECT查询
- SELECT * FROM conditions ORDER BY time DESC LIMIT 100;
复制代码
- SELECT COUNT(*) FROM conditions
- WHERE time > NOW() - INTERVAL '12 hours';
复制代码
- 统计conditions表中过去一天的数据中有空调的地方的数量
- SELECT COUNT(DISTINCT location) FROM conditions
- JOIN locations
- ON conditions.location = locations.location
- WHERE locations.air_conditioning = True
- AND time > NOW() - INTERVAL '1 day';
复制代码 2.2 使用Time bucket
Time bucket可以让我们更方便的按时间段对数据进行聚合。例如,可以将数据分组为5分钟、1小时、3天、1周的Time bucket,以计算统计数据,下面是一些例子:
在 weather_conditions 的表中计算每日平均温度:
- SELECT time_bucket('1 day', time) AS bucket,
- avg(temperature) AS avg_temp
- FROM weather_conditions
- GROUP BY bucket
- ORDER BY bucket ASC;
复制代码
- 按Time bucket分组进行数据统计,且显示Time bucket的结束时间
默认情况下,time_bucket 列的显示值为time bucket的开始时间,如果盼望显示接入时间,可以如下方式写SQL:
- SELECT time_bucket('5 min', time) + '5 min' AS bucket,
- min(cpu),
- max(cpu)
- FROM metrics
- GROUP BY bucket
- ORDER BY bucket DESC;
复制代码
- 按Time bucket分组进行数据统计,并修改时间范围
要更改Time bucket所覆盖的时间范围,可以使用偏移参数,该参数接受一个 INTERVAL 值。正偏移将Time bucket的起始和结束时间推后。负偏移将Time bucket的起始和结束时间提前。
例如,可以计算每5小时的平均 CPU 使用率,并将所有Time bucket的起始和结束时间推后1小时:
- SELECT time_bucket('5 hours', time, '1 hour'::INTERVAL) AS bucket,
- avg(cpu)
- FROM metrics
- GROUP BY bucket
- ORDER BY bucket DESC;
复制代码
- 计算一个特定时间的Time Bucket
- 获取过去3小时内每个位置每15分钟时段的气候状况信息,并计算采集的丈量次数、最高温度和最高湿度。按最高温度对效果进行排序:
- SELECT time_bucket('15 minutes', time) AS fifteen_min,
- location,
- COUNT(*),
- MAX(temperature) AS max_temp,
- MAX(humidity) AS max_hum
- FROM conditions
- WHERE time > NOW() - INTERVAL '3 hours'
- GROUP BY fifteen_min, location
- ORDER BY fifteen_min DESC, max_temp DESC;
复制代码 一些工作原理解释
Time bucket是一个时间段的形貌,比如1小时,起始点则决定了Time bucket的开始和结束时间。默认情况下,Time bucket并不从数据中最早的时间戳开始。例如,第一个数据点是在00:37收到的,但我们通常盼望每天的Time bucket从0点开始。同样的,我们大概在星期三网络到第一个数据点,但通常盼望每周的Time bucket从星期日或星期一开始计算。
假设我们的数据中最早的时间戳是2020年4月24日。如果我们的Time bucket隔断设置为两周,则第一个Time bucket不会从4月24日开始,因为那是一个星期五。它也不会从4月20日开始,因为那是前一周的星期一。它会从2020年4月13日开始,因为您可以通过从2000年1月3日开始以两周为单位计数来到达2020年4月13日,这里Timescale使用2000年1月3日作为默认起始点进行推导。
对于不涉及月份或年份的隔断,默认起始点是2000年1月3日。对于月份、年份或世纪的隔断,默认起始点是2000年1月1日。对于整数时间值, 默认起始点是0。
这些选择使得Time bucket的时间范围更加直观。由于2000年1月3日是星期一,以是周Time bucket从星期一开始。这符合ISO标准计算日历周的要求。月度和年度Time bucket使用2000年1月1日作为起始点。这使它们能够从日历月或年的第一天开始。
起始时间取决于您的时间值的数据类型。
如果您使用 TIMESTAMP,默认情况下,Time bucket的开始时间与 00:00:00 对齐。每日和每周的Time bucket从 00:00:00 开始。较短的Time bucket从您可以通过从原始日期的 00:00:00 开始按Time bucket增量计数到达的时间开始。
如果您使用 TIMESTAMPTZ,默认情况下,Time bucket的开始时间与 00:00:00 UTC 对齐。要将Time bucket与另一个时区对齐,需要设置时区参数。
- ERROR: cannot create continuous aggregate with incompatible bucket width
- DETAIL: Time bucket width of "<BUCKET>" [1 year] should be multiple of the time bucket width of "<BUCKET>" [1 day].
复制代码 如果实验创建分层持续聚合,必须使用兼容的Time bucket。不能在具有可变宽度Time bucket的持续聚合上创建具有固定宽度Time bucket的持续聚合。
2.3 使用SkipScan来加快DISTINCT查询
留意:SkipScan 现在不能在压缩数据上执行。
SkipScan 可以加快 DISTINCT 查询的查询时间。它实用于 PostgreSQL 表、Timescale Hypertable。SkipScan 包含在 TimescaleDB 2.2.1 及更高版本中。
要查询数据库并找到某项的最新值,您可以使用 DISTINCT 查询。例如,您大概想要找到每项投资的最新股票或加密货币代价。或者您大概有一个可视化图表和警报,周期性查询每个设备或监控项的最新值。
随着表变得越来越大,DISTINCT 查询每每会变得更慢。这是因为 PostgreSQL 现在没有很好的机制来从有序索引中获取唯一值列表。即使您有一个与此类查询的确切次序和列匹配的索引,PostgreSQL 也会扫描整个索引以找到所有唯一值。随着表的增长,此操纵变得越来越慢。
SkipScan 答应查询在不读取中心所有行的情况下渐渐从一个有序值跳到下一个值。如果不支持此功能,数据库引擎必须扫描整个有序索引,然后在结束时去重,这是一个非常慢的过程。
SkipScan 是对形式为 SELECT DISTINCT ON column_name 的查询进行优化的。从概念上讲,SkipScan 是一个常规的 IndexScan,它在索引中跳过查找下一个大于当前值的值。
使用SkipScan,需要创建需要满意如下条件的索引:
- DISTINCT 字段必须是索引的第一个字段
- 使用BTREE索引
- 索引的排序规则与查询语句保持一致
查询语句必须满意如下条件:
例如:
- CREATE INDEX "cpu_customer_tags_id_time_idx"
- ON readings (customer_id, tags_id, time DESC)
复制代码 2.4 高级分析
- 2.4.1 计算百分位数和中位数(一种特殊的百分位:50%)
- SELECT percentile_cont(0.5)
- WITHIN GROUP (ORDER BY temperature)
- FROM conditions;
复制代码
使用 sum(sum(column)) OVER(ORDER BY group) 计算累计总和:
- SELECT location, sum(sum(temperature)) OVER(ORDER BY location)
- FROM conditions
- GROUP BY location;
复制代码 上面的窗口函数 sum(sum(temperature)) OVER(ORDER BY location) 计算的方式如下:
- 首先,内部的 SUM(temperature) 函数会计算每个 location 的温度总和。
- 然后,外部的 SUM() 窗口函数将对上述计算的温度总和进行累加。
- OVER(ORDER BY location) 部分指定了窗口的排序方式,即按照 location 的次序进行排序。
因此,这个查询会返回一个按照 location 排序的效果集,此中每行的值表现该行之前所有行中对应位置的温度总和的累加值。
要计算简单移动平均值,使用 OVER 窗口函数跨越一定数量的行,然后在这些行上计算聚合函数。例如,要通过对近来十次读数进行平均来找出设备的平滑温度值:
- SELECT time, AVG(temperature) OVER(ORDER BY time
- ROWS BETWEEN 9 PRECEDING AND CURRENT ROW)
- AS smooth_temp
- FROM conditions
- WHERE location = 'garage' and time > NOW() - INTERVAL '1 day'
- ORDER BY time DESC;
复制代码 上面的SQL从 conditions 表中查询时间(time)和温度的平均值,此中平均值是基于时间窗口计算的。时间窗口包括当前行及其之前的近来的 9 行数据。如许可以计算出每个时间点(当前行)及其之前 9 个时间点的温度的平均值。
要计算值的增加,您需要思量计数器重置。如果主机重新启动或容器重新启动,则大概会发生计数器重置。此示例查找发送的字节数,并将计数器重置思量在内:
- SELECT
- time,
- (
- CASE
- WHEN bytes_sent >= lag(bytes_sent) OVER w
- THEN bytes_sent - lag(bytes_sent) OVER w
- WHEN lag(bytes_sent) OVER w IS NULL THEN NULL
- ELSE bytes_sent
- END
- ) AS "bytes"
- FROM net
- WHERE interface = 'eth0' AND time > NOW() - INTERVAL '1 day'
- WINDOW w AS (ORDER BY time)
- ORDER BY time
复制代码 上面的SQL从 net 表中查询时间(time)和字节数(“bytes”),此中字节数是根据前一行字节数计算得出的。
在 CASE 语句中:
- 如果当前行的字节数(bytes_sent)大于或等于前一行的字节数,则计算当前行字节数与前一行字节数之间的差值,并将效果作为当前行的字节数。
- 如果前一行的字节数为 NULL,则将当前行的字节数设为 NULL。
- 否则,将当前行的字节数设为当前行的字节数(bytes_sent)。
- 2.4.5 计算变革率
与增量类似,变革率实用于具有单调增加计数器的情况。如果您的采样隔断是可变的,或者在不同系列之间使用不同的采样隔断,将值归一化为公共时间隔断以使计算出的值可比较将会很有帮助。此示例查找每秒发送的字节数,并思量计数器重置:
- SELECT
- time,
- (
- CASE
- WHEN bytes_sent >= lag(bytes_sent) OVER w
- THEN bytes_sent - lag(bytes_sent) OVER w
- WHEN lag(bytes_sent) OVER w IS NULL THEN NULL
- ELSE bytes_sent
- END
- ) / extract(epoch from time - lag(time) OVER w) AS "bytes_per_second"
- FROM net
- WHERE interface = 'eth0' AND time > NOW() - INTERVAL '1 day'
- WINDOW w AS (ORDER BY time)
- ORDER BY time
复制代码 上面的SQL从 net 表中查询时间(time)和传输速率(“bytes_per_second”)的数据。
对于每一行数据,使用 CASE 语句来判断:
- 如果当前行的 bytes_sent 大于或等于上一行的 bytes_sent,则计算两行之间的字节差值,然后除以两行之间的时间隔断,得到字节传输速率。
- 如果上一行的 bytes_sent 为 NULL,则返回 NULL。
- 否则,将当前行的 bytes_sent 作为字节传输速率。
在这个SQL中,WINDOW 子句定义了一个窗口(window),命名为 w,并指定了排序规则,即按照时间(time)字段的次序进行排序(ORDER BY time),这个窗口 w 被用于两个地方:
- 在两个 OVER 子句中使用了窗口 w。这些 OVER 子句是用来执行窗口函数的,即 lag() 函数,用于获取前一行的数据。通过指定 w,我们告诉数据库应该按照窗口 w 中定义的排序规则来获取前一行的数据。
- 在主查询中,将窗口 w 用作 window function 的参数。在 CASE 语句中,通过指定 OVER w,我们指示数据库在计算 lag() 函数时应该按照窗口 w 中定义的排序规则来获取前一行的数据。如许可以确保计算字节传输速率时使用的是准确的时间隔断。
总之,WINDOW 子句在这个 SQL 中的作用是定义了一个命名窗口 w,并指定了排序规则,以确保在窗口函数中准确地获取前一行的数据,从而计算出准确的字节传输速率。
在很多监控和物联网(IoT)应用场景中,设备或传感器报告的指标不经常变革,任何变革都被视为异常。当您查询随时间变革的这些值时,通常不盼望传输所有值,而只盼望传输观察到变革的值。这有助于最小化发送的数据量。您可以使用窗口函数和子查询的组合来实现此目的。此示例使用差别来过滤未更改值的行,并仅传输发生更改的行:
- SELECT time, value FROM (
- SELECT time,
- value,
- value - LAG(value) OVER (ORDER BY time) AS diff
- FROM hypertable) ht
- WHERE diff IS NULL OR diff != 0;
复制代码
要根据某个字段对数据进行分组,并计算每个组内指标的变革量,可以使用 LAG … OVER (PARTITION BY …)。例如,给定一些气候数据,计算每个城市的温度变革量:
- SELECT ts, city_name, temp_delta
- FROM (
- SELECT
- ts,
- city_name,
- avg_temp - LAG(avg_temp) OVER (PARTITION BY city_name ORDER BY ts) as temp_delta
- FROM weather_metrics_daily
- ) AS temp_change
- WHERE temp_delta IS NOT NULL
- ORDER BY bucket;
复制代码 上面的SQL从 weather_metrics_daily 表中查询时间戳(ts)、城市名称(city_name)和温度变革量(temp_delta)的数据。
在内部查询中,对每个城市按时间戳排序,计算每个城市平均温度与上一个时间戳的平均温度之间的差值(temp_delta)。
末了,从内部查询中选择非空的温度变革量数据,并按照 bucket 字段进行排序(这里的 bucket 字段没有在内部查询中定义)。
这个查询的目的是找出每个城市在相邻时间戳之间的平均温度变革量。
Timescale提供了first和last函数,答应你按另一列的次序获取一列的值。
- SELECT location, last(temperature, time)
- FROM conditions
- GROUP BY location;
- SELECT time_bucket('5 minutes', time) five_min, location, last(temperature, time)
- FROM conditions
- GROUP BY five_min, location
- ORDER BY five_min DESC LIMIT 12;
复制代码
Timescale的 [histogram](https://docs.timescale.com/api/latest/hyperfunctions/histogram/) 函数答应你天生数据的直方图。此示例定义了一个直方图,此中有五个桶,定义在范围 60 到 85 之间。天生的直方图有七个箱子:第一个箱子用于值低于最小阈值 60,中心的五个箱子用于指定范围内的值,末了一个箱子用于值高于 85 的值:
- SELECT location, COUNT(*),
- histogram(temperature, 60.0, 85.0, 5)
- FROM conditions
- WHERE time > NOW() - INTERVAL '7 days'
- GROUP BY location;
复制代码 效果如下:
- location | count | histogram
- ------------+-------+-------------------------
- office | 10080 | {0,0,3860,6220,0,0,0}
- basement | 10080 | {0,6056,4024,0,0,0,0}
- garage | 10080 | {0,2679,957,2420,2150,1874,0}
复制代码
您可以显示所选时间范围内的记载,即使部分时间范围内没有数据也可以。这称为填补间隙,通常涉及对缺失数据记载空值的操纵。
下面示例中,使用包含时间戳、正在交易的资产代码、资产代价和交易的资产量的交易数据。
创建一个查询,用于获取九月份每天交易的资产“TIMS”的交易量:
- SELECT
- time_bucket('1 day', time) AS date,
- sum(volume) AS volume
- FROM trades
- WHERE asset_code = 'TIMS'
- AND time >= '2021-09-01' AND time < '2021-10-01'
- GROUP BY date
- ORDER BY date DESC;
复制代码 输出如下:
- date | volume
- ------------------------+--------
- 2021-09-29 00:00:00+00 | 11315
- 2021-09-28 00:00:00+00 | 8216
- 2021-09-27 00:00:00+00 | 5591
- 2021-09-26 00:00:00+00 | 9182
- 2021-09-25 00:00:00+00 | 14359
- 2021-09-22 00:00:00+00 | 9855
复制代码 可以看到,上面的输出缺少了一些日期的数据,可以通过time_bucket_gapfill 函数来补全数据,以方便效果的使用:
- SELECT
- time_bucket_gapfill('1 day', time) AS date,
- sum(volume) AS volume
- FROM trades
- WHERE asset_code = 'TIMS'
- AND time >= '2021-09-01' AND time < '2021-10-01'
- GROUP BY date
- ORDER BY date DESC;
复制代码 输出如下:
- date | volume
- ------------------------+--------
- 2021-09-30 00:00:00+00 |
- 2021-09-29 00:00:00+00 | 11315
- 2021-09-28 00:00:00+00 | 8216
- 2021-09-27 00:00:00+00 | 5591
- 2021-09-26 00:00:00+00 | 9182
- 2021-09-25 00:00:00+00 | 14359
- 2021-09-24 00:00:00+00 |
- 2021-09-23 00:00:00+00 |
- 2021-09-22 00:00:00+00 | 9855
复制代码 还可以使用 Timescale 的 time_bucket_gapfill 函数天生包含时间戳的数据点。这对于图表展示非常有用,便于它们可以准确地在图表中绘制间隙。下面这个例子,在过去两周内天生了 1080 个数据点,填充了空缺值,并为每个空值分配了一个时间戳。
- SELECT
- time_bucket_gapfill(INTERVAL '2 weeks' / 1080, time, now() - INTERVAL '2 weeks', now()) AS btime,
- sum(volume) AS volume
- FROM trades
- WHERE asset_code = 'TIMS'
- AND time >= now() - INTERVAL '2 weeks' AND time < now()
- GROUP BY btime
- ORDER BY btime;
复制代码 效果如下:
- btime | volume
- ------------------------+----------
- 2021-03-09 17:28:00+00 | 1085.25
- 2021-03-09 17:46:40+00 | 1020.42
- 2021-03-09 18:05:20+00 |
- 2021-03-09 18:24:00+00 | 1031.25
- 2021-03-09 18:42:40+00 | 1049.09
- 2021-03-09 19:01:20+00 | 1083.80
- 2021-03-09 19:20:00+00 | 1092.66
- 2021-03-09 19:38:40+00 |
- 2021-03-09 19:57:20+00 | 1048.42
- 2021-03-09 20:16:00+00 | 1063.17
- 2021-03-09 20:34:40+00 | 1054.10
- 2021-03-09 20:53:20+00 | 1037.78
复制代码
如果您的数据集仅在实际值发生变革时记载一行数据,但您的可视化界面大概仍旧需要所有数据点才能准确显示效果。在这种情况下,可以将上一次观察到的值通报到缺失的数据点中。例如:
- SELECT
- time_bucket_gapfill(INTERVAL '5 min', time, now() - INTERVAL '2 weeks', now()) as 5min,
- meter_id,
- locf(avg(data_value)) AS data_value
- FROM my_hypertable
- WHERE
- time > now() - INTERVAL '2 weeks'
- AND meter_id IN (1,2,3,4)
- GROUP BY 5min, meter_id
复制代码
您可以在数据库中找到每个唯一项目的末了一个数据点。例如,每个物联网设备的最跋文录的丈量值。减少搜索末了一个数据点所需数据量的标准方法是使用时间谓词来严酷限制要遍历的时间量或块的数量。除非所有项目在时间范围内至少有一条记载,否则此方法不起作用。更结实的方法是使用末了一个数据点查询来确定每个唯一项目的末了一条记载。
我们看一个例子:
- CREATE TABLE vehicles (
- vehicle_id INTEGER PRIMARY KEY,
- vin_number CHAR(17),
- last_checkup TIMESTAMP
- );
- CREATE TABLE location (
- time TIMESTAMP NOT NULL,
- vehicle_id INTEGER REFERENCES vehicles (vehicle_id),
- latitude FLOAT,
- longitude FLOAT
- );
- SELECT create_hypertable('location', by_range('time'));
复制代码 可以使用第一个表来对location表执行 LATERAL JOIN ,该表提供了一组不同的车辆:
- SELECT data.* FROM vehicles v
- INNER JOIN LATERAL (
- SELECT * FROM location l
- WHERE l.vehicle_id = v.vehicle_id
- ORDER BY time DESC LIMIT 1
- ) AS data
- ON true
- ORDER BY v.vehicle_id, data.time DESC;
- time | vehicle_id | latitude | longitude
- ----------------------------+------------+-----------+-------------
- 2017-12-19 20:58:20.071784 | 72 | 40.753690 | -73.980340
- 2017-12-20 11:19:30.837041 | 156 | 40.729265 | -73.993611
- 2017-12-15 18:54:01.185027 | 231 | 40.350437 | -74.651954
复制代码 这种方法需要保留一个单独的表来存储唯一项目的标识符或名称。可以如上所示,通过 REFERENCES 定义Hypertable到元数据表的外键来实现这一点。
元数据表(本例中即:vehicles )可以通过业务逻辑进行填充,例如当车辆首次在系统中注册时。别的,您还可以在对Hypertable执行插入或更新时使用触发器动态填充它。例如:
- CREATE OR REPLACE FUNCTION create_vehicle_trigger_fn()
- RETURNS TRIGGER LANGUAGE PLPGSQL AS
- $BODY$
- BEGIN
- INSERT INTO vehicles VALUES(NEW.vehicle_id, NULL, NULL) ON CONFLICT DO NOTHING;
- RETURN NEW;
- END
- $BODY$;
- CREATE TRIGGER create_vehicle_trigger
- BEFORE INSERT OR UPDATE ON location
- FOR EACH ROW EXECUTE PROCEDURE create_vehicle_trigger_fn();
复制代码 三、数据压缩
TimescaleDB以更高效的格式存储数据,与普通PostgreSQL表相比,压缩率高达20倍。TimescaleDB压缩是在PostgreSQL中原生实现的,不需要特殊的存储格式。
列存压缩的一个额外利益是,某些查询明显更快,因为需要读取到内存中的数据更少。
3.1 设置压缩策略
以下面的表为例,我们演示一下如何使用压缩:
- CREATE TABLE "metrics"(
- created timestamp with time zone default now() not null,
- type_id integer not null,
- value double precision not null
- );
- SELECT create_hypertable('metrics', by_range('created'));
复制代码
- 使用Alter table命令,对表启用压缩,并设置segmentby字段和orderby字段:
- ALTER TABLE metrics
- SET (
- timescaledb.compress,
- timescaledb.compress_segmentby='type_id',
- timescaledb.compress_orderby='created DESC'
- );
复制代码 segmentby和orderby的选择不同,性能和压缩比会不一样,如何准确的选中列,参考这里。
- SELECT compress_chunk(c) from show_chunks('metrics') c;
复制代码 也可以配置自动压缩策略,在下一节中会详细介绍。
- SELECT
- pg_size_pretty(before_compression_total_bytes) as before,
- pg_size_pretty(after_compression_total_bytes) as after
- FROM hypertable_compression_stats('metrics');
复制代码 效果:
- before | after
- --------+-------
- 180 MB | 16 MB
- (1 row)
复制代码 3.2 设置自动压缩策略
为了制止每次有数据要压缩时都手动运行压缩,可以设置压缩策略。压缩策略答应您压缩凌驾特定时间的数据,例如,压缩凌驾8天的所有数据块:
- SELECT add_compression_policy('metrics', INTERVAL '8 days');
复制代码 压缩策略定期运行,默认情况下每天运行一次,这意味着使用上述设置,大概有长达9天的未压缩数据。
更多关于压缩策略的信息可以检察这里。
3.3 查询加快
前面我们将压缩设置为按type_id列值进行分段(segmentby),这意味着通过对该列进行过滤或分组来获取数据将更加高效。同时,我们按created字段进行降序排序,在查询语句中使用该字段进行降序排序查询性能会更好。
下面是一个使用上述规则加快查询的例子:
- SELECT time_bucket('1 day', created, 'Europe/Berlin') AS "time",
- round((last(value, created) - first(value, created)) *
- 100.) / 100. AS value
- FROM metrics
- WHERE type_id = 5
- GROUP BY 1;
复制代码 在压缩和解压的情况下,分别执行上述SQL,会看到相当大的性能差别。
解压数据的方法:
- SELECT decompress_chunk(c) from show_chunks('metrics') c;
复制代码 四、持续聚合
数据规模随着时间推移变得非常庞大是时序数据的典范特点,持续聚合旨在加快对非常大型数据集的查询速度。Timescale 持续聚合使用 PostgreSQL 的雾化视图,并以持续和增量方式在后台刷新数据,因此当您运行查询时,只需计算已更改的数据,而不是整个数据集。
4.1 创建持续聚合
创建一个持续聚合分两步:
持续聚合依赖Time bucket,下面是一个例子:
- CREATE MATERIALIZED VIEW conditions_summary_daily
- WITH (timescaledb.continuous) AS
- SELECT device,
- time_bucket(INTERVAL '1 day', time) AS bucket,
- AVG(temperature),
- MAX(temperature),
- MIN(temperature)
- FROM conditions
- GROUP BY device, bucket;
复制代码- SELECT add_continuous_aggregate_policy('conditions_summary_daily',
- start_offset => INTERVAL '1 month',
- end_offset => INTERVAL '1 day',
- schedule_interval => INTERVAL '1 hour');
复制代码 默认情况下,当您首次创建视图时,它会被填充数据。这是为了使聚合可以跨整个超表进行计算。如果您不盼望发生这种情况,例如如果表非常大,或者正在不断添加新数据,您可以控制数据刷新的次序。您可以通过在持续聚合策略中使用 WITH NO DATA 选项来添加手动刷新。
WITH NO DATA 选项答应立即创建持续聚合,因此您无需等候数据被聚合。数据只有在策略开始运行时才开始填充。这意味着只有新于 start_offset 时间的数据才开始填充持续聚合。如果您有比 start_offset 时间隔断更早的历史数据,则需要手动刷新历史数据,直到当前的 start_offset,以便及时查询能够高效运行。
- CREATE MATERIALIZED VIEW cagg_rides_view
- WITH (timescaledb.continuous) AS
- SELECT vendor_id,
- time_bucket('1h', pickup_datetime) AS hour,
- count(*) total_rides,
- avg(fare_amount) avg_fare,
- max(trip_distance) as max_trip_distance,
- min(trip_distance) as min_trip_distance
- FROM rides
- GROUP BY vendor_id, time_bucket('1h', pickup_datetime)
- WITH NO DATA;
复制代码 手动刷新视图:
- CALL refresh_continuous_aggregate('cagg_rides_view', NULL, localtimestamp - INTERVAL '1 week');
复制代码 添加刷新策略:
- SELECT add_continuous_aggregate_policy('cagg_rides_view',
- start_offset => INTERVAL '1 week',
- end_offset => INTERVAL '1 hour',
- schedule_interval => INTERVAL '30 minutes');
复制代码- CREATE MATERIALIZED VIEW conditions_summary_daily_3
- WITH (timescaledb.continuous) AS
- SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
- AVG(temperature),
- MAX(temperature),
- MIN(temperature),
- name
- FROM devices JOIN conditions USING (device_id)
- GROUP BY name, bucket;
复制代码 4.2 查询持续聚合的数据
跟查询普通表和视图一样:
- SELECT *
- FROM conditions_summary_hourly
- WHERE device = 5
- AND bucket >= '2020-01-01'
- AND bucket < '2020-04-01';
复制代码 4.3 及时聚合
持续聚合是周期性刷新的,因此通常不包括来自底层Hypertable的最新数据块。及时聚合会使用持续聚合数据,并将最新的原始数据添加到此中,以提供准确和最新的效果,而无需在写入数据时进行聚合。
在 Timescale 的版本 1.7 到 2.12 中,默认情况下启用了及时聚合;当您创建一个持续聚合视图时,对该视图的查询将包括最新的数据,即使它尚未被聚合。在 Timescale 2.13 及更高版本中,默认情况下禁用了及时聚合。
- ALTER MATERIALIZED VIEW table_name set (timescaledb.materialized_only = false);
复制代码
- ALTER MATERIALIZED VIEW table_name set (timescaledb.materialized_only = true);
复制代码 开启及时聚合后,在查询持续聚合时会自动添加最新的数据。换句话说,它们包括比您上次物化的数据更新的数据。
如果您向已物化的数据中添加新的历史数据,则这些数据不会反映在及时聚合中。这些历史数据需要等到下一轮刷新,或者通过调用 refresh_continuous_aggregate 进行手动刷新才能被查询到。您可以将及时聚合视为对于历史数据的最终一致性。
4.4 与窗口函数一起使用
现在,持续聚合不支持窗口函数。您可以通过以下方式办理此问题:
- 首先为查询的其他部分创建持续聚合,
- 然后在查询时对您的持续聚合使用窗口函数。
例如,假设您有一个名为 example 的Hypertable,此中包含一个time列和一个value列。您将数据按时间进行分段,并使用 lag 窗口函数计算time bucket之间的增量:
- WITH t AS (
- SELECT
- time_bucket('10 minutes', time) as bucket,
- first(value, time) as value
- FROM example GROUP BY bucket
- )
- SELECT
- bucket,
- value - lag(value, 1) OVER (ORDER BY bucket) delta
- FROM t;
复制代码 你无法直接使用上面的SQL创建持续聚合,因为上面的SQL使用了窗口函数lag。你可以先把窗口函数清除,创建一个持续聚合:
- CREATE MATERIALIZED VIEW example_aggregate
- WITH (timescaledb.continuous) AS
- SELECT
- time_bucket('10 minutes', time) AS bucket,
- first(value, time) AS value
- FROM example GROUP BY bucket;
复制代码 然后在查询时使用lag函数在持续聚合上计算差量:
- SELECT
- bucket,
- value - lag(value, 1) OVER (ORDER BY bucket) AS delta
- FROM example_aggregate;
复制代码 这可以使用持续聚合的能力加快查询。
4.5 级联/多层持续聚合
Timescale答应在其他持续聚合之上创建持续聚合,如许可以让你以不同的粒度汇总数据。例如,你大概有一个按小时汇总逐分钟数据的持续聚合。要获取每日摘要,您可以在小时级聚合的基础上创建一个新的持续聚合。这比在原始Hypertable之上创建每日聚合更高效,因为您可以重用小时级聚合的计算效果。
在持续聚合上创建持续聚合跟在Hypertable上创建持续聚合的语法千篇一律,就不再赘述。
及时聚合
默认情况下,所有持续聚合都使用及时聚合,这意味着对持续聚合的所有查询得到的总是及时最新的数据。Timescale通过将持续聚合产生的物化视图与源表或未雾化的原始数据进行毗连来实现这一点。
当持续聚合被级联时,每个持续聚合仅了解其下一层。未物化数据的毗连会递归进行,直到到达底层,从而确保在任何层级都能访问到最新的及时数据。
如果您将级联中的所有持续聚合都保持为及时聚合,底层是源Hypertable,这意味着级联中的每个持续聚合都可以访问所有近来的数据。
如果级联中的某个位置存在非及时持续聚合,递归JOIN将在该非及时持续聚合处停止。较高层的持续聚合不会从较低层接收任何未物化的数据。
例如,假设您有以下持续聚合:
- 在源Hypertable上进行及时每小时持续聚合
- 在每小时持续聚合上进行及时每日持续聚合
- 在每日持续聚合上进行非及时(仅雾化)每月持续聚合
- 在每月持续聚合上进行及时每年持续聚合
每小时和每日持续聚合的查询包括来自源Hypertable的及时、未物化的数据。每月持续聚合的查询仅返回已物化的数据。每年持续聚合的查询返回来自每年持续聚合本身的物化数据,以及来自每月持续聚合的近期的数据。但是,数据受限于每月持续聚合中已物化的内容,而且不会从源Hypertable获取最新的数据。
4.6 在持续聚合上创建索引
当您创建连续聚合时,会自动为每个 GROUP BY 列创建一个索引。该索引是一个复合索引,将 GROUP BY 列与 time_bucket 列结合起来。
例如,如果您定义了一个带有 GROUP BY 设备、位置、桶的连续聚合视图,那么会创建两个复合索引:一个在 {device, bucket} 上,另一个在 {location, bucket} 上。
可以在创建持续聚合时,将 timescaledb.create_group_indexes 设置为 false 来关闭自动创建索引的功能:
- CREATE MATERIALIZED VIEW conditions_daily
- WITH (timescaledb.continuous, timescaledb.create_group_indexes=false)
- AS
- ...
复制代码
Timescale 2.7开始,答应在任意字段上创建索引,就跟pg索引的语法一样:
- CREATE INDEX avg_temp_idx ON weather_daily (avg_temp);
复制代码 4.7 数据压缩
持续聚合通常是对历史数据的采样,因此如果数据只是用来查询分析用,不会进行修改,则可以开启压缩以节省空间。
连续聚合上的压缩与Hypertable上的压缩工作方式类似。当启用压缩且没有提供其他选项时,segment_by 值将自动设置为连续聚合的 group by 列,而且 time_bucket 列将用作压缩配置中的 order_by 列。
- ALTER MATERIALIZED VIEW cagg_name set (timescaledb.compress = true);
复制代码
- ALTER MATERIALIZED VIEW cagg_name set (timescaledb.compress = false);
复制代码 如果持续聚合上有压缩的块,关闭压缩会失败,此时需要先解压数据
- SELECT decompress_chunk(c, true) FROM show_chunks('cagg_name') c;
复制代码 设置压缩策略
在为连续聚合设置压缩策略之前,需要先设置一个刷新策略。压缩策略的隔断应设置得不会对正在刷新的地区进行压缩。这是为了防止刷新策略失败。
以下面的刷新策略为例:
- SELECT add_continuous_aggregate_policy('cagg_name',
- start_offset => INTERVAL '30 days',
- end_offset => INTERVAL '1 day',
- schedule_interval => INTERVAL '1 hour');
复制代码 压缩策略需要设置compress_after 参数的值大于刷新策略的start_offset 的值:
- SELECT add_compression_policy('cagg_name', compress_after=>'45 days'::interval);
复制代码 4.8 关于时间和时区
在连续聚合中,不支持依赖于本地时区设置的函数,无法调整到本地时间,因为时区设置会因用户而异。
为了办理这个问题,可以在视图定义中使用显式时区。别的,还可以为使用整数时间列的表创建自己的自定义聚合方案。
- CREATE MATERIALIZED VIEW device_summary
- WITH (timescaledb.continuous)
- AS
- SELECT
- time_bucket('1 hour', observation_time) AS bucket,
- min(observation_time AT TIME ZONE 'EST') AS min_time,
- device_id,
- avg(metric) AS metric_avg,
- max(metric) - min(metric) AS metric_spread
- FROM
- device_readings
- GROUP BY bucket, device_id;
复制代码
日期和时间通常以年-月-日和小时:分钟:秒表现。大多数 Timescale 数据库使用日期/时间类型列来表现日期和时间。然而,在某些情况下,您大概需要将这些常见的时间和日期格式转换为使用整数的格式。最常见的整数时间是 Unix 时间戳,它是自 1970-01-01 Unix 纪元以来的秒数,但也有大概使用其他类型的基于整数的时间格式。
要创建一个使用整数列作为时间的Hypertable,需要提供块时间隔断。下面的示例使用每个块10 分钟作为隔断:
- -- 基础表,time字段是整数
- CREATE TABLE devices(
- time BIGINT, -- Time in minutes since epoch
- cpu_usage INTEGER, -- Total CPU usage
- disk_usage INTEGER, -- Total disk usage
- PRIMARY KEY (time)
- );
- -- Hypertable,间隔时间为10分钟
- SELECT create_hypertable('devices', by_range('time', 10));
复制代码 要在使用整数型时间的Hypertable上定义一个连续聚合,您需要一个函数以准确的格式获取当前时间,并将其设置为Hypertable。您可以使用 set_integer_now_func 函数来实现这一点。它可以被定义为一个普通的 PostgreSQL 函数,但必须是 STABLE,不带参数,并返回与表中时间列雷同类型的整数值。当您设置好时间处理后,就可以创建连续聚合了。
创建一个函数将事件转换为Unix时间戳:
- CREATE FUNCTION current_epoch() RETURNS BIGINT
- LANGUAGE SQL STABLE AS $$
- SELECT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::bigint;$$;
- SELECT set_integer_now_func('devices', 'current_epoch');
复制代码 为devices表创建持续聚合:
- CREATE MATERIALIZED VIEW devices_summary
- WITH (timescaledb.continuous) AS
- SELECT time_bucket('500', time) AS bucket,
- avg(cpu_usage) AS avg_cpu,
- avg(disk_usage) AS avg_disk
- FROM devices
- GROUP BY bucket;
复制代码 插入测试数据:
- CREATE EXTENSION tablefunc;
- INSERT INTO devices(time, cpu_usage, disk_usage)
- SELECT time,
- normal_rand(1,70,10) AS cpu_usage,
- normal_rand(1,2,1) * (row_number() over()) AS disk_usage
- FROM generate_series(1,10000) AS time;
复制代码 上面的SQL使用 tablefunc 扩展来天生正态分布的数据,并使用 row_number 函数将其转换为累积序列。
TimescaleDB是时序数据库中的一员,比较与众不同的是,他只是PostgreSQL的一个插件,对于PostgreSQL的爱好者来说,这是对pg非常好的一个加强。这篇文章会带你快速了解TimescaleDB的使用方法。
PS1:固然是快速入门教程,但内容还是有点多。
PS2:最快的学习方法是边学边练,如果以为自己摆设环境麻烦,可以使用一些免费在线数据库。
原文:https://zhuanlan.zhihu.com/p/689183857
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |