TensorFlow 模型如何对外提供服务

TensorFlow 是目前最为流行的机器学习框架之一,通过它我们可以便捷地构建机器学习模型。使用 TensorFlow 模型对外提供服务有若干种方式,本文将介绍如何使用 SavedModel 机制来编写模型预测接口。

鸢尾花深层神经网络分类器

首先让我们使用 TensorFlow 的深层神经网络模型来构建一个鸢尾花的分类器。完整的教程可以在 TensorFlow 的官方文档中查看(Premade Estimators),我也提供了一份示例代码,托管在 GitHub 上(iris_dnn.py),读者可以克隆到本地进行测试。以下是部分代码摘要:

1
2
3
4
5
6
7
8
9
10
11
12
13
feature_columns = [tf.feature_column.numeric_column(key=key)
for key in train_x.keys()]
classifier = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
hidden_units=[10, 10],
n_classes=3)

classifier.train(
input_fn=lambda: train_input_fn(train_x, train_y, batch_size=BATCH_SIZE),
steps=STEPS)

predictions = classifier.predict(
input_fn=lambda: eval_input_fn(predict_x, labels=None, batch_size=BATCH_SIZE))

阅读全文

使用 Python 和 Thrift 连接 HBase

Apache HBase 是 Hadoop 生态环境中的键值存储系统(Key-value Store)。它构建在 HDFS 之上,可以对大型数据进行高速的读写操作。HBase 的开发语言是 Java,因此提供了原生的 Java 语言客户端。不过,借助于 Thrift 和其丰富的语言扩展,我们可以十分便捷地在任何地方调用 HBase 服务。文本将讲述的就是如何使用 Thrift 和 Python 来读写 HBase。

生成 Thrift 类定义

如果你对 Apache Thrift 并不熟悉,它提供了一套 IDL(接口描述语言),用于定义远程服务的方法签名和数据类型,并能将其转换成所需要的目标语言。举例来说,以下是用该 IDL 定义的一个数据结构:

1
2
3
4
5
struct TColumn {
1: required binary family,
2: optional binary qualifier,
3: optional i64 timestamp
}

转换后的 Python 代码是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TColumn(object):
def __init__(self, family=None, qualifier=None, timestamp=None,):
self.family = family
self.qualifier = qualifier
self.timestamp = timestamp

def read(self, iprot):
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
# ...

def write(self, oprot):
oprot.writeStructBegin('TColumn')
# ...

阅读全文

Vuex 严格模式下的表单处理

在使用 Vue 进行表单处理时,我们通常会使用 v-model 来建立双向绑定。但是,如果将表单数据交由 Vuex 管理,这时的双向绑定就会引发问题,因为在 严格模式 下,Vuex 是不允许在 Mutation 之外的地方修改状态数据的。以下用一个简单的项目举例说明,完整代码可在 GitHub(链接) 查看。

src/store/table.js

1
2
3
4
5
6
7
8
export default {
state: {
namespaced: true,
table: {
table_name: ''
}
}
}

src/components/NonStrict.vue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<b-form-group label="表名:">
<b-form-input v-model="table.table_name" />
</b-form-group>

<script>
import { mapState } from 'vuex'

export default {
computed: {
...mapState('table', [
'table'
])
}
}
</script>

当我们在“表名”字段输入文字时,浏览器会报以下错误:

1
2
3
4
错误:[vuex] 禁止在 Mutation 之外修改 Vuex 状态数据。
at assert (vuex.esm.js?358c:97)
at Vue.store._vm.$watch.deep (vuex.esm.js?358c:746)
at Watcher.run (vue.esm.js?efeb:3233)

当然,我们可以选择不开启严格模式,只是这样就无法通过工具追踪到每一次的状态变动了。下面我将列举几种解决方案,描述如何在严格模式下进行表单处理。

阅读全文

RESTful API 中的错误处理

RESTful API

