Sink is the last component of Apache Flume data flow, and it is used to output data into storages like local files, HDFS, ElasticSearch, etc. In this article, I will illustrate how Flume’s HDFS sink works, by analyzing its source code with diagrams.
Sink Component Lifecycle
In the previous article, we learnt that every Flume component implements LifecycleAware
interface, and is started and monitored by LifecycleSupervisor
. Sink component is not directly invoked by this supervisor, but wrapped in SinkRunner
and SinkProcessor
classes. Flume supports three different sink processors, to connect channel and sinks in different semantics. But here we only consider the DefaultSinkProcessor
, that accepts only one sink, and we will skip the concept of sink group as well.
HDFS Sink Classes
HDFS sink’s source code locates in flume-hdfs-sink
sub-module, and is composed of the following classes:
HDFSEventSink
class implements the lifecycle methods, including configure
, start
, process
, and stop
. It maintains a list of BucketWriter
, according to the output file paths, and delegates received events to them. With different implementations of HDFSWriter
, BucketWriter
can append data to either text file, compressed file, or sequence file.
Configure and Start
When Flume configuration file is loaded, configure
method is called on every sink component. In HDFSEventSink#configure
, it reads properties that are prefixed with hdfs.
from the context, provides default values, and does some sanity checks. For instance, batchSize
must be greater than 0, codeC
must be provided when fileType
is CompressedStream
, etc. It also initializes a SinkCounter
to provide various metrics for monitoring.
1 | public void configure(Context context) { |
SinkProcessor
will invoke the HDFSEventSink#start
method, in which two thread pools are created. callTimeoutPool
is used by BucketWriter#callWithTimeout
to limit the time that HDFS calls may take, such as FileSystem#create
, or FSDataOutputStream#hflush
. timedRollerPool
is used to schedule a periodic task to do time-based file rolling, if rollInterval
property is provided. More details will be covered in the next section.
1 | public void start() { |
Process Events
The process
method contains the main logic, i.e. pull events from upstream channel and send them to HDFS. Here is the flow chart of this method.
Channel Transaction
Codes are wrapped in a channel transaction, with some exception handlings. Take Kafka channel for instance, when transaction begins, it takes events without committing the offset. Only after it successfully writes these events into HDFS, the consumed offset will be sent to Kafka. And in the next transaction, it can consume messages from the new offset.
1 | Channel channel = getChannel(); |
Find or Create BucketWriter
BucketWriter
corresponds to an HDFS file, and the file path is generated from configuration. For example:
1 | a1.sinks.access_log.hdfs.path = /user/flume/access_log/dt=%Y%m%d |
The generated file paths, temporary and final, will be:
1 | /user/flume/access_log/dt=20180925/.events.hostname1.1537848761307.lzo.tmp |
Placeholders are replaced in BucketPath#escapeString
. It supports three kinds of placeholders:
%{...}
: replace with arbitrary header values;%[...]
: currently only supports%[localhost]
,%[ip]
, and%[fqdn]
;%x
: date time patterns, which requires atimestamp
entry in headers, oruseLocalTimeStamp
is enabled.
And the prefix and suffix is added in BucketWriter#open
. counter
is the timestamp when this bucket is opened or re-opened, and lzo
is the default extension of the configured compression codec.
1 | String fullFileName = fileName + "." + counter; |
If no BucketWriter
is associated with the file path, a new one will be created. First, it creates an HDFSWriter
corresponding to the fileType
config. Flume supports three kinds of writers: HDFSSequenceFile
, HDFSDataStream
, and HDFSCompressedDataStream
. They handle the actual writing to HDFS files, and will be assigned to the new BucketWriter
.
1 | bucketWriter = sfWriters.get(lookupPath); |
Append Data and Flush
Before appending data, BucketWriter
will first self-check whether it is opened. If not, it will call its underlying HDFSWriter
to open a new file on HDFS filesystem. Take HDFSCompressedDataStream
for instance:
1 | public void open(String filePath, CompressionCodec codec) { |
Flume’s default serializerType
is TEXT
, i.e. BodyTextEventSerializer that simply writes the event content to the output stream.
1 | public void write(Event e) throws IOException { |
When BucketWriter
is about to close or re-open, it calls sync
on HDFSWrtier
, which in turn calls flush
on serializer and underlying output stream.
1 | public void sync() throws IOException { |
From Hadoop 0.21.0, the Syncable#sync
method is divided into hflush
and hsync
methods. Former just flushes data out of client’s buffer, while latter guarantees data is synced to disk device. In order to handle both old and new API, Flume will use Java reflection to determine whether hflush
exists, or fall back to sync
. The flushOrSync
method will invoke the right method.
File Rotation
In HDFS sink, files can be rotated by file size, event count, or time interval. BucketWriter#shouldRotate
is called in every append
:
1 | private boolean shouldRotate() { |
Time-based rolling, on the other hand, is scheduled in the previously mentioned timedRollerPool
:
1 | private void open() throws IOException, InterruptedException { |
Close and Stop
In HDFSEventSink#close
, it iterates every BucketWriter
and calls its close
method, which in turn calls its underlying HDFSWriter
‘s close
method. What it does is mostly like flush
method, but also closes the output stream and invokes some callback functions, like removing current BucketWriter
from the sfWriters
hash map.
1 | public synchronized void close(boolean callCloseCallback) { |
The onCloseCallback
is passed from HDFSEventSink
when initializing the BucketWriter
:
1 | WriterCallback closeCallback = new WriterCallback() { |
After all BucketWriter
s are closed, HDFSEventSink
then shutdown the callTimeoutPool
and timedRollerPool
executer services.
1 | ExecutorService[] toShutdown = { callTimeoutPool, timedRollerPool }; |