Data extraction is the very first step of an ETL process. We need to load data from external data stores like RDMBS or logging file system, and then we can do cleaning, transformation and summary. In modern website stack, MySQL is the most widely used database, and it’s common to extract data from different instances and load into a central MySQL database, or directly into Hive. There’re several query-based techniques that we can use to do the extraction, including the popular open source software Sqoop, but they are not meant for real-time data ingestion. Binlog, on the other hand, is a real-time data stream that is used to do replication between master and slave instances. With the help of Alibaba’s open sourced Canal project, we can easily utilize the binlog facility to do data extraction from MySQL database to various destinations.
In brief, Canal simulates itself to be a MySQL slave and dump binlog from master, parse it, and send to downstream sinks. Canal consists of two major components, namely Canal server and Canal client. A Canal server can connect to multiple MySQL instances, and maintains an event queue for each instance. Canal clients can then subscribe to theses queues and receive data changes. The following is a quick start guide to get Canal going.
MySQL binlog is not enabled by default. Locate your
my.cnf file and make these changes:
binlog_format must be
ROW, becuase in
MIXED mode, only SQL statements will be logged and transferred (to save log size), but what we need is full data of the changed rows.
Slave connects to master via an dedicated account, which must have the global
REPLICATION priviledges. We can use the
GRANT statement to create the account:
Download Canal server from its GitHub Releases page (link). The config files reside in
conf directory. A typical layout is:
conf/canal.properties there’s the main configuration.
canal.port for example defines which port Canal server is listening.
instanceA/instance.properties defines the MySQL instance that Canal server will draw binlog from. Important settings are:
Start the server by
sh bin/startup.sh, and you’ll see the following output in
To consume update events from Canal server, we can create a Canal client in our application, specify the instance and tables we’re interested in, and start polling.
com.alibaba.otter:canal.client dependency to your
pom.xml, and construct a Canal client:
The code is quite similar to consuming from a message queue. The update events are sent in batches, and you can acknowledge every batch after being properly processed.
Entry in a message represents a set of row changes with the same event type, e.g. INSERT, UPDATE, or DELETE. For each row, we can get the column data like this:
Full example can be found on GitHub (link).
For DB based data warehouse, we can directly use the
REPLACE statement and let the database deduplicates rows by primary key. One concern is the instertion performance, so it’s often necessary to cache the data for a while and do a batch insertion, like:
Another approach is to write the extracted data into a delimited text file, then execute a
LOAD DATA statement. These files can also be used to import data into Hive. But for both approaches, make sure you escape the string columns properly, so as to avoid insertion errors.
Hive tables are stored on HDFS, which is an append-only file system, so it takes efforts to update data in a previously loaded table. One can use a JOIN-based approach, Hive transaction, or switch to HBase.
Data can be categorized into base and delta. For example, yesterday’s
user table is the base, while today’s updated rows are the delta. Using a
FULL OUTER JOIN we can generate the latest snapshot:
Hive 0.13 introduces transaction and ACID table, 0.14 brings us the
DELETE statements, and Hive 2.0.0 provides a new Streaming Mutation API that can be used to submit insert/update/delete transactions to Hive tables programmatically. Currently, ACID tables must use ORC file format, and be bucketed by primiary key. Hive will store the mutative operations in delta files. When reading from this table,
OrcInputFormat will figure out which record is the latest. The official sample code can be found in the test suite (link).
And the final approach is to use HBase, which is a key-value store built on HDFS, making it perfect for data updates. Its table can also be used by MapReduce jobs, or you can create an external Hive table that points directly to HBase. More information can be found on the official website.
Data extraction is usually on-demand, so there may be already historical data in the source table. One obvious approach is dumping the full table manually and load into destination. Or we can reuse the Canal facility, notify the client to query data from source and do the updates.
First, we create a helper table in the source database:
To reload all records in
When Canal client receives the
retl_buffer table, it can extract the table name and primary key value from the record, query the source database, and write to the destination.
This approach is included in another Alibaba’s project Otter.
- Canal instances can be supplied with a standby MySQL source, typically in a Master-Master HA scenario. Make sure you turn on the
log_slave_updatesoption in both MySQL instances. Canal uses a dedicated heartbeat check, i.e. update a row periodically to check if current source is alive.
- Canal server itself also supports HA. You’ll need a Zookeeper quorum to enable this feature. Clients will get the current server location from Zookeeper, and the server will record the last binlog offset that has been consumed.
For more information, please checkout the AdminGuide.
- https://github.com/alibaba/canal/wiki (in Chinese)
- https://github.com/alibaba/otter/wiki (in Chinese)