Deploy Flink Job Cluster on Kubernetes

Kubernetes is the trending container orchestration system that can be used to host various applications from web services to data processing jobs. Applications are packaged in self-contained, yet light-weight containers, and we declare how they should be deployed, how they scale, and how they expose as services. Flink is also a trending distributed computing framework that can run on a variety of platforms, including Kubernetes. Combining them will bring us robust and scalable deployments of data processing jobs, and more safely Flink can share a Kubernetes cluster with other services.

Flink on Kubernetes

When deploying Flink on Kubernetes, there are two options, session cluster and job cluster. Session cluster is like running a standalone Flink cluster on k8s that can accept multiple jobs and is suitable for short running tasks or ad-hoc queries. Job cluster, on the other hand, deploys a full set of Flink cluster for each individual job. We build container image for each job, and provide it with dedicated resources, so that jobs have less chance interfering with other, and can scale out independently. So this article will illustrate how to run a Flink job cluster on Kubernetes, the steps are:

  • Compile and package the Flink job jar.
  • Build a Docker image containing the Flink runtime and the job jar.
  • Create a Kubernetes Job for Flink JobManager.
  • Create a Kubernetes Service for this Job.
  • Create a Kubernetes Deployment for Flink TaskManagers.
  • Enable Flink JobManager HA with ZooKeeper.
  • Correctly stop and resume Flink job with SavePoint facility.

Read More

Understanding Hive ACID Transactional Table

Apache Hive introduced transactions since version 0.13 to fully support ACID semantics on Hive table, including INSERT/UPDATE/DELETE/MERGE statements, streaming data ingestion, etc. In Hive 3.0, this feature is further improved by optimizing the underlying data file structure, reducing constraints on table scheme, and supporting predicate push down and vectorized query. Examples and setup can be found on Hive wiki and other tutorials, while this article will focus on how transactional table is saved on HDFS, and take a closer look at the read-write process.

File Structure

Insert Data

1
2
3
4
5
6
7
CREATE TABLE employee (id int, name string, salary int)
STORED AS ORC TBLPROPERTIES ('transactional' = 'true');

INSERT INTO employee VALUES
(1, 'Jerry', 5000),
(2, 'Tom', 8000),
(3, 'Kate', 6000);

An INSERT statement is executed in a single transaction. It will create a delta directory containing information about this transaction and its data.

1
2
3
/user/hive/warehouse/employee/delta_0000001_0000001_0000
/user/hive/warehouse/employee/delta_0000001_0000001_0000/_orc_acid_version
/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

The schema of this folder’s name is delta_minWID_maxWID_stmtID, i.e. “delta” prefix, transactional writes’ range (minimum and maximum write ID), and statement ID. In detail:

  • All INSERT statements will create a delta directory. UPDATE statement will also create delta directory right after a delete directory. delete directory is prefixed with “delete_delta”.
  • Hive will assign a globally unique ID for every transaction, both read and write. For transactional writes like INSERT and DELETE, it will also assign a table-wise unique ID, a.k.a. a write ID. The write ID range will be encoded in the delta and delete directory names.
  • Statement ID is used when multiple writes into the same table happen in one transaction.

Read More

Real-time Exactly-once ETL with Apache Flink

Apache Flink is another popular big data processing framework, which differs from Apache Spark in that Flink uses stream processing to mimic batch processing and provides sub-second latency along with exactly-once semantics. One of its use cases is to build a real-time data pipeline, move and transform data between different stores. This article will show you how to build such an application, and explain how Flink guarantees its correctness.

Apache Flink

Demo ETL Application

Let us build a project that extracts data from Kafka and loads them into HDFS. The result files should be stored in bucketed directories according to event time. Source messages are encoded in JSON, and the event time is stored as timestamp. Samples are:

1
2
3
{"timestamp":1545184226.432,"event":"page_view","uuid":"ac0e50bf-944c-4e2f-bbf5-a34b22718e0c"}
{"timestamp":1545184602.640,"event":"adv_click","uuid":"9b220808-2193-44d1-a0e9-09b9743dec55"}
{"timestamp":1545184608.969,"event":"thumbs_up","uuid":"b44c3137-4c91-4f36-96fb-80f56561c914"}

