'Testing strategies for Apache Spark based projects' post illustration

Testing strategies for Apache Spark based projects

avatar

Let’s talk about tests. The Apache Spark official guide doesn’t suggest an approach for testing, so we have to do the first steps ourselves.

First, we should understand what we are going to achieve with testing. Often developers think all testing is unit testing. Actually there are a lot of tests: unit, integration, component, contract, end-to-end... you name it. I used to separate it into two categories: unit tests and integration tests. The names are unimportant. The border between these terms is vague. Let’s look at what I mean by those words.

Unit testing

By “unit” I mean isolating code for the test. It doesn’t matter what it would be: function, class or a whole package. The main thing that makes something a “unit” is isolation of part of your code.

Pros of unit tests:

  • These tests are fast and small. When a test fails, you are able to find mistakes quickly
  • It’s easy to achieve a big coverage of code. When you test component A that has 3 test cases and component B with 3 test cases, you need to write only 6 tests. But for the integrations test with components A and B, you need to test any combination of these cases, so 3 * 3 = 9 cases

Cons:

  • A lot of code. You need to write a lot of tests to get a whole system tested, even with small coverage. On the other hand, it’s possible to test a whole system with several integration tests
  • Unit tests often requires a lot of mocking, which makes it harder to write
  • It becomes completely unreadable with a lot of mocking
  • Unit tests fixate implementation of your functions

There is one big problem with unit testing. I deliberately didn't add the common mentioned advantage of unit testing: fearless refactoring. Because the last drawback ruins this item completely: you are able to do only simple things within a function, like a renaming, or method extraction. But often you want to completely rewrite some part of a system without breaking it. And this is a place where integration tests come to the rescue.

Integration/e2e testing

The main reasons why I love integration tests:

  1. You can cover a big part of your code with a few tests
  2. You can throw away a big part of your system, and write it from scratch without rewriting integration tests

This makes it a good choice for a beginning, when you have requirements that change as the project progresses. I split integration tests into two categories. One involves infrastructure (databases, http, etc.) code. The other doesn’t. The latter makes this kind of integration testing looks like a “big unit test”. Also, while you are testing a bigger part with a smaller test code, those tests can be considered as a documentation.

What we have now

Let’s define some "lab rat" code for example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import java.util.Properties

class TestJob() {
 def run(): Unit = {
   val spark: SparkSession = SparkSession.builder()
     .master("local[1]")
     .getOrCreate()

   val users = spark.read
     .jdbc("/database", "users", new Properties()) // simplified for example

   val adults = users
     .withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
     .filter(col("age") >= lit(18))
     .drop(col("first_name"))
     .drop(col("last_name"))

   adults.write
     .jdbc("/database", "adults", new Properties()) // simplified for example
 }
}

Unit testing with Spark

To test your code in Spark, you must divide your code in at least 2 parts: domain/computation, and input/output. When your Spark transformation doesn’t have code that writes or reads to/from an outer world, it’s really easy to write unit tests:

1
2
3
4
5
6
// class TestJob() ...

def run(users: DataFrame): DataFrame = {
 return users
   .withColumn... // the same as in TestJob original class
}

And the test (using scalatest):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.scalatest.freespec.AnyFreeSpec

class Test extends AnyFreeSpec {
  "TestJob test" in {
    implicit val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .getOrCreate()

    import spark.implicits._

    val users = Seq(
      ("John",  "Johnson",   17),
      ("Henry", "Petrovich", 18),
      ("Harry", "Harrison",  19)
    ).toDF("first_name", "last_name", "age")

    val adults = new TestJob(users).run()
     
    val expected = Seq(
      ("Henry Petrovich", 18),
      ("Harry Harrison",  19)
    ).toDF("full_name", "age")

    val diff = adults.except(expected)
      .union(expected.except(adults))
    diff.count() should be 0
  }
}

Here we are asserting that data frames are the same by .except method that removes rows from the first dataset that exists in the second dataset. Of course it’s just a simple demo. In the real world you should write a custom assert function with at least a printing of difference in case of an error. An example of such a function, you can find in the repository for this article.

Integration/e2e testing with Spark

