ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

Flink Windows

2022-04-08 15:31:24  阅读:141  来源: 互联网

标签:function 窗口 Windows Flink window new return public


Windows 属性

Windows 就是基于ListState 和 AggregateState来做的存储,Windows里面有三个重要的属性,Assigner、Trigger、Evictor (非必须)。

WindowsAssigner

TumblingEventTimeWindows

protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger) {
    if (Math.abs(offset) >= size) {
        throw new IllegalArgumentException(
                "TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
    }

    this.size = size;
    this.globalOffset = offset;
    this.windowStagger = windowStagger;
}

WindowsAssigner 是什么,WindowsAssigner就是给定一条数据,根据定义的Assigner 把这条数据分配到某一个窗口。例如 TumblingEventTimeWindows,定义需要指定 窗口大小,窗口的offset。

public Collection<TimeWindow> assignWindows(
        Object element, long timestamp, WindowAssignerContext context) {
    if (timestamp > Long.MIN_VALUE) {
        if (staggerOffset == null) {
            staggerOffset =
                    windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
        }
        // Long.MIN_VALUE is currently assigned when no timestamp is present
        long start =
                TimeWindow.getWindowStartWithOffset(
                        timestamp, (globalOffset + staggerOffset) % size, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    } else {
        throw new RuntimeException(
                "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
                        + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
                        + "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}



public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
}

分配窗口其实很简单,就是 timestamp - (timestamp - offset + windowSize) % windowSize 假如当前时间是 10:30 用户定义的 windoSize 是 1h,没有指定offset ,窗口就是 [10:00,11:00) ,但是有特殊需求就想看[10:15,11:15) ,这时候就需要指定 offset 为 15min.

例如滑动窗口,一条数据可能属于多个窗口,所以这边返回是List [ Window ]

Trigger

Trigger 就是触发器,可以触发窗口进行计算。

TriggerResult

public enum TriggerResult {

    /** No action is taken on the window. */
    CONTINUE(false, false),

    /** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
    FIRE_AND_PURGE(true, true),

    /**
     * On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,
     * though, all elements are retained.
     */
    FIRE(true, false),

    /**
     * All elements in the window are cleared and the window is discarded, without evaluating the
     * window function or emitting any elements.
     */
    PURGE(false, true);

    // ------------------------------------------------------------------------

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}

TriggerResult 触发器有几个状态,

CONTINUE :不做任何东西

FIRE:触发窗口计算,不清空状态

PURGE:不触发窗口计算,清空状态

FIRE_AND_PURGE:触发窗口计算,清空状态

EventTimeTrigger

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

}

onElement 是来一条数据调用一次,EventTimeTrigger 是获取数据时间和窗口时间最大值比较,如果超过了,触发窗口。

onEventTime 是当使用触发器上下文设置的事件时间计时器触发时调用,是事件时间计时器触发时时间和窗口时间最大值比较,如果超过了,触发窗口。

onProcessingTime 是当使用触发器上下文设置的处理时间计时器触发时调用

clear 是清除触发器上下文设置的事件时间计时器的时间

onMerge 是针对Session Window的方法

这边有一个细节就是:onElement 里面有一步,其实注册窗口触发的时间,目的是在Watermark超过这个时间的时候可以触发窗口。

ctx.registerEventTimeTimer(window.maxTimestamp());

Evictor

evictor 是删除器,在出发窗口计算的前后,针对iterator 进行删除数据

CountEvictor

public class CountEvictor<W extends Window> implements Evictor<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;
    private final boolean doEvictAfter;

    private CountEvictor(long count, boolean doEvictAfter) {
        this.maxCount = count;
        this.doEvictAfter = doEvictAfter;
    }

    private CountEvictor(long count) {
        this.maxCount = count;
        this.doEvictAfter = false;
    }

    @Override
    public void evictBefore(
            Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (!doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    @Override
    public void evictAfter(
            Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
        if (size <= maxCount) {
            return;
        } else {
            int evictedCount = 0;
            for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                    iterator.hasNext(); ) {
                iterator.next();
                evictedCount++;
                if (evictedCount > size - maxCount) {
                    break;
                } else {
                    iterator.remove();
                }
            }
        }
    }

    /**
     * Creates a {@code CountEvictor} that keeps the given number of elements. Eviction is done
     * before the window function.
     *
     * @param maxCount The number of elements to keep in the pane.
     */
    public static <W extends Window> CountEvictor<W> of(long maxCount) {
        return new CountEvictor<>(maxCount);
    }

    /**
     * Creates a {@code CountEvictor} that keeps the given number of elements in the pane Eviction
     * is done before/after the window function based on the value of doEvictAfter.
     *
     * @param maxCount The number of elements to keep in the pane.
     * @param doEvictAfter Whether to do eviction after the window function.
     */
    public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) {
        return new CountEvictor<>(maxCount, doEvictAfter);
    }
}

例如这个Flink CountEvictor ,只计算窗口的后N条数据。evictBefore 是计算前剔除,evictAfter 是计算后剔除。

WindowedStream

WindowedStream

public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {

    this.input = input;

    this.builder =
            new WindowOperatorBuilder<>(
                    windowAssigner,
                    windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
                    input.getExecutionConfig(),
                    input.getType(),
                    input.getKeySelector(),
                    input.getKeyType());
}


public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
    builder.trigger(trigger);
    return this;
}


public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
    outputTag = input.getExecutionEnvironment().clean(outputTag);
    builder.sideOutputLateData(outputTag);
    return this;
}



