Extract Data from MySQL with Binlog and Canal

Data extraction is the very first step of an ETL process. We need to load data from external data stores like RDMBS or logging file system, and then we can do cleaning, transformation and summary. In modern website stack, MySQL is the most widely used database, and it’s common to extract data from different instances and load into a central MySQL database, or directly into Hive. There’re several query-based techniques that we can use to do the extraction, including the popular open source software Sqoop, but they are not meant for real-time data ingestion. Binlog, on the other hand, is a real-time data stream that is used to do replication between master and slave instances. With the help of Alibaba’s open sourced Canal project, we can easily utilize the binlog facility to do data extraction from MySQL database to various destinations.

Canal

Canal Components

In brief, Canal simulates itself to be a MySQL slave and dump binlog from master, parse it, and send to downstream sinks. Canal consists of two major components, namely Canal server and Canal client. A Canal server can connect to multiple MySQL instances, and maintains an event queue for each instance. Canal clients can then subscribe to theses queues and receive data changes. The following is a quick start guide to get Canal going.

Read More

How to Extract Event Time in Apache Flume

Extracting data from upstream message queues is a common task in ETL. In a Hadoop based data warehouse, we usually use Flume to import event logs from Kafka into HDFS, and then run MapReduce jobs agaist it, or create Hive external tables partitioned by time. One of the keys of this process is to extract the event time from the logs, since real-time data can have time lags, or your system is temporarily offline and need to perform a catch-up. Flume provides various facilities to help us do this job easily.

Apache Flume

HDFS Sink and Timestamp Header

Here is a simple HDFS Sink config:

1
2
3
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/flume/ds_alog/dt=%Y%m%d

%Y%m%d is the placeholders supported by this sink. It will use the milliseconds in timestamp header to replace them. Also, HDFS Sink provides hdfs.useLocalTimeStamp option so that it’ll use the local time to replace these placeholders, but this is not what we intend.

Another sink we could use is the Hive Sink, which directly communicates with Hive metastore and loads data into HDFS as Hive table. It supports both delimited text and JSON serializers, and also requires a timestamp header. But we don’t choose it for the following reasons:

  • It doesn’t support regular expression serializer, so we cannot extract columns from arbitrary data format like access logs;
  • The columns to be extracted are defined in Hive metastore. Say the upstream events add some new keys in JSON, they will be dropped until Hive table definition is updated. As in data warehouse, it’s better to preserve the original source data for a period of time.

Read More

How to Achieve Exactly-Once Semantics in Spark Streaming

Exactly-once semantics is one of the advanced topics of stream processing. To process every message once and only once, in spite of system or network failure, not only the stream processing framework needs to provide such functionality, but also the message delivery system, the output data store, as well as how we implement the processing procedure, altogether can we ensure the exactly-once semantics. In this article, I’ll demonstrate how to use Spark Streaming, with Kafka as data source and MySQL the output storage, to achieve exactly-once stream processing.

Spark Streaming

An Introductory Example

First let’s implement a simple yet complete stream processing application that receive access logs from Kafka, parse and count the errors, then write the errors per minute metric into MySQL database.

Sample access logs:

1
2
3
2017-07-30 14:09:08 ERROR some message
2017-07-30 14:09:20 INFO some message
2017-07-30 14:10:50 ERROR some message

Output table, where log_time should be truncated to minutes:

1
2
3
4
create table error_log (
log_time datetime primary key,
log_count int not null default 0
);

Read More

Learn Pandas from a SQL Perspective

Pandas is a widely used data processing tool for Python. Along with NumPy and Matplotlib, it provides in-memory high-performance data munging, analyzing, and visualization capabilities. Although Python is an easy-to-learn programming language, it still takes time to learn Pandas APIs and the idiomatic usages. For data engineer and analysts, SQL is the de-facto standard language of data queries. This article will provide examples of how some common SQL queries can be rewritten with Pandas.

