Serve TensforFlow Estimator with SavedModel

TensorFlow is one of the most popular machine learning frameworks that allow us to build various models with minor efforts. There are several ways to utilize these models in production like web service API, and this article will introduce how to make model prediction APIs with TensorFlow’s SavedModel mechanism.

Iris DNN Estimator

First let’s build the famous iris classifier with TensorFlow’s pre-made DNN estimator. Full illustration can be found on TensorFlow’s website (Premade Estimators), and I create a repository on GitHub (iris_dnn.py) for you to fork and work with. Here’s the gist of training the model:

1
2
3
4
5
6
7
8
9
10
11
12
13
feature_columns = [tf.feature_column.numeric_column(key=key)
for key in train_x.keys()]
classifier = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
hidden_units=[10, 10],
n_classes=3)

classifier.train(
input_fn=lambda: train_input_fn(train_x, train_y, batch_size=BATCH_SIZE),
steps=STEPS)

predictions = classifier.predict(
input_fn=lambda: eval_input_fn(predict_x, labels=None, batch_size=BATCH_SIZE))

Read More

Connect HBase with Python and Thrift

Apache HBase is a key-value store in Hadoop ecosystem. It is based on HDFS, and can provide high performance data access on large amount of volume. HBase is written in Java, and has native support for Java clients. But with the help of Thrift and various language bindings, we can access HBase in web services quite easily. This article will describe how to read and write HBase table with Python and Thrift.

Generate Thrift Class

For anyone who is new to Apache Thrift, it provides an IDL (Interface Description Language) to let you describe your service methods and data types and then transform them into different languages. For instance, a Thrift type definition like this:

1
2
3
4
5
struct TColumn {
1: required binary family,
2: optional binary qualifier,
3: optional i64 timestamp
}

Will be transformed into the following Python code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TColumn(object):
def __init__(self, family=None, qualifier=None, timestamp=None,):
self.family = family
self.qualifier = qualifier
self.timestamp = timestamp

def read(self, iprot):
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
# ...

def write(self, oprot):
oprot.writeStructBegin('TColumn')
# ...

Read More

Form Handling in Vuex Strict Mode

When handling form inputs in Vue, we usually use v-model to achieve two-way binding. But if we want to put form data into Vuex store, two-way binding becomes a problem, since in strict mode, Vuex doesn’t allow state change outside mutation handlers. Take the following snippet for instance, while full code can be found on GitHub (link).

src/store/table.js

1
2
3
4
5
6
7
8
export default {
state: {
namespaced: true,
table: {
table_name: ''
}
}
}

src/components/NonStrict.vue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<b-form-group label="Table Name:">
<b-form-input v-model="table.table_name" />
</b-form-group>

<script>
import { mapState } from 'vuex'

export default {
computed: {
...mapState('table', [
'table'
])
}
}
</script>

When we input something in “Table Name” field, an error will be thrown in browser’s console:

1
2
3
4
Error: [vuex] Do not mutate vuex store state outside mutation handlers.
at assert (vuex.esm.js?358c:97)
at Vue.store._vm.$watch.deep (vuex.esm.js?358c:746)
at Watcher.run (vue.esm.js?efeb:3233)

Apart from not using strict mode at all, which is fine if you’re ready to lose some benefits of tracking every mutation to the store, there’re several ways to solve this error. In this article, we’ll explore these solutions, and explain how they work.

Read More

Error Handling in RESTful API

RESTful API

RESTful API is a common tool of building web services, especially in front and back-end separated application. It is based on HTTP protocol, which is simple, text-oriented, and well supported by various languages, browsers or clients. However, REST is not yet standardized, so that the developers need to decide how to design their APIs. One of the decisions is error handling. Should I use HTTP status code? How to handle form validation errors, etc. This article will propose an error handling mechanism for RESTful API, based on my daily work and understanding of this technique.

Types of Errors

I tend to categorize errors into two types, global and local. Global errors include requesting an unknown API url, not being authorized to access this API, or there’s something wrong with the server code, unexpected and fatal. These errors should be caught by the web framework, no customized handling in individual API function.

Local errors, on the other hand, are closely related to the current API. Examples are form validation, violation of unique constraint, or other expected errors. We need to write specific codes to catch these errors, and raise a global error with message and payload for framework to catch and respond with.

