什么是滑动窗口
在风控很多场景下,均需要基于用户的行为数据进行特征指标统计分析,在分析特征指标的时候,往往涉及到相关的时间跨度问题,比如1小时内用户的访问频次或者是近1小时内用户的频次,这两个指标仅只有一个字差别,但实际统计出来的数据可能就是千差万别。
1小时内用户的访问频次所指的是自然小时,比如平常所说的 1点-2点,2点-3点,3点-4点类似于这种,这种称之为固定时间窗口。
而近1小时内用户的访问频次,所指的是从当前时间往前推60分钟,所得出得统计时间跨度,比如当前时间是12.53,那所统计得时间跨度是11.53-12.53,这种称之为滑动时间窗口.
由于在用户访问的过程中,用户的行为会出现随时性和不确定性,如果采用固定时间窗口,经常会被一些有经验的黑产摸到对应的规律,很容易出现在跨两个时间段之间的指标漏洞,因此通常更多的情况下会采用滑动时间窗口。
在滑动窗口中统计周期以及窗口的大小,需要根据实际业务使用情况进行设定。
统计周期一致,窗口大小不一致:窗口越大统计精准度越低,但并发性能好;窗口越小,统计精准度越高,但并发性能随之降低
统计周期不一致,窗口大小一致:周期越长抗流量脉冲情况越好
滑动窗口的实现
要实现滑动窗口的指标统计,需要做以下三步
第一步:定义滑动窗口
public class Window { /** * 统计窗口长度 */ private final long windowLengthInMs; /** * 统计窗口的起始时间戳 */ private long windowStart; /** * 当前统计窗口中的统计数据 */ private T value; }
第二步:定义滑动窗口存储空间,将统计窗口分成N个桶,每个桶存储对应的统计数据,比如像按照秒级精度统计近1分钟的指标,则可以把滑动时间窗口定位1分钟分为60个桶。这样就可以得到近一分钟的统计指标,这是滑动窗口的关键部分
public class WindowBucket { /** * 窗口时间长度(毫秒) */ private int windowLengthInMs; /** *滑动窗口的桶数,越多代表统计精度越高 */ private int sampleCount; /** *滑动窗口总的时长,比如1分钟,1小时,单位为毫秒 */ private int intervalInMs; /** * 每个桶存储的数据 */ private final AtomicReferenceArray> array; private final ReentrantLock updateLock = new ReentrantLock; public WindowBucket(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); }//获取给定的时间的时间窗口。 public Window currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); long windowStart = calculateWindowStart(timeMillis); while (true) { Window old = array.get(idx); if (old == null) { /* B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update */ Window window = new Window(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { return window; } else { Thread.yield; } } else if (windowStart == old.windowStart) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * */ return old; } else if (windowStart > old.windowStart) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 */ if (updateLock.tryLock) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock; } } else { Thread.yield; } } else if (windowStart < old.windowStart) { // Should not go through here, as the provided time is already behind. return new Window(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } priavate Window resetWindowTo(Window window, long startTime) { window.resetTo(startTime); window.value.reset; return window; }//根据时间戳,计算出所在桶的 private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; return (int)(timeId % array.length); }//根据给定的时间戳,计算出所在桶的开始时间 private long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; }//根据给定的时间戳,给出时间窗口内的所有时间桶 public List values(long timeMillis) { if (timeMillis < 0) { return new ArrayList; } int size = array.length; List result = new ArrayList(size); for (int i = 0; i < size; i++) { Window window = array.get(i); if (window == null || isWindowDeprecated(timeMillis, window)) { continue; } result.add(window.value); } return result; }//判断是否为过期的桶数据 private boolean isWindowDeprecated(long time, Window window) { return time - window.windowStart > intervalInMs; }}
关键属性
/** * 窗口时间长度(毫秒) */ protected int windowLengthInMs; /** *滑动窗口的桶数,越多代表统计精度越高 */ protected int sampleCount; /** *滑动窗口总的时长,比如1分钟,1小时,单位为毫秒 */ protected int intervalInMs; /** * 每个桶存储的数据 */ protected final AtomicReferenceArray> array;
其中array就是整个时间窗口的存储空间,里面有所有桶的数据。
public WindowBucket(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); }
可以传入样本窗口的数量,以及整个时间窗的大小,如果样本数量是60,时间窗大小是1分钟,那么整个array数组中就会有60个样本窗口每一个样本窗口的时间长度为1秒。代表着每个桶里面存储的是1秒的统计数据。
currentWindow:通过时间戳定位 Bucket
存在几种情况
第一种情况是:当前时间所在Bucket第一次使用,还没有创建,那此时数组是空,进行新建即可,一般是项目启动时,时间未到达一个周期,数组还没有存储满,没有到复用阶段,所以数组元素可能为空
第二种情况:当前时间窗口,刚好等于所在的Bucket,返回当前的Window即可
第三种情况:当前时间窗口,使用了原有过期的数组,这个时候将原有的时间窗口进行复用,进行数据清除。
第三步:定义指标统计对象
public class Metric { private final WindowBucket data; public Metric(int sampleCount, int intervalInMs) { this.data = new WindowBucket(sampleCount, intervalInMs); } //获取整个时间窗口的统计数据 public long total { data.currentWindow; long total = 0; List list = data.values; for (LongAdder window : list) { total += window.sum; } return total; } //添加统计数据 public void add(int count) { Window window = data.currentWindow; window.value.add(count); } //获取当前时间下的时间分片的统计数据 public long get(long time){ Window window = data.currentWindow; return window.value.sum; }}
此次整个基于滑动窗口的统计就完成了,总的来说就是把滑动窗口将时间窗口划分为更小的时间片段,每过一个时间片段,时间窗口就会往右滑动一格,每个时间片段都有独立的计数器。在计算整个时间窗口内的请求总数时会累加所有的时间片段内的计数器。时间窗口划分的越细,那么滑动窗口的滚动就越平滑,统计就会越精确。
写在最后
基于滑动窗口的统计使用范围很广,比如文章前面所说的风控特征指标统计,微服务应用的流控等场景(限流开源框架Sentinel,本文的代码部分就是基于Sentinel的滑动窗口代码进行改写)。
转载此文是出于传递更多信息目的。若来源标注错误或侵犯了您的合法权益,请与本站联系,我们将及时更正、删除、谢谢。
https://www.414w.com/read/271999.html