Apache Flume is a real-time ETL tool for data warehouse platform. It consists of different types of components, and during runtime all of them are managed by Flume’s lifecycle and supervisor mechanism. This article will walk you through the source code of Flume’s component lifecycle management.
Flume’s source code can be downloaded from GitHub. It’s a Maven project, so we can import it into an IDE for efficient code reading. The following is the main structure of the project:
main entrance of Flume agent is in the
org.apache.flume.node.Application class of
flume-ng-node module. Following is an abridged source code:
The process can be illustrated as follows:
- Parse command line arguments with
commons-cli, including the Flume agent’s name, configuration method and path.
- Configurations can be provided via properties file or ZooKeeper. Both provider support live-reload, i.e. we can update component settings without restarting the agent.
- File-based live-reload is implemented by using a background thread that checks the last modification time of the file.
- ZooKeeper-based live-reload is provided by Curator’s
NodeCacherecipe, which uses ZooKeeper’s watch functionality underneath.
- If live-reload is on (by default), configuration providers will add themselves into the application’s component list, and after calling
LifecycleSupervisorwill start the provider, and trigger the reload event to parse the configuration and load all defined components.
- If live-reload is off, configuration providers will parse the file immediately and start all components, also supervised by
- Finally add a JVM shutdown hook by
Runtime#addShutdownHook, which in turn invokes
Application#stopto shutdown the Flume agent.
PollingPropertiesFileConfigurationProvider, when it detects file changes, it will invoke the
AbstractConfigurationProvider#getConfiguration method to parse the configuration file into an
MaterializedConfiguration instance, which contains the source, sink, and channel definitions. And then, the polling thread send an event to
Application via a Guava’s
EventBus instance, which effectively invokes the
Application#handleConfigurationEvent method to reload all components.
The starting process lies in
Application#startAllComponents. The method accepts a new set of components, starts the
Channels first, followed by
LifecycleSupervisor manages instances that implement
LifecycleAware interface. Supervisor will schedule a
MonitorRunnable instance with a fixed delay (3 secs), which tries to convert a
LifecycleAware instance into its
desiredState, by calling
When JVM is shutting down, the hook invokes
Application#stop, which calls
LifecycleSupervisor#stop, that first shutdowns the
MonitorRunnables’ executor pool, and changes all components’ desired status to
STOP, waiting for them to fully shutdown.
KafkaSource for an instance, we shall see how agent supervises source components, and the same thing happens to sinks and channels.
KafkaSource is a pollable source, which means it needs a runner thread to constantly poll for more data to process.
SourceRunner are subclass of
LifecycleAware, which means they have
stop methods for supervisor to call. In this case,
SourceRunner is the component that Flume agent actually supervises, and
PollableSource is instantiated and managed by
SourceRunner. Details lie in