Real-time Exactly-once ETL with Apache Flink

Apache Flink is another popular big data processing framework, which differs from Apache Spark in that Flink uses stream processing to mimic batch processing and provides sub-second latency along with exactly-once semantics. One of its use cases is to build a real-time data pipeline, move and transform data between different stores. This article will show you how to build such an application, and explain how Flink guarantees its correctness.

Apache Flink

Demo ETL Application

Let us build a project that extracts data from Kafka and loads them into HDFS. The result files should be stored in bucketed directories according to event time. Source messages are encoded in JSON, and the event time is stored as timestamp. Samples are:

1
2
3
{"timestamp":1545184226.432,"event":"page_view","uuid":"ac0e50bf-944c-4e2f-bbf5-a34b22718e0c"}
{"timestamp":1545184602.640,"event":"adv_click","uuid":"9b220808-2193-44d1-a0e9-09b9743dec55"}
{"timestamp":1545184608.969,"event":"thumbs_up","uuid":"b44c3137-4c91-4f36-96fb-80f56561c914"}

The result directory structure should be:

1
2
/user/flink/event_log/dt=20181219/part-0-1
/user/flink/event_log/dt=20181220/part-1-9

Read More

Spark DataSource API V2

From Spark 1.3, the team introduced a data source API to help quickly integrating various input formats with Spark SQL. But eventually this version of API became insufficient and the team needed to add a lot of internal codes to provide more efficient solutions for Spark SQL data sources. So in Spark 2.3, the second version of data source API is out, which is supposed to overcome the limitations of the previous version. In this article, I will demonstrate how to implement custom data source for Spark SQL in both V1 and V2 API, to help understanding their differences and the new API’s advantages.

DataSource V1 API

V1 API provides a set of abstract classes and traits. They are located in spark/sql/sources/interfaces.scala. Some basic APIs are:

1
2
3
4
5
6
7
8
9
10
11
12
trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructType
}

trait TableScan {
def buildScan(): RDD[Row]
}

A RelationProvider defines a class that can create a relational data source for Spark SQL to manipulate with. It can initialize itself with provided options, such as file path or authentication. BaseRelation is used to define the data schema, which can be loaded from database, Parquet file, or specified by the user. This class also needs to mix-in one of the Scan traits, implements the buildScan method, and returns an RDD.

Read More

Flume Source Code: HDFS Sink

Sink is the last component of Apache Flume data flow, and it is used to output data into storages like local files, HDFS, ElasticSearch, etc. In this article, I will illustrate how Flume’s HDFS sink works, by analyzing its source code with diagrams.

Sink Component Lifecycle

In the previous article, we learnt that every Flume component implements LifecycleAware interface, and is started and monitored by LifecycleSupervisor. Sink component is not directly invoked by this supervisor, but wrapped in SinkRunner and SinkProcessor classes. Flume supports three different sink processors, to connect channel and sinks in different semantics. But here we only consider the DefaultSinkProcessor, that accepts only one sink, and we will skip the concept of sink group as well.

Sink Component LifeCycle

Read More

How to Avoid NullPointerException

NullPointerException happens when you dereference a possible null object without checking it. It’s a common exception that every Java programmer may encounter in daily work. There’re several strategies that can help us avoid this exception, making our codes more robust. In this article, I will list both traditional ways and those with tools and new features introduced by recent version of Java.

Runtime Check

The most obvious way is to use if (obj == null) to check every variable you need to use, either from function argument, return value, or instance field. When you receive a null object, you can throw a different, more informative exception like IllegalArgumentException. There are some library functions that can make this process easier, like Objects#requireNonNull:

1
2
3
4
public void testObjects(Object arg) {
Object checked = Objects.requireNonNull(arg, "arg must not be null");
checked.toString();
}

Or use Guava’s Preconditions package, which provides all kinds of arguments checking facilities:

1
2
3
4
public void testGuava(Object arg) {
Object checked = Preconditions.checkNotNull(arg, "%s must not be null", "arg");
checked.toString();
}

We can also let Lombok generate the check for us, which will throw a more meaningful NullPointerException:

1
2
3
public void testLombok(@NonNull Object arg) {
arg.toString();
}

The generated code and exception message are as follows:

1
2
3
4
5
6
public void testLombokGenerated(Object arg) {
if (arg == null) {
throw new NullPointerException("arg is marked @NonNull but is null");
}
arg.toString();
}

