FlinkTimerService机制分析

ProcessingTimeService

  • ProcessingTime时间服务,提供获取当前processing时间,注册timer等操作

public interface ProcessingTimeService {

	/**
	 * 获取当前Processing时间
	 * Returns the current processing time.
	 */
	long getCurrentProcessingTime();

	/**
	 * 注册一个timer异步
	 */
	ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target);

	/**
	 * Registers a task to be executed repeatedly at a fixed rate.
	 */
	ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period);

	/**
	 * Registers a task to be executed repeatedly with a fixed delay.
	 *
	 */
	ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period);

	CompletableFuture<Void> quiesce();
}

ProcessingTimeCallback

  • 用于执行timer触发的动作

public interface ProcessingTimeCallback {

	/**
	 * timer触发方法
	 */
	void onProcessingTime(long timestamp) throws Exception;
}

TimeService

  • 提供检测timeservice状态接口以及关闭方法

public interface TimerService extends ProcessingTimeService {

	/**
	 * timerService是否终止状态
	 */
	boolean isTerminated();

	/**
	 * 关闭TimerService
	 */
	void shutdownService();

	/**优雅关闭
	 */
	boolean shutdownServiceUninterruptible(long timeoutMs);
}

SystemProcessingTimeService

  • 系统ProcessTime服务,根据System#currentTimeMillis()指定ProcessingTime

public class SystemProcessingTimeService implements TimerService {

	private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
	/**
	 * 定义TimeService状态
	 */
	private static final int STATUS_ALIVE = 0;
	private static final int STATUS_QUIESCED = 1;
	private static final int STATUS_SHUTDOWN = 2;

	// ------------------------------------------------------------------------

	/** The executor service that schedules and calls the triggers of this task. */
	// 调度线程池,线程数为1
	private final ScheduledThreadPoolExecutor timerService;
	// 异常处理器
	private final ExceptionHandler exceptionHandler;
	// timeService状态
	private final AtomicInteger status;
	// 未完成的定时器
	private final CompletableFuture<Void> quiesceCompletedFuture;

	@VisibleForTesting
	SystemProcessingTimeService(ExceptionHandler exceptionHandler) {
		this(exceptionHandler, null);
	}

	SystemProcessingTimeService(ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {

		this.exceptionHandler = checkNotNull(exceptionHandler);
		// 默认状态 alive
		this.status = new AtomicInteger(STATUS_ALIVE);
		this.quiesceCompletedFuture = new CompletableFuture<>();

		if (threadFactory == null) {
			this.timerService = new ScheduledTaskExecutor(1);
		} else {
			this.timerService = new ScheduledTaskExecutor(1, threadFactory);
		}

		// tasks should be removed if the future is canceled,如果futrue被取消,task应该被移除
		this.timerService.setRemoveOnCancelPolicy(true);

		// make sure shutdown removes all pending tasks 确保关机删除所有未完成的任务
		// timeService关闭后,任务被终止和移除,相当于shutdownNow
		this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
		// 设置为true,标示关闭后执行,false标示不执行
		this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
	}

	@Override
	public long getCurrentProcessingTime() {
		// 系统时间
		return System.currentTimeMillis();
	}

	/**
	 * Registers a task to be executed no sooner than time {@code timestamp}, but without strong
	 * guarantees of order.
	 *
	 * @param timestamp Time when the task is to be enabled (in processing time)
	 * @param callback    The task to be executed
	 * @return The future that represents the scheduled task. This always returns some future,
	 *         even if the timer was shut down
	 */
	@Override
	public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {

		// 计算timestamp和getCurrentProcessingTime的延迟,在timestamp和当前时间的差值上再延迟1ms,为了watermark的判断,防止出现边界清空,小于watermakr的数据都会被丢弃
		long delay = ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, getCurrentProcessingTime());

		// we directly try to register the timer and only react to the status on exception
		// that way we save unnecessary volatile accesses for each timer
		try {
			// 在delay ms后执行wrapOnTimerCallback
			return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
		}
		catch (RejectedExecutionException e) {
			final int status = this.status.get();
			// 停止状态,没有timer
			if (status == STATUS_QUIESCED) {
				return new NeverCompleteFuture(delay);
			}
			else if (status == STATUS_SHUTDOWN) {
				throw new IllegalStateException("Timer service is shut down");
			}
			else {
				// something else happened, so propagate the exception
				throw e;
			}
		}
	}

