Spark DataSource API V2

Spark 1.3 引入了第一版的数据源 API,我们可以使用它将常见的数据格式整合到 Spark SQL 中。但是,随着 Spark 的不断发展,这一 API 也体现出了其局限性,故而 Spark 团队不得不加入越来越多的专有代码来编写数据源,以获得更好的性能。Spark 2.3 中,新一版的数据源 API 初见雏形,它克服了上一版 API 的种种问题,原来的数据源代码也在逐步重写。本文将演示这两版 API 的使用方法,比较它们的不同之处,以及新版 API 的优势在哪里。

DataSource V1 API

V1 API 由一系列的抽象类和接口组成,它们位于 spark/sql/sources/interfaces.scala 文件中。主要的内容有:

1
2
3
4
5
6
7
8
9
10
11
12
trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructType
}

trait TableScan {
def buildScan(): RDD[Row]
}

通过实现 RelationProvider 接口,表明该类是一种新定义的数据源,可以供 Spark SQL 取数所用。传入 createRelation 方法的参数可以用来做初始化,如文件路径、权限信息等。BaseRelation 抽象类则用来定义数据源的表结构,它的来源可以是数据库、Parquet 文件等外部系统,也可以直接由用户指定。该类还必须实现某个 Scan 接口,Spark 会调用 buildScan 方法来获取数据源的 RDD,我们将在下文看到。

阅读全文

Flume 源码解析:HDFS Sink

Apache Flume 数据流程的最后一部分是 Sink,它会将上游抽取并转换好的数据输送到外部存储中去,如本地文件、HDFS、ElasticSearch 等。本文将通过分析源码来展现 HDFS Sink 的工作流程。

Sink 组件的生命周期

在上一篇文章中, 我们了解到 Flume 组件都会实现 LifecycleAware 接口,并由 LifecycleSupervisor 实例管理和监控。不过,Sink 组件并不直接由它管理,而且被包装在了 SinkRunnerSinkProcessor 这两个类中。Flume 支持三种 Sink 处理器,该处理器会将 Channel 和 Sink 以不同的方式连接起来。这里我们只讨论 DefaultSinkProcessor 的情况,即一个 Channel 只会连接一个 Sink。同时,我们也将略过对 Sink 分组的讨论。

Sink Component LifeCycle

阅读全文

Java 空指针异常的若干解决方案

Java 中任何对象都有可能为空,当我们调用空对象的方法时就会抛出 NullPointerException 空指针异常,这是一种非常常见的错误类型。我们可以使用若干种方法来避免产生这类异常,使得我们的代码更为健壮。本文将列举这些解决方案,包括传统的空值检测、编程规范、以及使用现代 Java 语言引入的各类工具来作为辅助。

运行时检测

最显而易见的方法就是使用 if (obj == null) 来对所有需要用到的对象来进行检测,包括函数参数、返回值、以及类实例的成员变量。当你检测到 null 值时,可以选择抛出更具针对性的异常类型,如 IllegalArgumentException,并添加消息内容。我们可以使用一些库函数来简化代码,如 Java 7 开始提供的 Objects#requireNonNull 方法:

1
2
3
4
public void testObjects(Object arg) {
Object checked = Objects.requireNonNull(arg, "arg must not be null");
checked.toString();
}

Guava 的 Preconditions 类中也提供了一系列用于检测参数合法性的工具函数,其中就包含空值检测:

1
2
3
4
public void testGuava(Object arg) {
Object checked = Preconditions.checkNotNull(arg, "%s must not be null", "arg");
checked.toString();
}

我们还可以使用 Lombok 来生成空值检测代码,并抛出带有提示信息的空指针异常:

1
2
3
public void testLombok(@NonNull Object arg) {
arg.toString();
}

生成的代码如下:

1
2
3
4
5
6
public void testLombokGenerated(Object arg) {
if (arg == null) {
throw new NullPointerException("arg is marked @NonNull but is null");
}
arg.toString();
}

这个注解还可以用在类实例的成员变量上,所有的赋值操作会自动进行空值检测。

阅读全文

是否需要使用 ESLint jsx-no-bind 规则?

在使用 ESLint React 插件时,有一条名为 jsx-no-bind 的检测规则,它会禁止我们在 JSX 属性中使用 .bind 方法和箭头函数。比如下列代码,ESLint 会提示 onClick 属性中的箭头函数不合法:

1
2
3
4
5
6
7
8
9
10
11
class ListArrow extends React.Component {
render() {
return (
<ul>
{this.state.items.map(item => (
<li key={item.id} onClick={() => { alert(item.id) }}>{item.text}</li>
))}
</ul>
)
}
}

这条规则的引入原因有二。首先,每次执行 render 方法时都会生成一个新的匿名函数对象,这样就会对垃圾回收器造成负担;其次,属性中的箭头函数会影响渲染过程:当你使用了 PureComponent,或者自己实现了 shouldComponentUpdate 方法,使用对象比较的方式来决定是否要重新渲染组件,那么组件属性中的箭头函数就会让该方法永远返回真值,引起不必要的重复渲染。