This annotation can also be added to a class field, and Lombok will check nullness for every assignment.

Read More

Is It Necessary to Apply ESLint jsx-no-bind Rule?

When using ESLint React plugin, you may find a rule called jsx-no-bind. It prevents you from using .bind or arrow function in a JSX prop. For instance, ESLint will complain about the arrow function in the onClick prop.

1
2
3
4
5
6
7
8
9
10
11
class ListArrow extends React.Component {
render() {
return (
<ul>
{this.state.items.map(item => (
<li key={item.id} onClick={() => { alert(item.id) }}>{item.text}</li>
))}
</ul>
)
}
}

There’re two reasons why this rule is introduced. First, a new function will be created on every render call, which may increase the frequency of garbage collection. Second, it will disable the pure rendering process, i.e. when you’re using a PureComponent, or implement the shouldComponentUpdate method by yourself with identity comparison, a new function object in the props will cause unnecessary re-render of the component.

But some people argue that these two reasons are not solid enough to enforce this rule on all projects, especially when the solutions will introduce more codes and decrease readability. In Airbnb ESLint preset, the team only bans the usage of .bind, but allows arrow function in both props and refs. I did some googling, and was convinced that this rule is not quite necessary. Someone says it’s premature optimization, and you should measure before you optimize. I agree with that. In the following sections, I will illustrate how arrow function would affect the pure component, what solutions we can use, and talk a little bit about React rendering internals.

Read More

Serve TensforFlow Estimator with SavedModel

TensorFlow is one of the most popular machine learning frameworks that allow us to build various models with minor efforts. There are several ways to utilize these models in production like web service API, and this article will introduce how to make model prediction APIs with TensorFlow’s SavedModel mechanism.

Iris DNN Estimator

First let’s build the famous iris classifier with TensorFlow’s pre-made DNN estimator. Full illustration can be found on TensorFlow’s website (Premade Estimators), and I create a repository on GitHub (iris_dnn.py) for you to fork and work with. Here’s the gist of training the model:

1
2
3
4
5
6
7
8
9
10
11
12
13
feature_columns = [tf.feature_column.numeric_column(key=key)
for key in train_x.keys()]
classifier = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
hidden_units=[10, 10],
n_classes=3)

classifier.train(
input_fn=lambda: train_input_fn(train_x, train_y, batch_size=BATCH_SIZE),
steps=STEPS)

predictions = classifier.predict(
input_fn=lambda: eval_input_fn(predict_x, labels=None, batch_size=BATCH_SIZE))

Read More

Connect HBase with Python and Thrift

Apache HBase is a key-value store in Hadoop ecosystem. It is based on HDFS, and can provide high performance data access on large amount of volume. HBase is written in Java, and has native support for Java clients. But with the help of Thrift and various language bindings, we can access HBase in web services quite easily. This article will describe how to read and write HBase table with Python and Thrift.

Generate Thrift Class

For anyone who is new to Apache Thrift, it provides an IDL (Interface Description Language) to let you describe your service methods and data types and then transform them into different languages. For instance, a Thrift type definition like this:

1
2
3
4
5
struct TColumn {
1: required binary family,
2: optional binary qualifier,
3: optional i64 timestamp
}

Will be transformed into the following Python code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TColumn(object):
def __init__(self, family=None, qualifier=None, timestamp=None,):
self.family = family
self.qualifier = qualifier
self.timestamp = timestamp

def read(self, iprot):
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
# ...

def write(self, oprot):
oprot.writeStructBegin('TColumn')
# ...

Read More

Form Handling in Vuex Strict Mode

When handling form inputs in Vue, we usually use v-model to achieve two-way binding. But if we want to put form data into Vuex store, two-way binding becomes a problem, since in strict mode, Vuex doesn’t allow state change outside mutation handlers. Take the following snippet for instance, while full code can be found on GitHub (link).

src/store/table.js

1
2
3
4
5
6
7
8
export default {
state: {
namespaced: true,
table: {
table_name: ''
}
}
}

src/components/NonStrict.vue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<b-form-group label="Table Name:">
<b-form-input v-model="table.table_name" />
</b-form-group>

<script>
import { mapState } from 'vuex'

export default {
computed: {
...mapState('table', [
'table'
])
}
}
</script>

