TimescaleDB | 时序数据库

打印 上一主题 下一主题

主题 1492|帖子 1492|积分 4476

为啥需要时序数据库

● 关系数据库单表数据量大后查询慢(关系数据库单表数据几万万上亿后查询变慢明显)
● 时序数据特点,高写入并发,写入后很少修改
● 数据压缩、自动清算本领
为啥利用TimescaleDB

灵活性:TimescaleDB支持标准SQL,对于风俗SQL的团队来说更轻易上手,且能更好地与现有的分析工具和框架集成(我们的业务存在多指标关联分析行转列等需求可以方便处理)。
扩展性:作为PostgreSQL的扩展,TimescaleDB继承了其强盛的生态系统和可扩展性,适用于从小型到超大规模的数据集(比如大模型需要的向量数据库和搜索场景需要的倒排索引和相关性排序)。
运维与学习本钱:如果团队对PostgreSQL或Mysql熟悉,转向TimescaleDB的迁移本钱相对较低。
生产本钱:同时支持关系表和时序表,不需要摆设两套数据库。
局限性:相比原生时序数据库如InfluxDB、TDengine等,大数据量时性能略低,存储占用空间更大。
一些概念

TimescaleDB:本质是Postgresql数据库的扩展,Postgresql原本关系数据库的本领依旧在,可以同时支持时序表和关系表。
分区(partition):把数据按时间或者其他维度分别成多个物理表(chunk),每个表就是一个分区。
时序超表hypertable):本质上是Postgresql的分区表,相比于分区表提供了自动分区、元数据管理、查询优化、插入优化、数据压缩保留策略、连续聚合本领等。
VACUUM:主要用于清算数据库中的死元组,以及接纳空间以进步存储效率和性能。
本地摆设

  1. docker run -d --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=123456 -v /home/timescaledb/data:/var/lib/postgresql/data timescale/timescaledb
复制代码
客户端访问

可以用DBeaver + PostgreSQL驱动
日常利用

普通表的DML及DDL等操作兼容Postgresql语法。
创建时序

  1. -- 1. 创建普通表
  2. CREATE TABLE IF NOT EXISTS dm_tag_value (
  3.                               tag_name VARCHAR(200)  NOT NULL ,
  4.                               tag_value VARCHAR(1000) DEFAULT '',
  5.                               time TIMESTAMP  NOT NULL
  6. );
  7. COMMENT ON COLUMN dm_tag_value.tag_name IS '位号名';
  8. COMMENT ON COLUMN dm_tag_value.tag_value IS '位号值';
  9. COMMENT ON COLUMN dm_tag_value.time IS '时间';
  10. -- 2. 创建超表
  11. -- 默认按周分区(字段类型不同默认值不同)
  12. SELECT create_hypertable('dm_tag_value', by_range('time'));
  13. -- 指定分区间隔
  14. SELECT create_hypertable('dm_tag_value', by_range('time', INTERVAL '7 hours'));
  15. -- 修改时间间隔,修改只对未来数据有效(等现有分区结束)
  16. SELECT set_chunk_time_interval('dm_tag_value', INTERVAL '24 hours');
  17. -- 高级能力,时间字段在大字段里面
  18. CREATE TABLE my_table (
  19.    metric_id serial not null,
  20.    data jsonb,
  21. );
  22. CREATE FUNCTION get_time(jsonb) RETURNS timestamptz AS $$
  23.   SELECT ($1->>'time')::timestamptz
  24. $$ LANGUAGE sql IMMUTABLE;
  25. SELECT create_hypertable('my_table', by_range('data', '1 day', 'get_time'));
  26. -- 添加更多分区维度
  27. SELECT add_dimension('m_tag_value', by_hash('tag_name', 4));
复制代码
查看分区

  1. SELECT show_chunks('dm_tag_value');
  2. -- 根据时间过滤
  3. SELECT show_chunks('dm_tag_value', created_before => INTERVAL '3 months');
  4. SELECT show_chunks('dm_tag_value', older_than => DATE '2017-01-01');
  5. -- 直接查元数据
  6. SELECT *
  7. FROM timescaledb_information.chunks
  8. WHERE hypertable_name = 'dm_tag_value';
复制代码
查询数据量和空间占用

  1. -- 表数据行数(表大了后count(*)会很慢给,不一定能查出来)
  2. SELECT approximate_row_count('dm_tag_value');
  3. -- 空间占用
  4. SELECT hypertable_size('dm_tag_value');
  5. --更多细节
  6. SELECT * FROM hypertable_detailed_size('dm_tag_value') ORDER BY node_name;
  7. --按分区
  8. SELECT * FROM chunks_detailed_size('dm_tag_value')
  9.   ORDER BY chunk_name, node_name;
  10.   
  11. --获取所有表占用的空间
  12. SELECT hypertable_name, hypertable_size(format('%I.%I', hypertable_schema, hypertable_name)::regclass)
  13.   FROM timescaledb_information.hypertables;
  14. -- 查看分区压缩情况
  15. SELECT * FROM hypertable_compression_stats('dm_tag_value');
  16. SELECT pg_size_pretty(after_compression_total_bytes) as total
  17.   FROM hypertable_compression_stats('dm_tag_value');
  18.         
  19. -- 其他方法
  20. SELECT relname AS table_name,
  21.        pg_size_pretty(pg_total_relation_size(relid)) AS total_size,
  22.        pg_size_pretty(pg_relation_size(relid)) AS table_size,
  23.        pg_size_pretty(pg_indexes_size(relid)) AS indexes_size
  24. FROM pg_stat_user_tables
  25. ORDER BY pg_total_relation_size(relid) DESC;
  26. -- 按大小排序
  27. SELECT
  28.     table_name,
  29.     total_size,
  30.     table_size,
  31.     indexes_size,
  32.     -- 抽取数值并转换单位为字节数
  33.     CASE
  34.         WHEN total_size LIKE '%kB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint
  35.         WHEN total_size LIKE '%MB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint * 1024
  36.         WHEN total_size LIKE '%GB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint * 1024 * 1024
  37.         ELSE regexp_replace(total_size, '[^0-9]', '', 'g')::bigint
  38.     END AS total_size_k
  39. FROM
  40. (   
  41. SELECT relname AS table_name,
  42.        pg_size_pretty(pg_total_relation_size(relid)) AS total_size,
  43.        pg_size_pretty(pg_relation_size(relid)) AS table_size,
  44.        pg_size_pretty(pg_indexes_size(relid)) AS indexes_size
  45. FROM pg_stat_user_tables where relname like '%_chunk'
  46. )
  47. ORDER BY total_size_k DESC;
  48. --查看未压缩分区容量
  49. select sum(total_size_m) from (
  50. SELECT
  51.     CASE
  52.         WHEN total_size LIKE '%kB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint / 1024
  53.         WHEN total_size LIKE '%MB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint  
  54.         WHEN total_size LIKE '%GB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint * 1024
  55.         ELSE regexp_replace(total_size, '[^0-9]', '', 'g')::bigint
  56.     END AS total_size_m
  57. FROM
  58. (   
  59. SELECT relname AS table_name,
  60.        pg_size_pretty(pg_total_relation_size(relid)) AS total_size,
  61.        pg_size_pretty(pg_relation_size(relid)) AS table_size,
  62.        pg_size_pretty(pg_indexes_size(relid)) AS indexes_size
  63. FROM pg_stat_user_tables where relname in (SELECT chunk_name
  64. FROM timescaledb_information.chunks
  65. WHERE range_start >= '2024-07-18' AND range_end < '2024-07-20')
  66. ))
  67. --查看压缩分区容量
  68. select sum(total_size_m) from (
  69. SELECT
  70.     CASE
  71.         WHEN total_size LIKE '%kB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint / 1024
  72.         WHEN total_size LIKE '%MB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint  
  73.         WHEN total_size LIKE '%GB' THEN regexp_replace(total_size, '[^0-9]', '', 'g')::bigint * 1024
  74.         ELSE regexp_replace(total_size, '[^0-9]', '', 'g')::bigint
  75.     END AS total_size_m
  76. FROM
  77. (   
  78. SELECT relname AS table_name,
  79.        pg_size_pretty(pg_total_relation_size(relid)) AS total_size,
  80.        pg_size_pretty(pg_relation_size(relid)) AS table_size,
  81.        pg_size_pretty(pg_indexes_size(relid)) AS indexes_size
  82. FROM pg_stat_user_tables where relname in (select table_name
  83.         FROM _timescaledb_catalog.chunk c
  84.         where creation_time >= '2024-11-19' and creation_time < '2024-11-20' and table_name  like 'compress%')
  85. ))
