流式数据库PipelineDB使用手册

简述

  PipelineDB是一个高性能的PostgreSQL扩展,用于在时间序列数据上连续运行SQL查询。这些连续查询的输出存储在常规表中,可以像查询任何其他表或视图一样查询这些表。因此,连续查询可以看作是非常高的吞吐量、增量更新的物化视图。与传统的实时计算引擎(storm、kafka streams等)相比,PipelineDB不需要写任何程序代码,只需要写SQL即可实现实时指标的聚合计算,统计实时指标开发耗时可以从原来的天级骤降到分钟级。

官方文档
PipelineDB文档地址:http://docs.pipelinedb.com/
PostgreSQL文档地址:https://www.postgresql.org/docs/11/index.html

Stream:进行时序数据聚合计算的第一个必需基础表,外部数据一条一条的流入Stream表。该表不实际储存数据,当一条数据被后续所有的必须View/Transform读取后即会被丢弃。Stream作为PipelineDB的第一道关口,承接从外部实时写入到PipelineDB内的数据。之后,可以以Stream为基础表,在其上创建若干个统计结果的View或者转换内容的Transform。

Continuous View:PipelineDB的基本抽象,从Stream读取数据并将新数据按照SQL内条件实时聚合,并将结果实时增量更新到View内。

Continuous Transform:进行实时转换数据,比如解析URL、关联维表等,不支持聚合,不储存数据。转换后的数据可以实时写入到另一个Stream或者表。

安装

推荐使用docker客户端kitematic,然后直接安装pipelinedb的镜像,安装启动后,用本地工具连接

命令行连接,默认数据库、用户名、密码都是pipeline

1
> psql -h localhost -p 32768 -d pipeline -U pipeline

基本命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
-- 创建schema
> CREATE SCHEMA IF NOT EXISTS test;

-- 查看已创建schema
> \dn

-- 查看已创建表
> \d 或者 \d+

-- psql命令帮助
> \?

-- 设置搜索路径,切换schema
> SET search_path TO test, public;

-- 激活和停止VIEW/TRANSFORM
> SELECT pipelinedb.activate('continuous_view_or_transform');
> SELECT pipelinedb.deactivate('continuous_view_or_transform');

--回填数据到老的VIEW
> SELECT pipelinedb.combine_table('continuous_view_3', 'continuous_view_mrel');

-- 获取所有的VIEW
> SELECT * FROM pipelinedb.get_views();

-- 获取所有的TRANSFORM
> SELECT * FROM pipelinedb.get_transforms();

-- 创建表,当字段是字符串类型时,除非你知道这个字段值永远不会超过某个长度,
-- 否则请使用text格式,从此再也不用担心长度问题!

-- 使用Postgres作为api的后端数据库时,可以在SQL前添加下面的语句,避免一些转义符引起的错误
SET standard_conforming_strings = off;

敲重点:因为官方的bug,在创建view等前请确保目前的schema是public,或者使用上述方式把public加到搜索路径内,否则会报函数找不到等错误。

创建STREAM

1
2
3
4
5
6
7
8
9
CREATE FOREIGN TABLE IF NOT EXISTS test.streams_track_page_log (
data json
)
SERVER pipelinedb;

-- 特别情况下,可以对STREAM增加字段(只能增,不能删)
ALTER FOREIGN TABLE test.streams_track_page_log ADD COLUMN x integer;
-- 删除STREAM
DROP FOREIGN TABLE test.streams_track_page_log;

注意:建议stream以”kafka内topic名称” + “数据归属”规则进行命名。如果topic不带stream,建议命名是添加上stream,方便区分。

