cool hit counter [spark] Checkpoint Source Code Analysis_Intefrankly

[spark] Checkpoint Source Code Analysis


In spark applications, often encounter a lot of operations after a very complex Transformation to get the RDD that Lineage chain longer, wide dependency of the RDD, at this time we can consider the RDD persistence.

cache is also persistent to disk, but directly write the output data of the partition to disk, and checkpoint is in the logical job is completed, if there is a need to checkpoint the RDD, and then start a separate job to complete the checkpoint, so that the RDD is calculated twice, so it is recommended that when there is a checkpoint first cache the RDD to memory, when the time to write directly to disk on the line.

Implementation of checkpoint

To use checkpoint, you need to set a directory to store checkpoint information data through the setCheckpointDir method of sparkcontext.

def setCheckpointDir(directory: String) {
    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
      logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
        s"must not be on the local filesystem. Directory '$directory' " +
        "appears to be on the local filesystem.")
    checkpointDir = Option(directory).map { dir =>
      val path = new Path(dir, UUID.randomUUID().toString)
      val fs = path.getFileSystem(hadoopConfiguration)

In non-local mode, the directory must be an HDFS directory; create a directory with a unique directory name generated with a UUID in that directory. You can checkpoint this RDD with rdd.checkpoint()

def checkpoint(): Unit = RDDCheckpointData.synchronized { 
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))

Determining first whether checkpointDir is set, and then whether checkpointData.isEmpty holds, checkpointData is defined as follows.

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

RDDCheckpointData corresponds one-to-one with the RDD and holds information related to the checkpoint. Here the checkpointData is instantiated by new ReliableRDDCheckpointData(this) , of which ReliableRDDCheckpointData is a subclass, and here it is equivalent to a token for checkpoint and does not really perform checkpoint.

When is checkpoint

The sparkcontext call to runJob is triggered when there is an action action.

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:
" + rdd.toDebugString)
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

We can see that rdd.doCheckpoint() is executed after the execution of the job, and here is the checkpoint for the RDD that was marked earlier, and we continue with this method.

private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          if (checkpointAllMarkedAncestors) {
        } else {

If we want to checkpoint the RDDs defined by checkpointData, then we need to checkpoint the parents first. Because, if the RDD checkspointed itself, then it excised its parens from the lineage. Continue to follow up with checkpointData.get.checkpoint()

final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState = CheckpointingInProgress
      } else {

    val newRDD = doCheckpoint()

    // Update our state and truncate the RDD lineage
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed

First change the state of the checkpoint to CheckpointingInProgress, then execute doCheckpoint, return a newRDD, and see what doCheckpoint does:.

protected override def doCheckpoint(): CheckpointRDD[T] = {
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
    logInfo(s"Done checkpointing RDD ${} to $cpDir, new parent is RDD ${}")

ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir), writes an RDD to multiple checkpoint files and returns a ReliableCheckpointRDD to represent this RDD

def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val sc = originalRDD.sparkContext
    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    // Save to file, and reload it as an RDD
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
          s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")

Get some configuration information broadcast output and other operations, then start a Job to write Checkpint file, mainly by ReliableCheckpointRDD.writeCheckpointFile to achieve the write operation, after writing checkpoint new a ReliableCheckpointRDD instance to return, look at the specific writePartitionToCheckpointFile implementation.

def writePartitionToCheckpointFile[T: ClassTag](
      path: String,
      broadcastedConf: Broadcast[SerializableConfiguration],
      blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
    val env = SparkEnv.get
    val outputDir = new Path(path)
    val fs = outputDir.getFileSystem(broadcastedConf.value.value)

    val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
    val finalOutputPath = new Path(outputDir, finalOutputName)
    val tempOutputPath =
      new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")

    if (fs.exists(tempOutputPath)) {
      throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

    val fileOutputStream = if (blockSize < 0) {
      fs.create(tempOutputPath, false, bufferSize)
    } else {
      // This is mainly for testing purpose
      fs.create(tempOutputPath, false, bufferSize,
        fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
    val serializer = env.serializer.newInstance()
    val serializeStream = serializer.serializeStream(fileOutputStream)
    Utils.tryWithSafeFinally {
    } {

    if (!fs.rename(tempOutputPath, finalOutputPath)) {
      if (!fs.exists(finalOutputPath)) {
        logInfo(s"Deleting tempOutputPath $tempOutputPath")
        fs.delete(tempOutputPath, false)
        throw new IOException("Checkpoint failed: failed to save output of task: " +
          s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
      } else {
        // Some other copy of this task must've finished before us and renamed it
        logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
        if (!fs.delete(tempOutputPath, false)) {
          logWarning(s"Error deleting ${tempOutputPath}")

The code here is the normal operation of writing a file to HDFS, writing data from an RDD partition to the checkpoint directory.

The doCheckpoint() operation has completed, returning a new RDD:ReliableCheckpointRDD reference to cpRDD, then marking the checkpoint's status as Checkpointed, what did rdd.markCheckpointed() do?

private[spark] def markCheckpointed(): Unit = {
    partitions_ = null
    deps = null    // Forget the constructor argument for dependencies too

And finally clear all dependencies of the RDD.

Writing checkpoint summaries

  • Initialized
  • marked for checkpointing
  • checkpointing in progress
  • checkpointed

When to read checkpoint

When you need to read the data of a partition, you will use rdd.iterator() to calculate the partition of the rdd, let's look at the implementation of RDD's iterator().

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)

When no data is read in the cache then determine if the RDD has been checkedpointed. isCheckpointedAndMaterialized is a state flag when the checkpoint succeeds: cpState = Checkpointed.

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)

When the RDD has been successfully checkpointed, the iterator() of the parent rdd is used directly, that is, CheckpointRDD.iterator(), otherwise the compute method of the RDD is called directly.

final def dependencies: Seq[Dependency[_]] = { => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        dependencies_ = getDependencies

When getting the dependency of an RDD, it will first try to get the dependency from the checkpointRDD, and if it succeeds, it will return the ReliableCheckpointRDD object wrapped by OneToOneDependency, otherwise it will get the real dependency.

1、Blockchain is swallowing the whole world
2、Is there any quick way to start learning Java from zero base
3、CLC Rice Beat 31Cut
4、FMI2018 Artificial Intelligence and Big Data Summit
5、Similarity and Relevance Metrics

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送