Extracting data from upstream message queues is a common task in ETL. In a Hadoop based data warehouse, we usually use Flume to import event logs from Kafka into HDFS, and then run MapReduce jobs agaist it, or create Hive external tables partitioned by time. One of the keys of this process is to extract the event time from the logs, since real-time data can have time lags, or your system is temporarily offline and need to perform a catch-up. Flume provides various facilities to help us do this job easily.
HDFS Sink and Timestamp Header
Here is a simple HDFS Sink config:
1 | a1.sinks = k1 |
%Y%m%d
is the placeholders supported by this sink. It will use the milliseconds in timestamp
header to replace them. Also, HDFS Sink provides hdfs.useLocalTimeStamp
option so that it’ll use the local time to replace these placeholders, but this is not what we intend.
Another sink we could use is the Hive Sink, which directly communicates with Hive metastore and loads data into HDFS as Hive table. It supports both delimited text and JSON serializers, and also requires a timestamp
header. But we don’t choose it for the following reasons:
- It doesn’t support regular expression serializer, so we cannot extract columns from arbitrary data format like access logs;
- The columns to be extracted are defined in Hive metastore. Say the upstream events add some new keys in JSON, they will be dropped until Hive table definition is updated. As in data warehouse, it’s better to preserve the original source data for a period of time.
Regex Extractor Interceptor
Flume has a mechanism called Interceptor, i.e. some optionally chained operations appended to Source, so as to perform various yet primitive transformation. For instance, the TimestampInterceptor
is to add current local timestamp to the event header. In this section, I’ll demonstrate how to extract event time from access logs and JSON serialized logs with the help of interceptors.
1 | 0.123 [2017-06-27 09:08:00] GET / |
RegexExtractorInterceptor
can be used to extract values based on regular expressions. Here’s the config:
1 | a1.sources.r1.interceptors = i1 |
It searches the string with pattern \[(.*?)\]
, capture the first sub-pattern as s1
, then parse it as a datetime string, and finally store it into headers with the name timestamp
.
Search And Replace Interceptor
For JSON strings:
1 | {"actionTime":1498525680.023,"actionType":"pv"} |
We can also extract actionTime
with a regular expression, but note that HDFS Sink requires the timestamp in milliseconds, so we have to first convert the timestamp with SearchAndReplaceInterceptor
.
1 | a1.sources.r1.interceptors = i1 i2 |
There’re two chained interceptors, first one replaces 1498525680.023
with 1498525680023
and second extracts actionTime
right into headers.
Custom Interceptor
It’s also possible to write your own interceptor, thus do the extraction and conversion in one step. Your interceptor should implements org.apache.flume.interceptor.Interceptor
and then do the job in intercept
method. The source code and unit test can be found on GitHub (link). Please add flume-ng-core
to your project dependencies.
1 | public class ActionTimeInterceptor implements Interceptor { |
Use Kafka Channel Directly
When the upstream is Kafka, and you have control of the message format, you can further eliminate the Source and directly pass data from Kafka to HDFS. The trick is to write messages in AvroFlumeEvent
format, so that Kafka Channel can deserialize them and use the timestamp
header within. Otherwise, Kafka channel will parse messages as plain text with no headers, and HDFS sink will complain missing timestamp
.
1 | // construct an AvroFlumeEvent, this class can be found in flume-ng-sdk artifact |