创建VIEW

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CREATE VIEW test.rt_view_stat_daily WITH (action=materialize) AS
SELECT
to_date(data->>'server_date', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
-- 使用SUM时,请务必使用COALESCE把NULL值替换掉,否则会导致严重的数据库后端进程崩溃重启!!!
SUM(COALESCE(cast(data->>'unreal_key', numeric), 0)) AS "testName",
COUNT(*) AS pv,
COUNT(DISTINCT data->>'visit_id') AS uv
FROM test.streams_track_page_log
GROUP BY f_ds
;

-- 清空VIEW内数据
SELECT pipelinedb.truncate_continuous_view('test.rt_view_stat_daily');

-- 删除VIEW
DROP VIEW test.rt_view_stat_daily;

-- 创建TTL(Time-To-Live)VIEW表(TTL表可以按照尽量销毁早于指定时间的数据)
CREATE VIEW test.rt_view_stat_daily_ttl
WITH (action=materialize, ttl='1 month', ttl_column='ttl_ds') AS
SELECT
to_date(data->>'server_date', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
day(to_timestamp(data->>'server_date', 'YYYY-MM-DD HH24:MI:SS')) AS ttl_ds,
COUNT(*) AS pv,
COUNT(DISTINCT data->>'visit_id') AS uv
FROM test.streams_track_page_log
GROUP BY f_ds, ttl_ds
;
-- 可以通过pipelinedb.set_ttl函数对continuous view增加、修改、移除TTL。
-- 具体详见:http://docs.pipelinedb.com/continuous-views.html#modifying-ttls

注意:”action=materialize”默认可不加;建议view以”rt_view“ + “VIEW应用名称” + “数据归属”命名view表,例如:rt_view_survey_daily

注意: 使用SUM时,请务必使用COALESCE把NULL值替换掉,否则会导致严重的数据库后端进程崩溃重启!!!

创建TRANSFORM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
CREATE VIEW test.rt_trans_add_source_name WITH (action=transform) AS
SELECT
v1.data->>'source_bu' AS source_bu,
v1.data->>'source_cate' AS source_cate,
v1.data->>'source_msg_type' AS source_msg_type,
v1.data->>'source_msg' AS source_msg,
v2.source_name,
v1.data->>'visit_id' AS visit_id,
v1.data->>'server_date' AS server_date
FROM test.streams_track_page_log v1
LEFT JOIN dw_setting.dim_source_infos_source v2 ON v1.data->>'source_msg' = v2.source
WHERE v1.data->>'source_msg_type' IN ('a', 'b', 'c')
;

-- 从TRANSFORM创建VIEW
CREATE VIEW test.rt_view_source_stat_daily WITH (action=materialize) AS
SELECT
to_date(server_date, 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
source_msg_type,
source_msg,
COUNT(*) AS pv
FROM output_of('test.rt_trans_add_source_name')
GROUP BY
f_ds,
source_msg_type,
source_msg
;

-- TRANSFORM后的数据写入到STREAM
CREATE FOREIGN TABLE IF NOT EXISTS test.streams_track_page (
source_bu text,
source_cate text,
source_msg_type text,
source_msg text,
source_name text,
visit_id text,
server_date text
)

SERVER pipelinedb;

CREATE VIEW test.rt_trans_add_source_name_to_stream
WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('test.streams_track_page')) AS
SELECT
v1.data->>'source_bu' AS source_bu,
v1.data->>'source_cate' AS source_cate,
v1.data->>'source_msg_type' AS source_msg_type,
v1.data->>'source_msg' AS source_msg,
v2.source_name,
v1.data->>'visit_id' AS visit_id,
v1.data->>'server_date' AS server_date
FROM test.streams_track_page_log v1
LEFT JOIN dim_table.dim_source_infos v2 ON v1.data->>'source_msg' = v2.source
WHERE v1.data->>'source_msg_type' IN ('a', 'b', 'c')
;

注意:因为已知bug,使用TRANSFORM时请不要使用自定义函数,可以使用pipelinedb.insert_into_stream将结果写入到另一个STREAM

JouyPub wechat
欢迎订阅「K叔区块链」 - 专注于区块链技术学习