深入源码层面,学习Flink窗口操作的原理,这里只挑了各部分的一个进行分析
依赖关系 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 Window GlobalWindow(放置所有数据的默认窗口) TimeWindow(表示一段时间间隔的窗口) WindowAssigner -MerginWindowAssigner(窗口是可以合并的) -DynamicEventTimeSessionWindows -DynamicProcessingTimeSessionWindows -EventTimeSessionWindows -ProcessingTimeSessionWindows -SlidingEventTimeWindows(滑动窗口) -SlidingTimeWindows -SlidingProcessingTimeWindows(滑动窗口) -TumblingEventTimeWindows(滚动窗口) -TumblingTimeWindows -TumblingProcessionTimeWindows(滚动窗口) -GlobalWindwos(将所有元素分配在一个窗口中) Trigger -ContinuousEventTimeTrigger(基于给定时间间隔连续触发,计算基于水印) -ContinuousProcessingTimeTrigger(基于给定时间间隔连续触发,计算基于ProcessingTime) -CountTrigger(每maxCount触发一次计算) -用于DataStream -用于KeyedStream -DeltaTrigger(此触发器计算上次触发的数据点与当前到达的数据点之间的增量。如果增量高于指定的阈值,则会触发。) 需要用户自己实现DeltaFunction -EventTimeTrigger(按照EventTime判断是否触发计算) -用于EventTimeWindows -NeverTrigger(一个从不触发的触发器,作为GlobalWindows的默认触发器) -用于GlobalWindows -ProcessingTimeTrigger(按照ProcessingTime判断是否触发计算) -用于ProcessingTimeWindows -PurgingTrigger(包装类,将TriggerResult为FIRE的改为FIRE_AND_PURGE) -用于DataStream -用于KeyedStream -用于DataStreamGroupWindowAggregateBase -StateCleaningCountTrigger(GlobalWindow)(触发清理定时器触发或元素达到maxCount触发) -用于DataStreamGroupWindowAggregateBase TriggerResult CONTINUE(不做任何操作) FIRE_AND_PURGE FIRE(处理窗口数据) PURGE(移除窗口和窗口中的数据) Evictor -CountEvictor(以maxCount为判断标准,决定元素是否被移除) -DeltaEvictor(计算每个元素与最后一个元素的Delta值,与threshold进行对比,如果大于等于,则移除该元素) -TimeEvictor(以时间为判断标准,决定元素是否会被移除) Timer
SlidingEventTimeWindows 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 成员变量 size:窗口大小,slide窗口步长,offset偏移量 # 划分窗口 数组结构ArrayList<TimeWindow>:大小为size/slide 窗口开始时间:timestamp - (timestamp - offset + slide) % slide; 数组内窗口:new TimeWindow(start, start + size) # 默认Trigger EventTimeTrigger # 使用 DataStream<Tuple2<String, Integer>> in = ...; KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...); WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)));
EventTimeTrigger 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 如果window中的最大时间戳小于当前水印 FIRE # 如果window中的最大时间戳大于当前水印 注册Timer定时器 CONTINUE # 注册事件时间回调。当当前水印通过时,将使用此处指定的时间调用指定的时间。 Trigger TriggerContext.registerEventTimeTimer(long time) # WindowOperator Context.registerEventTimeTimer(long time) # InternalTimerService registerEventTimeTimer(N namespace, long time) # 注册事件时间水印超过给定时间时要触发的计时器。计时器触发时,将提供您在此处传递的命名空间。 InternalTimerServiceImpl registerEventTimeTimer(N namespace, long time) # 当前正在运行的EventTime定时器队列 KeyGroupedInternalPriorityQueue add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))
CountEvictor 1 2 3 4 5 6 7 8 9 10 11 12 13 evict方法 如果size小于设置的最大数值,则可以返回 否则将迭代元素,并删除多出的元素 int evictedCount = 0; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){ iterator.next(); evictedCount++; if (evictedCount > size - maxCount) { break; } else { iterator.remove(); } }
WindowOperator工作流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 @Override public void processElement (StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); boolean isSkippedElement = true ; final K key = this .<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { @Override public void merge (W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (!windowAssigner.isEventTime()) { long currentProcessingTime = internalTimerService.currentProcessingTime(); if (mergeResult.maxTimestamp() <= currentProcessingTime) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than the current processing time " + "by merging. Current processing time: " + currentProcessingTime + " window: " + mergeResult); } } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue ; } isSkippedElement = false ; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null ) { throw new IllegalStateException("Window " + window + " is not in in-flight window set." ); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = actualWindow; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null ) { continue ; } emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(actualWindow); } mergingWindows.persist(); } else { for (W window: elementWindows) { if (isWindowLate(window)) { continue ; } isSkippedElement = false ; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null ) { continue ; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null ){ sideOutput(element); } else { this .numLateRecordsDropped.inc(); } } }