UFIV
问题
这边接到后台的需求需要提供指标支持游戏平台的流量控速,按几个维度字段计算最近 72h 时间范围内的指标,每 1min 更新,统计逻辑比较简单,SQL 实现:
SELECT
window_start,
window_end,
SUM(amount) AS total_amount,
COUNT(DISTINCT user_id) AS distinct_users
FROM TABLE(
HOP(
TABLE orders,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE,
INTERVAL '72' HOUR
)
)
GROUP BY window_start, window_end
问题在于窗口很大而步长很短,对于 sum 能够增量累计加和的指标比较简单,而对于去重指标相当于要对每个窗口内的 uid 做去重。最直接的方式当然是直接实现在窗口内去重,但这要求所有的数据都缓存到 window state 中,当窗口触发时得把所有数据拉出来计算一遍,会使窗口触发的时间 cpu 飙升,这是任务开始 15min 后的截图,此时资源都跑满了。
实现
有这么高的资源消耗,需要看下 flink 对窗口是如何处理的,网站上只有概念的解释,直接看代码实现。当数据流转到 windowOperator#processElement,会根据 windowAssigner 分配窗口。
在我们当前的场景下,则会分配这条数据时间戳所在的所有滑动窗口,即 72*60=4320,再按每一个分配的窗口切换 window state,把数据写入到对应 window state 中,等于把每一条数据 copy 4320 次。对于滚动窗口或很小的滑动窗口,同时能实现增量累加的任务一般没有问题,因为 state的数据只有数据源里维度排列组合的数量,一般 state 需要存储的数据都很小。
但需求中数据源 72h 符合条件的上报有 15 万数据量,每条数据有 4320 个窗口,等于需要缓存 6.5 亿条数据,并在每个窗口触发时平均需要从 state 获取 2 万条(一个维度的key)数据。
优化
根据上述的逻辑,我们应该把去重转化成能增量累计的计算,这样每个窗口只需要缓存一条数据。理想是在 reduce function 中实现去重,但 flink 的实现 reduce 只是一个纯函数,框架不支持访问 state 或 context,当一条数据合并时也不带当前所在的窗口信息,即使访问 cache 也无法判断去重的范围。
因此需要换个思路在前面数据处理加点 trick,当消费到一条数据以 维度+uid 为 key,从 cache 中获取到同一个 uid 最近的时间戳(不存在则为0)。重新定义滑动窗口分配逻辑,如果 uid 最近的时间戳 previous_ts 在某个时间窗内,说明之前已经有一条数据进同一个时间窗了,那这条新数据则跳过分配相同的时间窗,从而保证一个窗口中只有唯一一个 uid。
timestamp = context.getCurrentProcessingTime();
long preConvTS = 0;
long curConvTS = 0;
if (element instanceof QuotaStat) {
QuotaStat quotaStat = (QuotaStat) element;
preConvTS = quotaStat.getConvInfo().getPreConvTS();
curConvTS = quotaStat.getConvInfo().getCurConvTS();
}
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart; start > timestamp - size; start -= slide) {
if (start > preConvTS) {
windows.add(new TimeWindow(start, start + size));
}
}
return windows;
跳过时间窗分配反而跳过后续时间窗处理数据的流程,只要能进入 reduce 函数都是新增,去重指标直接加 1。下面是优化后的效果,cpu 占用和 checkpoint 都很低。