然而,反对的声音认为这两个原因还不足以要求我们在所有代码中应用该规则,特别是当需要引入更多代码、并牺牲一定可读性的情况下。在 Airbnb ESLint 预设规则集中,只禁止了 .bind 方法的使用,而允许在属性(props)或引用(refs)中使用箭头函数。对此我翻阅了文档,阅读了一些关于这个话题的博客,也认为这条规则有些过于严格。甚至还有博主称该规则是一种过早优化(premature optimization),我们需要先做基准测试,再着手修改代码。下文中,我将简要叙述箭头函数是如何影响渲染过程的,有哪些可行的解决方案,以及它为何不太重要。

阅读全文

TensorFlow 模型如何对外提供服务

TensorFlow 是目前最为流行的机器学习框架之一,通过它我们可以便捷地构建机器学习模型。使用 TensorFlow 模型对外提供服务有若干种方式,本文将介绍如何使用 SavedModel 机制来编写模型预测接口。

鸢尾花深层神经网络分类器

首先让我们使用 TensorFlow 的深层神经网络模型来构建一个鸢尾花的分类器。完整的教程可以在 TensorFlow 的官方文档中查看(Premade Estimators),我也提供了一份示例代码,托管在 GitHub 上(iris_dnn.py),读者可以克隆到本地进行测试。以下是部分代码摘要:

1
2
3
4
5
6
7
8
9
10
11
12
13
feature_columns = [tf.feature_column.numeric_column(key=key)
for key in train_x.keys()]
classifier = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
hidden_units=[10, 10],
n_classes=3)

classifier.train(
input_fn=lambda: train_input_fn(train_x, train_y, batch_size=BATCH_SIZE),
steps=STEPS)

predictions = classifier.predict(
input_fn=lambda: eval_input_fn(predict_x, labels=None, batch_size=BATCH_SIZE))

阅读全文

使用 Python 和 Thrift 连接 HBase

Apache HBase 是 Hadoop 生态环境中的键值存储系统(Key-value Store)。它构建在 HDFS 之上,可以对大型数据进行高速的读写操作。HBase 的开发语言是 Java,因此提供了原生的 Java 语言客户端。不过,借助于 Thrift 和其丰富的语言扩展,我们可以十分便捷地在任何地方调用 HBase 服务。文本将讲述的就是如何使用 Thrift 和 Python 来读写 HBase。

生成 Thrift 类定义

如果你对 Apache Thrift 并不熟悉,它提供了一套 IDL(接口描述语言),用于定义远程服务的方法签名和数据类型,并能将其转换成所需要的目标语言。举例来说,以下是用该 IDL 定义的一个数据结构:

1
2
3
4
5
struct TColumn {
1: required binary family,
2: optional binary qualifier,
3: optional i64 timestamp
}

转换后的 Python 代码是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TColumn(object):
def __init__(self, family=None, qualifier=None, timestamp=None,):
self.family = family
self.qualifier = qualifier
self.timestamp = timestamp

def read(self, iprot):
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
# ...

def write(self, oprot):
oprot.writeStructBegin('TColumn')
# ...

阅读全文

Vuex 严格模式下的表单处理

在使用 Vue 进行表单处理时,我们通常会使用 v-model 来建立双向绑定。但是,如果将表单数据交由 Vuex 管理,这时的双向绑定就会引发问题,因为在 严格模式 下,Vuex 是不允许在 Mutation 之外的地方修改状态数据的。以下用一个简单的项目举例说明,完整代码可在 GitHub(链接) 查看。

src/store/table.js

1
2
3
4
5
6
7
8
export default {
state: {
namespaced: true,
table: {
table_name: ''
}
}
}

src/components/NonStrict.vue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<b-form-group label="表名:">
<b-form-input v-model="table.table_name" />
</b-form-group>

<script>
import { mapState } from 'vuex'

export default {
computed: {
...mapState('table', [
'table'
])
}
}
</script>

当我们在“表名”字段输入文字时,浏览器会报以下错误:

1
2
3
4
错误:[vuex] 禁止在 Mutation 之外修改 Vuex 状态数据。
at assert (vuex.esm.js?358c:97)
at Vue.store._vm.$watch.deep (vuex.esm.js?358c:746)
at Watcher.run (vue.esm.js?efeb:3233)

当然,我们可以选择不开启严格模式,只是这样就无法通过工具追踪到每一次的状态变动了。下面我将列举几种解决方案,描述如何在严格模式下进行表单处理。

阅读全文

RESTful API 中的错误处理

RESTful API

构建 Web 服务时,我们会使用 RESTful API 来实现组件间的通信,特别是在现今前后端分离的技术背景下。REST 是一种基于 HTTP 协议的通信方式,它简单、基于文本、且在各种语言、浏览器及客户端软件中能得到很好的支持。然而,REST 目前并没有一个普遍接受的标准,因此开发者需要自行决定 API 的设计,其中一项决策就是错误处理。比如我们是否应该使用 HTTP 状态码来标识错误?如何返回表单验证的结果等等。以下这篇文章是基于日常使用中的经验总结的一套错误处理流程,供读者们参考。

错误的分类