复制代码
删除分区过期数据

  1. -- 删除多少天之前的分区
  2. SELECT drop_chunks('dm_tag_value', created_before => now() -  INTERVAL '3 months');
  3. --删除指定日期之前的分区
  4. SELECT drop_chunks('dm_tag_value','2024-06-21'::timestamp);
  5. --删除所有表历史分区
  6. SELECT drop_chunks(format('%I.%I', hypertable_schema, hypertable_name)::regclass, INTERVAL '3 months')
  7.   FROM timescaledb_information.hypertables;
  8.   
  9. -- 添加分区过期策略
  10. SELECT add_retention_policy('dm_tag_value', drop_after => INTERVAL '6 months');
  11. -- 删除过期策略
  12. SELECT remove_retention_policy('dm_tag_value');
复制代码
压缩分区

  1. -- 修改表为可压缩,同时指定压缩策略
  2. ALTER TABLE <table_name> SET (timescaledb.compress,
  3.    timescaledb.compress_orderby = '<column_name> [ASC | DESC] [ NULLS { FIRST | LAST } ] [, ...]',
  4.    timescaledb.compress_segmentby = '<column_name> [, ...]',
  5.    timescaledb.compress_chunk_time_interval='interval'
  6. );
  7. -- 压缩指定分区
  8. SELECT compress_chunk('_timescaledb_internal._hyper_1_3_chunk', if_not_compressed => true);
  9. -- 解压指定分区
  10. SELECT decompress_chunk('_timescaledb_internal._hyper_2_2_chunk');
  11. -- 解压所有分区
  12. SELECT decompress_chunk(c, true) FROM show_chunks('dm_tag_value') c;
  13. -- 修改压缩分区后重新压缩
  14. recompress_chunk(
  15.     chunk REGCLASS,
  16.     if_not_compressed BOOLEAN = false
  17. )
  18. -- 添加压缩策略
  19. SELECT add_compression_policy('dm_tag_value', INTERVAL '30 days');
  20. -- 删除压缩策略
  21. SELECT remove_compression_policy('dm_tag_value');
复制代码
利用函数

系统函数

Timescale Documentation | Hyperfunctions
自界说函数

  1. CREATE OR REPLACE FUNCTION get_devices_avg_custom_time(
  2.     device_names TEXT[],
  3.     interval_seconds INT,
  4.     duration_minutes INT,
  5.     delay_seconds INT
  6. )
  7. RETURNS TABLE (device_name TEXT, interval_time TIMESTAMPTZ, avg_value DOUBLE PRECISION) AS $$
  8. BEGIN
  9.     RETURN QUERY
  10.     SELECT
  11.         tag_name AS device_name,
  12.         time_bucket(INTERVAL '1 second' * interval_seconds, app_time) AS interval_time,
  13.         AVG(tag_value) AS avg_value
  14.     FROM
  15.         timeseries_data
  16.     WHERE
  17.         tag_name = ANY(device_names)
  18.         AND app_time >= NOW() - INTERVAL '1 minute' * duration_minutes - INTERVAL '1 second' * delay_seconds
  19.         AND app_time < NOW() - INTERVAL '1 second' * delay_seconds
  20.     GROUP BY
  21.         device_name, interval_time
  22.     ORDER BY
  23.         device_name, interval_time;
  24. END;
  25. $$ LANGUAGE plpgsql;
  26. SELECT * FROM get_devices_avg_custom_time(
  27.     ARRAY['device_1', 'device_2'],
  28.     30,   -- 每隔30秒
  29.     60,   -- 最近60分钟
  30.     120   -- 延时120秒
  31. );
复制代码
用自界说函数作为调理任务
  1. CREATE OR REPLACE PROCEDURE user_defined_action(job_id int, config jsonb) LANGUAGE PLPGSQL AS
  2. $$
  3. BEGIN
  4.   RAISE NOTICE 'Executing action % with config %', job_id, config;
  5. END
  6. $$;
  7. SELECT add_job('user_defined_action','1h');
  8. SELECT add_job('user_defined_action','1h', fixed_schedule => false);
复制代码
连续聚合

  1. -- 删除视图
  2. drop MATERIALIZED  view cagg_dm_tag_value_agg;
  3. -- 创建视图
  4. CREATE MATERIALIZED VIEW cagg_dm_tag_value_agg
  5. WITH (timescaledb.continuous) AS
  6. select
  7. tag_name,
  8.    TIME_BUCKET('5 MIN', app_time) AS app_time,
  9. first(tag_value,app_time) as tag_value ,
  10. first(tag_time,app_time) as tag_time ,
  11. first(quality,app_time) as quality
  12. FROM dm_tag_value
  13. GROUP BY tag_name ,TIME_BUCKET('5 MIN', app_time)
  14. WITH NO DATA;
  15. -- 添加持续聚合运行策略
  16. SELECT add_continuous_aggregate_policy('cagg_dm_tag_value_agg',
  17. start_offset => INTERVAL '3 days',
  18. end_offset => INTERVAL '1 day',
  19. schedule_interval => INTERVAL '1 hour');
  20. -- 手动执行聚合
  21. CALL refresh_continuous_aggregate(
  22.     'cagg_dm_tag_value_agg',
  23.     (NOW() - INTERVAL '3 days')::timestamp,
  24.     (NOW() - INTERVAL '1 day')::timestamp
  25. );
  26. -- 删除聚合策略
  27. SELECT remove_continuous_aggregate_policy('cagg_dm_tag_value_agg');
