TL;DR
Compose dependency graph with Guice:
1 | public class DatabaseModule extends AbstractModule { |
Create singleton injector:
1 | public class AppInjector { |
Use in Flink function:
1 | public class UserMapper extends RichMapFunction<Long, User> { |
Motivation
Dependency injection, or DI, is a common practice in Java programming, especially when you come from a Spring background. The most direct benefit is testability, meaning you can replace class implementation with test stub. Other benefits are separation of concerns, better class hierarchy, inversion of control, etc. Component defines its dependencies via class constructor or annotated members, and the DI framework creates a container, or context, to wire these components properly. This context is usually created at startup and lives through the application lifecycle. Some examples are Spring ApplicationContext
, Guice Injector
.
Flink is a distributed computing framework, and it is favorable to decouple business logic from it by dependency injection. However, Flink application is composed of functional classes, that are instantiated in driver class, or main
method, serialized and sent to distributed task managers. We cannot inject dependencies into these classes unless all our components are serializable. Fortunately, Flink provides a lifecycle hook open
that is called when the job starts. Combined with another common pattern, Singleton, we can make DI framework play well with Flink.
Guice crash course
The dependency injection framework I choose is Guice, because it is simple, light-weight, and effective. Usually we declare class dependencies with constructor, add all components to a module, and let Guice do the rest.
Declare dependencies
There are three ways to declare dependencies for a class. The constructor approach is preferable.
1 | import com.google.inject.Inject; |
Add components to module
Module is a mechanism of Guice to configure the components. How to initialize them, which concrete class implements the interface, what to do when there are multiple implementations, etc. Components are grouped into modules, and modules can be grouped together themselves. There are plenty of topics here, one can refer to its documentation, and I will cover some basic usage.
First, components can be created implicitly, as long as Guice can figure out the dependency graph solely by class type and annotation. For instance:
1 |
|
dataSource
is typed HikariDataSource
, which is a concrete class, so Guice knows how to create it. If it is DataSource
, Guice would raise a missing implementation error. For UserRepository
, however, Guice knows the implementation because we declare it by ImplementedBy
annotation. Otherwise, we need to declare this relationship in a module:
1 | import com.google.inject.AbstractModule; |
These two methods are equivalent. The second approach can be interpreted in this way:
- User requests for a
UserRepository
instance. - Guice sees the
provideUserRepository
method, due to its annotation and return type. - The method requires a
UserRepositoryImpl
parameter. - Guice creates the implementation instance implicitly, because it is a concrete class.
- The method gets the instance, possibly modifies it, and returns it to the user.
The second approach is a little different from what we use before, where the parameter is DataSource
, and we create UserRepositoryImpl
manually:
1 |
|
In this case, the Inject
annotation in UserRepositoryImpl
can be omitted, because Guice is not responsible for creating the instance, unless you deliberately try to get a UserRepositoryImpl
instance from it.
In provider method, we can configure the instance we return:
1 |
|
Lastly, modules can be grouped together:
1 | public class EtlModule extends AbstractModule { |
Named and scoped components
When there are multiple instances for a type with different configuration, use Named
annotation to tell them apart. It is also possible to create custom annotations, or use bindings in AbstractModule#configure
instead of provider method.
1 | public class DatabaseModule extends AbstractModule { |
Both data sources and the implementation instance are annotated with Singleton
, meaning Guice will return the same instance when it is requested. Otherwise, it works like the prototype scope in Spring.
Flink pipeline serialization
Consider this simple pipeline that transforms a stream of ID to user models and print to the console.
1 | var env = StreamExecutionEnvironment.getExecutionEnvironment(); |
Under the hood, Flink will build this pipeline into a job graph, serialize it, and send to remote task managers. The map
operator takes a MapFunction
implementation, in this case a UserMapper
instance. This instance is wrapped in SimpleUdfStreamOperatorFactory
and gets serialized with Java object serialization mechanism.
1 | // org.apache.flink.util.InstantiationUtil |
Pipeline operators become a series of configuration hash maps and are sent to the job manager by a remote call.
1 | org.apache.flink.configuration.Configuration { |
For ObjectOutputStream
to work, every class in the pipeline must implement the Serializable
interface, as well as their member fields. For UserMapper
, it extends RichMapFunction
which implements the Serializable
interface. However, if we add a dependency and that object is not serializable, an error would occur:
1 | public class UserMapper extends RichMapFunction<Long, User> { |
This is because HikariDataSource
is not serializable. As a result, it is not possible to carry userRepository
through serialization, but set it after UserMapper
is restored and opened, as is demonstrated at the beginning of this article. We add transient
keyword to inform Java to not include this field when serializing.
1 | public class UserMapper extends RichMapFunction<Long, User> { |
In AppInjector
, we use the Singleton pattern to ensure there is only one Guice injector, and Guice itself works in a thread-safe manner so heavy resources like connection pool can be shared among different user defined functions.
Unit testing
As mentioned earlier, dependency injection improves testability. To test the UserMapper
, we can mock the dependency and test it like a plain function. Other testing techniques can be found in the documentation.
1 | import static org.junit.jupiter.api.Assertions.*; |