From Spark 1.3, the team introduced a data source API to help quickly integrating various input formats with Spark SQL. But eventually this version of API became insufficient and the team needed to add a lot of internal codes to provide more efficient solutions for Spark SQL data sources. So in Spark 2.3, the second version of data source API is out, which is supposed to overcome the limitations of the previous version. In this article, I will demonstrate how to implement custom data source for Spark SQL in both V1 and V2 API, to help understanding their differences and the new API’s advantages.
DataSource V1 API
V1 API provides a set of abstract classes and traits. They are located in spark/sql/sources/interfaces.scala. Some basic APIs are:
1 | trait RelationProvider { |
A RelationProvider
defines a class that can create a relational data source for Spark SQL to manipulate with. It can initialize itself with provided options, such as file path or authentication. BaseRelation
is used to define the data schema, which can be loaded from database, Parquet file, or specified by the user. This class also needs to mix-in one of the Scan
traits, implements the buildScan
method, and returns an RDD.
JdbcSourceV1
Now we use V1 API to implement a JDBC data source. For simplicity, the table schema is hard coded, and it only supports full table scan. Complete example can be found on GitHub (link), while the sample data is in here.
1 | class JdbcSourceV1 extends RelationProvider { |
The actual data reading happens in JdbcRDD#compute
. It receives the connection options, possibly with pruned column list and where conditions, executes the query, and returns an iterator of Row
objects, correspondent to the defined schema. Now we can create a DataFrame
from this custom data source.
1 | val df = spark.read |
The outputs are:
1 | root |
Limitations of V1 API
As we can see, V1 API is quite straightforward and can meet the initial requirements of Spark SQL use cases. But as Spark moves forward, V1 API starts to show its limitations.
Coupled with Higher Level API
createRelation
accepts SQLContext
as parameter; buildScan
returns RDD
of Row
; and when implementing writable data source, the insert
method accepts DataFrame
type.
1 | trait InsertableRelation { |
These classes are of higher level of Spark API, and some of them have already upgraded, like SQLContext
is superceded by SparkSession
, and DataFrame
is now an alias of Dataset[Row]
. Data sources should not be required to reflect these changes.
Hard to Add New Push Down Operators
Besides TableScan
, V1 API provides PrunedScan
to eliminate unnecessary columns, and PrunedFilteredScan
to push predicates down to data source. In JdbcSourceV1
, they are reflected in the SQL statement.
1 | class JdbcRelationV1 extends BaseRelation with PrunedFilteredScan { |
What if we need to push down a new operator like limit
? It will introduce a whole new set of Scan
traits.
1 | trait LimitedScan { |
Hard to Pass Partition Info
For data sources that support partitioning like HDFS and Kafka, V1 API does not provide native support for partitioning and data locality. We need to achieve this by extending the RDD class. For instance, some Kafka topic contains several partitions, and we want the data reading task to be run on the servers where leader brokers reside.
1 | case class KafkaPartition(partitionId: Int, leaderHost: String) extends Partition { |
Besides, some database like Cassandra distributes data by primary key. If the query pipeline contains grouping on the columns, this information can be used by the optimizer to avoid shuffling. V2 API supports this with a dedicated trait.
Lack of Transactional Writing
Spark tasks may fail, and with V1 API there will be partially written data. For file systems like HDFS, we can put a _SUCCESS
file in the output directory to indicate if the job finishes successfully, but this process needs to be implemented by users, while V2 API provides explicit interfaces to support transactional writing.
Lack of Columnar and Streaming Support
Columnar data and stream processing are both added to Spark SQL without using V1 API. Current implementations like ParquetFileFormat
and KafkaSource
are written in dedicated codes with internal APIs. These features are also addressed by V2 API.
DataSource V2 API
V2 API starts with a marker interface DataSourceV2
. The class needs to be mixed-in with either ReadSupport
or WriteSupport
. ReadSupport
interface, for instance, creates a DataSourceReader
with initialization options; DataSourceReader
reads schema of the data source, and returns a list of DataReaderFactory
; the factory will create the actual DataReader
, which works like an iterator. Besides, DataSourceReader
can mix-in various Support
interfaces, to apply query optimizations like operator push-down and columnar scan. For WriteSupport
interfaces, the hierarchy is similar. All of them are written in Java for better interoperability.
1 | public interface DataSourceV2 {} |
You may notice that DataSourceReader#createDataReaderFactories
still relies on Row
class, because currently only Row
is supported, and V2 API is still marked as Evolving
.
JdbcSourceV2
Let us rewrite the JDBC data source with V2 API. The following is an abridged example of full table scan. Complete code can be found on GitHub (link).
1 | class JdbcDataSourceReader extends DataSourceReader { |
Prune Columns
DataSourceReader
can mix-in the SupportsPushDownRequiredColumns
trait. Spark will invoke the pruneColumns
method with required StructType
, and DataSourceReader
can pass it to underlying DataReader
.
1 | class JdbcDataSourceReader with SupportsPushDownRequiredColumns { |
We can examine the execution plan with df.explain(true)
. For instance, the optimized logical plan of query SELECT emp_name, age FROM employee
shows column pruning is pushed down to the data source.
1 | == Analyzed Logical Plan == |
Push Down Filters
Similarly, with SupportsPushDownFilters
, we can add where conditions to the underlying SQL query.
1 | class JdbcDataSourceReader with SupportsPushDownFilters { |
Multiple Partitions
createDataReaderFactories
returns a list. Each reader will output data for an RDD partition. Say we want to parallelize the data reading tasks, we can divide the records into two parts, according to primary key ranges.
1 | def createDataReaderFactories() = { |
Transactional Write
V2 API provides two sets of commit
/ abort
methods to implement transactional writes.
1 | public interface DataSourceWriter { |
DataSourceWriter
is running on Spark driver, DataWriter
on executor. When DataWriter
succeeds in writing, it sends commit message to driver, and after DataSourceWriter
collects all writers’ commit messages, it will do the final commit. If the writer task fails, abort
will be called, and a new task will be retried. When the retries hit the maximum, abort
will be called on all tasks.
Columnar and Streaming Support
These features are currently still in experimental status and there is no concrete implementation yet. Briefly, DataSourceReader
can mix-in SupportsScanColumnarBatch
trait and creates DataReaderFactory
that handles ColumnarBatch
, an interface that Spark uses to represent columnar data. For streaming support, there are MicroBatchReader
and ContinuousReader
traits. One can refer to the unit tests for more details.
References
- http://blog.madhukaraphatak.com/spark-datasource-v2-part-1/
- https://databricks.com/session/apache-spark-data-source-v2
- https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html
- https://developer.ibm.com/code/2018/04/16/introducing-apache-spark-data-sources-api-v2/