	@Override
	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
		return scheduleRepeatedly(callback, initialDelay, period, false);
	}

	@Override
	public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
		return scheduleRepeatedly(callback, initialDelay, period, true);
	}

	private ScheduledFuture<?> scheduleRepeatedly(ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
		// 下次执行的时间
		final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
		// 获取调度任务
		final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period);

		// we directly try to register the timer and only react to the status on exception
		// that way we save unnecessary volatile accesses for each timer
		try {
			return fixedDelay
					? timerService.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.MILLISECONDS)
					: timerService.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
		} catch (RejectedExecutionException e) {
			final int status = this.status.get();
			if (status == STATUS_QUIESCED) {
				return new NeverCompleteFuture(initialDelay);
			}
			else if (status == STATUS_SHUTDOWN) {
				throw new IllegalStateException("Timer service is shut down");
			}
			else {
				// something else happened, so propagate the exception
				throw e;
			}
		}
	}

	/**
	 * @return {@code true} is the status of the service
	 * is {@link #STATUS_ALIVE}, {@code false} otherwise.
	 */
	@VisibleForTesting
	boolean isAlive() {
		return status.get() == STATUS_ALIVE;
	}

	@Override
	public boolean isTerminated() {
		return status.get() == STATUS_SHUTDOWN;
	}

	@Override
	public CompletableFuture<Void> quiesce() {
		if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
			timerService.shutdown();
		}

		return quiesceCompletedFuture;
	}

	@Override
	public void shutdownService() {
		if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) ||
				status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN)) {
			timerService.shutdownNow();
		}
	}

	/**
	 * Shuts down and clean up the timer service provider hard and immediately. This does wait
	 * for all timers to complete or until the time limit is exceeded. Any call to
	 * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method.
	 * @param time time to wait for termination.
	 * @param timeUnit time unit of parameter time.
	 * @return {@code true} if this timer service and all pending timers are terminated and
	 *         {@code false} if the timeout elapsed before this happened.
	 */
	@VisibleForTesting
	boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
		shutdownService();
		return timerService.awaitTermination(time, timeUnit);
	}

	@Override
	public boolean shutdownServiceUninterruptible(long timeoutMs) {

		final Deadline deadline = Deadline.fromNow(Duration.ofMillis(timeoutMs));

		boolean shutdownComplete = false;
		boolean receivedInterrupt = false;

		do {
			try {
				// wait for a reasonable time for all pending timer threads to finish
				shutdownComplete = shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
			} catch (InterruptedException iex) {
				// 强制终端
				receivedInterrupt = true;
				LOG.trace("Intercepted attempt to interrupt timer service shutdown.", iex);
			}
		} while (deadline.hasTimeLeft() && !shutdownComplete);

		if (receivedInterrupt) {
			Thread.currentThread().interrupt();
		}

		return shutdownComplete;
	}

	// safety net to destroy the thread pool
	// 垃圾回收的时候触发,强制关闭timerService
	@Override
	protected void finalize() throws Throwable {
		super.finalize();
		timerService.shutdownNow();
	}

	@VisibleForTesting
	int getNumTasksScheduled() {
		// 获取调度任务个数
		BlockingQueue<?> queue = timerService.getQueue();
		if (queue == null) {
			return 0;
		} else {
			return queue.size();
		}
	}

	// ------------------------------------------------------------------------

	private class ScheduledTaskExecutor extends ScheduledThreadPoolExecutor {

		public ScheduledTaskExecutor(int corePoolSize) {
			super(corePoolSize);
		}

		public ScheduledTaskExecutor(int corePoolSize, ThreadFactory threadFactory) {
			super(corePoolSize, threadFactory);
		}

		@Override
		protected void terminated() {
			super.terminated();
			quiesceCompletedFuture.complete(null);
		}
	}

	/**
	 * An exception handler, called when {@link ProcessingTimeCallback} throws an exception.
	 */
	interface ExceptionHandler {
		void handleException(Exception ex);
	}

	/**
	 * 将ProcessingTimeCallback包装成Runnable
	 * @param callback
	 * @param timestamp
	 * @return
	 */
	private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long timestamp) {
		return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0);
	}

	private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long nextTimestamp, long period) {
		return new ScheduledTask(status, exceptionHandler, callback, nextTimestamp, period);
	}

	/**
	 * Timer调度Task
	 */
	private static final class ScheduledTask implements Runnable {
		// 服务状态
		private final AtomicInteger serviceStatus;
		// 异常处理器
		private final ExceptionHandler exceptionHandler;
		// Processing回调函数
		private final ProcessingTimeCallback callback;
		// 下次触发的时间
		private long nextTimestamp;
		// 间隔的周期
		private final long period;

		ScheduledTask(
				AtomicInteger serviceStatus,
				ExceptionHandler exceptionHandler,
				ProcessingTimeCallback callback,
				long timestamp,
				long period) {
			this.serviceStatus = serviceStatus;
			this.exceptionHandler = exceptionHandler;
			this.callback = callback;
			this.nextTimestamp = timestamp;
			this.period = period;
		}

		@Override
		public void run() {
			// 判断服务状态
			if (serviceStatus.get() != STATUS_ALIVE) {
				return;
			}
			try {
				// 触发onProcessingTime
				callback.onProcessingTime(nextTimestamp);
			} catch (Exception ex) {
				exceptionHandler.handleException(ex);
			}
			// 周期调用,每隔period执行一次
			nextTimestamp += period;
		}
	}
}