The result directory structure should be:

1
2
/user/flink/event_log/dt=20181219/part-0-1
/user/flink/event_log/dt=20181220/part-1-9

Read More

Spark DataSource API V2

From Spark 1.3, the team introduced a data source API to help quickly integrating various input formats with Spark SQL. But eventually this version of API became insufficient and the team needed to add a lot of internal codes to provide more efficient solutions for Spark SQL data sources. So in Spark 2.3, the second version of data source API is out, which is supposed to overcome the limitations of the previous version. In this article, I will demonstrate how to implement custom data source for Spark SQL in both V1 and V2 API, to help understanding their differences and the new API’s advantages.

DataSource V1 API

V1 API provides a set of abstract classes and traits. They are located in spark/sql/sources/interfaces.scala. Some basic APIs are:

1
2
3
4
5
6
7
8
9
10
11
12
trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructType
}

trait TableScan {
def buildScan(): RDD[Row]
}

A RelationProvider defines a class that can create a relational data source for Spark SQL to manipulate with. It can initialize itself with provided options, such as file path or authentication. BaseRelation is used to define the data schema, which can be loaded from database, Parquet file, or specified by the user. This class also needs to mix-in one of the Scan traits, implements the buildScan method, and returns an RDD.

Read More

Flume Source Code: HDFS Sink

Sink is the last component of Apache Flume data flow, and it is used to output data into storages like local files, HDFS, ElasticSearch, etc. In this article, I will illustrate how Flume’s HDFS sink works, by analyzing its source code with diagrams.

Sink Component Lifecycle

In the previous article, we learnt that every Flume component implements LifecycleAware interface, and is started and monitored by LifecycleSupervisor. Sink component is not directly invoked by this supervisor, but wrapped in SinkRunner and SinkProcessor classes. Flume supports three different sink processors, to connect channel and sinks in different semantics. But here we only consider the DefaultSinkProcessor, that accepts only one sink, and we will skip the concept of sink group as well.

Sink Component LifeCycle

Read More

How to Avoid NullPointerException

NullPointerException happens when you dereference a possible null object without checking it. It’s a common exception that every Java programmer may encounter in daily work. There’re several strategies that can help us avoid this exception, making our codes more robust. In this article, I will list both traditional ways and those with tools and new features introduced by recent version of Java.

Runtime Check

The most obvious way is to use if (obj == null) to check every variable you need to use, either from function argument, return value, or instance field. When you receive a null object, you can throw a different, more informative exception like IllegalArgumentException. There are some library functions that can make this process easier, like Objects#requireNonNull:

1
2
3
4
public void testObjects(Object arg) {
Object checked = Objects.requireNonNull(arg, "arg must not be null");
checked.toString();
}

Or use Guava’s Preconditions package, which provides all kinds of arguments checking facilities:

1
2
3
4
public void testGuava(Object arg) {
Object checked = Preconditions.checkNotNull(arg, "%s must not be null", "arg");
checked.toString();
}

We can also let Lombok generate the check for us, which will throw a more meaningful NullPointerException:

1
2
3
public void testLombok(@NonNull Object arg) {
arg.toString();
}

The generated code and exception message are as follows:

1
2
3
4
5
6
public void testLombokGenerated(Object arg) {
if (arg == null) {
throw new NullPointerException("arg is marked @NonNull but is null");
}
arg.toString();
}

This annotation can also be added to a class field, and Lombok will check nullness for every assignment.

Read More

Is It Necessary to Apply ESLint jsx-no-bind Rule?

When using ESLint React plugin, you may find a rule called jsx-no-bind. It prevents you from using .bind or arrow function in a JSX prop. For instance, ESLint will complain about the arrow function in the onClick prop.

1
2
3
4
5
6
7
8
9
10
11
class ListArrow extends React.Component {
render() {
return (
<ul>
{this.state.items.map(item => (
<li key={item.id} onClick={() => { alert(item.id) }}>{item.text}</li>
))}
</ul>
)
}
}

