拉链表是数据仓库设计中用来处理数据变化的一种技术,它允许保存历史数据,记录一个事物从开始到当前状态的所有变化信息,可以反映任意时间点数据的状态。本文将为您介绍基于 MaxCompute 引擎在 DataWorks 上实现拉链表 ETL 的案例。
适用场景
在设计数据仓库的数据模型时,拉链存储技术可作为一种解决方案,满足以下需求:
-
数据量较大。
-
表中的部分字段被更新。 例如,用户的地址、产品的描述信息、订单的状态和手机号码等。
-
需要查看某一个时间点或时间段的历史快照信息。 例如,查看某一个订单在某一个历史时间点的状态,或查看某一个用户在过去某段时间内更新过几次等。
-
变化的比例不大或频率不高。 假设总共有1000万个会员,且每天新增和发生变化的会员只有10万左右,如果每天都在表中保留一份全量,那么每次全量中会保存很多不变的信息,极大地浪费了存储资源。
关键字段介绍
拉链表通常包含以下几个关键字段:
字段 | 描述 |
主键 | 唯一标识每一行数据 |
业务主键 | 唯一标识实体的主键。例如,会员ID等 |
属性字段 | 需要追踪变化的实体属性。例如,会员昵称、会员手机号等 |
有效开始日期 | 数据版本的有效开始时间 |
有效结束日期 | 数据版本的有效结束时间。通常用一个极大值“9999-12-31”表示当前数据有效 |
状态标识符 | 标识是否为最新版本 |
版本 | 标识相同业务主键的不同历史版本号 |
交易订单拉链表
案例介绍
数据表设计,本案例共设计2张数据表。
- 交易下单源表:ods_order_d,用于存放从业务库同步而来的每日增量数据。包含如下两个字段:
字段 | 描述 |
id | 业务主键订单 |
data_status | 追踪变化的订单状态 |
- 交易下单事实表(拉链表):dwd_order,用于拉链存储全量有效和失效的数据。包含如下四个字段:
字段 | 描述 |
id | 业务主键订单ID |
data_status | 追踪变化的订单状态 |
start_date | 有效开始日期 |
end_date | 有效结束时间 |
拉链表实现逻辑。
- 本案例将使用拉链表记录电商订单从开始到当前状态(创建/支付/完成)的所有变化信息。
任务开发:增量表数据准备
在数据准备环节,需要创建 ODPS SQL 临时查询,用于产出交易下单源表数据。
ods_order_di
-
表数据:交易下单源表,包含业务主键订单 ID 和需要追踪变化的订单状态 data_status 字段,用于存放每日新增交易下单数据。
-
在
ods_order_di
节点编辑页面,需要将以下测试数据,存储在ods_order_di
交易下单源表中。
id gmt_create gmt_modified data_status pt ------ ---------- -------------- ------------ ---------- 210001 2023-10-04 2023-10-04 创建 2023-10-04 210002 2023-10-04 2023-10-04 创建 2023-10-04 210001 2023-10-04 2023-10-05 支付 2023-10-05 210003 2023-10-05 2023-10-05 创建 2023-10-05 210004 2023-10-05 2023-10-05 创建 2023-10-05 210001 2023-10-04 2023-10-06 完成 2023-10-06 210002 2023-10-04 2023-10-06 支付 2023-10-06 210004 2023-10-05 2023-10-06 支付 2023-10-06 210005 2023-10-06 2023-10-06 创建 2023-10-06
- 在插入测试数据的时候,需要按照对应的分区(pt)进行补充,操作如下:
-- 1. 创建 ods_order_di 订单源表,存放每日新增交易下单数据。 -- 补充说明:为了便于理解,我们约定案例中的“订单创建时间、修改时间字段”粒度到天级别,且默认一个订单一天仅执行一次操作; -- 实际业务中一般时间字段粒度更细,并且一天可能有多次操作。 create table if not exists ods_order_di ( id bigint comment '订单id' ,gmt_create date comment '创建时间,格式为yyyy-mm-dd' ,gmt_modified date comment '更新时间,格式为yyyy-mm-dd' ,data_status string comment '状态' ) comment '交易下单源表' partitioned by ( pt string comment '日期,格式为yyyy-mm-dd' ) lifecycle 7; -- 2. 初始化历史交易下单新增数据。 -- 补充说明:由于案例测试需要,我们将3天的每日新增数据一次性分别插入分区'2023-10-04、2023-10-05、2023-10-06', -- 在实际业务场景中,一般每日更新当前业务日期分区,仅需将分区日期设置成调度参数变量即可。 insert overwrite table ods_order_di partition (pt='2023-10-04') values (210001,date'2023-10-04',date'2023-10-04','创建') ,(210002,date'2023-10-04',date'2023-10-04','创建'); insert overwrite table ods_order_di partition (pt='2023-10-05') values (210001,date'2023-10-04',date'2023-10-05','支付') ,(210003,date'2023-10-05',date'2023-10-05','创建') ,(210004,date'2023-10-05',date'2023-10-05','创建'); insert overwrite table ods_order_di partition (pt='2023-10-06') values (210001,date'2023-10-04',date'2023-10-06','完成') ,(210002,date'2023-10-04',date'2023-10-06','支付') ,(210004,date'2023-10-05',date'2023-10-06','支付') ,(210005,date'2023-10-06',date'2023-10-06','创建');
数据开发:拉链表的实现
创建 ODPS SQL 节点:dwd_order
,用于产出交易下单事实明细表数据。
-
表数据:交易下单事实明细表(拉链表),用于存放全量有效数据和全量失效数据。包含有效开始日期 start_date、有效结束时间 end_date、业务主键ID和追踪订单状态变化的 data_status 字段。
-
在
dwd_order
临时查询页面,输入如下示例代码:
create table if not exists dwd_order ( id bigint comment '订单id' ,gmt_create date comment '创建时间,格式为yyyy-mm-dd' ,gmt_modified date comment '更新时间,格式为yyyy-mm-dd' ,data_status string comment '状态' ,start_date date comment '生效日期,格式为yyyy-mm-dd' ,end_date date comment '失效日期,格式为yyyy-mm-dd,9999-12-31表示有效' ) comment '交易下单事实明细表(拉链表)';
- 在
dwd_order
节点编辑页面,输入如下示例代码:
-- 唯一标识:id + gmt_create + gmt_modified + data_status -- 拉链存储的实现逻辑:业务日期当日 ods_order_di 新增或更新的所有数据插入 dwd_order,且通过 end_date 标识为有效数据 -- dwd_order 中有更新过的有效数据修改成失效数据。 insert overwrite table dwd_order select id ,gmt_create ,gmt_modified ,data_status ,start_date ,end_date from ( select id ,gmt_create ,gmt_modified ,data_status ,start_date ,end_date -- 支持重跑:使用开窗函数,按 id, gmt_create, gmt_modified, data_status 分组,按 end_date 降序排序,仅取第一条数据。 ,row_number() over (distribute by id,gmt_create,gmt_modified,data_status sort by end_date desc ) as row_num from ( -- 存量历史数据与业务日期当日新增数据,通过主键关联 select a.id as id ,a.gmt_create as gmt_create ,a.gmt_modified as gmt_modified ,a.data_status as data_status ,a.start_date as start_date -- 如果当日新增数据不为空且存量历史数据的 end_date 等于固定日期 '9999-12-31',则将存量历史数据的 end_date 置为业务日期 -- 即:将存量历史数据置为失效数据 ,case when b.id is not null and cast(a.end_date as string) = '9999-12-31' then date'${biz_date}' else a.end_date end as end_date from dwd_order as a left outer join ( -- 当日新增数据 select id ,gmt_create ,gmt_modified ,data_status from ods_order_di t where pt = '${biz_date}' -- 若存量历史表中已存在相同的数据,则不重新纳入计算(防止数据重算而导致数据有效时间异常) and not exists ( select 1 from dwd_order p where t.id = p.id and t.gmt_create = p.gmt_create and t.gmt_modified = p.gmt_modified and t.data_status = p.data_status ) ) b on a.id = b.id union all -- 当日新增或更新的有效数据:业务日期当日 ods_order_di 新增或更新的所有数据插入 dwd_order,且通过 end_date 标识为有效数据。 select id ,gmt_create ,gmt_modified ,data_status ,date'${biz_date}' as start_date ,date'9999-12-31' as end_date from ods_order_di t where pt = '${biz_date}' -- 若存量历史表中已存在相同的数据,则不重新纳入计算(防止数据重算而导致数据有效时间异常) and not exists ( select 1 from dwd_order p where t.id = p.id and t.gmt_create = p.gmt_create and t.gmt_modified = p.gmt_modified and t.data_status = p.data_status ) ) ) where row_num = 1;
-
调度参数配置,使用手动输入的方式添加调度参数
biz_date=$[yyyy-mm-dd-1]
-
配置跨周期依赖。由于 dwd_order 表的当天数据处理依赖于自身前一日的数据产出,为了确保数据处理的正确性和连续性。通过配置自依赖,可以保障只有在上一周期的数据成功处理后,才会进行当前周期的数据加载,从而保障了数据处理流程的顺利进行。
拉链表数据加载逻辑
-
业务日期当日 dwd_order 表中,有被更新过的有效数据 “修改” 成失效数据;
-
业务日期当日 ods_order_di 表中,新增的所有数据插入 dwd_order 表中,状态标识为有效数据。
以业务日期 10.5 日拉链表数据加载为例,具体步骤如下:
- 原始数据:在 10.4 日的拉链表记录中,存在一条订单ID为 “210001”,其状态为 “创建”。
- 状态变更:到 10.5 日该订单状态被更新为 “支付”。
- 数据对比逻辑:
a. 将 10.4 日拉链表历史全量有效数据和增量表 “2023-10-05” 分区新增的数据关联查询。
b. 更新状态判断:如果订单ID为 “210001” 的订单在 10.4 日拉链表中存在,且 end_data 为 “9999-12-31”,说明该记录有效,同时该订单出现在增量表 “2023-10-05” 的分区中,则说明状态已更新。 - 更新处理:
a. 需要将原拉链表中记录 “end_date=9999-12-31” 修改为当前业务日期 “end_date=2023-10-05”。
b. 如果该订单ID未出现在增量表“2023-10-05”的分区中,则说明无更新,维持原记录不变。 - 数据覆盖:
完成上述步骤后,将拉链表中处理好的历史数据与增量表中新增的所有数据,一同覆盖写回拉链表中,最终产出 10.5 日拉链表数据。
拉链表的使用
- 查询 dwd_order 表中全量订单的所有历史快照。
- 查询 dwd_order 表中全量最新状态的有效数据,根据 end_date='9999-12-31' 查询
重跑数据验证
- 如上图,
id='210001 and gmt_modified='2023-10-06
这条记录为最新数据,现重跑pt='2023-10-04'
分区的数据,并检查历史数据状态。
由于已存在最新的数据 start_date='2023-10-06
,且存量历史数据已存在相同的记录 start_date='2023-10-04
所以重跑历史数据,不会对数据产生影响。
债券评级拉链表
案例介绍
数据表设计,本案例共设计2张数据表。
- 债券评级源表:ods_bondcredit_d,用于存放从业务库同步而来的每日全量数据。包含如下几个字段:
字段 | 描述 |
bondid | 业务主键债券ID |
bondcode | 债券组合代码 |
declaredate | 披露日期 |
changedate | 变动日期 |
creditrank | 信用等级 |
- 债券评级事实表(拉链表):dwd_bondcredit,用于拉链存储全量有效和失效的数据。包含如下几个字段:
字段 | 描述 |
bondid | 业务主键债券ID |
bondcode | 债券组合代码 |
declare_date | 披露日期 |
start_date | 有效开始日期 |
end_date | 有效结束时间 |
creditrank | 信用等级 |
status | 数据状态(0: 无效;1:有效) |
拉链表实现逻辑。
- 本案例将使用拉链表记录债券评级从开始到当前状态的所有变动信息。
任务开发:增量表数据准备
-
表数据:债券评级源表,包含业务主键债券 ID 和债券评级 creditrank 等 字段,用于存放全量债券评级数据。
-
将数据写入增量表
create table if not exists ods_bondcredit_d ( bondid bigint comment '债券ID' ,bondcode string comment '债券组合代码' ,declaredate date comment '披露日期' ,changedate date comment '变动日期' ,creditrank string comment '信用等级' ) comment '债券评级源表' partitioned by ( ds string comment '日期,格式为yyyy-mm-dd' ) lifecycle 7; insert overwrite table ods_bondcredit_d partition (ds = '2023-10-01') select cast(bond_cgchg_id as bigint) as bondid ,bondcode ,cast(declaredate as date) as declaredate ,cast(changedate as date) as changedate ,creditrank from ( select t.* ,row_number() over(partition by bondcode order by changedate) rn from ( select * from v_cjht_finchina_bond_cgchg where ds = '99991231' and bondcode in ( '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ' ) and cevaluitcode in ( '80002192', '80117870', '80002172', '80129270', '80062388', '80002154', '80000858', '80002099' ) ) t ) where rn = 1; insert overwrite table ods_bondcredit_d partition (ds = '2023-10-02') select cast(bond_cgchg_id as bigint) as bondid ,bondcode ,cast(declaredate as date) as declaredate ,cast(changedate as date) as changedate ,creditrank from ( select t.* ,row_number() over(partition by bondcode order by changedate) rn from ( select * from v_cjht_finchina_bond_cgchg where ds = '99991231' and bondcode in ( '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ' ) and cevaluitcode in ( '80002192', '80117870', '80002172', '80129270', '80062388', '80002154', '80000858', '80002099' ) ) t ) where rn = 2; insert overwrite table ods_bondcredit_d partition (ds = '2023-10-03') select cast(bond_cgchg_id as bigint) as bondid ,bondcode ,cast(declaredate as date) as declaredate ,cast(changedate as date) as changedate ,creditrank from ( select t.* ,row_number() over(partition by bondcode order by changedate) rn from ( select * from v_cjht_finchina_bond_cgchg where ds = '99991231' and bondcode in ( '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ' ) and cevaluitcode in ( '80002192', '80117870', '80002172', '80129270', '80062388', '80002154', '80000858', '80002099' ) ) t ) where rn = 3; insert overwrite table ods_bondcredit_d partition (ds = '2023-10-04') select cast(bond_cgchg_id as bigint) as bondid ,bondcode ,cast(declaredate as date) as declaredate ,cast(changedate as date) as changedate ,creditrank from ( select t.* ,row_number() over(partition by bondcode order by changedate) rn from ( select * from v_cjht_finchina_bond_cgchg where ds = '99991231' and bondcode in ( '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ' ) and cevaluitcode in ( '80002192', '80117870', '80002172', '80129270', '80062388', '80002154', '80000858', '80002099' ) ) t ) where rn = 4; insert overwrite table ods_bondcredit_d partition (ds = '2023-10-05') select cast(bond_cgchg_id as bigint) as bondid ,bondcode ,cast(declaredate as date) as declaredate ,cast(changedate as date) as changedate ,creditrank from ( select t.* ,row_number() over(partition by bondcode order by changedate) rn from ( select * from v_cjht_finchina_bond_cgchg where ds = '99991231' and bondcode in ( '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ' ) and cevaluitcode in ( '80002192', '80117870', '80002172', '80129270', '80062388', '80002154', '80000858', '80002099' ) ) t ) where rn = 5;
数据开发:拉链表的实现
创建 ODPS SQL 节点:dwd_bondcredit
,用于产出债券评级事实明细表数据。
-
表数据:债券评级事实明细表(拉链表),用于存放全量有效数据和全量失效数据。包含有效开始日期 start_date、有效结束时间 end_date 等字段。
-
在
dwd_bondcredit
临时查询页面,输入如下示例代码:
create table if not exists dwd_bondcredit ( bondid bigint comment '债券ID' ,bondcode string comment '债券组合代码' ,declare_date date comment '披露日期' ,start_date date comment '生效日期,格式为yyyy-mm-dd' ,end_date date comment '失效日期,格式为yyyy-mm-dd' ,creditrank string comment '信用等级' ,status string comment '状态' ) comment '债券评级事实明细表(拉链表)';
在 dwd_bondcredit
节点编辑页面,输入如下示例代码:
唯一标识:bondid insert overwrite table dwd_bondcredit select bondid ,bondcode ,declare_date ,start_date ,end_date ,creditrank ,status from ( select a.bondid as bondid ,a.bondcode as bondcode ,a.declare_date as declare_date ,a.start_date as start_date ,case when b.bondid is not null and a.status = '1' then b.changedate else a.end_date end as end_date ,a.creditrank as creditrank ,case when b.bondid is not null and a.status = '1' then '0' else a.status end as status from dwd_bondcredit a left outer join ( -- 当日新增数据 select bondid ,bondcode ,declaredate ,changedate ,creditrank from ods_bondcredit_d t where ds = '${biz_date}' and not exists ( select 1 from dwd_bondcredit p where t.bondid = p.bondid ) ) b on a.bondid = b.bondid union all select bondid ,bondcode ,declaredate ,changedate as start_date ,date'9999-12-31' as end_date ,creditrank as creditrank ,'1' as status from ods_bondcredit_d t where ds = '${biz_date}' and not exists ( select 1 from dwd_bondcredit p where t.bondid = p.bondid ) );
结果查询
将存量历史表变更为时间拉链表
保留历史变动信息
存在如下数据,同一个 bondcode,存在多个 changedate 字段值。现通过 changedate 字段衍生出 begin_dt 和 end_dt 时间拉链字段,并保留所有 changedate 历史数据。
with -- 删除表中的重复数据 a as ( select * from ( select t.* ,row_number() over(partition by bondcode, cevaluitcode, changedate order by changedate) rna from v_cjht_finchina_bond_cgchg t where t.bondcode in ( '21030600001SKZ', '18030200002ATZ', '19140100001TYY' ) ) where rna = 1 ), --按照 bondcode 和 cevaluitcode 字段分组并按照 changedate 排序 b as ( select t.* ,row_number() over(partition by bondcode, cevaluitcode order by changedate) rnb from a t ) -- 向下自关联 select t1.bondcode ,cast(t1.declaredate as date) as declaredate ,cast(t1.changedate as date) as start_dt ,case when cast(t2.changedate as date) is null then date'9999-12-31' else cast(t2.changedate as date) - 1 end as end_dt ,t1.cevaluitcode ,t1.cevaluit ,t1.creditrank ,case when cast(t2.changedate as date) is null then '1' else '0' end as status from b t1 left outer join b t2 on t1.bondcode = t2.bondcode and t1.cevaluitcode = t2.cevaluitcode and t1.rnb + 1 = t2.rnb order by t1.bondcode, t1.cevaluitcode, t1.changedate;
不保留历史变动信息
从上图中可知,bondcode='19140100001TYY' and cevaluitcode='80002172'
存在多笔相同的记录(changedate 不一样但是 creditrank 一样)。可进行合并。
基于排序后的结果集向下向上自关联,并重新编号:
where ( -- 上下都不同 (t1.creditrank != coalesce(t2.creditrank, '') and t1.creditrank != coalesce(t3.creditrank, '')) or -- 与下相同与上不同 (t1.creditrank = coalesce(t2.creditrank, '') and t1.creditrank != coalesce(t3.creditrank, '')) or -- 保留尾条 t2.creditrank is null )
根据如下条件筛选数据:
t1 = t2 and t1 = t3
:上下都相同,去除t1 != t2 and t1 != t3
:上下都不同,保留t1 = t2 and t1 != t3
:与下相同,与上不同,保留t1 != t2 and t1 = t3
:与下不同,与上相同,去除- 另外遇到倒数2条记录都相同的情况,尾条必须保留
向下自关联,并更新有效日期-完整代码:
with -- 删除表中的重复数据 a as ( select * from ( select t.* ,row_number() over(partition by bondcode, cevaluitcode, changedate order by changedate) rna from v_cjht_finchina_bond_cgchg t where t.bondcode in ( '21030600001SKZ', '18030200002ATZ', '19140100001TYY' ) ) where rna = 1 ), --按照 bondcode 和 cevaluitcode 字段分组并按照 changedate 排序 b as ( select t.bondcode ,cast(t.changedate as date) as changedate ,t.cevaluitcode ,t.cevaluit ,t.creditrank ,row_number() over(partition by bondcode, cevaluitcode order by changedate) rnb from a t ) -- 向下向上自关联,并重新编号 ,c as ( select t1.bondcode ,t1.changedate ,t1.cevaluitcode ,t1.cevaluit ,t1.creditrank ,row_number() over(partition by t1.bondcode, t1.cevaluitcode order by t1.changedate) rnc from b t1 left outer join b t2 on t1.bondcode = t2.bondcode and t1.cevaluitcode = t2.cevaluitcode and t1.rnb + 1 = t2.rnb left outer join b t3 on t1.bondcode = t3.bondcode and t1.cevaluitcode = t3.cevaluitcode and t1.rnb - 1 = t3.rnb where ( -- 上下都不同 (t1.creditrank != coalesce(t2.creditrank, '') and t1.creditrank != coalesce(t3.creditrank, '')) or -- 与下相同与上不同 (t1.creditrank = coalesce(t2.creditrank, '') and t1.creditrank != coalesce(t3.creditrank, '')) or -- 保留尾条 t2.creditrank is null ) ) -- 向下自关联 select t1.bondcode ,t1.changedate as begin_dt ,case when t2.changedate is null then date'9999-12-31' else t2.changedate - 1 end as end_dt ,t1.cevaluitcode ,t1.cevaluit ,t1.creditrank ,case when t2.changedate is null then '1' else '0' end as end_dt from c t1 left outer join c t2 on t1.bondcode = t2.bondcode and t1.cevaluitcode = t2.cevaluitcode and t1.rnc + 1 = t2.rnc order by t1.bondcode, t1.cevaluitcode, t1.changedate;
参考资料
MaxCompute 任务最佳实践 > 基于 MaxCompute 实现拉链表
原创文章,转载请注明出处:http://www.opcoder.cn/article/65/