构建 Web 服务时,我们会使用 RESTful API 来实现组件间的通信,特别是在现今前后端分离的技术背景下。REST 是一种基于 HTTP 协议的通信方式,它简单、基于文本、且在各种语言、浏览器及客户端软件中能得到很好的支持。然而,REST 目前并没有一个普遍接受的标准,因此开发者需要自行决定 API 的设计,其中一项决策就是错误处理。比如我们是否应该使用 HTTP 状态码来标识错误?如何返回表单验证的结果等等。以下这篇文章是基于日常使用中的经验总结的一套错误处理流程,供读者们参考。

错误的分类

错误可以分为两种类型:全局错误和本地错误。全局错误包括:请求了一个不存在的 API、无权请求这个 API、数据库连接失败、或其他一些没有预期到的、会终止程序运行的服务端错误。这类错误应该由 Web 框架捕获,无需各个 API 处理。

本地错误则和 API 密切相关,例如表单验证、唯一性检查、或其他可预期的错误。我们需要编写特定代码来捕获这类错误,并抛出一个包含提示信息的全局异常,供 Web 框架捕获并返回给客户端。

例如,Flask 框架就提供了此类全局异常处理机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BadRequest(Exception):
"""将本地错误包装成一个异常实例供抛出"""
def __init__(self, message, status=400, payload=None):
self.message = message
self.status = status
self.payload = payload


@app.errorhandler(BadRequest)
def handle_bad_request(error):
"""捕获 BadRequest 全局异常,序列化为 JSON 并返回 HTTP 400"""
payload = dict(error.payload or ())
payload['status'] = error.status
payload['message'] = error.message
return jsonify(payload), 400


@app.route('/person', methods=['POST'])
def person_post():
"""创建用户的 API,成功则返回用户 ID"""
if not request.form.get('username'):
raise BadRequest('用户名不能为空', 40001, { 'ext': 1 })
return jsonify(last_insert_id=1)

阅读全文

Flume 源码解析:组件生命周期

Apache Flume 是数据仓库体系中用于做实时 ETL 的工具。它提供了丰富的数据源和写入组件,这些组件在运行时都由 Flume 的生命周期管理机制进行监控和维护。本文将对这部分功能的源码进行解析。

项目结构

Flume 的源码可以从 GitHub 上下载。它是一个 Maven 项目,我们将其导入到 IDE 中以便更好地进行源码阅读。以下是代码仓库的基本结构:

1
2
3
4
5
6
/flume-ng-node
/flume-ng-code
/flume-ng-sdk
/flume-ng-sources/flume-kafka-source
/flume-ng-channels/flume-kafka-channel
/flume-ng-sinks/flume-hdfs-sink

程序入口

Flume Agent 的入口 main 函数位于 flume-ng-node 模块的 org.apache.flume.node.Application 类中。下列代码是该函数的摘要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Application {
public static void main(String[] args) {
CommandLineParser parser = new GnuParser();
if (isZkConfigured) {
if (reload) {
PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider;
components.add(zookeeperConfigurationProvider);
} else {
StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider;
application.handleConfigurationEvent();
}
} else {
// PropertiesFileConfigurationProvider
}
application.start();
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
}
}

启动过程说明如下:

  1. 使用 commons-cli 对命令行参数进行解析,提取 Agent 名称、配置信息读取方式及其路径信息;
  2. 配置信息可以通过文件或 ZooKeeper 的方式进行读取,两种方式都支持热加载,即我们不需要重启 Agent 就可以更新配置内容:
    • 基于文件的配置热加载是通过一个后台线程对文件进行轮询实现的;
    • 基于 ZooKeeper 的热加载则是使用了 Curator 的 NodeCache 模式,底层是 ZooKeeper 原生的监听(Watch)特性。
  3. 如果配置热更新是开启的(默认开启),配置提供方 ConfigurationProvider 就会将自身注册到 Agent 程序的组件列表中,并在 Application#start 方法调用后,由 LifecycleSupervisor 类进行启动和管理,加载和解析配置文件,从中读取组件列表。
  4. 如果热更新未开启,则配置提供方将在启动时立刻读取配置文件,并由 LifecycleSupervisor 启动和管理所有组件。
  5. 最后,main 会调用 Runtime#addShutdownHook,当 JVM 关闭时(SIGTERM 或者 Ctrl+C),Application#stop 会被用于关闭 Flume Agent,使各组件优雅退出。

