2019-06-18 14:03:39 503浏览
今天千锋扣丁学堂大数据培训老师给大家分享一篇关于Spark的Broadcast要用单例模式详解,首先很多用SparkStreaming的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮大家分析一下,有以下几个原因:
	
	
object WordBlacklist { 
 
  @volatile private var instance: Broadcast[Seq[String]] = null 
 
  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { 
    if (instance == null) { 
      synchronized { 
        if (instance == null) { 
          val wordBlacklist = Seq("a", "b", "c") 
          instance = sc.broadcast(wordBlacklist) 
        } 
      } 
    } 
    instance 
  } 
} 
val lines = ssc.socketTextStream(ip, port) 
    val words = lines.flatMap(_.split(" ")) 
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => 
      // Get or register the blacklist Broadcast 
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext) 
      // Get or register the droppedWordsCounter Accumulator 
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) 
      // Use blacklist to drop words and use droppedWordsCounter to count them 
      val counts = rdd.filter { case (word, count) => 
        if (blacklist.value.contains(word)) { 
          droppedWordsCounter.add(count) 
          false 
        } else { 
          true 
        } 
      }.collect().mkString("[", ", ", "]") 
      val output = s"Counts at time $time $counts" 
      println(output) 
      println(s"Dropped ${droppedWordsCounter.value} word(s) totally") 
      println(s"Appending to ${outputFile.getAbsolutePath}") 
      Files.append(output + "\n", outputFile, Charset.defaultCharset()) 
    } 
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 
 
      override protected def onError(e: Throwable): Unit = { 
        jobScheduler.reportError("Error in job generator", e) 
      } 
    } 
    eventLoop.start() 
/** Processes all events */ 
  private def processEvent(event: JobGeneratorEvent) { 
    logDebug("Got event " + event) 
    event match { 
      case GenerateJobs(time) => generateJobs(time) 
      case ClearMetadata(time) => clearMetadata(time) 
      case DoCheckpoint(time, clearCheckpointDataLater) => 
        doCheckpoint(time, clearCheckpointDataLater) 
      case ClearCheckpointData(time) => clearCheckpointData(time) 
    } 
  } 
/** Generate jobs and perform checkpointing for the given `time`.  */ 
  private def generateJobs(time: Time) { 
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") 
    Try { 
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
      graph.generateJobs(time) // generate jobs using allocated block 
    } match { 
      case Success(jobs) => 
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
      case Failure(e) => 
        jobScheduler.reportError("Error generating jobs for time " + time, e) 
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) 
    } 
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
  } 
graph.generateJobs(time) 
 
具体代码块儿 
 
def generateJobs(time: Time): Seq[Job] = { 
    logDebug("Generating jobs for time " + time) 
    val jobs = this.synchronized { 
      outputStreams.flatMap { outputStream => 
        val jobOption = outputStream.generateJob(time) 
        jobOption.foreach(_.setCallSite(outputStream.creationSite)) 
        jobOption 
      } 
    } 
    logDebug("Generated " + jobs.length + " jobs for time " + time) 
    jobs 
  } 
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
 
具体h函数内容 
def submitJobSet(jobSet: JobSet) { 
    if (jobSet.jobs.isEmpty) { 
      logInfo("No jobs added for time " + jobSet.time) 
    } else { 
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) 
      jobSets.put(jobSet.time, jobSet) 
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) 
      logInfo("Added jobs for time " + jobSet.time) 
    } 
  } 
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 
  private val jobExecutor = 
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") 
conf.set("spark.scheduler.mode", "FAIR") 
	
	
                          
	
【关注微信公众号获取更多学习资料】 【扫码进入Python全栈开发免费公开课】