【Spark】Spark WebUI 原理和工作方式
Spark应用运行时的详细进度信息,性能指标等数据和信息对于我们分析Spark应用是十分重要的。而Spark的WebUI便是观测应用、作业运行情况的一个很重要的窗口。本文主要从源码层面分析下Spark WebUI原理和工作方式。并从Job信息的一个切面阐述WebUI数据获取和更新的过程。
目录:
页面
流程图
源码分析
Step1、SparkContext初始化时构建SparkUI
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
Step2、执行SparkUI的create方法,实例化各个监听器
在创建SparkUI的过程中,会实例化几个重要的listener并添加到ListenerBus中,这是一种观察者模式。在数据获取和更新中会详细介绍监听器数据产生和更新的原理。
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
listenerBus.addListener(listener)
listener
}
val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
appName, basePath, startTime)
上述的几个监听对象分别与UI上的
这几个Tab项的是对应的,具体是:
JobProgressListener -> Jobs和Stages,即Spark应用运行过程中的Job和Stage信息和数据。 EnvironmentListener -> Environment,即Spark应用的作业配置和Spark参数等环境变量和配置信息。 StorageListener -> Storage, RDD的存储状态等信息。 ExecutorListener ->Executors,即Spark应用运行时的所有Executor的数据。 而operationGraphListener -> Jobs, Stages主要是作业的DAG图数据。 也就是说,Spark WebUI中的所有数据正是来源于这些监听器对象。
Step3、执行SparkUI的initialize初始化方法
当实例化SparkUI的过程中会执行初始化方法,绑定如下的tab项对应的对象数据以及注册页面处理句柄
即
def initialize() {
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))
}
SparkUI初始化过程全部结束。Spark WebUI的Tab项对应了相应的SparkUI的Tab类,Tab类中封装了页面数据。
Step4、调用SparkUI的bind方法启动JettyServer
_ui.foreach(_.bind())
bind方法会启动spark内嵌的jetty。Jetty采用java编写,是非常轻巧的servlet engine和http server,Spark使用内嵌的Jetty响应web请求。
serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name))
Step5、接收UI请求,数据呈现 当发起Spark WebUI的数据请求时,Spark引擎会进行Tab和Page数据的渲染然后返回给用户。
数据获取和更新原理
因为Spark WebUI上的不同Tab项的数据实际上来源于不同的监听器对象,所以这边抛砖引玉,以JobProgressListener来说明。JobProgressListener中封装了Job和Stage运行状况以及运行进度等全部作业信息。
1 JobProgressListener生成
根据前文所述,SparkUI对象构建过程中会实例化JobProgressListener然后把它add到ListenerBus中。
2 JobProgressListener接收事件
2.1 事件到达ListenerBus
根据前文所述JobProgressListener与ListenerBus是一种观察者模式,为什么这么说呢,这是因为ListenerBus中同时维护了listener的一个set集合和eventQueue。eventQueue即一个事件的队列。
private[spark] val listeners = new CopyOnWriteArrayList[L]
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
Spark作业在运行的时候,事件发生后(即某些方法的具体调用,如Job提交、Job结束等事件)会通过ListenerBus的post方法传入eventQueue。比如说当Job提交事件发生时,DAGScheduler调用handleJobSubmitted方法执行然后将Job开始事件通过post方法加入eventQueue中。
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
val eventAdded = eventQueue.offer(event)
事件的种类是很多的,以DAGScheduler类为例,会有如下的事件。
对应的DAGScheduler方法 | SparkListenerEvent事件 | 描述 |
---|---|---|
executorHeartbeatReceived | SparkListenerExecutorMetricsUpdate | executor向master发送心跳表示BlockManager仍然存活 |
handleBeginEvent | SparkListenerTaskStart | task开始执行事件 |
cleanUpAfterSchedulerStop | SparkListenerJobEnd | Job结束事件 |
handleGetTaskResult | SparkListenerTaskGettingResult | task获取结果事件 |
handleJobSubmitted | SparkListenerJobStart | Job开始事件 |
handleMapStageSubmitted | SparkListenerJobStart | Job开始事件 |
submitMissingTasks | SparkListenerStageSubmitted | Stage提交事件 |
handleTaskCompletion | SparkListenerTaskEnd | Task结束事件 |
handleTaskCompletion | SparkListenerJobEnd | Job结束事件 |
markStageAsFinished | SparkListenerStageCompleted | Stage结束事件 |
failJobAndIndependentStages | SparkListenerJobEnd | Job结束事件 |
markMapStageJobAsFinished | SparkListenerJobEnd | Job结束事件 |
2.2 事件到达JobProgressListener
ListenerBus的run方法会持续运转,
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)
}
从eventQueue取出事件后,调用ListenerBus的postToAll方法,将事件分发到各Listener中。具体的ListenerBus实现类封装了相应的事件。
event match {
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
...
}
3 JobProgressListener对事件进行响应
以JobStart事件为例,相应的listener具体实现——JobProgressListener便接收JobStart的事件,并触发自己的onJobStart方法开始产生和更新数据啦。
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
...省略
jobIdToData(jobStart.jobId) = jobData
activeJobs(jobStart.jobId) = jobData
for (stageId <- jobStart.stageIds) {
stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId)
}
for (stageInfo <- jobStart.stageInfos) {
stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
}
...省略
}
jobIdToData、activeJobs等对象和集合就是JobProgressListener中封装的数据啦。
JobProgressListener封装的其他数据还有:
Job的,completedJobs,activeJobs,failedJobs,jobIdToData
Stage的,pendingStages,activeStages,completedStages,failedStages等。
至此JobProgressListener的各项数据就产生了,其他事件触发的时候,或下次同样事件到达的时候,JobProgressListener依然会进行同样的逻辑,然后对数据进行更新。对于Spark WebUI来说,便可以从JobProgressListener中取得数据进行页面呈现了。对于其他的listener,如EnvironmentListener,StorageListener,ExecutorListener等等,数据产生和更新的原理是一致的。
敲重点:明白了listener的数据产生和更新原理以后对于Spark应用的其他开发是很有意义的,比方说你想设计一个自定义metrics,设计metrics子系统,设计开发spark作业分析诊断系统等等,就可以从spark的各个后台listener中去获取数据啦。
ps:公众号已正式接入图灵机器人,快去和我聊聊吧。
本文系本人个人公众号「梦回少年」原创发布,扫一扫加关注。