Extracting data from upstream message queues is a common task in ETL. In a Hadoop based data warehouse, we usually use Flume to import event logs from Kafka into HDFS, and then run MapReduce jobs agaist it, or create Hive external tables partitioned by time. One of the keys of this process is to extract the event time from the logs, since real-time data can have time lags, or your system is temporarily offline and need to perform a catch-up. Flume provides various facilities to help us do this job easily.
Here is a simple HDFS Sink config:
%Y%m%d is the placeholders supported by this sink. It will use the milliseconds in
timestamp header to replace them. Also, HDFS Sink provides
hdfs.useLocalTimeStamp option so that it’ll use the local time to replace these placeholders, but this is not what we intend.
Another sink we could use is the Hive Sink, which directly communicates with Hive metastore and loads data into HDFS as Hive table. It supports both delimited text and JSON serializers, and also requires a
timestamp header. But we don’t choose it for the following reasons:
- It doesn’t support regular expression serializer, so we cannot extract columns from arbitrary data format like access logs;
- The columns to be extracted are defined in Hive metastore. Say the upstream events add some new keys in JSON, they will be dropped until Hive table definition is updated. As in data warehouse, it’s better to preserve the original source data for a period of time.
Flume has a mechanism called Interceptor, i.e. some optionally chained operations appended to Source, so as to perform various yet primitive transformation. For instance, the
TimestampInterceptor is to add current local timestamp to the event header. In this section, I’ll demonstrate how to extract event time from access logs and JSON serialized logs with the help of interceptors.
RegexExtractorInterceptor can be used to extract values based on regular expressions. Here’s the config:
It searches the string with pattern
\[(.*?)\], capture the first sub-pattern as
s1, then parse it as a datetime string, and finally store it into headers with the name
For JSON strings:
We can also extract
actionTime with a regular expression, but note that HDFS Sink requires the timestamp in milliseconds, so we have to first convert the timestamp with
There’re two chained interceptors, first one replaces
1498525680023 and second extracts
actionTime right into headers.
It’s also possible to write your own interceptor, thus do the extraction and conversion in one step. Your interceptor should implements
org.apache.flume.interceptor.Interceptor and then do the job in
intercept method. The source code and unit test can be found on GitHub (link). Please add
flume-ng-core to your project dependencies.
When the upstream is Kafka, and you have control of the message format, you can further eliminate the Source and directly pass data from Kafka to HDFS. The trick is to write messages in
AvroFlumeEvent format, so that Kafka Channel can deserialize them and use the
timestamp header within. Otherwise, Kafka channel will parse messages as plain text with no headers, and HDFS sink will complain missing