Flask, for instance, provides a mechanism to catch exceptions globally:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BadRequest(Exception):
"""Custom exception class to be thrown when local error occurs."""
def __init__(self, message, status=400, payload=None):
self.message = message
self.status = status
self.payload = payload


@app.errorhandler(BadRequest)
def handle_bad_request(error):
"""Catch BadRequest exception globally, serialize into JSON, and respond with 400."""
payload = dict(error.payload or ())
payload['status'] = error.status
payload['message'] = error.message
return jsonify(payload), 400


@app.route('/person', methods=['POST'])
def person_post():
"""Create a new person object and return its ID"""
if not request.form.get('username'):
raise BadRequest('username cannot be empty', 40001, { 'ext': 1 })
return jsonify(last_insert_id=1)

Read More

Flume Source Code: Component Lifecycle

Apache Flume is a real-time ETL tool for data warehouse platform. It consists of different types of components, and during runtime all of them are managed by Flume’s lifecycle and supervisor mechanism. This article will walk you through the source code of Flume’s component lifecycle management.

Repository Structure

Flume’s source code can be downloaded from GitHub. It’s a Maven project, so we can import it into an IDE for efficient code reading. The following is the main structure of the project:

1
2
3
4
5
6
/flume-ng-node
/flume-ng-code
/flume-ng-sdk
/flume-ng-sources/flume-kafka-source
/flume-ng-channels/flume-kafka-channel
/flume-ng-sinks/flume-hdfs-sink

Application Entrance

The main entrance of Flume agent is in the org.apache.flume.node.Application class of flume-ng-node module. Following is an abridged source code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Application {
public static void main(String[] args) {
CommandLineParser parser = new GnuParser();
if (isZkConfigured) {
if (reload) {
PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider;
components.add(zookeeperConfigurationProvider);
} else {
StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider;
application.handleConfigurationEvent();
}
} else {
// PropertiesFileConfigurationProvider
}
application.start();
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
}
}

The process can be illustrated as follows:

  1. Parse command line arguments with commons-cli, including the Flume agent’s name, configuration method and path.
  2. Configurations can be provided via properties file or ZooKeeper. Both provider support live-reload, i.e. we can update component settings without restarting the agent.
    • File-based live-reload is implemented by using a background thread that checks the last modification time of the file.
    • ZooKeeper-based live-reload is provided by Curator’s NodeCache recipe, which uses ZooKeeper’s watch functionality underneath.
  3. If live-reload is on (by default), configuration providers will add themselves into the application’s component list, and after calling Application#start, a LifecycleSupervisor will start the provider, and trigger the reload event to parse the configuration and load all defined components.
  4. If live-reload is off, configuration providers will parse the file immediately and start all components, also supervised by LifecycleSupervisor.
  5. Finally add a JVM shutdown hook by Runtime#addShutdownHook, which in turn invokes Application#stop to shutdown the Flume agent.

Read More

Pandas and Tidy Data

In the paper Tidy Data, Dr. Wickham proposed a specific form of data structure: each variable is a column, each observation is a row, and each type of observational unit is a table. He argued that with tidy data, data analysts can manipulate, model, and visualize data more easily and effectively. He lists five common data structures that are untidy, and demonstrates how to use R language to tidy them. In this article, we’ll use Python and Pandas to achieve the same tidiness.

Source code and demo data can be found on GitHub (link), and readers are supposed to have Python environment installed, preferably with Anaconda and Spyder IDE.

Column headers are values, not variable names

1
2
3
import pandas as pd
df = pd.read_csv('data/pew.csv')
df.head(10)

Religion and Income - Pew Forum

Column names “<$10k”, “$10-20k” are really income ranges that constitutes a variable. Variables are measurements of attributes, like height, weight, and in this case, income and religion. The values within the table form another variable, frequency. To make each variable a column, we do the following transformation:

1
2
3
4
5
6
df = df.set_index('religion')
df = df.stack()
df.index = df.index.rename('income', level=1)
df.name = 'frequency'
df = df.reset_index()
df.head(10)

Religion and Income - Tidy

Read More

Apache Beam Quick Start with Python

Apache Beam is a big data processing standard created by Google in 2016. It provides unified DSL to process both batch and stream data, and can be executed on popular platforms like Spark, Flink, and of course Google’s commercial product Dataflow. Beam’s model is based on previous works known as FlumeJava and Millwheel, and addresses solutions for data processing tasks like ETL, analysis, and stream processing. Currently it provides SDK in two languages, Java and Python. This article will introduce how to use Python to write Beam applications.