错误可以分为两种类型:全局错误和本地错误。全局错误包括:请求了一个不存在的 API、无权请求这个 API、数据库连接失败、或其他一些没有预期到的、会终止程序运行的服务端错误。这类错误应该由 Web 框架捕获,无需各个 API 处理。

本地错误则和 API 密切相关,例如表单验证、唯一性检查、或其他可预期的错误。我们需要编写特定代码来捕获这类错误,并抛出一个包含提示信息的全局异常,供 Web 框架捕获并返回给客户端。

例如,Flask 框架就提供了此类全局异常处理机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BadRequest(Exception):
"""将本地错误包装成一个异常实例供抛出"""
def __init__(self, message, status=400, payload=None):
self.message = message
self.status = status
self.payload = payload


@app.errorhandler(BadRequest)
def handle_bad_request(error):
"""捕获 BadRequest 全局异常,序列化为 JSON 并返回 HTTP 400"""
payload = dict(error.payload or ())
payload['status'] = error.status
payload['message'] = error.message
return jsonify(payload), 400


@app.route('/person', methods=['POST'])
def person_post():
"""创建用户的 API,成功则返回用户 ID"""
if not request.form.get('username'):
raise BadRequest('用户名不能为空', 40001, { 'ext': 1 })
return jsonify(last_insert_id=1)

阅读全文

Flume 源码解析:组件生命周期

Apache Flume 是数据仓库体系中用于做实时 ETL 的工具。它提供了丰富的数据源和写入组件,这些组件在运行时都由 Flume 的生命周期管理机制进行监控和维护。本文将对这部分功能的源码进行解析。

项目结构

Flume 的源码可以从 GitHub 上下载。它是一个 Maven 项目,我们将其导入到 IDE 中以便更好地进行源码阅读。以下是代码仓库的基本结构:

1
2
3
4
5
6
/flume-ng-node
/flume-ng-code
/flume-ng-sdk
/flume-ng-sources/flume-kafka-source
/flume-ng-channels/flume-kafka-channel
/flume-ng-sinks/flume-hdfs-sink

程序入口

Flume Agent 的入口 main 函数位于 flume-ng-node 模块的 org.apache.flume.node.Application 类中。下列代码是该函数的摘要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Application {
public static void main(String[] args) {
CommandLineParser parser = new GnuParser();
if (isZkConfigured) {
if (reload) {
PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider;
components.add(zookeeperConfigurationProvider);
} else {
StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider;
application.handleConfigurationEvent();
}
} else {
// PropertiesFileConfigurationProvider
}
application.start();
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
}
}

启动过程说明如下:

  1. 使用 commons-cli 对命令行参数进行解析,提取 Agent 名称、配置信息读取方式及其路径信息;
  2. 配置信息可以通过文件或 ZooKeeper 的方式进行读取,两种方式都支持热加载,即我们不需要重启 Agent 就可以更新配置内容:
    • 基于文件的配置热加载是通过一个后台线程对文件进行轮询实现的;
    • 基于 ZooKeeper 的热加载则是使用了 Curator 的 NodeCache 模式,底层是 ZooKeeper 原生的监听(Watch)特性。
  3. 如果配置热更新是开启的(默认开启),配置提供方 ConfigurationProvider 就会将自身注册到 Agent 程序的组件列表中,并在 Application#start 方法调用后,由 LifecycleSupervisor 类进行启动和管理,加载和解析配置文件,从中读取组件列表。
  4. 如果热更新未开启,则配置提供方将在启动时立刻读取配置文件,并由 LifecycleSupervisor 启动和管理所有组件。
  5. 最后,main 会调用 Runtime#addShutdownHook,当 JVM 关闭时(SIGTERM 或者 Ctrl+C),Application#stop 会被用于关闭 Flume Agent,使各组件优雅退出。

阅读全文

Pandas 与数据整理

Tidy Data 论文中,Wickham 博士 提出了这样一种“整洁”的数据结构:每个变量是一列,每次观测结果是一行,不同的观测类型存放在单独的表中。他认为这样的数据结构可以帮助分析师更简单高效地进行处理、建模、和可视化。他在论文中列举了 五种 不符合整洁数据的情况,并演示了如何通过 R 语言 对它们进行整理。本文中,我们将使用 Python 和 Pandas 来达到同样的目的。

文中的源代码和演示数据可以在 GitHub(链接)上找到。读者应该已经安装好 Python 开发环境,推荐各位使用 Anaconda 和 Spyder IDE。

列名称是数据值,而非变量名

1
2
3
import pandas as pd
df = pd.read_csv('data/pew.csv')
df.head(10)

宗教信仰与收入 - Pew 论坛

表中的列“<$10k”、“$10-20k”其实是“收入”变量的具体值。变量 是指某一特性的观测值,如身高、体重,本例中则是收入、宗教信仰。表中的数值数据构成了另一个变量——人数。要做到 每个变量是一列 ,我们需要进行以下变换:

1
2
3
4
5
6
df = df.set_index('religion')
df = df.stack()
df.index = df.index.rename('income', level=1)
df.name = 'frequency'
df = df.reset_index()
df.head(10)

宗教信仰与收入 - 整洁版

阅读全文