阅读全文

Pandas 与数据整理

Tidy Data 论文中,Wickham 博士 提出了这样一种“整洁”的数据结构:每个变量是一列,每次观测结果是一行,不同的观测类型存放在单独的表中。他认为这样的数据结构可以帮助分析师更简单高效地进行处理、建模、和可视化。他在论文中列举了 五种 不符合整洁数据的情况,并演示了如何通过 R 语言 对它们进行整理。本文中,我们将使用 Python 和 Pandas 来达到同样的目的。

文中的源代码和演示数据可以在 GitHub(链接)上找到。读者应该已经安装好 Python 开发环境,推荐各位使用 Anaconda 和 Spyder IDE。

列名称是数据值,而非变量名

1
2
3
import pandas as pd
df = pd.read_csv('data/pew.csv')
df.head(10)

宗教信仰与收入 - Pew 论坛

表中的列“<$10k”、“$10-20k”其实是“收入”变量的具体值。变量 是指某一特性的观测值,如身高、体重,本例中则是收入、宗教信仰。表中的数值数据构成了另一个变量——人数。要做到 每个变量是一列 ,我们需要进行以下变换:

1
2
3
4
5
6
df = df.set_index('religion')
df = df.stack()
df.index = df.index.rename('income', level=1)
df.name = 'frequency'
df = df.reset_index()
df.head(10)

宗教信仰与收入 - 整洁版

阅读全文

Apache Beam 快速入门(Python 版)

Apache Beam 是一种大数据处理标准,由谷歌于 2016 年创建。它提供了一套统一的 DSL 用以处理离线和实时数据,并能在目前主流的大数据处理平台上使用,包括 Spark、Flink、以及谷歌自身的商业套件 Dataflow。Beam 的数据模型基于过去的几项研究成果:FlumeJavaMillwheel,适用场景包括 ETL、统计分析、实时计算等。目前,Beam 提供了两种语言的 SDK:Java、Python。本文将讲述如何使用 Python 编写 Beam 应用程序。

Apache Beam Pipeline

安装 Apache Beam

Apache Beam Python SDK 必须使用 Python 2.7.x 版本,你可以安装 pyenv 来管理不同版本的 Python,或者直接从源代码编译安装(需要支持 SSL)。之后,你便可以在 Python 虚拟环境中安装 Beam SDK 了:

1
2
3
$ virtualenv venv --distribute
$ source venv/bin/activate
(venv) $ pip install apache-beam

阅读全文

2017 Top 15 Python 数据科学类库;时间序列异常点检测;如何加入开源项目

2017 Top 15 Python 数据科学类库

Google Trends

近年来,Python 在数据科学领域得到了越来越多的关注,本文整理归类了使用率最高的数据科学类库,供大家参考。

NumPy、SciPy、Pandas 是 Python 数据科学的核心类库。NumPy 提供了 N 维数组、矩阵、向量等数据结构,能够进行高性能的数学运算;SciPy 包含了线性代数、拟合优化、统计学习的通用方法;Pandas 则一般用于数据清洗、探索型分析等工作。

可视化方面,Matplotlib 是最早流行的类库,提供了丰富的图形化接口,但 API 的使用方式偏底层,需要编写较多代码;Seaborn 构建在 Matplotlib 之上,重新定义了图表样式,更适合在报告、演示文档中使用,并且它还预置了诸多探索型分析函数,可以快速地对数据进行描述性可视化;Bokeh 主打交互性,它运行在浏览器中,让使用者可以方便地调节可视化参数;Plotly 也是一款基于页面的可视化工具,但因为是商业软件,需要授权后才能使用。

