一、StreamingContext源码分析
###入口 org.apache.spark.streaming/StreamingContext.scala/** * 在创建和完成StreamContext的初始化之后,创建了DStreamGraph、JobScheduler等关联组件之后,就会调用StreamContext的socketTextStream等方法, * 来创建输入DStream,然后针对输入DStream执行一系列的transformation转换操作,最后,会执行一个output输出操作,来触发针对一个一个的batch的job触发和执行 * * 上述初始化操作完成之后,start()方法是必须要调用的,不调用的话,相当于整个Spark Streaming应用程序不会执行 * StreamingContext.start(),启动一个Spark Streaming应用程序,这个start()方法,会创建StreamingContext的另外两个重要组件 * ReceiverTracker、JobGenerator,另外,最重要的的,启动整个Spark Streaming应用程序输入的DStream对应的Receiver,在Spark * 集群的某个worker节点上的Executor中启动Receiver */class StreamingContext private[streaming] ( sc_ : SparkContext, cp_ : Checkpoint, batchDur_ : Duration ) extends Logging {##DStreamGraph// 重要组件,DStreamGraph,里面保存了,我们定义的Spark Streaming Application中,一系列的DStream的依赖关系以及互相之间的算子的应用 private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph } else { assert(batchDur_ != null, "Batch duration for streaming context cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(batchDur_) newGraph } }##JobScheduler // JobScheduler,涉及到job的调度,JobGenerator会负责每隔batch interval,生成一个job,然后通过JobScheduler来调度和提交job // 底层,其实还是基于Spark的核心计算引擎,底层DAGScheduler、TaskScheduler、Worker、Executor、Task,如果定义了reduceByKey, // 还是会走shuffle,底层的数据存取组件,还是Executor关联的BlockManager,负责持久化数据存储的组件,还是CacheManager private[streaming] val scheduler = new JobScheduler(this)##StreamingContext 的start()方法/** * 这个,就是Streaming应用程序启动的入口 */ def start(): Unit = synchronized { if (state == Started) { throw new SparkException("StreamingContext has already been started") } if (state == Stopped) { throw new SparkException("StreamingContext has already been stopped") } validate() sparkContext.setCallSite(DStream.getCreationSite()) // 调用JobScheduler的start()方法 scheduler.start() state = Started }
调用JobScheduler的start()方法,看看这个方法
###org.apache.spark.streaming.scheduler/JobScheduler.scala/** * StreamingContext的start()方法,其实是比较简单的,真正重要的是,调用JobScheduler的start()方法 */ def start(): Unit = synchronized { if (eventActor != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { def receive = { case event: JobSchedulerEvent => processEvent(event) } }), "JobScheduler") listenerBus.start() // 创建了ReceiverTracker组件,数据接收相关 receiverTracker = new ReceiverTracker(ssc) // 并启动 // 至此,我们说的StreamingContext相关几个重要组件,都创建出来了, // 然后,启动DStream关联的Receiver,逻辑都在ReceiverTracker的start()方法中 receiverTracker.start() // 这是JobGenerator,创建JobScheduler的时候,直接就把JobGenerator给创建出来了,启动 jobGenerator.start() logInfo("Started JobScheduler") }
看ReceiverTracker的start()方法
###org.apache.spark.streaming.scheduler/ReceiverTracker.scala def start() = synchronized { if (actor != null) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker") // 这个start()方法中,主要就是调用了内部的ReceiverLauncher的start()方法,这个ReceiverTracker // 的主要作用,就是启动Receiver if (!skipReceiverLaunch) receiverExecutor.start() logInfo("ReceiverTracker started") } }
调用了receiverExecutor.start()方法,receiverExecutor是ReceiverTracker内部的ReceiverLauncher类
###org.apache.spark.streaming.scheduler/ReceiverTracker.scala class ReceiverLauncher { @transient val env = ssc.env @volatile @transient private var running = false @transient val thread = new Thread() { override def run() { try { SparkEnv.set(env) // 开始启动所有的DStream对应的Receiver startReceivers() } catch { case ie: InterruptedException => logInfo("ReceiverLauncher interrupted") } } } // ReceiverLauncher的start()方法,其实启动了内部的一个线程,相当于使用异步的方式来启动Receiver def start() { thread.start() }
receiverExecutor.start(),通过 startReceivers(),开始启动所有的DStream对应的Receiver
看这个方法###org.apache.spark.streaming.scheduler/ReceiverTracker.scala /** * 一直到这里,ReceiverTracker的startReceivers()都是在Driver上执行的 */ private def startReceivers() { // 将程序中创建的所有的DStream,调用其getReceiver()方法,拿到一个Receiver集合 val receivers = receiverInputStreams.map(nis => { // val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) // 拿到这些Receiver的一些最佳位置 // Right now, we only honor preferences if all receivers have them val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) // Create the parallel collection of receivers to distributed them on the worker nodes val tempRDD = if (hasLocationPreferences) { val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) } else { ssc.sc.makeRDD(receivers, receivers.size) } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node // 这里,定义了启动Receiver的核心逻辑 // 只是定义而已,不是在这里执行的,定义了一个startReceiver函数 // 这个函数的执行,以及后面的过程,都是在executor上执行的,Receiver的启动,是在executor上的,而不是driver val startReceiver = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } val receiver = iterator.next() // 将每一个Receiver封装在ReceiverSupervisorImpl中,并调用其start()方法,启动 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") running = true // 调用StreamingContext的SparkContext的runJob()方法,真正的,将启动Receiver的startReceiver函数 // 分布到各个worker节点的executor上去执行 ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)) running = false logInfo("All of the receivers have been terminated") }
二、Receiver源码
看下receiverInputStreams是什么
###org.apache.spark.streaming.scheduler/ReceiverTracker.scala// 这个receiverInputStreams,就是从StreamingContext的graph中,取出的,就是说,每次调用StreamingContext创建一个输入DStream时,都会 // 放入DStreamGraph的ReceiverInputStreams private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
输入DStram,一定都会有一个重要的方法,getReceiver(),如SocketInputDStream
###org.apache.spark.streaming.dstream/SocketInputDStream.scalaprivate[streaming]class SocketInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { // 输入DStram,一定都会有一个重要的方法,getReceiver(),这个方法就负责返回DStream的Receiver def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) }}
接下来supervisor.start()方法,supervisor是ReceiverSupervisorImpl类的对象,ReceiverSupervisorImpl类并没有start()方法,
该start()在ReceiverSupervisorImpl的父类ReceiverSupervisor里面 看看ReceiverSupervisor的start()方法###org.apache.spark.streaming.receiver/ReceiverSupervisor.scala /** Called when supervisor is started */ protected def onStart() { } /** Called when receiver is stopped */ protected def onReceiverStop(message: String, error: Option[Throwable]) { } /** Start the supervisor */ def start() { onStart() startReceiver() }
ReceiverSupervisor的start()调用了onStart()方法,而ReceiverSupervisor是抽象方法,
所以应该看实现类的的onStart(),看ReceiverSupervisorImpl的onStart()方法###org.apache.spark.streaming.receiver/ReceiverSupervisorImpl.scala override protected def onStart() { // 这里的blockGenerator很重要,和数据接收有关,其运行在worker的executor端负责数据接收后的一些存取工作,以及配合ReceiverTracker // 在Executor上,启动Receiver之前,就会先启动这个Receiver相关的一个blockGenerator,该组件,在数据接收中,极其重要 blockGenerator.start() }
再次回到JobScheduler的start()方法,调用了jobGenerator.start()方法,
###org.apache.spark.streaming.scheduler/JobGenerator.scala def start(): Unit = synchronized { if (eventActor != null) return // generator has already been started eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { def receive = { case event: JobGeneratorEvent => processEvent(event) } }), "JobGenerator") if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
看startFirstTime()方法
###org.apache.spark.streaming.scheduler/JobGenerator.scala/** * 只要JobGenerator一启动,这里就初始化一个开始时间,后面,根据我们自己的batch interval,每到一个batch interval * 都会从上一个time,也就是这里的startTime,开始将batch interval内的数据封装成一个batch */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
看DStream的output方法,比如print
###org.apache.spark.streaming.dstream/DStream.scaladef print() { print(10) } /** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int) { def foreachFunc = (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) println ("-------------------------------------------") println ("Time: " + time) println ("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.size > num) println("...") println() } new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() }
ForEachDStream类
###org.apache.spark.streaming.dstream/ForEachDStream.scalaclass ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None // 所有的output操作,其实都会来调用ForEachDStream的generateJob()方法,所以,每次执行DStreamGraph的 // 时候,到最后,都会调用到这里,底层会触发job的提交 override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }}