博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
64、Spark Streaming:StreamingContext初始化与Receiver启动原理剖析与源码分析
阅读量:5219 次
发布时间:2019-06-14

本文共 11013 字,大约阅读时间需要 36 分钟。

一、StreamingContext源码分析

###入口 org.apache.spark.streaming/StreamingContext.scala/**  * 在创建和完成StreamContext的初始化之后,创建了DStreamGraph、JobScheduler等关联组件之后,就会调用StreamContext的socketTextStream等方法,  * 来创建输入DStream,然后针对输入DStream执行一系列的transformation转换操作,最后,会执行一个output输出操作,来触发针对一个一个的batch的job触发和执行  *  * 上述初始化操作完成之后,start()方法是必须要调用的,不调用的话,相当于整个Spark Streaming应用程序不会执行  * StreamingContext.start(),启动一个Spark Streaming应用程序,这个start()方法,会创建StreamingContext的另外两个重要组件  * ReceiverTracker、JobGenerator,另外,最重要的的,启动整个Spark Streaming应用程序输入的DStream对应的Receiver,在Spark  * 集群的某个worker节点上的Executor中启动Receiver  */class StreamingContext private[streaming] (    sc_ : SparkContext,    cp_ : Checkpoint,    batchDur_ : Duration  ) extends Logging {##DStreamGraph// 重要组件,DStreamGraph,里面保存了,我们定义的Spark Streaming Application中,一系列的DStream的依赖关系以及互相之间的算子的应用  private[streaming] val graph: DStreamGraph = {    if (isCheckpointPresent) {      cp_.graph.setContext(this)      cp_.graph.restoreCheckpointData()      cp_.graph    } else {      assert(batchDur_ != null, "Batch duration for streaming context cannot be null")      val newGraph = new DStreamGraph()      newGraph.setBatchDuration(batchDur_)      newGraph    }  }##JobScheduler // JobScheduler,涉及到job的调度,JobGenerator会负责每隔batch interval,生成一个job,然后通过JobScheduler来调度和提交job  // 底层,其实还是基于Spark的核心计算引擎,底层DAGScheduler、TaskScheduler、Worker、Executor、Task,如果定义了reduceByKey,  // 还是会走shuffle,底层的数据存取组件,还是Executor关联的BlockManager,负责持久化数据存储的组件,还是CacheManager  private[streaming] val scheduler = new JobScheduler(this)##StreamingContext 的start()方法/**    * 这个,就是Streaming应用程序启动的入口    */  def start(): Unit = synchronized {    if (state == Started) {      throw new SparkException("StreamingContext has already been started")    }    if (state == Stopped) {      throw new SparkException("StreamingContext has already been stopped")    }    validate()    sparkContext.setCallSite(DStream.getCreationSite())    // 调用JobScheduler的start()方法    scheduler.start()    state = Started  }

调用JobScheduler的start()方法,看看这个方法

###org.apache.spark.streaming.scheduler/JobScheduler.scala/**    * StreamingContext的start()方法,其实是比较简单的,真正重要的是,调用JobScheduler的start()方法    */  def start(): Unit = synchronized {    if (eventActor != null) return // scheduler has already been started     logDebug("Starting JobScheduler")    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {      def receive = {        case event: JobSchedulerEvent => processEvent(event)      }    }), "JobScheduler")     listenerBus.start()    // 创建了ReceiverTracker组件,数据接收相关    receiverTracker = new ReceiverTracker(ssc)    // 并启动    // 至此,我们说的StreamingContext相关几个重要组件,都创建出来了,    // 然后,启动DStream关联的Receiver,逻辑都在ReceiverTracker的start()方法中    receiverTracker.start()    // 这是JobGenerator,创建JobScheduler的时候,直接就把JobGenerator给创建出来了,启动    jobGenerator.start()    logInfo("Started JobScheduler")  }

看ReceiverTracker的start()方法

###org.apache.spark.streaming.scheduler/ReceiverTracker.scala  def start() = synchronized {    if (actor != null) {      throw new SparkException("ReceiverTracker already started")    }     if (!receiverInputStreams.isEmpty) {      actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),        "ReceiverTracker")      // 这个start()方法中,主要就是调用了内部的ReceiverLauncher的start()方法,这个ReceiverTracker      // 的主要作用,就是启动Receiver      if (!skipReceiverLaunch) receiverExecutor.start()      logInfo("ReceiverTracker started")    }  }

调用了receiverExecutor.start()方法,receiverExecutor是ReceiverTracker内部的ReceiverLauncher类

###org.apache.spark.streaming.scheduler/ReceiverTracker.scala  class ReceiverLauncher {    @transient val env = ssc.env    @volatile @transient private var running = false    @transient val thread  = new Thread() {      override def run() {        try {          SparkEnv.set(env)          // 开始启动所有的DStream对应的Receiver          startReceivers()        } catch {          case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")        }      }    }     // ReceiverLauncher的start()方法,其实启动了内部的一个线程,相当于使用异步的方式来启动Receiver    def start() {      thread.start()    }

receiverExecutor.start(),通过 startReceivers(),开始启动所有的DStream对应的Receiver

看这个方法

###org.apache.spark.streaming.scheduler/ReceiverTracker.scala /**      * 一直到这里,ReceiverTracker的startReceivers()都是在Driver上执行的      */    private def startReceivers() {       // 将程序中创建的所有的DStream,调用其getReceiver()方法,拿到一个Receiver集合      val receivers = receiverInputStreams.map(nis => {        //        val rcvr = nis.getReceiver()        rcvr.setReceiverId(nis.id)        rcvr      })       // 拿到这些Receiver的一些最佳位置      // Right now, we only honor preferences if all receivers have them      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)       // Create the parallel collection of receivers to distributed them on the worker nodes      val tempRDD =        if (hasLocationPreferences) {          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)        } else {          ssc.sc.makeRDD(receivers, receivers.size)        }       val checkpointDirOption = Option(ssc.checkpointDir)      val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)       // Function to start the receiver on the worker node      // 这里,定义了启动Receiver的核心逻辑      // 只是定义而已,不是在这里执行的,定义了一个startReceiver函数      // 这个函数的执行,以及后面的过程,都是在executor上执行的,Receiver的启动,是在executor上的,而不是driver      val startReceiver = (iterator: Iterator[Receiver[_]]) => {        if (!iterator.hasNext) {          throw new SparkException(            "Could not start receiver as object not found.")        }        val receiver = iterator.next()        // 将每一个Receiver封装在ReceiverSupervisorImpl中,并调用其start()方法,启动        val supervisor = new ReceiverSupervisorImpl(          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)        supervisor.start()        supervisor.awaitTermination()      }      // Run the dummy Spark job to ensure that all slaves have registered.      // This avoids all the receivers to be scheduled on the same node.      if (!ssc.sparkContext.isLocal) {        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()      }       // Distribute the receivers and start them      logInfo("Starting " + receivers.length + " receivers")      running = true      // 调用StreamingContext的SparkContext的runJob()方法,真正的,将启动Receiver的startReceiver函数      // 分布到各个worker节点的executor上去执行      ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))      running = false      logInfo("All of the receivers have been terminated")    }

二、Receiver源码

看下receiverInputStreams是什么

###org.apache.spark.streaming.scheduler/ReceiverTracker.scala// 这个receiverInputStreams,就是从StreamingContext的graph中,取出的,就是说,每次调用StreamingContext创建一个输入DStream时,都会  // 放入DStreamGraph的ReceiverInputStreams  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()

输入DStram,一定都会有一个重要的方法,getReceiver(),如SocketInputDStream

###org.apache.spark.streaming.dstream/SocketInputDStream.scalaprivate[streaming]class SocketInputDStream[T: ClassTag](    @transient ssc_ : StreamingContext,    host: String,    port: Int,    bytesToObjects: InputStream => Iterator[T],    storageLevel: StorageLevel  ) extends ReceiverInputDStream[T](ssc_) {   // 输入DStram,一定都会有一个重要的方法,getReceiver(),这个方法就负责返回DStream的Receiver  def getReceiver(): Receiver[T] = {    new SocketReceiver(host, port, bytesToObjects, storageLevel)  }}

接下来supervisor.start()方法,supervisor是ReceiverSupervisorImpl类的对象,ReceiverSupervisorImpl类并没有start()方法,

该start()在ReceiverSupervisorImpl的父类ReceiverSupervisor里面
看看ReceiverSupervisor的start()方法

###org.apache.spark.streaming.receiver/ReceiverSupervisor.scala  /** Called when supervisor is started */  protected def onStart() { }  /** Called when receiver is stopped */  protected def onReceiverStop(message: String, error: Option[Throwable]) { }   /** Start the supervisor */  def start() {    onStart()    startReceiver()  }

ReceiverSupervisor的start()调用了onStart()方法,而ReceiverSupervisor是抽象方法,

所以应该看实现类的的onStart(),看ReceiverSupervisorImpl的onStart()方法

###org.apache.spark.streaming.receiver/ReceiverSupervisorImpl.scala  override protected def onStart() {    // 这里的blockGenerator很重要,和数据接收有关,其运行在worker的executor端负责数据接收后的一些存取工作,以及配合ReceiverTracker    // 在Executor上,启动Receiver之前,就会先启动这个Receiver相关的一个blockGenerator,该组件,在数据接收中,极其重要    blockGenerator.start()  }

再次回到JobScheduler的start()方法,调用了jobGenerator.start()方法,

###org.apache.spark.streaming.scheduler/JobGenerator.scala  def start(): Unit = synchronized {    if (eventActor != null) return // generator has already been started     eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {      def receive = {        case event: JobGeneratorEvent =>  processEvent(event)      }    }), "JobGenerator")    if (ssc.isCheckpointPresent) {      restart()    } else {      startFirstTime()    }  }

看startFirstTime()方法

###org.apache.spark.streaming.scheduler/JobGenerator.scala/**    * 只要JobGenerator一启动,这里就初始化一个开始时间,后面,根据我们自己的batch interval,每到一个batch interval    * 都会从上一个time,也就是这里的startTime,开始将batch interval内的数据封装成一个batch    */  private def startFirstTime() {    val startTime = new Time(timer.getStartTime())    graph.start(startTime - graph.batchDuration)    timer.start(startTime.milliseconds)    logInfo("Started JobGenerator at " + startTime)  }

看DStream的output方法,比如print

###org.apache.spark.streaming.dstream/DStream.scaladef print() {    print(10)  }   /**   * Print the first num elements of each RDD generated in this DStream. This is an output   * operator, so this DStream will be registered as an output stream and there materialized.   */  def print(num: Int) {    def foreachFunc = (rdd: RDD[T], time: Time) => {      val firstNum = rdd.take(num + 1)      println ("-------------------------------------------")      println ("Time: " + time)      println ("-------------------------------------------")      firstNum.take(num).foreach(println)      if (firstNum.size > num) println("...")      println()    }    new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()  }

ForEachDStream类

###org.apache.spark.streaming.dstream/ForEachDStream.scalaclass ForEachDStream[T: ClassTag] (    parent: DStream[T],    foreachFunc: (RDD[T], Time) => Unit  ) extends DStream[Unit](parent.ssc) {   override def dependencies = List(parent)   override def slideDuration: Duration = parent.slideDuration   override def compute(validTime: Time): Option[RDD[Unit]] = None    // 所有的output操作,其实都会来调用ForEachDStream的generateJob()方法,所以,每次执行DStreamGraph的  // 时候,到最后,都会调用到这里,底层会触发job的提交  override def generateJob(time: Time): Option[Job] = {    parent.getOrCompute(time) match {      case Some(rdd) =>        val jobFunc = () => {          ssc.sparkContext.setCallSite(creationSite)          foreachFunc(rdd, time)        }        Some(new Job(time, jobFunc))      case None => None    }  }}

转载于:https://www.cnblogs.com/weiyiming007/p/11383208.html

你可能感兴趣的文章
WEB_点击一百万次
查看>>
CodeForces - 878A Short Program(位运算)
查看>>
路冉的JavaScript学习笔记-2015年1月23日
查看>>
Mysql出现(10061)错误提示的暴力解决办法
查看>>
2018-2019-2 网络对抗技术 20165202 Exp3 免杀原理与实践
查看>>
NPM慢怎么办 - nrm切换资源镜像
查看>>
CoreData 从入门到精通(四)并发操作
查看>>
Swift - UIView的常用属性和常用方法总结
查看>>
Swift - 异步加载各网站的favicon图标,并在单元格中显示
查看>>
Java编程思想总结笔记Chapter 5
查看>>
[LeetCode]662. Maximum Width of Binary Tree判断树的宽度
查看>>
WinForm聊天室
查看>>
【Python学习笔记】1.基础知识
查看>>
梦断代码阅读笔记02
查看>>
selenium学习中遇到的问题
查看>>
大数据学习之一——了解简单概念
查看>>
Linux升级内核教程(CentOS7)
查看>>
Lintcode: Partition Array
查看>>
分享适合个人站长的5类型网站
查看>>
类别的三个作用
查看>>