public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
    builder.evictor(evictor);
    return this;
}



public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
        AggregateFunction<T, ACC, V> aggregateFunction,
        ProcessWindowFunction<V, R, K, W> windowFunction,
        TypeInformation<ACC> accumulatorType,
        TypeInformation<V> aggregateResultType,
        TypeInformation<R> resultType) {
    checkNotNull(aggregateFunction, "aggregateFunction");
    checkNotNull(windowFunction, "windowFunction");
    checkNotNull(accumulatorType, "accumulatorType");
    checkNotNull(aggregateResultType, "aggregateResultType");
    checkNotNull(resultType, "resultType");
    if (aggregateFunction instanceof RichFunction) {
        throw new UnsupportedOperationException(
                "This aggregate function cannot be a RichFunction.");
    }
    // clean the closures
    windowFunction = input.getExecutionEnvironment().clean(windowFunction);
    aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
    final String opName = builder.generateOperatorName(aggregateFunction, windowFunction);
    OneInputStreamOperator<T, R> operator =
            builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
    return input.transform(opName, resultType, operator);
}




public <R> SingleOutputStreamOperator<R> process(
        ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
    function = input.getExecutionEnvironment().clean(function);
    final String opName = builder.generateOperatorName(function, null);
    OneInputStreamOperator<T, R> operator = builder.process(function);
    return input.transform(opName, resultType, operator);
}

WindowedStream 里面会构造一个 WindowOperatorBuilder 传入 assigner,trigger、sideOutputLateData、evictor。

当后面接 aggregate 、process 时就会构造 WindowOperator,如果存在 evictor 就会构造 EvictorWindowOperator。

构造WindowOperator

Aggregate方式的窗口

WindowOperatorBuilder#aggregate(AggregateFunction<T,ACC,V>, ProcessWindowFunction<V,R,K,W>, TypeInformation)

public <ACC, V, R> WindowOperator<K, T, ?, R, W> aggregate(
        AggregateFunction<T, ACC, V> aggregateFunction,
        ProcessWindowFunction<V, R, K, W> windowFunction,
        TypeInformation<ACC> accumulatorType) {

    Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
    Preconditions.checkNotNull(windowFunction, "ProcessWindowFunction cannot be null");

    if (aggregateFunction instanceof RichFunction) {
        throw new UnsupportedOperationException(
                "This aggregate function cannot be a RichFunction.");
    }

    if (evictor != null) {
        return buildEvictingWindowOperator(
                new InternalAggregateProcessWindowFunction<>(
                        aggregateFunction, windowFunction));
    } else {
        AggregatingStateDescriptor<T, ACC, V> stateDesc =
                new AggregatingStateDescriptor<>(
                        WINDOW_STATE_NAME,
                        aggregateFunction,
                        accumulatorType.createSerializer(config));

        return buildWindowOperator(
                stateDesc, new InternalSingleValueProcessWindowFunction<>(windowFunction));
    }
}