复制代码
按小时连续聚合
  1. CREATE MATERIALIZED VIEW dm_tag_value_hourly
  2.                 WITH (timescaledb.continuous) AS
  3.        SELECT
  4.             dm.tag_name AS tag_name,
  5.             time_bucket(INTERVAL '1 hour', dm.app_time) AS app_time ,
  6.             first(tag_value,app_time)
  7.         FROM
  8.             dm_tag_value dm
  9.         GROUP BY
  10.             dm.tag_name, time_bucket(INTERVAL '1 hour', dm.app_time);
  11.            
  12.         SELECT add_continuous_aggregate_policy('dm_tag_value_hourly',
  13.                   start_offset => INTERVAL '1 day',
  14.                   end_offset => INTERVAL '1 hour',
  15.                   schedule_interval => INTERVAL '1 hour');
复制代码
任务

  1. CREATE OR REPLACE PROCEDURE user_defined_action(job_id int, config jsonb) LANGUAGE PLPGSQL AS
  2. $$
  3. BEGIN
  4.   RAISE NOTICE 'Executing action % with config %', job_id, config;
  5. END
  6. $$;
  7. SELECT add_job('user_defined_action','1h');
  8. SELECT add_job('user_defined_action','1h', fixed_schedule => false);
  9. SELECT alter_job(1000, schedule_interval => INTERVAL '2 days');SELECT delete_job(1000);SET client_min_messages TO DEBUG1;CALL run_job(1000);
复制代码
其他

一些常用的元信息表

  1. -- 查看有哪些时序表
  2. SELECT * from timescaledb_information.hypertables;
  3. -- 查看有哪些任务
  4. SELECT * FROM timescaledb_information.jobs;
  5. -- 查看有哪些分区表
  6. SELECT * FROM timescaledb_information.chunks ;-- 查看有哪些连续聚合
  7. SELECT * FROM timescaledb_information.continuous_aggregates;
  8. -- 查看有哪些分区维度
  9. SELECT * from timescaledb_information.dimensions
  10.   ORDER BY hypertable_name, dimension_number;
  11.   
  12. -- 查看压缩情况
  13. SELECT * FROM timescaledb_information.chunk_compression_settings
  14. -- 查看数据库活跃连接情况
  15. SELECT
  16.     pid,
  17.     usename,
  18.     state,
  19.     state_change,
  20.     wait_event_type,
  21.     wait_event,
  22.     query
  23. FROM pg_stat_activity
  24. WHERE state = 'active';
复制代码
行转列

  1. CREATE EXTENSION IF NOT EXISTS tablefunc;
  2. SELECT *
  3. FROM crosstab(
  4.     'SELECT app_time, tag_name, tag_value FROM dm_tag_value where tag_name in (''TAG41'',''TAG42'',''TAG43'') and  app_time  between ''2024-06-04 00:00:00.000'' and ''2024-06-04 23:59:59.000''  ORDER BY 1, 2',
  5.     'SELECT DISTINCT tag_name FROM dm_tag_value where tag_name in (''TAG41'',''TAG42'',''TAG43'') ORDER BY 1'
  6. ) AS ct (
  7.     app_time TIMESTAMPTZ,
  8.     tag_v1 NUMERIC,
  9.     tag_v2 numeric,
  10.     tag_v3 NUMERIC
  11. );
