Extracting data from upstream message queues is a common task in ETL. In a Hadoop based data warehouse, we usually use Flume to import event logs from Kafka into HDFS, and then run MapReduce jobs agaist it, or create Hive external tables partitioned by time. One of the keys of this process is to extract the event time from the logs, since real-time data can have time lags, or your system is temporarily offline and need to perform a catch-up. Flume provides various facilities to help us do this job easily.
Here is a simple HDFS Sink config:
a1.sinks = k1
%Y%m%d is the placeholders supported by this sink. It will use the milliseconds in
timestamp header to replace them. Also, HDFS Sink provides
hdfs.useLocalTimeStamp option so that it’ll use the local time to replace these placeholders, but this is not what we intend.
Another sink we could use is the Hive Sink, which directly communicates with Hive metastore and loads data into HDFS as Hive table. It supports both delimited text and JSON serializers, and also requires a
timestamp header. But we don’t choose it for the following reasons:
- It doesn’t support regular expression serializer, so we cannot extract columns from arbitrary data format like access logs;
- The columns to be extracted are defined in Hive metastore. Say the upstream events add some new keys in JSON, they will be dropped until Hive table definition is updated. As in data warehouse, it’s better to preserve the original source data for a period of time.