The installation and basic concepts of Pandas is not covered in this post. One can check out the offical documentation, or read the book Python for Data Analysis. And I recommend using the Anaconda Python distribution, with Spyder IDE included. Before diving into the codes, please import Pandas and NumPy as follows:

1
2
import pandas as pd
import numpy as np

FROM - Load Data into Memory

First of all, let’s read some data into the workspace (memory). Pandas supports a variety of formats, one of them is CSV. Take the following flight delay dataset for example (link):

1
2
3
4
date,delay,distance,origin,destination
02221605,3,358,BUR,SMF
01022100,-5,239,HOU,DAL
03210808,6,288,BWI,ALB

We can use pd.read_csv to load this file:

1
2
df = pd.read_csv('flights.csv', dtype={'date': str})
df.head()

This statement will load flights.csv file into memory, use first line as column names, and try to figure out each column’s type. Since the date column is in %m%d%H%M format, we don’t want to lose the initial 0 in month, so we pass an explict dtype for it, indicating that this column should stay unparsed.

Read More

Log Tailer with WebSocket and Python

Tailing a log file is a common task when we deploy or maintain some software in production. Instead of logging into the server and tail -f, it would be nice if we can tail a log file in the browser. With WebSocket, this can be done easily. In this article, I’ll walk you through a simple logviewer (source) utility that is written in Python.

Logviewer with WebSocket

WebSocket Intro

WebSocket is standard protocol over TCP, that provides full-duplex communication between client and server side, usually a browser and a web server. Before WebSocket, when we want to keep an alive browser-server connection, we choose from long polling, forever frame or Comet techniques. Now that WebSocket is widely supported by major browsers, we can use it to implement web chatroom, games, realtime dashboard, etc. Besides, WebSocket connection can be established by an HTTP upgrade request, and communicate over 80 port, so as to bring minimum impact on existing network facility.

Read More

Build Interactive Report with Crossfilter and dc.js

When visualizing multidimensional datasets, we often want to connect individual charts together, so that one chart’s filter will apply to all the other charts. We can do it manually, filter data on the server side, and update the rendered charts. Or we can filter data on the client side, and let charts update themselves. With Crossfilter and dc.js, this work becomes simple and intuitive.

Airline On-time Performance

Here’s an example taken from Crossfilter’s official website. It’s a flight delay analysis report based on ASA Data Expo dataset. And this post will introduce how to use dc.js to build the report. A runnable JSFiddle can be found here, though the dataset is reduced to 1,000 records.

Read More

Why Use Lodash When ES6 Is Available

Lodash is a well-known JavaScript utility library that makes it easy to manipulate arrays and objects, as well as functions, strings, etc. I myself enjoys its functional way to process collections, especially chaining and lazy evaluation. But as ECMAScript 2015 Standard (ES6) becomes widely supported by major browsers, and Babel, the JavaScript compiler that transforms ES6 codes to ES5, plays a major role in today’s frontend development, it seems that most Lodash utilities can be replaced by ES6. But should we? In my opinion, Lodash will remain popular, for it still has lots of useful features that could improve the way of programming.

_.map and Array#map Are Different

_.map, _.reduce, _.filter and _.forEach are frequently used functions when processing collections, and ES6 provides direct support for them:

1
2
3
4
5
6
7
8
9
10
_.map([1, 2, 3], (i) => i + 1)
_.reduce([1, 2, 3], (sum, i) => sum + i, 0)
_.filter([1, 2, 3], (i) => i > 1)
_.forEach([1, 2, 3], (i) => { console.log(i) })
// becomes
[1, 2, 3].map((i) => i + 1)
[1, 2, 3].reduce((sum, i) => sum + i, 0)
[1, 2, 3].filter((i) => i > 1)
[1, 2, 3].forEach((i) => { console.log(i) })

But Lodash’s _.map is more powerful, in that it works on objects, has iteratee / predicate shorthands, lazy evaluation, guards against null parameter, and has better performance.

Read More

Process Python Collections with Functional Programming

