本文通过案例为您介绍如何使用实时计算完成订单与销量的统计。

背景信息

以下案例是实时计算的合作伙伴袋鼠云通过阿里云实时计算来完成电商订单管理的案例。

业务架构图



业务流程:

  1. 用阿里云的DTS数据实时同步把您的数据同步到大数据总线(DataHub)。具体步骤请参见MySQL到DataHub数据实时同步
  2. 阿里云实时计算订阅大数据总线(DataHub)的数据进行实时计算。
  3. 将实时数据插入到RDS的云数据库。
  4. 再通过阿里云的DataV或者是其他的大屏做数据展示。

准备工作

将RDS for MySQL产生的增量数据实时同步到DataHub中的Topic。由RDS经过DTS数据同步到大数据总线(DataHub)Schema表的信息。

具体步骤请参见MySQL到DataHub数据实时同步

订单源表

字段名 数据类型 详情
dts_ordercodeofsys VARCHAR 订单编号
dts_paytime BIGINT 订单付款时间
dts_deliveredtime VARCHAR 订单发货时间
dts_storecode VARCHAR 店铺编号
dts_warehousecode VARCHAR 仓库code
dts_cancelled BIGINT 是否取消
dts_delivered BIGINT 是否发货
dts_receivercity VARCHAR 收货人城市
dts_receiverprovince VARCHAR 收货人省份
dts_record_id VARCHAR 记录ID
dts_operation_flag VARCHAR 操作Flag
dts_instance_id VARCHAR 数据库instanceId
dts_db_name VARCHAR 数据库名
dts_table_name VARCHAR 数据表
dts_utc_timestamp VARCHAR 更新时间
dts_before_flag VARCHAR 变更前标识
dts_after_flag VARCHAR 变更后标识

订单详情源表

字段名 数据类型 详情
dts_ordercodeofsys VARCHAR 订单编号
dts_skuname VARCHAR 商品名字
dts_skucode VARCHAR 商品编号
dts_quantity BIGINT 数量
dts_dividedamount DOUBLE 发货金额
dts_salechanneldividedamount DOUBLE 渠道销售金额
dts_initialcost DOUBLE 成本
dts_record_id VARCHAR 记录ID
dts_operation_flag VARCHAR 操作Flag
dts_instance_id VARCHAR 数据库instanceId
dts_db_name VARCHAR 数据库名字
dts_table_name VARCHAR 表名
dts_utc_timestamp VARCHAR 更新时间
dts_before_flag VARCHAR 变更前标识
dts_after_flag VARCHAR 变更后标识

编写业务逻辑

--数据的订单源表
create table orders_real(
dts_ordercodeofsys varchar,
dts_paytime bigint,
dts_deliveredtime varchar,
dts_storecode varchar,
dts_warehousecode varchar,
dts_cancelled bigint,
dts_delivered bigint,
dts_receivercity varchar,
dts_receiverprovince varchar,
dts_record_id varchar,
dts_operation_flag varchar,
dts_instance_id varchar,
dts_db_name varchar,
dts_table_name varchar,
dts_utc_timestamp varchar,
dts_before_flag varchar,
dts_after_flag varchar
)with(
type='datahub',
endPoint='http://dh-cn-****.com',
project='your',
topic='表名',
accessId='您的ID',
accessKey='您的KEY'
);

create table orderdetail_real(
dts_ordercodeofsys varchar,
dts_skuname varchar,
dts_skucode varchar,
dts_quantity bigint,
dts_dividedamount double,
dts_salechanneldividedamount double,
dts_initialcost double,
dts_record_id varchar,
dts_operation_flag varchar,
dts_instance_id varchar,
dts_db_name varchar,
dts_table_name varchar,
dts_utc_timestamp varchar,
dts_before_flag varchar,
dts_after_flag varchar
)with(
type='datahub',
endPoint='http://dh-cn-****.com',
project='yourPorjectName',
topic='yourTableName',
accessId='yourAccessId',
accessKey='yourAccessSecret'
);


create table ads_all_count_amount(
bill_date varchar,--下单时间
bill_count bigint,--总的订单总数
qty bigint,--总的销售量
primary key (bill_date)
)with(
type='rds',
url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
tableName='yourDatabaseTableName',
userName='yourDatabaseAccount',
password='yourDatabasePassword'
);


--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS
SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys ;

--订单详情表,有效的订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE WHEN dts_operation_flag='U'
AND dts_before_flag='Y'
AND dts_after_flag='N' THEN -1*dts_quantity
WHEN dts_operation_flag='U'
AND dts_before_flag='N'
AND dts_after_flag='Y' THEN dts_quantity
WHEN dts_operation_flag='D' THEN -1*dts_quantity
ELSE dts_quantity
END AS dts_quantity
FROM
orderdetail_real ;