Apache Beam Pipeline

Installation

Apache Beam Python SDK requires Python 2.7.x. You can use pyenv to manage different Python versions, or compile from source (make sure you have SSL installed). And then you can install Beam SDK from PyPI, better in a virtual environment:

1
2
3
$ virtualenv venv --distribute
$ source venv/bin/activate
(venv) $ pip install apache-beam

Read More

Hive Window and Analytical Functions

SQL is one of the major tools of data analysis. It provides filtering, transforming and aggregation functionalities, and we can use it to process big volume of data with the help of Hive and Hadoop. However, legacy SQL does not support operations like grouped ranking and moving average, because the GROUP BY clause can only produce one aggregation result for each group, but not for each row. Fortunately, with the new SQL standard coming, we can use the WINDOW clause to compute aggregations on a set of rows and return the result for each row.

Moving Average

For instance, if we want to calculate the two-day moving average for each stock, we can write the following query:

1
2
3
4
5
6
SELECT
`date`, `stock`, `close`
,AVG(`close`) OVER `w` AS `mavg`
FROM `t_stock`
WINDOW `w` AS (PARTITION BY `stock` ORDER BY `date`
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)

OVER, WINDOW and ROWS BETWEEN AND are all newly added SQL keywords to support windowing operations. In this query, PARTITION BY and ORDER BY works like GROUP BY and ORDER BY after the WHERE clause, except it doesn’t collapse the rows, but only divides them into non-overlapping partitions to work on. ROWS BETWEEN AND here constructs a window frame. In this case, each frame contains the previous row and current row. We’ll discuss more on frames later. Finally, AVG is a window function that computes results on each frame. Note that WINDOW clause can also be directly appended to window function:

1
SELECT AVG(`close`) OVER (PARTITION BY `stock`) AS `mavg` FROM `t_stock`;

Read More

An Introduction to stream-lib The Stream Processing Utilities

When processing a large amount of data, certain operations will cost a lot of time and space, such as counting the distinct values, or figuring out the 95th percentile of a sequence of numbers. But sometimes the accuracy is not that important. Maybe you just want a brief summary of the dataset, or it’s a monitoring system, where limited error rate is tolerable. There’re plenty of such algorithms that can trade accuracy with huge saves of time-space. What’s more, most of the data structures can be merged, making it possible to use in stream processing applications. stream-lib is a collection of these algorithms. They are Java implementations based on academical research and papers. This artile will give a brief introduction to this utility library.

Count Cardinality with HyperLogLog

Unique visitors (UV) is the major metric of websites. We usually generate UUIDs for each user and track them by HTTP Cookie, or roughly use the IP address. We can use a HashSet to count the exact value of UV, but that takes a lot of memory. With HyperLogLog, an algorithm for the count-distinct problem, we are able to estimate cardinalities of > 10^9 with a typical accuracy of 2%, using 1.5 kB of memory.

1
2
3
4
5
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.9.5</version>
</dependency>
1
2
3
4
5
ICardinality card = new HyperLogLog(10);
for (int i : new int[] { 1, 2, 3, 2, 4, 3 }) {
card.offer(i);
}
System.out.println(card.cardinality()); // 4

Read More

Extract Data from MySQL with Binlog and Canal

Data extraction is the very first step of an ETL process. We need to load data from external data stores like RDMBS or logging file system, and then we can do cleaning, transformation and summary. In modern website stack, MySQL is the most widely used database, and it’s common to extract data from different instances and load into a central MySQL database, or directly into Hive. There’re several query-based techniques that we can use to do the extraction, including the popular open source software Sqoop, but they are not meant for real-time data ingestion. Binlog, on the other hand, is a real-time data stream that is used to do replication between master and slave instances. With the help of Alibaba’s open sourced Canal project, we can easily utilize the binlog facility to do data extraction from MySQL database to various destinations.

Canal

Canal Components

In brief, Canal simulates itself to be a MySQL slave and dump binlog from master, parse it, and send to downstream sinks. Canal consists of two major components, namely Canal server and Canal client. A Canal server can connect to multiple MySQL instances, and maintains an event queue for each instance. Canal clients can then subscribe to theses queues and receive data changes. The following is a quick start guide to get Canal going.

Read More