NeverFireProcessingTimeService

  • 处理时间服务,其计时器永不触发,因此所有计时器都包含在savepoint中,可以获取当前系统的processing time

TimerService

  • timer服务包含获取processing时间、watermakr、注册timer等。

public interface TimerService {

	// 设置timer只支持keyed stream 
	/** Error string for {@link UnsupportedOperationException} on registering timers. */
	String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
	// 删除timer只支持keyed stream
	/** Error string for {@link UnsupportedOperationException} on deleting timers. */
	String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";

	/** Returns the current processing time. */
	long currentProcessingTime();

	/** Returns the current event-time watermark. */
	long currentWatermark();

	/**
	 * Registers a timer to be fired when processing time passes the given time.
	 *
	 * <p>Timers can internally be scoped to keys and/or windows. When you set a timer
	 * in a keyed context, such as in an operation on
	 * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
	 * will also be active when you receive the timer notification.
	 */
	void registerProcessingTimeTimer(long time);

	/**
	 * Registers a timer to be fired when the event time watermark passes the given time.
	 *
	 * <p>Timers can internally be scoped to keys and/or windows. When you set a timer
	 * in a keyed context, such as in an operation on
	 * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
	 * will also be active when you receive the timer notification.
	 */
	void registerEventTimeTimer(long time);

	/**
	 * Deletes the processing-time timer with the given trigger time. This method has only an effect if such a timer
	 * was previously registered and did not already expire.
	 *
	 * <p>Timers can internally be scoped to keys and/or windows. When you delete a timer,
	 * it is removed from the current keyed context.
	 */
	void deleteProcessingTimeTimer(long time);

	/**
	 * Deletes the event-time timer with the given trigger time. This method has only an effect if such a timer
	 * was previously registered and did not already expire.
	 *
	 * <p>Timers can internally be scoped to keys and/or windows. When you delete a timer,
	 * it is removed from the current keyed context.
	 */
	void deleteEventTimeTimer(long time);
}

SimpleTimerService

  • 使用InternalTimerService实现的SimpleTimerService

InternalTimerService

  • Timer是维护在JVM堆内存中的,如果频繁注册大量Timer,或者同时触发大量Timer,也是一笔不小的开销。

public interface InternalTimerService<N> {

	long currentProcessingTime();

	long currentWatermark();

	void registerProcessingTimeTimer(N namespace, long time);

	void deleteProcessingTimeTimer(N namespace, long time);

	void registerEventTimeTimer(N namespace, long time);

	void deleteEventTimeTimer(N namespace, long time);

	/**
	 * 遍历全部eventTimer
	 */
	void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception;
	void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception;
}
  • TimerService接口的实现类为SimpleTimerService,它实际上又是InternalTimerService的非常简单的代理。

  • InternalTimerService的实例由getInternalTimerService()方法取得,该方法定义在所有算子的基类AbstractStreamOperator中.

  • KeyedProcessOperator.processElement()方法调用用户自定义函数的processElement()方法,顺便将上下文实例ContextImpl传了进去,所以用户可以由它获得TimerService来注册Timer。

  • Timer在代码中叫做InternalTimer。

  • 当Timer触发时,实际上是根据时间特征调用onProcessingTime()/onEventTime()方法(这两个方法来自Triggerable接口),进而触发用户函数的onTimer()回调逻辑。后面还会见到它们。

最后更新于