Testing Spark Streaming code ingesting data from Kusto, using ScalaTest

Mr. Marcin Kuthan has a post on testing Spark Streaming code that, in production, would be ingesting data from Apache Kafka.  However, his strategy is to mock out the Kafka source, so as to unit test the Spark code.  That approach is more pure, certainly, than the one that led me to this post of mine.

As a test-driven developer, my aim was to validate that Spark Streaming code could, indeed, ingest data from Apache Kafka, and to have that validation incorporated within a test that could be executed repeatedly and catch, for instance, mishaps that might transpire from a change in the version of the libraries one was using.  The test assumes that Kafka is running locally and either already has a topic named, "test," or has been configured to create topics on-demand.  A better version of the test might merely assume that Kafka could be run locally, and start it.

Scala programmers may well object to one intentional departure from the Scala idiom, which is to explicitly declare the type of every value.  In my view, a little extra typing by the author of any code will save a lot of mental effort by subsequent readers.

The use, and, indeed, the naming, of the WaiterSink object follows Mr. Zubair Nabi's discussion of "The Art of Side Effects," in his terrific book, Pro Spark Streaming: The Zen of Real-time Analytics using Apache Spark.

The version of Kafka I am using is 0.10.2.1 and the version of Spark is 2.1.1 prebuilt for Hadoop 2.7 and later.  Here is the sbt file, first ...

 

name := "Party"

version := "1.0"

scalaVersion := "2.11.8"

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-annotations" % "2.6.5"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.6.5"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.5"
dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-paranamer" % "2.6.5"

// kafka: https://mvnrepository.com/artifact/org.apache.kafka/kafka\_2.11
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.10.2.1"

// spark core: https://mvnrepository.com/artifact/org.apache.spark/spark-core\_2.11
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.1"

// spark streaming: https://repo1.maven.org/maven2/org/apache/spark/spark-streaming\_2.11/2.1.1/
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.1"

// spark streaming kafka: https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly\_2.11/2.1.1/
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10-assembly_2.11" % "2.1.1"

// ScalaTest: https://www.scalatest.org/
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.2" % Test

 

Here's the ScalaTest code.  The code starts the Spark Stream, publishes an event in Kafka, and wait until that event is observed in the Spark stream, or until a timeout expires.

 

 import java.util.{Properties, UUID}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.scalatest._
import org.scalatest.concurrent.AsyncAssertions._
import org.scalatest.concurrent.PatienceConfiguration._
import org.scalatest.concurrent.TimeLimitedTests
import org.scalatest.time.Span

final class SparkTests extends FunSuite with TimeLimitedTests {

  private val CONSUMPTION_TIMEOUT_UNITS: Int = 10
  private val COUNT_CONSUMER_THREADS: Int = 1

  private val KAFKA_AUTO_OFFSET_RESET_LATEST : String = "latest"
  private val KAFKA_MESSAGE_BROKER: String = "localhost:9092"
  private val KAFKA_MESSAGE_ACKNOWLEDGEMENTS_LEADER: String = "1"
  private val KAFKA_DESERIALIZER_STRING: Object = classOf[StringDeserializer]
  private val KAFKA_SERIALIZER_STRING: String = "org.apache.kafka.common.serialization.StringSerializer"
  private val KAFKA_TOPIC: String = "test"
  private val KAFKA_DISABLE_AUTO_COMMIT: java.lang.Boolean = false

  private val KAFKA_PROPERTY_KEY_AUTO_OFFSET_RESET: String = "auto.offset.reset"
  private val KAFKA_PROPERTY_KEY_BROKER_ACKNOWLEDGEMENTS: String = "acks"
  private val KAFKA_PROPERTY_KEY_BOOTSTRAP_SERVERS: String = "bootstrap.servers"
  private val KAFKA_PROPERTY_KEY_ENABLE_AUTO_COMMIT: String = "enable.auto.commit"
  private val KAFKA_PROPERTY_KEY_GROUP_IDENTIFIER: String = "group.id"
  private val KAFKA_PROPERTY_KEY_DESERIALIZER_KEYS: String = "key.deserializer"
  private val KAFKA_PROPERTY_KEY_DESERIALIZER_VALUES: String = "value.deserializer"
  private val KAFKA_PROPERTY_KEY_SERIALIZER_KEYS: String = "key.serializer"
  private val KAFKA_PROPERTY_KEY_SERIALIZER_VALUES: String = "value.serializer"