--订单总单数,总销售量
INSERT INTO ads_all_count_amount
SELECT
FROM_UNIXTIME(cast(a.dts_paytime/1000000 AS bigint),'yyyyMMdd') AS bill_date,
COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count,
SUM(b.dts_quantity) AS qty
from
new_paytime a
join
new_orderdetail b
ON a.dts_ordercodeofsys=b.dts_ordercodeofsys
GROUP BY
FROM_UNIXTIME(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd');
			

难点解析

为了方便您理解结构化代码和代码维护,推荐使用View(数据视图概念)把业务逻辑差分成三个模块。

  • 模块1

    根据订单编号做分组。

    同一个编号订单会有多次业务操作(例如下单、付款、发货),会在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。

    CREATE VIEW new_paytime AS
    SELECT
    	dts_ordercodeofsys,
    	MAX(dts_paytime) AS dts_paytime
    FROM orders_real
    GROUP BY dts_ordercodeofsys
    					
  • 模块2
    生成有效订单的信息。
    --订单详情表,有效的订单的订单编码、商品名称、商品编号、数量的信息
    CREATE VIEW new_orderdetail AS
    SELECT
    dts_ordercodeofsys,
    dts_skuname,
    dts_skucode,
    CASE WHEN dts_operation_flag='U'
    		AND dts_before_flag='Y'
    		AND dts_after_flag='N' THEN -1*dts_quantity
    	WHEN dts_operation_flag='U'
    		AND dts_before_flag='N'
    		AND dts_after_flag='Y' THEN dts_quantity
    	WHEN dts_operation_flag='D' THEN -1*dts_quantity
    	ELSE dts_quantity
    	END AS dts_quantity
    		FROM orderdetail_real

    数据库日志会获取所有的数据记录的变更,而每个订单是有状态的。如下列表所示。

    字段名 数据类型 详情
    dts_record_id VARCHAR 记录ID。增量日志的唯一标识,唯一递增。如果变更类型为Update,那么增量更新会被拆分成2条,一条Insert,一条Delete。这两条记录具有相同的record_id
    dts_operation_flag VARCHAR

    标示这条增量日志的操作类型。取值为

    • I:Insert
    • D:Delete
    • U:Update
    dts_instance_id VARCHAR 数据库instanceId。这条增量日志所对应的数据库的Server ID
    dts_db_name VARCHAR 这条增量更新日志更新的表所在的数据库库名。
    dts_table_name VARCHAR 这条增量更新日志更新的表名。
    dts_utc_timestamp VARCHAR 这条增量日志的操作时间戳,为这个更新操作记录Binlog的时间戳。时间戳为UTC时间。
    dts_before_flag VARCHAR 表示这条增量日志后面带的各个Column值是否为更新前的值。取值包括:YN。当后面的Column为更新前的值时,dts_before_flag=Y,当后面的Column值为更新后的值时,dts_before_flag=N
    dts_after_flag VARCHAR 表示这条增量日志后面带的各个Column值是否为更新后的值。取值包括:YN。当后面的Column为更新前的值时,dts_after_flag=N,当后面的Column值为更新后的值时,dts_after_flag=Y

    对于不同的操作类型,增量日志中的dts_before_flagdts_after_flag定义如下:

    1. 操作类型为Insert
      所有Column值为新插入的记录值,即为更新后的值。所以 dts_before_flag=Ndts_after_flag=Y

    2. 操作类型为Update
      Update操作被拆为2条增量日志。这两条增量日志的 dts_record_iddts_operation_flagdts_utc_timestamp相同。第一条日志记录更新前的值,所以 dts_before_flag=Ydts_after_flag=N第二条日志记录了更新后的值,所以 dts_before_flag=Ndts_after_flag=Y

    3. 操作类型为Delete
      所有Column值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Ydts_after_flag=N

    说明

    Q:怎么判断是有效交易订单呢?

    A:首先是要满足dts_operation_flag=U或者dts_operation_flag=I,然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态。所以有效交易订单如下。

    dts_operation_flag='U'
    		AND dts_before_flag='N'
    		AND dts_after_flag='Y' THEN dts_quantity
    						

    Q:为什么THEN -1*dts_quantity呢?

    A:订单的取消或者是交易没有成功在总的销量里也会记录;为了保证总销量的正确性,所以把没有成交的订单数量设为负数在计算总的销量会减去这个数量。

  • 模块3
    统计总订单数和销售额。
    
    SELECT
    	from_unixtime(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd') AS bill_date,
    	COUNT(DISTINCTa.dts_ordercodeofsys) AS bill_count,
    	SUM(b.dts_quantity) AS qty
    from
    	(new_paytime) a
    join
    	(new_orderdetail) b
    ON
    	a.dts_ordercodeofsys=b.dts_ordercodeofsys
    GROUP BY
    from_unixtime(CAST(a.dts_paytime/1000000 AS bigint),'yyyyMMdd');
    						

    Q:为什么订单源表和订单详情要做JOIN操作?

    A:new_paytime查出的是最新交易的时间的所有的订单编号;new_orderdetail查询的是所有的有效的订单的订单编码、商品名称、商品编号、数量的信息;两张表Join是为方便用户来统计订单总数和总的销量。

DEMO示例以及源代码

根据上文介绍的订单与销量统计解决方案,为您创建了一个包含完整链路的DEMO示例,如下。
  • DataHub作为源表
  • RDS作为结果表
DEMO代码完整,您可参考示例代码,注册上下游数据,制定自己的订单与销量统计解决方案。点击DEMO代码进行下载。