Spark部署模式
心跳接收器HeartbeatReceiver
运行在Driver上,用来接受各个Executor的心跳消息,对各个Executor的"状态"进行监控。
HeartbeatReceiver属性
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint with Logging {
def this(sc: SparkContext) {
this(sc, new SystemClock)
}
sc.listenerBus.addToManagementQueue(this)
override val rpcEnv: RpcEnv = sc.env.rpcEnv
private[spark] var scheduler: TaskScheduler = null
// executor ID -> timestamp of when the last heartbeat from this executor was received
// 维护executorId和这个executor最后接收hearbeat的时间
private val executorLastSeen = new mutable.HashMap[String, Long]
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
// executor 超时时间,根据 spark.network.timeout 和 spark.storage.blockManagerSlaveTimeoutMs决定
private val executorTimeoutMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s")
// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
// 超时间隔
private val timeoutIntervalMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
// 检查超时的间隔(单位为ms)。可通过spark.network.time-outInterval属性配置,默认采用timeoutIntervalMs的值。
private val checkTimeoutIntervalMs =
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000
// 超时校验线程
private var timeoutCheckingTask: ScheduledFuture[_] = null
// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
// block the thread for a long time.
// 循环处理事件线程
private val eventLoopThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
// kill executor 线程
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
相关方法
注册Executor
// 调用addExecutor(executorAdded.executorId)
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
// 维护executor
addExecutor(executorAdded.executorId)
}
def addExecutor(executorId: String): Option[Future[Boolean]] = {
// 发送ExecutorRegistered消息 都是通过akka方式
Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
}
// 处理ExecutorRegistered事件
executorLastSeen(executorId) = clock.getTimeMillis()
context.reply(true)
移除Executor
// 发送ExecutorRemoved消息
def removeExecutor(executorId: String): Option[Future[Boolean]] = {
Option(self).map(_.ask[Boolean](ExecutorRemoved(executorId)))
}
case ExecutorRemoved(executorId) =>
executorLastSeen.remove(executorId)
context.reply(true)
Executor分析
相关属性
private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil, // 用户指定的类路径。可通过spark.executor.extraClassPath属性进行配置。如果有多个类路径,可以在配置时用英文逗号分隔。
isLocal: Boolean = false,
uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler //spark捕获异常处理器
)
extends Logging {
logInfo(s"Starting executor ID $executorId on host $executorHostname")
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
// 当前执行的Task所需要的文件。
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
// 当前执行的Task所需要的Jar包。
private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
private val conf = env.conf
// No ip or host:port - just hostname
Utils.checkHost(executorHostname)
// must not have port specified.
assert (0 == Utils.parseHostPort(executorHostname)._2)
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(executorHostname)
if (!isLocal) {
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler)
}
// Start worker thread pool
// 启动worker线程池
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
override def newThread(r: Runnable): Thread =
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
// will hang forever if some methods are interrupted.
new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
})
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
// 采集executor worker线程池运行相关状态信息
private val executorSource = new ExecutorSource(threadPool, executorId)
// Pool used for threads that supervise task killing / cancellation
// 用于监督任务的kill和取消
private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper")
// For tasks which are in the process of being killed, this map holds the most recently created
// TaskReaper. All accesses to this map should be synchronized on the map itself (this isn't
// a ConcurrentHashMap because we use the synchronization for purposes other than simply guarding
// the integrity of the map's internal state). The purpose of this map is to prevent the creation
// of a separate TaskReaper for every killTask() of a given task. Instead, this map allows us to
// track whether an existing TaskReaper fulfills the role of a TaskReaper that we would otherwise
// create. The map key is a task id.
// 用户缓存正在被kill的Task的身份标识与执行kill工作的任务收割者(TaskReaper)之间的映射关系。
private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]()
if (!isLocal) {
env.blockManager.initialize(conf.getAppId)
env.metricsSystem.registerSource(executorSource)
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
}
// Whether to load classes in user jars before those in Spark jars
// 是否先加载用户jar中的类,然后再加载Spark jar中的类
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
// Whether to monitor killed / interrupted tasks
// 是否监控kill和中断的任务
private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false)
// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
// Task需要的类加载器。
private val urlClassLoader = createClassLoader()
// spark-shell/spark-sql使用的类加载器
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
// SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads
// for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too.
env.serializerManager.setDefaultClassLoader(replClassLoader)
/**
* executor插件
*/
private val executorPlugins: Seq[ExecutorPlugin] = {
val pluginNames = conf.get(EXECUTOR_PLUGINS)
if (pluginNames.nonEmpty) {
logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}")
// Plugins need to load using a class loader that includes the executor's user classpath
val pluginList: Seq[ExecutorPlugin] =
Utils.withContextClassLoader(replClassLoader) {
val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf)
plugins.foreach { plugin =>
plugin.init()
logDebug(s"Successfully loaded plugin " + plugin.getClass().getCanonicalName())
}
plugins
}
logDebug("Finished initializing plugins")
pluginList
} else {
Nil
}
}
// Max size of direct result. If task result is bigger than this, we use the block manager
// to send the result back.
//直接结果的最大大小。取spark.task.maxDirectResultSize属性(默认为1L << 20,即1048 576)
// 与spark.rpc.message.maxSize属性(默认为128MB)之间的最小值。
private val maxDirectResultSize = Math.min(
conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
RpcUtils.maxMessageSizeBytes(conf))
// 结果的最大限制。此属性通过调用Utils工具类的getMaxResultSize方法获得,默认为1GB。Task运行的结果如果超过maxResultSize,则会被删除。Task运行的结果如果小于等于maxResultSize且大于maxDirectResultSize,则会写入本地存储体系。Task运行的结果如果小于等于maxDirectResultSize,则会直接返回给Driver。
private val maxResultSize = conf.get(MAX_RESULT_SIZE)
// Maintains the list of running tasks.
// 用于维护正在运行的Task的身份标识与TaskRunner之间的映射关系。
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
/**
* Interval to send heartbeats, in milliseconds
*/
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
// Executor for the heartbeat task.
// 心跳线程池
private val heartbeater: ScheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
// must be initialized before running startDriverHeartbeat()
// executor心跳接收器 rpc应用
private val heartbeatReceiverRef =
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
/**
* 心跳最大失败次数
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
* times, it should kill itself. The default value is 60. It means we will retry to send
* heartbeats about 10 minutes because the heartbeat interval is 10s.
*/
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)
/**
* Count the failure times of heartbeat. It should only be accessed in the heartbeat thread. Each
* successful heartbeat will reset it to 0.
*/
private var heartbeatFailures = 0
Executor心跳报告
初始化Executor的过程中,Executor会调用自己的startDriverHeartbeater方法启动心跳报告的定时任务。
private def startDriverHeartbeater(): Unit = {
// 心跳间隔
val intervalMs = HEARTBEAT_INTERVAL_MS
// Wait a random interval so the heartbeats don't end up in sync
// 初始化时间,心跳间隔+0~1 * 心跳间隔
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
// 心跳任务
val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
// 启动心跳定时调度
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
报告心跳
private def reportHeartBeat(): Unit = {
// list of (task id, accumUpdates) to send back to the driver
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
val curGCTime = computeTotalGcTime()
//遍历runningTasks中正在运行的Task,将每个Task的度量信息更新到数组缓冲accumUpdates中。
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
}
}
// 保证心跳消息
val message: Heartbeat = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
try {
//向HeartbeatReceiver发送Heartbeat消息,并接收HeartbeatReceiver的响应消息HeartbeatResponse。
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
//将heartbeatFailures置为0。
heartbeatFailures = 0
} catch {
case NonFatal(e) =>
logWarning("Issue communicating with driver in heartbeater", e)
heartbeatFailures += 1
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
logError(s"Exit as unable to send heartbeats to driver " +
s"more than $HEARTBEAT_MAX_FAILURES times")
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
}
}
}
运行Task
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
// 创建任务线程
val tr = new TaskRunner(context, taskDescription)
// 任务id添加进runningTasks集合
runningTasks.put(taskDescription.taskId, tr)
// 线程池异步执行task
threadPool.execute(tr)
}
Local部署模型
local模式只有Driver,没有Master和Worker,执行任务的Executor与Driver在同一个JVM进程内,local模式中使用的ExecutorBackend和SchedulerBackend的实现类都是LocalSchedulerBackend。
TaskSchedulerImpl的submitTasks方法在提交Task的最后会调用LocalSchedulerBackend的reviveOffers方法。
LocalSchedulerBackend的reviveOffers方法只是向LocalEnd-point发送ReviveOffers消息。
LocalEndpoint收到ReviveOffers消息后,调用TaskScheduler-Impl的resourceOffers方法申请资源,TaskSchedulerImpl将根据任务申请的CPU核数、内存、本地化等条件为其分配资源。
任务获得资源后,调用Executor的launchTask方法运行任务。
在任务运行过程中,Executor中运行的TaskRunner通过调用LocalSchedulerBackend的statusUpdate方法更新Task的状态。
LocalSchedulerBackend的statusUpdate方法将向LocalEndpoint发送StatusUpdate消息。
LocalEndpoint接收到StatusUpdate消息,将调用TaskSchedulerImpl的statusUpdate方法更新任务的状态。
领导选举代理
领导选举机制(Leader Election)可以保证集群虽然存在多个Master,但是
只有一个Master处于激活(Active)状态,其他的Master处于支持(Standby)状态
。当Active状态的Master出现故障时,会选举出一个Standby状态的Master作为新的Active状态的Master。由于整个集群的Worker, Driver和Application的信息都已经通过持久化引擎持久化
,因此切换Master时只会影响新任务的提交,对于正在运行中的任务没有任何影响。
@DeveloperApi
trait LeaderElectionAgent {
// master实例
val masterInstance: LeaderElectable
def stop() {} // to avoid noops in implementations.
}
@DeveloperApi
trait LeaderElectable {
/**
* 选举leader
*/
def electedLeader(): Unit
/**
* 撤销leader
*/
def revokedLeadership(): Unit
}
MonarchyLeaderAgent
private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)
extends LeaderElectionAgent {
// 选举leader
masterInstance.electedLeader()
}
ZooKeeperLeaderElectionAgent
private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
// zookeeper存spark选举的目录
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private var zk: CuratorFramework = _
// 使用ZooKeeper进行领导选举的客户端,类型为LeaderLatch。
private var leaderLatch: LeaderLatch = _
// 领导选举的状态,包括有领导(LEADER)和无领导(NOT_LEADER)。
private var status = LeadershipStatus.NOT_LEADER
/**
* 开始选举
*/
start()
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
// 创建leader锁
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
// 添加监听器
leaderLatch.addListener(this)
// 开启
leaderLatch.start()
}
override def stop() {
leaderLatch.close()
zk.close()
}
/**
* 是否是leader
*/
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
/**
* 不是leader
*/
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
/**
* 修改leader选举状态
* @param isLeader
*/
private def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
// 发送ElectedLeader消息
masterInstance.electedLeader()
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
// 发送RevokedLeadership消息
masterInstance.revokedLeadership()
}
}
private object LeadershipStatus extends Enumeration {
type LeadershipStatus = Value
val LEADER, NOT_LEADER = Value
}
}
最后更新于