There’re two reasons why this rule is introduced. First, a new function will be created on every render call, which may increase the frequency of garbage collection. Second, it will disable the pure rendering process, i.e. when you’re using a PureComponent, or implement the shouldComponentUpdate method by yourself with identity comparison, a new function object in the props will cause unnecessary re-render of the component.

But some people argue that these two reasons are not solid enough to enforce this rule on all projects, especially when the solutions will introduce more codes and decrease readability. In Airbnb ESLint preset, the team only bans the usage of .bind, but allows arrow function in both props and refs. I did some googling, and was convinced that this rule is not quite necessary. Someone says it’s premature optimization, and you should measure before you optimize. I agree with that. In the following sections, I will illustrate how arrow function would affect the pure component, what solutions we can use, and talk a little bit about React rendering internals.

Read More

Serve TensforFlow Estimator with SavedModel

TensorFlow is one of the most popular machine learning frameworks that allow us to build various models with minor efforts. There are several ways to utilize these models in production like web service API, and this article will introduce how to make model prediction APIs with TensorFlow’s SavedModel mechanism.

Iris DNN Estimator

First let’s build the famous iris classifier with TensorFlow’s pre-made DNN estimator. Full illustration can be found on TensorFlow’s website (Premade Estimators), and I create a repository on GitHub (iris_dnn.py) for you to fork and work with. Here’s the gist of training the model:

1
2
3
4
5
6
7
8
9
10
11
12
13
feature_columns = [tf.feature_column.numeric_column(key=key)
for key in train_x.keys()]
classifier = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
hidden_units=[10, 10],
n_classes=3)

classifier.train(
input_fn=lambda: train_input_fn(train_x, train_y, batch_size=BATCH_SIZE),
steps=STEPS)

predictions = classifier.predict(
input_fn=lambda: eval_input_fn(predict_x, labels=None, batch_size=BATCH_SIZE))

Read More

Connect HBase with Python and Thrift

Apache HBase is a key-value store in Hadoop ecosystem. It is based on HDFS, and can provide high performance data access on large amount of volume. HBase is written in Java, and has native support for Java clients. But with the help of Thrift and various language bindings, we can access HBase in web services quite easily. This article will describe how to read and write HBase table with Python and Thrift.

Generate Thrift Class

For anyone who is new to Apache Thrift, it provides an IDL (Interface Description Language) to let you describe your service methods and data types and then transform them into different languages. For instance, a Thrift type definition like this:

1
2
3
4
5
struct TColumn {
1: required binary family,
2: optional binary qualifier,
3: optional i64 timestamp
}

Will be transformed into the following Python code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TColumn(object):
def __init__(self, family=None, qualifier=None, timestamp=None,):
self.family = family
self.qualifier = qualifier
self.timestamp = timestamp

def read(self, iprot):
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
# ...

def write(self, oprot):
oprot.writeStructBegin('TColumn')
# ...

Read More

Form Handling in Vuex Strict Mode

When handling form inputs in Vue, we usually use v-model to achieve two-way binding. But if we want to put form data into Vuex store, two-way binding becomes a problem, since in strict mode, Vuex doesn’t allow state change outside mutation handlers. Take the following snippet for instance, while full code can be found on GitHub (link).

src/store/table.js

1
2
3
4
5
6
7
8
export default {
state: {
namespaced: true,
table: {
table_name: ''
}
}
}

src/components/NonStrict.vue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<b-form-group label="Table Name:">
<b-form-input v-model="table.table_name" />
</b-form-group>

<script>
import { mapState } from 'vuex'

export default {
computed: {
...mapState('table', [
'table'
])
}
}
</script>

When we input something in “Table Name” field, an error will be thrown in browser’s console:

1
2
3
4
Error: [vuex] Do not mutate vuex store state outside mutation handlers.
at assert (vuex.esm.js?358c:97)
at Vue.store._vm.$watch.deep (vuex.esm.js?358c:746)
at Watcher.run (vue.esm.js?efeb:3233)

Apart from not using strict mode at all, which is fine if you’re ready to lose some benefits of tracking every mutation to the store, there’re several ways to solve this error. In this article, we’ll explore these solutions, and explain how they work.

Read More