I develop Spark applications with Scala, and it has a very powerful collection system, in which functional programming is certainly a key. Java 8 also introduces Lambda Expression and Stream API. In JavaScript, there is a Lodash library that provides powerful tools to process arrays and objects. When my primary work language changes to Python, I am wondering if it’s possible to manipulate collections in a FP way, and fortunately Python already provides syntax and tools for functional programming. Though list comprehension is the pythonic way to deal with collections, but the idea and concepts of FP is definitely worth learning.

Wordcount Example

Let’s first write a snippet to count the word occurences from a paragraph, in of course a functional way.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import re
import itertools
content = """
an apple orange the grape
banana an apple melon
an orange banana apple
"""
word_matches = re.finditer(r'\S+', content)
words = map(lambda m: m.group(0), word_matches)
fruits = filter(lambda s: len(s) > 3, words)
grouped_fruits = itertools.groupby(sorted(fruits))
fruit_counts = map(lambda t: (t[0], len(list(t[1]))), grouped_fruits)
print(list(fruit_counts))

Run this example and you’ll get a list of fruits, along with their counts:

1
[('apple', 3), ('banana', 2), ('grape', 1), ('melon', 1), ('orange', 2)]

This example includes most aspects of processing collections with FP style. For instance, re.finditer returns an iterator that is lazily evaluated; map and filter are used to do transformations; itertools module provides various functions to cope with iterables; and last but not least, the lambda expression, an easy way to define inline anonymous function. All of them will be described in the following sections.

Read More

Difference Between Lodash _.assign and _.assignIn

In Lodash, both _.assign and _.assignIn are ways to copy source objects’ properties into target object. According the documentation, _.assign processes own enumerable string keyed properties, while _.assignIn processes both own and inherited source properties. There’re also other companion functions like _.forOwn and _.forIn, _.has and _.hasIn. So what’s the difference between them?

In brief, the In in latter methods implies the way for...in loop behaves, which iterates all enumerable properties of the object itself and those the object inherits from its constructor’s prototype. JavaScript has an inheritance mechanism called prototype chain. When iterating an object’s properties with for...in or _.forIn, all properties appeared in the object and its prototype are processed, until the prototype resolves to null. Here’s the example code taken from Lodash’s doc:

1
2
3
4
5
6
function Foo() { this.a = 1; }
Foo.prototype.b = 2;
function Bar() { this.c = 3; }
Bar.prototype.d = 4;
_.assign({a: 0}, new Foo, new Bar); // => {a: 1, c: 3}
_.assignIn({a: 0}, new Foo, new Bar); // => {a:1, b:2, c:3, d:4}

Read More

Python 2 to 3 Quick Guide

Few years ago I was programming Python 2.7, when 3.x was still not an option, because of its backward-incompatibiliy and lack of popular third-party libraries support. But now it’s safe to say Python 3 is totally ready, and here’s a list of references for those (including me) who are adopting Python 3 with a 2.x background.

  1. All Strings Are Unicode
  2. print Becomes a Function
  3. Less Lists More Views
  4. Integer Division Returns Float
  5. Comparison Operators Raises TypeError
  6. Set Literal Support
  7. New String Formatting
  8. Exception Handling
  9. Global Function Changes
  10. Renaming Modules and Relative Import

All Strings Are Unicode

When dealing with non-ASCII encodings in Python 2, there’re str, unicode, u'...', s.encode(), etc. In Python 3, there’re only text and binary data. The former is str, strings that are always represented in Unicode; the later is bytes, which is just a sequence of byte numbers.

  • Conversion between str and bytes:
1
2
3
4
5
6
7
# str to bytes
'str'.encode('UTF-8')
bytes('str', encoding='UTF-8')
# bytes to str
b'bytes'.decode('UTF-8')
str(b'bytes', encoding='UTF-8')
  • basestring is removed, use str as type: isinstance(s, str)
  • bytes is immutable, the corresponding mutable version is bytearray.
  • The default source file encoding is UTF-8 now.

Read More