复制代码
取最近一条记录

  1. -- opt: interpolate、before、after
  2. CREATE OR REPLACE FUNCTION public.get_latest_records(tag_names text[], t timestamp without time zone DEFAULT now(), opt text DEFAULT 'before'::text)
  3. RETURNS TABLE(tag_name character varying, app_time timestamp without time zone, tag_value character varying)
  4. LANGUAGE plpgsql
  5. AS $function$
  6. DECLARE
  7.     tag text;
  8.     query text;
  9.     current_tags text[];
  10. BEGIN
  11.     -- 获取当前记录
  12.     current_tags := ARRAY(
  13.         SELECT a.tag_name
  14.         FROM public.get_current_records(tag_names, t, opt) a
  15.     );
  16.     -- 先返回所有当前记录
  17.     RETURN QUERY SELECT * FROM public.get_current_records(tag_names, t, opt);
  18.     -- 查询不在当前记录内的标签
  19.     FOR tag IN SELECT unnest(tag_names)
  20.     LOOP
  21.         IF NOT (tag = ANY(current_tags)) THEN
  22.             IF opt = 'after' THEN
  23.                 query := format('
  24.                     SELECT dm_tag_value.tag_name, dm_tag_value.app_time, dm_tag_value.tag_value
  25.                     FROM dm_tag_value
  26.                     WHERE dm_tag_value.tag_name = %L AND dm_tag_value.app_time >= %L
  27.                     ORDER BY dm_tag_value.app_time ASC
  28.                     LIMIT 1', tag, t);
  29.             ELSIF  opt = 'interpolate' THEN
  30.                 query := format('WITH closest_before AS (
  31.                             SELECT dm_tag_value.tag_name, dm_tag_value.app_time, dm_tag_value.tag_value
  32.                             FROM dm_tag_value
  33.                             WHERE dm_tag_value.tag_name = %L AND dm_tag_value.app_time <= %L
  34.                             ORDER BY app_time DESC
  35.                             LIMIT 1
  36.                         ),
  37.                         closest_after AS (
  38.                             SELECT dm_tag_value.tag_name, dm_tag_value.app_time, dm_tag_value.tag_value
  39.                             FROM dm_tag_value
  40.                             WHERE dm_tag_value.tag_name = %L AND dm_tag_value.app_time > %L
  41.                             ORDER BY app_time ASC
  42.                             LIMIT 1
  43.                         ),
  44.                         combined AS (
  45.                             SELECT
  46.                                     b.tag_name AS name,
  47.                                     EXTRACT(EPOCH FROM b.app_time) AS time_before,
  48.                                     CAST(b.tag_value AS DOUBLE PRECISION) AS value_before,
  49.                                     EXTRACT(EPOCH FROM a.app_time) AS time_after,
  50.                                     CAST(a.tag_value AS DOUBLE PRECISION) AS value_after
  51.                             FROM closest_before b
  52.                             FULL JOIN closest_after a ON 1=1
  53.                         )
  54.                         SELECT
  55.                             COALESCE(combined.name,ca.tag_name, cb.tag_name) AS tag_name,
  56.                             CAST(%L AS TIMESTAMP) AS app_time,
  57.                             CASE
  58.                                 WHEN cb.app_time IS NOT NULL AND ca.app_time IS NOT NULL THEN
  59.                                     CAST((value_before + (value_after - value_before) * (EXTRACT(EPOCH FROM TIMESTAMP %L) - time_before) / (time_after - time_before)) AS VARCHAR(1000))
  60.                                 WHEN cb.app_time IS NOT NULL THEN
  61.                                     CAST(cb.tag_value AS VARCHAR(1000))
  62.                                 WHEN ca.app_time IS NOT NULL THEN
  63.                                     CAST(ca.tag_value AS VARCHAR(1000))
  64.                                 ELSE
  65.                                     NULL
  66.                             END AS tag_value
  67.                         FROM combined
  68.                         LEFT JOIN closest_before cb ON 1=1
  69.                         LEFT JOIN closest_after ca ON 1=1'
  70.                     , tag, t , tag,t ,t, t);
  71.             ELSE
  72.                 query := format('
  73.                     SELECT dm_tag_value.tag_name, dm_tag_value.app_time, dm_tag_value.tag_value
  74.                     FROM dm_tag_value
  75.                     WHERE dm_tag_value.tag_name = %L AND dm_tag_value.app_time <= %L
  76.                     ORDER BY dm_tag_value.app_time DESC
  77.                     LIMIT 1', tag, t);
  78.             END IF;
  79.             RETURN QUERY EXECUTE query;
  80.         END IF;
  81.     END LOOP;
  82. END;
  83. $function$
  84. ;
  85. SELECT *
  86. FROM get_latest_records(ARRAY['TAG8710_ISYS7.1', 'TAG8728_ISYS7.1', 'TAG8727_ISYS7.1'], '2024-07-18','interpolate');
复制代码
隔断采样

  1. --按间隔取位号数据
  2. CREATE OR REPLACE FUNCTION get_one_tag_with_interval_seconds(
  3.     tag_names TEXT[],
  4.     interval_seconds INT,
  5.     duration_minutes INT,
  6.     delay_seconds INT
  7. )
  8. RETURNS TABLE (tag_name VARCHAR, app_time timestamp,tag_value TEXT) AS $$
  9. BEGIN
  10.     RETURN QUERY
  11.     SELECT
  12.         dm_tag_value.tag_name,
  13.         time_bucket(INTERVAL '1 second' * interval_seconds, dm_tag_value.app_time) AS interval_time,
  14.         min(concat(cast(dm_tag_value.app_time as VARCHAR),'|',dm_tag_value.tag_value,'|',dm_tag_value.quality)) as app_time_tag_value
  15.     FROM
  16.         dm_tag_value
  17.     WHERE
  18.         dm_tag_value.tag_name = ANY(tag_names)
  19.         AND dm_tag_value.app_time >= NOW() - INTERVAL '1 minute' * duration_minutes - INTERVAL '1 second' * delay_seconds
  20.         AND dm_tag_value.app_time < NOW() - INTERVAL '1 second' * delay_seconds
  21.     GROUP BY
  22.         dm_tag_value.tag_name, interval_time;
  23. END;
  24. $$ LANGUAGE plpgsql;
复制代码
多线程执行

  1. DO $$
  2. DECLARE
  3.     start_time TIMESTAMPTZ := '2024-07-14 00:00:00';
  4.     end_time TIMESTAMPTZ := '2024-07-14 03:00:00';
  5. BEGIN
  6.     WHILE start_time < '2024-07-17 18:15:00' LOOP
  7.         INSERT INTO dm_tag_value_1 (id,ds_id,tag_name, tag_value,tag_time, app_time,quality,create_time)
  8.                 SELECT id,ds_id,tag_name, tag_value,tag_time, app_time,quality,create_time
  9.                 FROM dm_tag_value_his
  10.         WHERE app_time >= start_time and app_time < end_time;
  11.         start_time := end_time;
  12.         end_time := end_time + INTERVAL '3 hours';
  13.     END LOOP;
  14. END $$;
复制代码
数据库参数配置

数据库保举配置

PGTune - calculate configuration for PostgreSQL based on the maximum performance for a given hardware configuration
8核/32G下的保举配置

  1. -- 8核/32G下的推荐配置
  2. -- 日志时区
  3. ALTER SYSTEM SET log_timezone = 'Asia/Shanghai';
  4. -- 数据时区
  5. ALTER SYSTEM SET timezone = 'Asia/Shanghai';
  6. -- 加载扩展
  7. ALTER SYSTEM SET shared_preload_libraries TO timescaledb,pg_stat_statements;
  8. -- 最大连接数
  9. ALTER SYSTEM SET max_connections TO '100';
  10. -- 预分配给时序库缓存区的内存大小(用于缓存数据和索引,一般系统内存的25%左右)
  11. ALTER SYSTEM SET shared_buffers TO '8GB';
  12. -- 后台工作线程的可用内存
  13. ALTER SYSTEM SET maintenance_work_mem TO '2GB';
  14. -- 查询线程用于排序、hash等操作的内存
  15. ALTER SYSTEM SET work_mem = '64MB';
  16. -- 可用缓存大小(包含shared_buffers缓存和系统自身缓存,一般系统内存的50%-75%)
  17. ALTER SYSTEM SET effective_cache_size TO '24GB';
  18. -- 设置控制检查点的完成目标时(数据从内存刷到磁盘),值范围从 0 到 1,数字越大表示越慢,相对的对查询影响抖动越小
  19. ALTER SYSTEM SET checkpoint_completion_target = '0.9';
  20. -- 设置随机访问页面的成本,默认值是4(根据磁盘类型不同修改,一般HDD硬盘设置4,SSD硬盘可以设置1.1或2)
  21. ALTER SYSTEM SET random_page_cost  TO 4;
  22. -- 为表和索引收集统计信息的目标行数,提高这个值可以让优化器获取更详细的统计数据,帮助生成更优化的查询计划。在有复杂查询的情况下增加这个值,可以提高查询性能,但同时会耗更多资源。
  23. ALTER SYSTEM SET default_statistics_target = '100';
  24. -- IO并发限
  25. ALTER SYSTEM SET effective_io_concurrency = '20';
  26. -- 写缓冲区大小
  27. ALTER SYSTEM SET wal_buffers = '32MB';
  28. -- WAL文件最大尺寸,设置较大的 WAL 文件能在高负载的写操作情况下,减少检查点的发生频率,提高性能。
  29. ALTER SYSTEM SET max_wal_size TO '4GB';
  30. -- WAL文件最小尺寸,较大的最小 WAL 大小可以在写负载下降时保留一些 WAL 文件,避免频繁的文件创建和删除,提高性能。
  31. ALTER SYSTEM SET min_wal_size TO '1GB';
  32. -- 后台工作线程最大数量
  33. ALTER SYSTEM SET max_parallel_maintenance_workers TO 8;
  34. -- 每个事务允许的最大锁定数量允许最大锁表/分区数量(因为分区拆的比较多,所以需要设置比较大)
  35. ALTER SYSTEM SET max_locks_per_transaction to 2048;
  36. -- 最大工作进程数量
  37. ALTER SYSTEM SET max_worker_processes = '32';
  38. -- 一个查询可用的最大并行工作进程数量
  39. ALTER SYSTEM SET max_parallel_workers = '16';
  40. -- 允许尝试使用(Huge Pages)能力,内存大于等于32G可以配置
  41. ALTER SYSTEM SET huge_pages = 'try';
  42. -- 限制每次解压最大影响行数,0表示不限制
  43. ALTER SYSTEM SET timescaledb.max_tuples_decompressed_per_dml_transaction = 0;
  44. -- 触发自动分析的阈值,即表中最少达到多少修改后会触发分析,高该值可减少分析的频率,节省资源,但也可能导致统计数据过时。
  45. ALTER SYSTEM SET autovacuum_analyze_threshold = 10;
  46. -- 触发自动分析的比例,设置较小的比例可以让自动分析更频繁地运行,帮助保持统计信息更新,但也会增加数据库的开销
  47. ALTER SYSTEM SET autovacuum_analyze_scale_factor = 0.01;
  48. -- 记录执行时间超过指定毫秒的语句的日志,这里是 30000 毫秒
  49. ALTER SYSTEM SET log_min_duration_statement = '30000';
  50. -- 记录指定类型SQL的日志,none表示不记录,all表示记录所有
  51. ALTER SYSTEM SET log_statement = 'none';
  52. -- 是否记录每个sql的执行时间off表示不记录
  53. ALTER SYSTEM SET log_duration = 'off';
  54. -- 记录日志格式
  55. ALTER SYSTEM SET log_line_prefix = '%m [%p]: [%l-1] db=%d,user=%u,app=%a,client=%h ';      
  56. -- 设置检查点的最大间隔时间
  57. ALTER SYSTEM SET checkpoint_timeout = '10min';
复制代码
常见题目

有题目可以先看日记,怎么看日记?
登录Kuboard,找到devops空间下的timescaledb容器,追踪日记

1.数据库频繁recover重启
原因:一次查询数据量多需要内存多,可用内存不敷,一般是缓存空间不敷。
排查:进Kuboard找到TimescaleDB的内存利用情况,是不是已经满负荷运作。
如果内存已经100%:可能需要考虑扩容,或者修改查询,减少单次请求数据量。
如果内存没用满:检查下数据库配置是否合理,具体根据内存巨细设置合理值(参考数据库配置)
  1. show shared_buffers;
  2. show effective_cache_size;
复制代码
2.could not resize shared memory segment “/PostgreSQL.xx“ to xx bytes: No space left on device题目
原因:
1. PostgreSQL动态共享内存过小,Docker的默认/dev/shm巨细为64MB
解决:修改PostgreSQL共享内存的巨细为256M
2. 单次查询涉及分区过多,超过配置限定
  1. show max_locks_per_transaction;
  2. -- 修改配置需要重启数据库
  3. ALTER SYSTEM SET max_locks_per_transaction to 2048;
复制代码
3.查询慢题目
1. 检查数据库负荷(内存、CPU),是否资源满了
2. 查看执行筹划,是否走到索引
  1. EXPLAIN analyze
  2. select * from dm_tag_value dtv where tag_name='XX.LJS.FIQ_8A196.SUM' and app_time > '2024-12-30 23:50:50' order by app_time
复制代码
4. 毗连卡住题目,时序表数据量大了后,一次普通的查询重复执行有概率会出现毗连卡死,要等几分钟才释放。
原因:目前看可能是bug(druid分页查询或数据库函数内直接 RETURN QUERY返回数据时会出现)
解决:用数据库函数,在数据库函数里利用RETURN QUERY EXECUTE query 方式返回数据,比如
  1. query := 'WITH RankedData AS (
  2.                                                           SELECT
  3.                                                             tag_name,
  4.                                                             app_time,
  5.                                                             tag_value,
  6.                                                             ROW_NUMBER() OVER (PARTITION BY tag_name ORDER BY app_time desc) as row_num
  7.                                                           FROM
  8.                                                             dm_tag_value
  9.                                                           WHERE
  10.                                                               app_time <= $2 AND app_time > $1   AND tag_name = ANY ($4::text[])
  11.                                                         )
  12.                                                         SELECT
  13.                                                           tag_name,
  14.                                                           app_time,
  15.                                                           tag_value
  16.                                                         FROM
  17.                                                           RankedData
  18.                                                         WHERE
  19.                                                           row_num = 1;';
  20.     RETURN QUERY EXECUTE query USING t1, t ,t2, tag_names;
  21. ``
  22. ## **<font style="color:#1a1a1a;">最佳实践</font>**
  23. ### **<font style="color:#1a1a1a;">建表</font>**
  24. <font style="color:#333333;">时序表必须要有</font><font style="color:#333333;">TIMESTAMP列,用于分区(分区列会默认建索引)</font>
  25. <font style="color:#333333;">示例:</font>
  26. ```sql
  27. begin;
  28. --建普通表
  29. CREATE table if not exists  tpt_warning_record (
  30.   warning_time TIMESTAMP NOT NULL,
  31.   tag_name VARCHAR(200)  NOT NULL ,
  32.   info TEXT DEFAULT NULL
  33. );
  34. COMMENT ON COLUMN tpt_warning_record.warning_time IS '预警时间';
  35. COMMENT ON COLUMN tpt_warning_record.tag_name IS '位号';
  36. COMMENT ON COLUMN tpt_warning_record.info IS '预警信息';
  37. --转时序表,同时会自动创建索引
  38. SELECT create_hypertable('tpt_warning_record', 'warning_time', chunk_time_interval => interval '1 day');
  39. --建其他索引,比如位号
  40. CREATE INDEX idx_tpt_warning_record_time_name ON tpt_warning_record (tag_name,warning_time desc);
  41. --设置分区保留时间30天
  42. SELECT add_retention_policy('tpt_warning_record', INTERVAL '30 days');
  43. --修改表为支持压缩
  44. ALTER TABLE dm_tag_value SET (timescaledb.compress);
  45. --设置压缩策略压缩30天前数据,每天执行
  46. SELECT add_compression_policy('tpt_warning_record', compress_after => INTERVAL '7d');
  47. end;
复制代码
1. 非分区列要建索引的话,需要跟分区列建复合索引,除时间分区列外其他分区列只管不要超过一个.
2. 分区巨细规范:为了进步性能最好是整个分区能完全加载到内存,建议每个分区存储数据量巨细是内存的四分之一,比如内存是4G那分区不要超过1G,具体多少条记录根据每行数据巨细估算,以此来确定是7天照旧1天或几个小时一个分区。
3. 建索引的时候可以选择正序或倒序,根据现实业务查询选择,比如利用的时候是时间倒序那么索引语句可以这样,如果查询排序和索引排序方向不一致影响性能。
  1. CREATE INDEX idx_tpt_warning_record_time_name ON tpt_warning_record (tag_name,warning_time desc);
复制代码
写入性能

1. 数据插入只管利用批量插入提升性能
2. 数据同时会插入到多个分区,可以采用并行插入提升性能
查询性能

  时序数据表一般存储大量数据,利用不当会严重影响性能而且轻易出现内存耗尽.尽可能的减少扫描行数.
1. 对于只插入不更新的数据只管尽早压缩(比如压缩一天前数据),压缩后能大大提升按索引的查询性能.
2. 查询都要带上索引字段(都带上时间分区列字段,减少扫描行数),不走索引的全量扫描的性能和普通数据库没有区别,表数据量大了就很慢。
3. 因为数据库内存有限,查询结果大的情况利用分页查询,带上limit数量
4. 查询不要随意order by,只管走默认排序(order by 和索引顺序一致)
5. 时间过滤时不要用时间函数,会影响性能
比如:
  1. --错误如下:
  2. SELECT app_time,tag_name,tag_value,quality FROM dm_tag_value WHERE DATE(app_time) = '2024-06-19'
  3. --正确如下:
  4. SELECT app_time,tag_name,tag_value,quality FROM dm_tag_value WHERE app_time >= '2024-06-19' and app_time < '2024-06-20'
复制代码
1. 时序数据库有一些特有函数可以大大进步性能。
比如:按时间窗口聚合
  1. SELECT time_bucket('1 day', time_column) AS day, AVG(value_column)
  2. FROM table_name
  3. GROUP BY day;
复制代码
比如:用物理视图做连续预聚合,适合要对大时间范围做聚合的场景
  1. CREATE MATERIALIZED VIEW continuous_aggregate_view WITH (timescaledb.continuous) ASSELECT time_bucket('1 day', time_column) AS day, AVG(value_column)
  2. FROM table_name
  3. GROUP BY day;
  4. SELECT add_continuous_aggregate_policy('continuous_aggregate_view',     start_offset => INTERVAL '1 month',     end_offset => INTERVAL '1 day',     schedule_interval => INTERVAL '1 hour');
复制代码
其他一些高级查询:Timescale Documentation | Perform advanced analytic queries
性能优化

  1. -- 查看vacuum和autovacuum情况
  2. SELECT
  3.     schemaname,
  4.     relname,
  5.     last_vacuum,
  6.     last_autovacuum,
  7.     vacuum_count,
  8.     autovacuum_count
  9. FROM
  10.     pg_stat_user_tables;
  11.    
  12. SELECT relname,
  13.        last_vacuum,
  14.        last_autovacuum,
  15.        vacuum_count,
  16.        autovacuum_count
  17. FROM pg_stat_user_tables
  18. WHERE relname like '%chunk';
  19. -- 手动vacuum
  20. VACUUM ANALYZE dm_tag_value;
  21. -- vacuum分区表
  22. VACUUM ANALYZE _timescaledb_internal._hyper_1_7_chunk;
  23. -- 打开慢查询日志
  24. ALTER SYSTEM SET log_min_duration_statement = '30000';
  25. ALTER SYSTEM SET log_statement = 'none';
  26. ALTER SYSTEM SET log_duration = 'off';
  27. ALTER SYSTEM SET log_line_prefix = '%m [%p]: [%l-1] db=%d,user=%u,app=%a,client=%h ';   
  28. SELECT pg_reload_conf();
复制代码
数据管理和维护

1. 利用 add_retention_policy 定期删除旧数据,管理数据库的存储。
2. 定期利用 VACUUM 和 ANALYZE 维护表,保持数据库性能。
3. 慢SQL查询
  1. -- 安装扩展
  2. CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
  3. -- 查询慢sql
  4. SELECT *
  5. FROM pg_stat_statements
  6. ORDER BY max_exec_time DESC
  7. LIMIT 10;
  8. --清空重新统计
  9. SELECT pg_stat_statements_reset();
复制代码
位号时序表

  1. -- 创建时序表
  2. CREATE TABLE IF NOT EXISTS dm_tag_value (
  3.                               id BIGINT DEFAULT NULL,
  4.                               ds_id BIGINT DEFAULT NULL,
  5.                               tag_name VARCHAR(200)  NOT NULL ,
  6.                               tag_value VARCHAR(1000) DEFAULT '',
  7.                               tag_time TIMESTAMP DEFAULT NULL,
  8.                               app_time TIMESTAMP  NOT NULL,
  9.                               quality INTEGER DEFAULT NULL,
  10.                               create_time TIMESTAMP DEFAULT NULL
  11. );
  12. COMMENT ON COLUMN dm_tag_value.id IS 'id';
  13. COMMENT ON COLUMN dm_tag_value.ds_id IS '数据源id';
  14. COMMENT ON COLUMN dm_tag_value.tag_name IS '位号名';
  15. COMMENT ON COLUMN dm_tag_value.tag_value IS '位号值';
  16. COMMENT ON COLUMN dm_tag_value.tag_time IS '实时数据库tag返回时间';
  17. COMMENT ON COLUMN dm_tag_value.app_time IS '查询实时数据库时间';
  18. COMMENT ON COLUMN dm_tag_value.quality IS '质量码';
  19. COMMENT ON COLUMN dm_tag_value.create_time IS '创建时间';
  20. DO $$
  21. begin
  22.     PERFORM create_hypertable('dm_tag_value', 'app_time', 'tag_name', 50);
  23.     PERFORM set_chunk_time_interval('dm_tag_value', INTERVAL '1 day');
  24.     ALTER TABLE dm_tag_value SET (timescaledb.compress, timescaledb.compress_orderby = 'app_time DESC', timescaledb.compress_segmentby = 'tag_name');
  25.     PERFORM add_compression_policy('dm_tag_value', INTERVAL '2 days');
  26.     CREATE INDEX idx_dm_tag_value_time_name ON dm_tag_value (app_time,tag_name);
  27.     PERFORM add_retention_policy('dm_tag_value', INTERVAL '1 year');
  28. EXCEPTION
  29.         WHEN OTHERS THEN
  30.         RAISE NOTICE 'An unexpected error occurred: %', SQLERRM;
  31. END $$;
  32. -- 创建连续聚合
  33. CREATE EXTENSION IF NOT EXISTS tablefunc;
  34. DO $$
  35. begin
  36.     CREATE MATERIALIZED VIEW dm_tag_value_hourly
  37.         WITH (timescaledb.continuous) AS
  38.                 SELECT
  39.                     dm.tag_name AS tag_name,
  40.                     time_bucket(INTERVAL '1 hour', dm.app_time) AS app_time ,
  41.                     first(tag_value,app_time)  as tag_value
  42.                 FROM
  43.                     dm_tag_value dm
  44.                 GROUP BY
  45.                     dm.tag_name, time_bucket(INTERVAL '1 hour', dm.app_time)
  46.     WITH NO DATA;
  47.     CREATE INDEX idx_dm_tag_value_hourly_name_time ON dm_tag_value_hourly (tag_name,app_time);
  48.     PERFORM add_continuous_aggregate_policy('dm_tag_value_hourly',
  49.                               start_offset => INTERVAL '1 day',
  50.                               end_offset => INTERVAL '30 minutes',
  51.                               schedule_interval => INTERVAL '1 hour');
  52. EXCEPTION
  53.         WHEN OTHERS THEN
  54.         RAISE NOTICE 'An unexpected error occurred: %', SQLERRM;
  55. END $$;
  56. -- 函数
  57. CREATE OR REPLACE FUNCTION get_current_records(
  58.     tag_names text[],
  59.     t timestamp DEFAULT now(),
  60.     opt text DEFAULT 'before'
  61. )
  62.     RETURNS TABLE(tag_name varchar(200), app_time timestamp, tag_value varchar(1000)) AS $$
  63. declare
  64.     t1 timestamp;
  65.     t2 timestamp;
  66.     query text;
  67. begin
  68.     IF opt = 'after' then
  69.         t1 := t;
  70.         t2 := t + INTERVAL '1 minute';
  71.     ELSIF  opt = 'interpolate' then
  72.         t1 := t - INTERVAL '1 minute';
  73.         t2 := t + INTERVAL '1 minute';
  74.     else
  75.         t1 := t - INTERVAL '1 minute';
  76.         t2 := t;
  77.     END IF;
  78.     IF opt = 'after' then
  79.         query := 'WITH RankedData AS (
  80.                                                           SELECT
  81.                                                             tag_name,
  82.                                                             app_time,
  83.                                                             tag_value,
  84.                                                             ROW_NUMBER() OVER (PARTITION BY tag_name ORDER BY app_time) as row_num
  85.                                                           FROM
  86.                                                             dm_tag_value
  87.                                                           WHERE
  88.                                                             app_time >= $2  and  app_time < $3 AND tag_name = ANY ($4::text[])
  89.                                                         )
  90.                                                         SELECT
  91.                                                           tag_name,
  92.                                                           app_time,
  93.                                                           tag_value
  94.                                                         FROM
  95.                                                           RankedData
  96.                                                         WHERE
  97.                                                           row_num = 1;';
  98.     ELSIF  opt = 'interpolate' THEN
  99.         query := 'WITH RankedData_b AS (
  100.                                                           SELECT
  101.                                                             tag_name,
  102.                                                             app_time,
  103.                                                             tag_value,
  104.                                                             ROW_NUMBER() OVER (PARTITION BY tag_name ORDER BY app_time desc) as row_num
  105.                                                           FROM
  106.                                                             dm_tag_value
  107.                                                           WHERE
  108.                                                               app_time <= $2 AND app_time > $1 AND tag_name = ANY ($4::text[])
  109.                                                         ),
  110.                                                          closest_before AS (
  111.                                                     SELECT
  112.                                                           RankedData_b.tag_name,
  113.                                                           RankedData_b.app_time,
  114.                                                           RankedData_b.tag_value
  115.                                                         FROM
  116.                                                           RankedData_b
  117.                                                         WHERE
  118.                                                           RankedData_b.row_num = 1
  119.                                                 ),
  120.                                                    RankedData_a AS (
  121.                                                           SELECT
  122.                                                             tag_name,
  123.                                                             app_time,
  124.                                                             tag_value,
  125.                                                             ROW_NUMBER() OVER (PARTITION BY tag_name ORDER BY app_time) as row_num
  126.                                                           FROM
  127.                                                             dm_tag_value
  128.                                                           WHERE
  129.                                                             app_time > $2 and app_time < $3 AND tag_name = ANY ($4::text[])
  130.                                                         ),
  131.                                                   closest_after AS (
  132.                                                            SELECT
  133.                                                           RankedData_a.tag_name,
  134.                                                           RankedData_a.app_time,
  135.                                                           RankedData_a.tag_value
  136.                                                         FROM
  137.                                                           RankedData_a
  138.                                                         WHERE
  139.                                                           RankedData_a.row_num = 1
  140.                                         ),
  141.                                         combined AS (
  142.                                                 SELECT
  143.                                                         b.tag_name AS name,
  144.                                                         EXTRACT(EPOCH FROM b.app_time) AS time_before,
  145.                                                         CAST(b.tag_value AS DOUBLE PRECISION) AS value_before,
  146.                                                         EXTRACT(EPOCH FROM a.app_time) AS time_after,
  147.                                                         CAST(a.tag_value AS DOUBLE PRECISION) AS value_after
  148.                                             FROM closest_before b
  149.                                             FULL JOIN closest_after a ON a.tag_name=b.tag_name
  150.                                         )
  151.                                         SELECT
  152.                                             COALESCE(combined.name,ca.tag_name, cb.tag_name) AS tag_name,
  153.                                             CAST($2 AS TIMESTAMP) AS app_time,
  154.                                             CASE
  155.                                                 WHEN cb.app_time IS NOT NULL AND ca.app_time IS NOT NULL THEN
  156.                                                     CAST((value_before + (value_after - value_before) * (EXTRACT(EPOCH FROM $2) - time_before) / (time_after - time_before)) AS VARCHAR(1000))
  157.                                                 WHEN cb.app_time IS NOT NULL THEN
  158.                                                     CAST(cb.tag_value AS VARCHAR(1000))
  159.                                                 WHEN ca.app_time IS NOT NULL THEN
  160.                                                     CAST(ca.tag_value AS VARCHAR(1000))
  161.                                                 ELSE
  162.                                                     NULL
  163.                                             END AS tag_value
  164.                                         FROM combined
  165.                                         LEFT JOIN closest_before cb ON combined.name=cb.tag_name
  166.                                         LEFT JOIN closest_after ca ON combined.name=ca.tag_name';
  167.     else
  168.         query := 'WITH RankedData AS (
  169.                                                           SELECT
  170.                                                             tag_name,
  171.                                                             app_time,
  172.                                                             tag_value,
  173.                                                             ROW_NUMBER() OVER (PARTITION BY tag_name ORDER BY app_time desc) as row_num
  174.                                                           FROM
  175.                                                             dm_tag_value
  176.                                                           WHERE
  177.                                                               app_time <= $2 AND app_time > $1   AND tag_name = ANY ($4::text[])
  178.                                                         )
  179.                                                         SELECT
  180.                                                           tag_name,
  181.                                                           app_time,
  182.                                                           tag_value
  183.                                                         FROM
  184.                                                           RankedData
  185.                                                         WHERE
  186.                                                           row_num = 1;';
  187.     END IF;
  188.     RETURN QUERY EXECUTE query USING t1, t ,t2, tag_names;
  189. END;
  190. $$ LANGUAGE plpgsql;
  191. CREATE OR REPLACE FUNCTION public.get_latest_records(
  192.     tag_names text[],
  193.     t timestamp without time zone DEFAULT now(),
  194.     opt text DEFAULT 'before'::text
  195. )
  196.     RETURNS TABLE(tag_name character varying, app_time timestamp without time zone, tag_value character varying)
  197.     LANGUAGE plpgsql
  198. AS $function$
  199. DECLARE
  200.     tag text;
  201.     query text;
  202.     current_tags text[];
  203. BEGIN
  204.     -- 获取当前记录
  205.     current_tags := ARRAY(
  206.         SELECT a.tag_name
  207.         FROM public.get_current_records(tag_names, t, opt) a
  208.     );
  209.     -- 先返回所有当前记录
  210.     RETURN QUERY SELECT * FROM public.get_current_records(tag_names, t, opt);
  211.     -- 查询不在当前记录内的标签
  212.     FOR tag IN SELECT unnest(tag_names)
  213.     LOOP
  214.         IF NOT (tag = ANY(current_tags)) THEN
  215.             IF opt = 'after' THEN
  216.                 query := format('
  217.                     SELECT dm_tag_value.tag_name, dm_tag_value.app_time, dm_tag_value.tag_value
  218.                     FROM dm_tag_value
  219.                     WHERE dm_tag_value.tag_name = %L AND dm_tag_value.app_time >= %L
  220.                     ORDER BY dm_tag_value.app_time ASC
  221.                     LIMIT 1', tag, t);
  222.             ELSIF  opt = 'interpolate' THEN
  223.                 query := format('WITH closest_before AS (
  224.                             SELECT dm_tag_value.tag_name, dm_tag_value.app_time, dm_tag_value.tag_value
  225.                             FROM dm_tag_value
  226.                             WHERE dm_tag_value.tag_name = %L AND dm_tag_value.app_time <= %L
  227.                             ORDER BY app_time DESC
  228.                             LIMIT 1
  229.                         ),
  230.                         closest_after AS (
  231.                             SELECT dm_tag_value.tag_name, dm_tag_value.app_time, dm_tag_value.tag_value
  232.                             FROM dm_tag_value
  233.                             WHERE dm_tag_value.tag_name = %L AND dm_tag_value.app_time > %L
  234.                             ORDER BY app_time ASC
  235.                             LIMIT 1
  236.                         ),
  237.                         combined AS (
  238.                             SELECT
  239.                                     b.tag_name AS name,
  240.                                     EXTRACT(EPOCH FROM b.app_time) AS time_before,
  241.                                     CAST(b.tag_value AS DOUBLE PRECISION) AS value_before,
  242.                                     EXTRACT(EPOCH FROM a.app_time) AS time_after,
  243.                                     CAST(a.tag_value AS DOUBLE PRECISION) AS value_after
  244.                             FROM closest_before b
  245.                             FULL JOIN closest_after a ON 1=1
  246.                         )
  247.                         SELECT
  248.                             COALESCE(combined.name,ca.tag_name, cb.tag_name) AS tag_name,
  249.                             CAST(%L AS TIMESTAMP) AS app_time,
  250.                             CASE
  251.                                 WHEN cb.app_time IS NOT NULL AND ca.app_time IS NOT NULL THEN
  252.                                     CAST((value_before + (value_after - value_before) * (EXTRACT(EPOCH FROM TIMESTAMP %L) - time_before) / (time_after - time_before)) AS VARCHAR(1000))
  253.                                 WHEN cb.app_time IS NOT NULL THEN
  254.                                     CAST(cb.tag_value AS VARCHAR(1000))
  255.                                 WHEN ca.app_time IS NOT NULL THEN
  256.                                     CAST(ca.tag_value AS VARCHAR(1000))
  257.                                 ELSE
  258.                                     NULL
  259.                             END AS tag_value
  260.                         FROM combined
  261.                         LEFT JOIN closest_before cb ON 1=1
  262.                         LEFT JOIN closest_after ca ON 1=1'
  263.                     , tag, t , tag,t ,t, t);
  264.             ELSE
  265.                 query := format('
  266.                     SELECT dm_tag_value.tag_name, dm_tag_value.app_time, dm_tag_value.tag_value
  267.                     FROM dm_tag_value
  268.                     WHERE dm_tag_value.tag_name = %L AND dm_tag_value.app_time <= %L
  269.                     ORDER BY dm_tag_value.app_time DESC
  270.                     LIMIT 1', tag, t);
  271.             END IF;
  272.             RETURN QUERY EXECUTE query;
  273.         END IF;
  274.     END LOOP;
  275. END;
  276. $function$
  277. ;
  278. CREATE OR REPLACE FUNCTION public.get_tag_history_count_with_interval_seconds(tag_names text[], begin_time timestamp without time zone, end_time timestamp without time zone, interval_seconds integer)
  279.     RETURNS TABLE(tag_name character varying, tag_count bigint)
  280.     LANGUAGE plpgsql
  281. AS $function$
  282. DECLARE
  283.     sql_query text;
  284. BEGIN
  285.     interval_seconds := GREATEST(interval_seconds, 1);
  286.     -- 构建动态 SQL 查询
  287.     sql_query := '
  288.         WITH grouped_data AS (
  289.             SELECT
  290.                 dm.tag_name,
  291.                 time_bucket(INTERVAL ''1 second'' * ' || interval_seconds || ', dm.app_time) AS interval_time,
  292.                 min(dm.app_time) AS app_time_tag_value
  293.             FROM
  294.                 dm_tag_value dm
  295.             WHERE
  296.                 dm.tag_name = ANY($1)
  297.                 AND dm.app_time >= $2
  298.                 AND dm.app_time < $3
  299.             GROUP BY
  300.                 dm.tag_name, interval_time
  301.         )
  302.         SELECT
  303.             gd.tag_name AS tag_name,
  304.             count(gd.tag_name) AS tag_count
  305.         FROM
  306.             grouped_data gd
  307.         GROUP BY
  308.             gd.tag_name;';
  309.     -- 执行动态 SQL
  310.     RETURN QUERY EXECUTE sql_query USING tag_names, begin_time, end_time;
  311. END;
  312. $function$
  313. ;
  314. DROP FUNCTION IF EXISTS  public.get_tag_history_with_interval_seconds(_text, timestamp, timestamp, int4, int4, int4);
  315. CREATE OR REPLACE FUNCTION public.get_tag_history_with_interval_seconds(
  316.     tag_names text[],
  317.     begin_time timestamp without time zone,
  318.     end_time timestamp without time zone,
  319.     interval_seconds integer,
  320.     page_size integer,
  321.     page_index integer,
  322.     sort_order text DEFAULT 'ASC'
  323. )
  324. RETURNS TABLE(tag_name character varying, app_time timestamp without time zone, tag_value text)
  325. LANGUAGE plpgsql
  326. AS $function$
  327. DECLARE
  328.     sql_query text;
  329. BEGIN
  330.     interval_seconds := GREATEST(interval_seconds, 1);
  331.     page_size := LEAST(page_size, 200000);
  332.     IF sort_order IS NULL OR (sort_order <> 'ASC' AND sort_order <> 'DESC') THEN
  333.             sort_order := 'ASC'; -- 默认排序
  334.         END IF;
  335.     -- 构建动态 SQL 查询
  336.     sql_query := 'WITH grouped_data AS (
  337.         SELECT
  338.             dm.tag_name AS tag_name,
  339.             time_bucket(INTERVAL ''1 second'' * ''' || interval_seconds || ''', dm.app_time) AS interval_time,
  340.             min(concat(cast(dm.app_time AS VARCHAR), ''|'', dm.tag_value, ''|'', dm.quality, ''|'', cast(dm.tag_time AS VARCHAR))) as app_time_tag_value,
  341.             ROW_NUMBER() OVER (PARTITION BY dm.tag_name ORDER BY time_bucket(INTERVAL ''1 second'' * ''' || interval_seconds || ''', dm.app_time) ' || sort_order || ') AS row_num
  342.         FROM
  343.             dm_tag_value dm
  344.         WHERE
  345.             dm.tag_name = ANY($1)
  346.             AND dm.app_time >= $2
  347.             AND dm.app_time < $3
  348.         GROUP BY
  349.             dm.tag_name, interval_time
  350.     )
  351.     SELECT
  352.         gd.tag_name,
  353.         gd.interval_time AS app_time,
  354.         gd.app_time_tag_value AS tag_value
  355.     FROM
  356.         grouped_data gd
  357.     WHERE
  358.         gd.row_num > ($4 * ($5 - 1)) AND gd.row_num <= ($4 * $5)
  359.     ORDER BY
  360.         gd.tag_name, gd.interval_time ' || sort_order || ' ;';
  361.     -- 执行动态 SQL
  362.     RETURN QUERY EXECUTE sql_query USING tag_names, begin_time, end_time, page_size, page_index;
  363. END;
  364. $function$;
复制代码
版本升级(只升TimescaleDB版本不升级PG)

1. timescaledb切换新版本镜像(pg差别版本可能不兼容)
2. 在容器或客户端中登录psql,利用 -X 标志(如果需要)
  1. psql -X -h <your_host> -U <your_user> -d <your_database>
复制代码
3. 升级扩展
  1. ALTER EXTENSION timescaledb UPDATE;
复制代码
4. 验证新版本
  1. SELECT * FROM pg_extension WHERE extname = 'timescaledb';
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表