Apache Flume is a real-time ETL tool for data warehouse platform. It consists of different types of components, and during runtime all of them are managed by Flume’s lifecycle and supervisor mechanism. This article will walk you through the source code of Flume’s component lifecycle management.
Repository Structure
Flume’s source code can be downloaded from GitHub. It’s a Maven project, so we can import it into an IDE for efficient code reading. The following is the main structure of the project:
1 | /flume-ng-node |
Application Entrance
The main
entrance of Flume agent is in the org.apache.flume.node.Application
class of flume-ng-node
module. Following is an abridged source code:
1 | public class Application { |
The process can be illustrated as follows:
- Parse command line arguments with
commons-cli
, including the Flume agent’s name, configuration method and path. - Configurations can be provided via properties file or ZooKeeper. Both provider support live-reload, i.e. we can update component settings without restarting the agent.
- File-based live-reload is implemented by using a background thread that checks the last modification time of the file.
- ZooKeeper-based live-reload is provided by Curator’s
NodeCache
recipe, which uses ZooKeeper’s watch functionality underneath.
- If live-reload is on (by default), configuration providers will add themselves into the application’s component list, and after calling
Application#start
, aLifecycleSupervisor
will start the provider, and trigger the reload event to parse the configuration and load all defined components. - If live-reload is off, configuration providers will parse the file immediately and start all components, also supervised by
LifecycleSupervisor
. - Finally add a JVM shutdown hook by
Runtime#addShutdownHook
, which in turn invokesApplication#stop
to shutdown the Flume agent.
Configuration Reload
In PollingPropertiesFileConfigurationProvider
, when it detects file changes, it will invoke the AbstractConfigurationProvider#getConfiguration
method to parse the configuration file into an MaterializedConfiguration
instance, which contains the source, sink, and channel definitions. And then, the polling thread send an event to Application
via a Guava’s EventBus
instance, which effectively invokes the Application#handleConfigurationEvent
method to reload all components.
1 | // Application class |
Start Components
The starting process lies in Application#startAllComponents
. The method accepts a new set of components, starts the Channel
s first, followed by Sink
s and Source
s.
1 | private void startAllComponents(MaterializedConfiguration materializedConfiguration) { |
The LifecycleSupervisor
manages instances that implement LifecycleAware
interface. Supervisor will schedule a MonitorRunnable
instance with a fixed delay (3 secs), which tries to convert a LifecycleAware
instance into its desiredState
, by calling LifecycleAware#start
or stop
.
1 | public static class MonitorRunnable implements Runnable { |
Stop Components
When JVM is shutting down, the hook invokes Application#stop
, which calls LifecycleSupervisor#stop
, that first shutdowns the MonitorRunnable
s’ executor pool, and changes all components’ desired status to STOP
, waiting for them to fully shutdown.
1 | public class LifecycleSupervisor implements LifecycleAware { |
Source and Source Runner
Take KafkaSource
for an instance, we shall see how agent supervises source components, and the same thing happens to sinks and channels.
1 | public class KafkaSource extends AbstractPollableSource { |
KafkaSource
is a pollable source, which means it needs a runner thread to constantly poll for more data to process.
1 | public class PollableSourceRunner extends SourceRunner { |
Both AbstractPollableSource
and SourceRunner
are subclass of LifecycleAware
, which means they have start
and stop
methods for supervisor to call. In this case, SourceRunner
is the component that Flume agent actually supervises, and PollableSource
is instantiated and managed by SourceRunner
. Details lie in AbstractConfigurationProvider#loadSources
:
1 | private void loadSources(Map<String, SourceRunner> sourceRunnerMap) { |