Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Spark Checkpointing: Proper Usage and Differences from Caching

Tech 2

Checkpointing materializes critical intermediate results to a fault-toleratn store and cuts off lineage, preventing expensive re-computation across deep DAGs when failures occur. Caching (or persisting) keeps data in memory/disk for faster reuse but retains dependencies, so data loss may still trigger upstream re-calculation.

Configuring a checkpoint directory

A reliable directory must be configured before any RDD/Dataset is marked for checkpointing.

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

object App {
  def main(args: Array[String]): Unit = {
    val mode = if (args.nonEmpty) args(0) else "cluster"

    val conf = new SparkConf()
      .setAppName("SkewJoinExample")
      .set("spark.sql.autoBroadcastJoinThreshold", (1L << 20).toString) // 1 MiB
      .set("spark.sql.shuffle.partitions", "4")

    if (mode == "local") {
      conf.setMaster("local[4]")
    }

    val spark = SparkSession.builder().config(conf).getOrCreate()
    val sc = spark.sparkContext

    // Choose an appropriate checkpoint directory per environment
    // sc.setCheckpointDir("file:///D:/spark-checkpoints")     // Windows local
    // sc.setCheckpointDir("file:///tmp/spark-checkpoints")     // Linux local
    // sc.setCheckpointDir("hdfs://nn:8020/spark/checkpoints")  // HDFS (recommended in prod)

    spark.stop()
  }
}

Environment-specific examples:

  • Windows (local filesystem):
    • sc.setCheckpointDir("file:///D:/spark-checkpoints")
  • Linux (local filesystem):
    • sc.setCheckpointDir("file:///tmp/spark-checkpoints")
  • HDFS:
    • sc.setCheckpointDir("hdfs://namenode:8020/app/spark-checkpoints")

Applying checkpoint to RDDs

  • Call rdd.checkpoint() before the RDD is first materialized by an action.
  • Persist first to avoid recomputing the RDD twice (once for normal execution, once to write out the checkpoint files).
import org.apache.spark.storage.StorageLevel

val sc = spark.sparkContext
sc.setCheckpointDir("hdfs://namenode:8020/app/spark-checkpoints")

val base = sc.parallelize(1 to 100000, numSlices = 8)
val stage = base
  .map(x => x * 3)
  .filter(_ % 5 != 0)
  .mapPartitions(_.map(_ + 7))

// Persist prior to checkpoint to avoid duplicated computation
stage.persist(StorageLevel.MEMORY_ONLY)

// Mark lineage cutoff point
stage.checkpoint()

// Trigger execution and checkpoint writing
val result = stage.count()

// Optional: verify checkpoint state
if (stage.isCheckpointed) {
  println("Checkpoint file: " + stage.getCheckpointFile.getOrElse("<none>"))
}

Operational notes:

  • rdd.checkpoint() schedules checkpointing; the data is actually written when an action runs and computes the RDD.
  • Without persist(), Spark will compute the RDD once for the action and again to write the checkpoint. Persisting avoids this by reusing cached partitions when checkpointing to storage.

Applying checkpoint to Datasets/DataFrames

Dataset/DataFrame checkpointing truncates the logical plan and writes data to the checkpoint store.

import spark.implicits._

val df = spark.range(0, 1e6.toLong).withColumnRenamed("id", "key")
val transformed = df.filter($"key" % 7 =!= 0).withColumn("v", ($"key" * 2) + 1)

// Eager checkpoint: materializes immediately when invoked
val ckptDf = transformed.checkpoint(eager = true)

// Alternatively, lazy checkpoint: will be materialized on the next action
// val ckptDf = transformed.checkpoint(eager = false)

val cnt = ckptDf.count()

Notes:

  • Dataset/DataFrame checkpoint returns a new Dataset[T]/DataFrame with the plan truncated.
  • Use HDFS or another fault-tolerant filesystem for production; local directories are not resilient.

Cache vs. checkpoint: behavior and use cases

  • Cache/Persist

    • Stores partitions in memory and/or disk for faster reuse.
    • Keeps the RDD/Dataset lineage graph intact.
    • If cached data is lost (e.g., executor failure), Spark may recompute upstream transformations.
    • Best for iterative access to the same intermediate results within a job.
  • Checkpoint

    • Writes data to a reliable store (e.g., HDFS) and severs dependencies to parent RDDs/plans.
    • Eliminates recomputation beyond the checkpoint even if executors fail or the lineage becomes very large.
    • Recommended when DAGs grow deep or when recomputation cost is high.
  • Common pattern

    • Persist first, then checkpoint, then run an action:
val prepared = someRdd
  .mapValues(v => v * v)
  .reduceByKey(_ + _)

prepared.persist()      // or persist(StorageLevel.MEMORY_AND_DISK)
prepared.checkpoint()
val out = prepared.collect() // triggers both compute and checkpoint write

Practical guidance

  • Always configure a checkpoint directory on a distributed, fault-tolerant filesystem for production workloads.
  • Call rdd.checkpoint() before the first action on that RDD; otherwise, Spark may not cut off lineage as expected.
  • Prefer persist() before chekcpoint() to avoid double computation overhead.
  • Use rdd.isCheckpointed and rdd.getCheckpointFile to inspect checkpoint status when debugging.
  • For Datasets/DataFrames, checkpoint(eager = true) forces immediate materialization; eager = false defers until the next action.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.