  private val SPARK_BATCH_DURATION: Duration = Seconds(10)
  private val SPARK_MASTER_NODE: String = "local[*]"
  private val SPARK_CHECKPOINT = "SparkStreamingTestsCheckpoint"

  private val TIME_LIMIT_UNITS: Int = CONSUMPTION_TIMEOUT_UNITS * 2
  private val ZOOKEEPER: String = "localhost:2181"

  private val kafkaConsumerParametersDirect: Map[String, Object] = Map[String, Object](
    (KAFKA_PROPERTY_KEY_BOOTSTRAP_SERVERS, KAFKA_MESSAGE_BROKER),
    (KAFKA_PROPERTY_KEY_DESERIALIZER_KEYS, KAFKA_DESERIALIZER_STRING),
    (KAFKA_PROPERTY_KEY_DESERIALIZER_VALUES, KAFKA_DESERIALIZER_STRING),
    (KAFKA_PROPERTY_KEY_GROUP_IDENTIFIER, UUID.randomUUID.toString),
    (KAFKA_PROPERTY_KEY_AUTO_OFFSET_RESET, KAFKA_AUTO_OFFSET_RESET_LATEST),
    (KAFKA_PROPERTY_KEY_ENABLE_AUTO_COMMIT, KAFKA_DISABLE_AUTO_COMMIT))

  private val kafkaProducerProperties: Properties = {
    val result: Properties = new Properties()
    result.put(KAFKA_PROPERTY_KEY_BOOTSTRAP_SERVERS, KAFKA_MESSAGE_BROKER)
    result.put(KAFKA_PROPERTY_KEY_SERIALIZER_KEYS, KAFKA_SERIALIZER_STRING)
    result.put(KAFKA_PROPERTY_KEY_SERIALIZER_VALUES, KAFKA_SERIALIZER_STRING)
    result.put(KAFKA_PROPERTY_KEY_BROKER_ACKNOWLEDGEMENTS, KAFKA_MESSAGE_ACKNOWLEDGEMENTS_LEADER)
    result
  }

  private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer(kafkaProducerProperties)

  private lazy val sparkConfiguration: SparkConf = {
    new SparkConf()
    .setMaster(SPARK_MASTER_NODE)
    .setAppName(UUID.randomUUID.toString)
    .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
  }
  private lazy val sparkConsumerStrategy: ConsumerStrategy[String, String] =
    Subscribe[String, String](Array(KAFKA_TOPIC), kafkaConsumerParametersDirect)

  private lazy val sparkStreamingContext: StreamingContext =
    new StreamingContext(sparkConfiguration, SPARK_BATCH_DURATION)

  private val sparkLocationStrategy: LocationStrategy = PreferConsistent

  private val sparkOffsetRanges: Array[OffsetRange] = Array(OffsetRange(KAFKA_TOPIC, 0, 0, 100))

  val timeLimit = Span(TIME_LIMIT_UNITS, org.scalatest.time.Seconds)
   object WaiterSink {
    val waiter: Waiter = new Waiter

    def apply(): Waiter = waiter
  }
  test("A message can be received via a direct stream") {
    assert(Option(sparkStreamingContext).isDefined)
    assert(Option(sparkConsumerStrategy).isDefined)
    assert(Option(kafkaProducer).isDefined)

    val recordValue: String = UUID.randomUUID.toString

    val waiter: Waiter = new Waiter

    val stream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](
        sparkStreamingContext,
        sparkLocationStrategy,
        sparkConsumerStrategy)
    val discretizedStream: DStream[(String, String)] =
          stream.map(
            (item: ConsumerRecord[String, String]) =>
              (item.key, item.value))

    val recordFunction: ((String, String)) => Unit =
      (record: (String, String)) => {
        if (recordValue == record._2) {
          WaiterSink().dismiss()
        }
      }
    val outputFunction: (RDD[(String, String)]) => Unit =
      (dataset: RDD[(String, String)]) => {
        dataset.foreach(recordFunction)
      }
    discretizedStream.foreachRDD(outputFunction)

    try {
      sparkStreamingContext.start

      val record: ProducerRecord[String, String] =
        new ProducerRecord[String, String](
          KAFKA_TOPIC,
          recordValue)
      val metadata: RecordMetadata = kafkaProducer.send(record).get()
      assert(Option(metadata).isDefined)

      WaiterSink().await(Timeout(Span(CONSUMPTION_TIMEOUT_UNITS, org.scalatest.time.Seconds)))
    }
    finally {
      sparkStreamingContext.stop(true)
    }
  }
}