private <R> WindowOperator<K, T, Iterable<T>, R, W> buildEvictingWindowOperator(
        InternalWindowFunction<Iterable<T>, R, K, W> function) {
    @SuppressWarnings({"unchecked", "rawtypes"})
    TypeSerializer<StreamRecord<T>> streamRecordSerializer =
            (TypeSerializer<StreamRecord<T>>)
                    new StreamElementSerializer(inputType.createSerializer(config));

    ListStateDescriptor<StreamRecord<T>> stateDesc =
            new ListStateDescriptor<>(WINDOW_STATE_NAME, streamRecordSerializer);

    return new EvictingWindowOperator<>(
            windowAssigner,
            windowAssigner.getWindowSerializer(config),
            keySelector,
            keyType.createSerializer(config),
            stateDesc,
            function,
            trigger,
            evictor,
            allowedLateness,
            lateDataOutputTag);
}




private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator(
        StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDesc,
        InternalWindowFunction<ACC, R, K, W> function) {

    return new WindowOperator<>(
            windowAssigner,
            windowAssigner.getWindowSerializer(config),
            keySelector,
            keyType.createSerializer(config),
            stateDesc,
            function,
            trigger,
            allowedLateness,
            lateDataOutputTag);
}
  1. aggregate 对应两种 EvictingWindowOperator和 WindowOperator 。无论是哪种,都需要传入StateDescriptor,这个State 就存放窗口里面数据的状态。

  2. 如果是WindowOperator 就对应 AggregatingStateDescriptor,如果是 EvictingWindowOperator 则就是 ListStateDescriptor。其实很简单,EvictingWindowOperator需要对整个窗口里的数据进行处理判断是否剔除,如果使用的是 AggregatingStateDescriptor 就无法对聚合后的数据进行剔除。

Process方式的窗口

public <R> WindowOperator<K, T, ?, R, W> process(ProcessWindowFunction<T, R, K, W> function) {
    Preconditions.checkNotNull(function, "ProcessWindowFunction cannot be null");
    return apply(new InternalIterableProcessWindowFunction<>(function));
}

private <R> WindowOperator<K, T, ?, R, W> apply(
        InternalWindowFunction<Iterable<T>, R, K, W> function) {
    if (evictor != null) {
        return buildEvictingWindowOperator(function);
    } else {
        ListStateDescriptor<T> stateDesc =
                new ListStateDescriptor<>(
                        WINDOW_STATE_NAME, inputType.createSerializer(config));

        return buildWindowOperator(stateDesc, function);
    }
}
  1. Process 对应两种 EvictingWindowOperator和 WindowOperator 。无论是哪种,都是ListStateDescriptor,这个State 就存放窗口里面数据的状态。
  2. 就性能来说 AggregateState 肯定是优于 ListStateDescriptor。

窗口处理数据

WindowOperator 处理窗口数据

基于数据时间

WindowOperator

public void processElement(StreamRecord<IN> element) throws Exception {
		// 根据 windowAssigner 分配窗口
    final Collection<W> elementWindows =
            windowAssigner.assignWindows(
                    element.getValue(), element.getTimestamp(), windowAssignerContext);

    // if element is handled by none of assigned elementWindows
    boolean isSkippedElement = true;

    final K key = this.<K>getKeyedStateBackend().getCurrentKey();

    if (windowAssigner instanceof MergingWindowAssigner) {
       // merge 这边不展开说
       ... 
    } else {
        for (W window : elementWindows) {

						// 如果数据时间超过 (分配的窗口时间最大时间+允许迟到的时间) 就丢弃
            // drop if the window is already late 
            if (isWindowLate(window)) {
                continue;
            }
            isSkippedElement = false;

						// 这个 windowState 就是之前定义的ListState或者是AggreateState
						// 因为当前已经是KeyBy 或者是没有Keyed,每个窗口之间的数据都要进行隔离,需要设置namespace 实际上就是 key的作用
            windowState.setCurrentNamespace(window);
            // 数据放入 状态里
            windowState.add(element.getValue());

            triggerContext.key = key;
            triggerContext.window = window;
						//  triggerContext 拿到书记,判断是否触发窗口
            TriggerResult triggerResult = triggerContext.onElement(element);
						// 如果是触发,把状态里的数据进行计算
						// 调用我们自己写的ProcessWindowFunction 
            if (triggerResult.isFire()) {
                ACC contents = windowState.get();
                if (contents == null) {
                    continue;
                }
                emitWindowContents(window, contents);
            }
						// 如果是purge 就清理窗口
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
            // 清理计时器
            registerCleanupTimer(window);
        }
    }

    // side output input event if
    // element not handled by any window
    // late arriving tag has been set
    // windowAssigner is event time and current timestamp + allowed lateness no less than
    // element timestamp
   	// 迟到数据侧输出
    if (isSkippedElement && isElementLate(element)) {
        if (lateDataOutputTag != null) {
            sideOutput(element);
        } else {
            this.numLateRecordsDropped.inc();
        }
    }
}



