Exactly-once semantics is one of the advanced topics of stream processing. To process every message once and only once, in spite of system or network failure, not only the stream processing framework needs to provide such functionality, but also the message delivery system, the output data store, as well as how we implement the processing procedure, altogether can we ensure the exactly-once semantics. In this article, I’ll demonstrate how to use Spark Streaming, with Kafka as data source and MySQL the output storage, to achieve exactly-once stream processing.
First let’s implement a simple yet complete stream processing application that receive access logs from Kafka, parse and count the errors, then write the errors per minute metric into MySQL database.
Sample access logs:
Output table, where
log_time should be truncated to minutes:
Scala projects are usually managed by
sbt tool. Let’s add the following dependencies into
build.sbt file. We’re using Spark 2.2 with Kafka 0.10. The choice of database library is ScalikeJDBC 3.0.
The complete code can be found on GitHub (link), so here only shows the major parts of the application:
There’re three semantics in stream processing, namely at-most-once, at-least-once, and exactly-once. In a typical Spark Streaming application, there’re three processing phases: receive data, do transformation, and push outputs. Each phase takes different efforts to achieve different semantics.
For receiving data, it largely depends on the data source. For instance, reading files from a fault-tolerant file system like HDFS, gives us exactly-once semantics. For upstream queues that support acknowledgement, e.g. RabbitMQ, we can combine it with Spark’s write ahead logs to achieve at-least-once semantics. For unreliable receivers like
socketTextStream, there might be data loss due to worker/driver failure and gives us undefined semantics. Kafka, on the other hand, is offset based, and its direct API can give us exactly-once semantics.
When transforming data with Spark’s RDD, we automatically get exactly-once semantics, for RDD is itself immutable, fault-tolerant and deterministically re-computable. As long as the source data is available, and there’s no side effects during transformation, the result will always be the same.
Output operation by default has at-least-once semantics. The
foreachRDD function will execute more than once if there’s worker failure, thus writing same data to external storage multiple times. There’re two approaches to solve this issue, idempotent updates, and transactional updates. They are further discussed in the following sections.
If multiple writes produce the same data, then this output operation is idempotent.
saveAsTextFile is a typical idempotent update; messages with unique keys can be written to database without duplication. This approach will give us the equivalent exactly-once semantics. Note though it’s usually for map-only procedures, and it requires some setup on Kafka DStream.
false. By default, Kafka DStream will commit the consumer offsets right after it receives the data. We want to postpone this action unitl the batch is fully processed.
- Turn on Spark Streaming’s checkpointing to store Kafka offsets. But if the application code changes, checkpointed data is not reusable. This leads to a second option:
- Commit Kafka offsets after outputs. Kafka provides a
commitAsyncAPI, and the
HasOffsetRangesclass can be used to extract offsets from the initial RDD:
Transactional updates require a unique identifier. One can generate from batch time, partition id, or Kafka offsets, and then write the result along with the identifier into external storage within a single transaction. This atomic operation gives us exactly-once semantics, and can be applied to both map-only and aggregation procedures.
Usually writing to database should happen in
foreachPartition, i.e. in worker nodes. It is true for map-only procedure, because Kafka RDD’s partition is correspondent to Kafka partition, so we can extract each partition’s offset like this:
But for shuffled operations like the error log count example, we need to first collect the result back into driver and then perform the transaction.
If the offsets fail to update, or there’s a duplicate offset range detected by
offset != $fromOffset, the whole transaction will rollback, which guarantees the exactly-once semantics.
Exactly-once is a very strong semantics in stream processing, and will inevitably bring some overhead to your application and impact the throughput. It’s also not applicable to windowed operations. So you need to decide whether it’s necessary to spend such efforts, or weaker semantics even with few data loss will suffice. But surely knowing how to achieve exactly-once is a good chance of learning, and it’s a great fun.