SciKit-Learn 是公认的 Python 机器学习标准类库,它提供了准确、统一的接口,可以方便地使用各种机器学习算法;深度学习领域,Theano 是比较老牌的类库之一,特点是能够运行于不同的系统架构之上(CPU、GPU);Tensorflow 则是最近较火的基础类库,使用它提供的各种算子和数据流工具,我们可以构建出多层神经网络,在集群上对大数据进行运算;Keras 则是一款较上层的工具库,底层使用 Theano 或 Tensorflow 作为引擎,可以通过快速构建实验来验证模型。

自然语言处理领域中,NLTK 提供了文本标记、分词、构建语料树等功能,用以揭示句中或句间的依赖关系;Gensim 则擅长构建向量空间模型、话题建模、挖掘大量文本中重复出现的模式,其算法都属于非监督学习,因此只需提供语料库就能得到结果。

原文:http://www.kdnuggets.com/2017/06/top-15-python-libraries-data-science.html

阅读全文

Hive 窗口与分析型函数

SQL 结构化查询语言是数据分析领域的重要工具之一。它提供了数据筛选、转换、聚合等操作,并能借助 Hive 和 Hadoop 进行大数据量的处理。但是,传统的 SQL 语句并不能支持诸如分组排名、滑动平均值等计算,原因是 GROUP BY 语句只能为每个分组的数据返回一行结果,而非每条数据一行。幸运的是,新版的 SQL 标准引入了窗口查询功能,使用 WINDOW 语句我们可以基于分区和窗口为每条数据都生成一行结果记录,这一标准也已得到了 Hive 的支持。

滑动平均值

举例来说,我们想要计算表中每只股票的两日滑动平均值,可以编写以下查询语句:

1
2
3
4
5
6
SELECT
`date`, `stock`, `close`
,AVG(`close`) OVER `w` AS `mavg`
FROM `t_stock`
WINDOW `w` AS (PARTITION BY `stock` ORDER BY `date`
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)

OVERWINDOW、以及 ROWS BETWEEN AND 都是新增的窗口查询关键字。在这个查询中,PARTITION BYORDER BY 的工作方式与 GROUP BYORDER BY 相似,区别在于它们不会将多行记录聚合成一条结果,而是将它们拆分到互不重叠的分区中进行后续处理。其后的 ROWS BETWEEN AND 语句用于构建一个 窗口帧。此例中,每一个窗口帧都包含了当前记录和上一条记录。下文会对窗口帧做进一步描述。最后,AVG 是一个窗口函数,用于计算每个窗口帧的结果。窗口帧的定义(WINDOW 语句)还可以直接附加到窗口函数之后:

1
SELECT AVG(`close`) OVER (PARTITION BY `stock`) AS `mavg` FROM `t_stock`;

阅读全文

实时计算工具库 stream-lib 使用指南

进行大数据处理时,计算唯一值、95% 分位数等操作非常占用空间和时间。但有时我们只是想对数据集有一个概略的了解,数值的准确性并不那么重要。实时监控系统中也是如此,可以容忍一定的错误率。目前已经有许多算法可以通过牺牲准确性来减少计算所需的空间和时间,这些算法大多支持数据结构之间的合并,因此可以方便地用在实时计算中。stream-lib 就是一个集成了很多此类算法的实时计算工具库,是对现有研究成果的 Java 实现。本文就将介绍这一工具库的使用方法。

唯一值计算 HyperLogLog

独立访客(UV)是网站的重要指标之一。我们通常会为每一个用户生成一个 UUID,并在 HTTP Cookie 中记录和跟踪,或直接使用 IP 地址做近似计算。我们可以使用一个 HashSet 来计算 UV 的准确值,但无疑会占用大量的空间。HyperLogLog 则是一种近似算法,用于解决此类唯一值计算的问题。该算法在对超过 10^9 个唯一值进行计算时可以做到 2% 的标准差,并只占用 1.5 kB 内存

1
2
3
4
5
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.9.5</version>
</dependency>
1
2
3
4
5
ICardinality card = new HyperLogLog(10);
for (int i : new int[] { 1, 2, 3, 2, 4, 3 }) {
card.offer(i);
}
System.out.println(card.cardinality()); // 4

阅读全文