// 调用我们自己写的ProcessWindowFunction 
private void emitWindowContents(W window, ACC contents) throws Exception {
    timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
    processContext.window = window;
    userFunction.process(
            triggerContext.key, window, processContext, contents, timestampedCollector);
}
  1. 以上代码是基于事件驱动,如果不来数据窗口是没法触发的。

基于Watermark

InternalTimerServiceImpl#advanceWatermark

public void advanceWatermark(long time) throws Exception {
    currentWatermark = time;
    InternalTimer<K, N> timer;
    while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
        eventTimeTimersQueue.poll();
        keyContext.setCurrentKey(timer.getKey());
        triggerTarget.onEventTime(timer);
    }
}

eventTimeTimersQueue 里面就是注册窗口最大的时间戳,这边进行比较,如果超过了就去触发注册的事件。

WindowOperator#onEventTime

@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
    triggerContext.key = timer.getKey();
    triggerContext.window = timer.getNamespace();

    MergingWindowSet<W> mergingWindows;

    if (windowAssigner instanceof MergingWindowAssigner) {
        mergingWindows = getMergingWindowSet();
        W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
        if (stateWindow == null) {
            // Timer firing for non-existent window, this can only happen if a
            // trigger did not clean up timers. We have already cleared the merging
            // window and therefore the Trigger state, however, so nothing to do.
            return;
        } else {
            windowState.setCurrentNamespace(stateWindow);
        }
    } else {
        windowState.setCurrentNamespace(triggerContext.window);
        mergingWindows = null;
    }

    TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());

    if (triggerResult.isFire()) {
        ACC contents = windowState.get();
        if (contents != null) {
            emitWindowContents(triggerContext.window, contents);
        }
    }

    if (triggerResult.isPurge()) {
        windowState.clear();
    }

    if (windowAssigner.isEventTime()
            && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
        clearAllState(triggerContext.window, windowState, mergingWindows);
    }

    if (mergingWindows != null) {
        // need to make sure to update the merging state in state
        mergingWindows.persist();
    }
}

这里就是触发窗口进行计算

EvictingWindowOperator 处理窗口数据

EvictingWindowOperator#emitWindowContents


private void emitWindowContents(
        W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState)
        throws Exception {
    timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());

    // Work around type system restrictions...
    FluentIterable<TimestampedValue<IN>> recordsWithTimestamp =
            FluentIterable.from(contents)
                    .transform(
                            new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
                                @Override
                                public TimestampedValue<IN> apply(StreamRecord<IN> input) {
                                    return TimestampedValue.from(input);
                                }
                            });
    evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));

    FluentIterable<IN> projectedContents =
            recordsWithTimestamp.transform(
                    new Function<TimestampedValue<IN>, IN>() {
                        @Override
                        public IN apply(TimestampedValue<IN> input) {
                            return input.getValue();
                        }
                    });

    processContext.window = triggerContext.window;
    userFunction.process(
            triggerContext.key,
            triggerContext.window,
            processContext,
            projectedContents,
            timestampedCollector);
    evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));

    // work around to fix FLINK-4369, remove the evicted elements from the windowState.
    // this is inefficient, but there is no other way to remove elements from ListState, which
    // is an AppendingState.
    windowState.clear();
    for (TimestampedValue<IN> record : recordsWithTimestamp) {
        windowState.add(record.getStreamRecord());
    }
}

  1. 其实刚刚看了WIndowOperator 之后,加上 evictor 也比较简单,就是拿到状态里的窗口数据,在计算前后进行数据剔除就可以了
  2. 还有一个细节需要注意:就是剔除后会对窗口状态清除,将剔除后的数据放入状态中,也就是下次触发的时候就没有剔除后的数据了。

标签:function,窗口,Windows,Flink,window,new,return,public
来源: https://www.cnblogs.com/chouc/p/16117336.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有