When we input something in “Table Name” field, an error will be thrown in browser’s console:

1
2
3
4
Error: [vuex] Do not mutate vuex store state outside mutation handlers.
at assert (vuex.esm.js?358c:97)
at Vue.store._vm.$watch.deep (vuex.esm.js?358c:746)
at Watcher.run (vue.esm.js?efeb:3233)

Apart from not using strict mode at all, which is fine if you’re ready to lose some benefits of tracking every mutation to the store, there’re several ways to solve this error. In this article, we’ll explore these solutions, and explain how they work.

Read More

Error Handling in RESTful API

RESTful API

RESTful API is a common tool of building web services, especially in front and back-end separated application. It is based on HTTP protocol, which is simple, text-oriented, and well supported by various languages, browsers or clients. However, REST is not yet standardized, so that the developers need to decide how to design their APIs. One of the decisions is error handling. Should I use HTTP status code? How to handle form validation errors, etc. This article will propose an error handling mechanism for RESTful API, based on my daily work and understanding of this technique.

Types of Errors

I tend to categorize errors into two types, global and local. Global errors include requesting an unknown API url, not being authorized to access this API, or there’s something wrong with the server code, unexpected and fatal. These errors should be caught by the web framework, no customized handling in individual API function.

Local errors, on the other hand, are closely related to the current API. Examples are form validation, violation of unique constraint, or other expected errors. We need to write specific codes to catch these errors, and raise a global error with message and payload for framework to catch and respond with.

Flask, for instance, provides a mechanism to catch exceptions globally:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BadRequest(Exception):
"""Custom exception class to be thrown when local error occurs."""
def __init__(self, message, status=400, payload=None):
self.message = message
self.status = status
self.payload = payload


@app.errorhandler(BadRequest)
def handle_bad_request(error):
"""Catch BadRequest exception globally, serialize into JSON, and respond with 400."""
payload = dict(error.payload or ())
payload['status'] = error.status
payload['message'] = error.message
return jsonify(payload), 400


@app.route('/person', methods=['POST'])
def person_post():
"""Create a new person object and return its ID"""
if not request.form.get('username'):
raise BadRequest('username cannot be empty', 40001, { 'ext': 1 })
return jsonify(last_insert_id=1)

Read More

Flume Source Code: Component Lifecycle

Apache Flume is a real-time ETL tool for data warehouse platform. It consists of different types of components, and during runtime all of them are managed by Flume’s lifecycle and supervisor mechanism. This article will walk you through the source code of Flume’s component lifecycle management.

Repository Structure

Flume’s source code can be downloaded from GitHub. It’s a Maven project, so we can import it into an IDE for efficient code reading. The following is the main structure of the project:

1
2
3
4
5
6
/flume-ng-node
/flume-ng-code
/flume-ng-sdk
/flume-ng-sources/flume-kafka-source
/flume-ng-channels/flume-kafka-channel
/flume-ng-sinks/flume-hdfs-sink

Application Entrance

The main entrance of Flume agent is in the org.apache.flume.node.Application class of flume-ng-node module. Following is an abridged source code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Application {
public static void main(String[] args) {
CommandLineParser parser = new GnuParser();
if (isZkConfigured) {
if (reload) {
PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider;
components.add(zookeeperConfigurationProvider);
} else {
StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider;
application.handleConfigurationEvent();
}
} else {
// PropertiesFileConfigurationProvider
}
application.start();
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
}
}

The process can be illustrated as follows:

  1. Parse command line arguments with commons-cli, including the Flume agent’s name, configuration method and path.
  2. Configurations can be provided via properties file or ZooKeeper. Both provider support live-reload, i.e. we can update component settings without restarting the agent.
    • File-based live-reload is implemented by using a background thread that checks the last modification time of the file.
    • ZooKeeper-based live-reload is provided by Curator’s NodeCache recipe, which uses ZooKeeper’s watch functionality underneath.
  3. If live-reload is on (by default), configuration providers will add themselves into the application’s component list, and after calling Application#start, a LifecycleSupervisor will start the provider, and trigger the reload event to parse the configuration and load all defined components.
  4. If live-reload is off, configuration providers will parse the file immediately and start all components, also supervised by LifecycleSupervisor.
  5. Finally add a JVM shutdown hook by Runtime#addShutdownHook, which in turn invokes Application#stop to shutdown the Flume agent.

Read More