The simplest way to write e2e tests for Spark is to use TestContainers to run real databases in the docker, setup input data, and - after a Spark job finishes check output tables. We won’t discuss it in this article, because it doesn’t have any Spark specific details and I think those tests are going to be really long running, making them boring. This is not our way. Instead, let’s use dependency injection to abstract read sources and write targets. Also, let’s add Guice as a dependency injection framework:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.scalatest.freespec.AnyFreeSpec
import javax.inject.Inject
import java.util.Properties

trait UsersDao {
 def getAll: DataFrame
}

class SparkUsersDao @Inject()(spark: SparkSession) extends UsersDao {
 override def getAll: DataFrame = spark.read
   .jdbc("/database", "users", new Properties())
}

trait AdultsDao {
 def save(dataFrame: DataFrame)
}

class SparkAdultsDao @Inject()(spark: SparkSession) extends AdultsDao {
 override def save(dataFrame: DataFrame): Unit = dataFrame.write
   .jdbc("/database", "adults", new Properties())
}

class TestJob @Inject()(spark: SparkSession,
                       usersDao: UsersDao,
                       adultsDao: AdultsDao) {
 def run(): Unit = {
   val users = usersDao.getAll

   val adults = users
     .withColumn... // the same as in TestJob original class

   adultsDao.save(adults)
 }
}

Now it becomes easy to mock implementations with Mockito or Scalamock and write integration tests.

Also we can consider another mocking option. When we are reading and writing we are using Spark, why not mock it? Is it possible? Yes, but I don’t recommend doing this, despite the convenience of using such a mock, Spark was not designed to be replaced with a mock, so mocking Spark will involve a lot of reflection usage. And reflection leads to unstable code: library developers usually provide some backward compatibility for open API, but don’t (and they shouldn’t) provide any guarantees about private API.

But if you are really curious, you can see at SparkStub class in the repository an example that contains experimental implementation of that approach. Now the test job will look like:

1
2
3
4
5
6
7
8
9
10
class TestJob @Inject()(spark: SparkSession) {
 def run(): Unit = {
   spark.read... // the same as in TestJob original class

   val adults = users
     .withColumn... // the same as in TestJob original class

   spark.save... // the same as in TestJob original class
 }
}

But caller of this method will look like this screenshot below:

1
2
3
4
5
6
7
8
9
10
import com.sysgears.{DataFrames, SparkStub}
// ...

implicit val spark: SparkSession = SparkStub.create()
// ...
DataFrames.threadLocal.addTableToRead("jdbc", "users", users)

new TestJob(spark).run()

DataFrames.threadLocal.written("jdbc", "adults").show()

In this case Spark context was mocked by SparkStub util, and - using Mockito implementation of SparkSession and Dataset classes - was modified to read from and write to DataFrames.threadLocal (that actually is just a map with dataframes).

Cucumber

Foreword. This paragraph may require a basic knowledge of Cucumber. For this article, It’s enough to think about Cucumber as a middle layer that provides human readable BDD test syntax. Instead of writing Scala test code as above we are going to get this syntax:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Feature: Cucumber demo
 Scenario: run test job
   Given table: "users" has data:
     | first_name STRING | last_name STRING | age INT |
     | John              | Petrovich        | 17      |
     | Henry             | Johnson          | 18      |
     | Harry             | Potter           | 19      |

   When test job is started

   Then table: "adults" was written with data:
     | full_name STRING | age INT |
     | Henry Johnson    | 18      |
     | Harry Potter     | 19      |

If you want to dive into Cucumber deeper, you can start f by reading the official introduction here or/and read about Cucumber’s DataTable here.


We can slightly increase readability. Let’s use infix method call, and define methods named: ! as unary operator for creating row builder, binary | to add more values to a row, and method | with no parameters that transforms the builder to a dataframe. So we can define dataframes as in the screen shot below:

1
2
3
4
5
6
7
8
9
import com.sysgears.DataFrameBuilder._
// ...
implicit val spark: SparkSession = // ...

val users: DataFrame =
   ! "first_name" | "last_name" | "age" |
   ! "John"       | "Johnson"   | 17    |
   ! "Henry"      | "Petrovich" | 18    |
   ! "Harry"      | "Harrison"  | 19    |

Full code for DataFrameBuilder you can find in the repository.

Another approach is to use Cucumber for providing human readable syntax. I’m not going to dwell on connecting Cucumber to your project, it’s well described in official documentation and other guides. Let’s jump directly to the step definitions.

First of all, we need to store datasets somewhere that can be read and written. A sample version of this will look like this:

1
2
3
4
class DataFrames() {
 val readDataFrames = mutable.HashMap[String, DataFrame]
 val writeDataFrames = mutable.HashMap[String, DataFrame]
}

A more mature version you can find in the repository, but for our example it will be enough. Next, we should define DataFrame data type to allow conversion from Cucumber’s DataTable to Spark DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import io.cucumber.java.en.{Given, Then}
import io.cucumber.java.DataTableType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.JavaConversions._
import scala.language.implicitConversions

class SparkSteps @Inject()(spark: SparkSession, dataFrames: DataFrames) {
  private implicit val sparkSession: SparkSession = spark

  @DataTableType
  def toDataFrame(table: DataTable): DataFrame = {
    def parseValue(`type`: String, value: String) = {
    `type` match {
      case "BOOLEAN" => value.toBoolean
      case "BYTE" | "TINYINT" => value.toByte
      ... // the same for other spark types
       case _ => value
    }
  }

  val header = dataTable.asLists().get(0)
  val namesAndTypes = header
    .map { nameAndType => nameAndType.split(" ") }
    .map { nameAndTypeSplit => (nameAndTypeSplit(0), nameAndTypeSplit(1)) }

  spark.createDataFrame(
    dataTable.asMaps()
     .map { rows => rows.map { case (key, value) => (key.split(" ")(0), value) } }
     .map { nameAndValueRows =>
       namesAndTypes
         .map { case (name, typ) => 
            parseValue(typ, nameAndValueRows.getOrDefault(name,    "")) 
          }
     }
     .map(Row.fromSeq(_)),
    StructType.fromDDL((header ++ defaultsHeader).mkString(", "))
  )
 }
}

Next, step definitions for input data and output assertion:

1
2
3
4
5
6
7
8
9
10
11
12
@Given("table: {string} has data:")
def setDataFrame(table: String, dataFrame: DataFrame): Unit = {
  dataFrames.read.put(table, dataFrame)
}

@Then("table: {string} was written with data:")
def assertDataFrame(table: String, expected: DataFrame): Unit = {
  val actual = dataFrames.write(table)
  val diff = actual.except(expected)
      .union(expected.except(actual))
  diff.count() should be 0
}

Next we must define a step for running the job. We considered three options above in this article, let’s use the first one here:

1
2
3
4
5
@When("test job is started")
def runTestJob() = {
 val adults = job.run(dataFrames.read("users"))
 dataFrames.write.put("adults", adults)
}

Eventually, we can write our test:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Feature: Cucumber demo
 Scenario: run test job
   Given table: "users" has data:
     | first_name STRING | last_name STRING | age INT |
     | John              | Petrovich        | 17      |
     | Henry             | Johnson          | 18      |
     | Harry             | Potter           | 19      |

   When test job is started

   Then table: "adults" was written with data:
     | full_name STRING | age INT |
     | Henry Johnson    | 18      |
     | Harry Potter     | 19      |

Looks good, doesn’t it?

Conclusions

So what approach should you choose? Let’s define some pros and cons.

Unit testing. Easy to set up, Easy to write. But requires your transformation to be isolated from input and output code. Also it forces you to write a lot of assertion code.

Integration testing. The same as a unit testing, but with the dependency injection and some general interface for your DAOs you can remove some boilerplate code. Doesn’t require transformation logic be isolated, but requires the input/output to be abstracted.

e2e. Good option, when you don't have isolated logic and abstracted input/output/ But bear in mind that this requires real databases, and execution speed of these tests can be disappointing.

Cucumber vs plain Scala code. Cucumber is the perfect solution, when you want to combine your code with documentation. But it will only work when you can cover all job start cases with several Cucumber steps (it may require some general interface for every Spark Job). Also, the format of the data table looks close to Spark’s .show() output, and sometimes it makes test writing easier.

What you can find in the repository:

  • Test demo for this article, DataFrameBuilder that brings table syntax DSL to Scala (as in the beginning of the Cucumber section)
  • SparkStub class that allows you to mock Spark sessions, DataFrames class that makes it easy to implement integration between main code and tests
  • Steps definitions for Cucumber that allow you to set default columns, parsing tables as specific Scala classes

SysGears is happy to support you with doing magic with your data by implementing Big Data pipelines in SMACK stack. Feel free to reach out us to info@sysgears.com

If you're looking for a developer or considering starting a new project,
we are always ready to help!

Comments