Apache Beam 快速入门(Python 版)

Apache Beam 是一种大数据处理标准,由谷歌于 2016 年创建。它提供了一套统一的 DSL 用以处理离线和实时数据,并能在目前主流的大数据处理平台上使用,包括 Spark、Flink、以及谷歌自身的商业套件 Dataflow。Beam 的数据模型基于过去的几项研究成果:FlumeJavaMillwheel,适用场景包括 ETL、统计分析、实时计算等。目前,Beam 提供了两种语言的 SDK:Java、Python。本文将讲述如何使用 Python 编写 Beam 应用程序。

Apache Beam Pipeline

安装 Apache Beam

Apache Beam Python SDK 必须使用 Python 2.7.x 版本,你可以安装 pyenv 来管理不同版本的 Python,或者直接从源代码编译安装(需要支持 SSL)。之后,你便可以在 Python 虚拟环境中安装 Beam SDK 了:

1
2
3
$ virtualenv venv --distribute
$ source venv/bin/activate
(venv) $ pip install apache-beam

阅读全文

2017 Top 15 Python 数据科学类库;时间序列异常点检测;如何加入开源项目

2017 Top 15 Python 数据科学类库

Google Trends

近年来,Python 在数据科学领域得到了越来越多的关注,本文整理归类了使用率最高的数据科学类库,供大家参考。

NumPy、SciPy、Pandas 是 Python 数据科学的核心类库。NumPy 提供了 N 维数组、矩阵、向量等数据结构,能够进行高性能的数学运算;SciPy 包含了线性代数、拟合优化、统计学习的通用方法;Pandas 则一般用于数据清洗、探索型分析等工作。

可视化方面,Matplotlib 是最早流行的类库,提供了丰富的图形化接口,但 API 的使用方式偏底层,需要编写较多代码;Seaborn 构建在 Matplotlib 之上,重新定义了图表样式,更适合在报告、演示文档中使用,并且它还预置了诸多探索型分析函数,可以快速地对数据进行描述性可视化;Bokeh 主打交互性,它运行在浏览器中,让使用者可以方便地调节可视化参数;Plotly 也是一款基于页面的可视化工具,但因为是商业软件,需要授权后才能使用。

SciKit-Learn 是公认的 Python 机器学习标准类库,它提供了准确、统一的接口,可以方便地使用各种机器学习算法;深度学习领域,Theano 是比较老牌的类库之一,特点是能够运行于不同的系统架构之上(CPU、GPU);Tensorflow 则是最近较火的基础类库,使用它提供的各种算子和数据流工具,我们可以构建出多层神经网络,在集群上对大数据进行运算;Keras 则是一款较上层的工具库,底层使用 Theano 或 Tensorflow 作为引擎,可以通过快速构建实验来验证模型。

自然语言处理领域中,NLTK 提供了文本标记、分词、构建语料树等功能,用以揭示句中或句间的依赖关系;Gensim 则擅长构建向量空间模型、话题建模、挖掘大量文本中重复出现的模式,其算法都属于非监督学习,因此只需提供语料库就能得到结果。

原文:http://www.kdnuggets.com/2017/06/top-15-python-libraries-data-science.html

阅读全文

Hive 窗口与分析型函数

SQL 结构化查询语言是数据分析领域的重要工具之一。它提供了数据筛选、转换、聚合等操作,并能借助 Hive 和 Hadoop 进行大数据量的处理。但是,传统的 SQL 语句并不能支持诸如分组排名、滑动平均值等计算,原因是 GROUP BY 语句只能为每个分组的数据返回一行结果,而非每条数据一行。幸运的是,新版的 SQL 标准引入了窗口查询功能,使用 WINDOW 语句我们可以基于分区和窗口为每条数据都生成一行结果记录,这一标准也已得到了 Hive 的支持。

滑动平均值

举例来说,我们想要计算表中每只股票的两日滑动平均值,可以编写以下查询语句:

1
2
3
4
5
6
SELECT
`date`, `stock`, `close`
,AVG(`close`) OVER `w` AS `mavg`
FROM `t_stock`
WINDOW `w` AS (PARTITION BY `stock` ORDER BY `date`
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)

OVERWINDOW、以及 ROWS BETWEEN AND 都是新增的窗口查询关键字。在这个查询中,PARTITION BYORDER BY 的工作方式与 GROUP BYORDER BY 相似,区别在于它们不会将多行记录聚合成一条结果,而是将它们拆分到互不重叠的分区中进行后续处理。其后的 ROWS BETWEEN AND 语句用于构建一个 窗口帧。此例中,每一个窗口帧都包含了当前记录和上一条记录。下文会对窗口帧做进一步描述。最后,AVG 是一个窗口函数,用于计算每个窗口帧的结果。窗口帧的定义(WINDOW 语句)还可以直接附加到窗口函数之后:

1
SELECT AVG(`close`) OVER (PARTITION BY `stock`) AS `mavg` FROM `t_stock`;

阅读全文

实时计算工具库 stream-lib 使用指南

进行大数据处理时,计算唯一值、95% 分位数等操作非常占用空间和时间。但有时我们只是想对数据集有一个概略的了解,数值的准确性并不那么重要。实时监控系统中也是如此,可以容忍一定的错误率。目前已经有许多算法可以通过牺牲准确性来减少计算所需的空间和时间,这些算法大多支持数据结构之间的合并,因此可以方便地用在实时计算中。stream-lib 就是一个集成了很多此类算法的实时计算工具库,是对现有研究成果的 Java 实现。本文就将介绍这一工具库的使用方法。

唯一值计算 HyperLogLog

独立访客(UV)是网站的重要指标之一。我们通常会为每一个用户生成一个 UUID,并在 HTTP Cookie 中记录和跟踪,或直接使用 IP 地址做近似计算。我们可以使用一个 HashSet 来计算 UV 的准确值,但无疑会占用大量的空间。HyperLogLog 则是一种近似算法,用于解决此类唯一值计算的问题。该算法在对超过 10^9 个唯一值进行计算时可以做到 2% 的标准差,并只占用 1.5 kB 内存

1
2
3
4
5
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.9.5</version>
</dependency>
1
2
3
4
5
ICardinality card = new HyperLogLog(10);
for (int i : new int[] { 1, 2, 3, 2, 4, 3 }) {
card.offer(i);
}
System.out.println(card.cardinality()); // 4

阅读全文

使用 Binlog 和 Canal 从 MySQL 抽取数据

数据抽取是 ETL 流程的第一步。我们会将数据从 RDBMS 或日志服务器等外部系统抽取至数据仓库,进行清洗、转换、聚合等操作。在现代网站技术栈中,MySQL 是最常见的数据库管理系统,我们会从多个不同的 MySQL 实例中抽取数据,存入一个中心节点,或直接进入 Hive。市面上已有多种成熟的、基于 SQL 查询的抽取软件,如著名的开源项目 Apache Sqoop,然而这些工具并不支持实时的数据抽取。MySQL Binlog 则是一种实时的数据流,用于主从节点之间的数据复制,我们可以利用它来进行数据抽取。借助阿里巴巴开源的 Canal 项目,我们能够非常便捷地将 MySQL 中的数据抽取到任意目标存储中。

Canal

Canal 的组成部分

简单来说,Canal 会将自己伪装成 MySQL 从节点(Slave),并从主节点(Master)获取 Binlog,解析和贮存后供下游消费端使用。Canal 包含两个组成部分:服务端和客户端。服务端负责连接至不同的 MySQL 实例,并为每个实例维护一个事件消息队列;客户端则可以订阅这些队列中的数据变更事件,处理并存储到数据仓库中。下面我们来看如何快速搭建起一个 Canal 服务。

阅读全文

Apache Flume 如何解析消息中的事件时间

数据开发工作中,从上游消息队列抽取数据是一项常规的 ETL 流程。在基于 Hadoop 构建的数据仓库体系中,我们通常会使用 Flume 将事件日志从 Kafka 抽取到 HDFS,然后针对其开发 MapReduce 脚本,或直接创建以时间分区的 Hive 外部表。这项流程中的关键一环是提取日志中的事件时间,因为实时数据通常会包含延迟,且在系统临时宕机的情况下,我们需要追回遗漏的数据,因而使用的时间戳必须是事件产生的时间。Flume 提供的诸多工具能帮助我们非常便捷地实现这一点。

Apache Flume

HDFS Sink 和时间戳头信息

以下是一个基本的 HDFS Sink 配置:

1
2
3
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/flume/ds_alog/dt=%Y%m%d

%Y%m%d 是该 Sink 支持的时间占位符,它会使用头信息中 timestamp 的值来替换这些占位符。HDFS Sink 还提供了 hdfs.useLocalTimeStamp 选项,直接使用当前系统时间来替换时间占位符,但这并不是我们想要达到的目的。

我们还可以使用 Hive Sink 直接将事件日志导入成 Hive 表,它能直接和 Hive 元数据库通信,自动创建表分区,并支持分隔符分隔和 JSON 两种序列化形式。当然,它同样需要一个 timestamp 头信息。不过,我们没有选择 Hive Sink,主要出于以下原因:

  • 它不支持正则表达式,因此我们无法从类似访问日志这样的数据格式中提取字段列表;
  • 它所提取的字段列表是根据 Hive 表信息产生的。假设上游数据源在 JSON 日志中加入了新的键值,直至我们主动更新 Hive 元信息,这些新增字段将被直接丢弃。对于数据仓库来说,完整保存原始数据是很有必要的。

阅读全文

Spark Streaming 中如何实现 Exactly-Once 语义

Exactly-once 语义是实时计算的难点之一。要做到每一条记录只会被处理一次,即使服务器或网络发生故障时也能保证没有遗漏,这不仅需要实时计算框架本身的支持,还对上游的消息系统、下游的数据存储有所要求。此外,我们在编写计算流程时也需要遵循一定规范,才能真正实现 Exactly-once。本文将讲述如何结合 Spark Streaming 框架、Kafka 消息系统、以及 MySQL 数据库来实现 Exactly-once 的实时计算流程。

Spark Streaming

引例

首先让我们实现一个简单而完整的实时计算流程。我们从 Kafka 接收用户访问日志,解析并提取其中的时间和日志级别,并统计每分钟错误日志的数量,结果保存到 MySQL 中。

示例日志:

1
2
3
2017-07-30 14:09:08 ERROR some message
2017-07-30 14:09:20 INFO some message
2017-07-30 14:10:50 ERROR some message

结果表结构,其中 log_time 字段会截取到分钟级别:

1
2
3
4
create table error_log (
log_time datetime primary key,
log_count int not null default 0
);

阅读全文

通过 SQL 查询学习 Pandas 数据处理

Pandas 是一款广泛使用的数据处理工具。结合 NumPy 和 Matplotlib 类库,我们可以在内存中进行高性能的数据清洗、转换、分析及可视化工作。虽然 Python 本身是一门非常容易学习的语言,但要熟练掌握 Pandas 丰富的 API 接口及正确的使用方式,还是需要投入一定时间的。对于数据开发工程师或分析师而言,SQL 语言是标准的数据查询工具。本文提供了一系列的示例,如何将常见的 SQL 查询语句使用 Pandas 来实现。

Pandas 的安装和基本概念并不在本文讲述范围内,请读者到官网上阅读相关文档,或者阅读《利用 Python 进行数据分析》一书。我推荐大家使用 Anaconda Python 套件,其中集成了 Spyder 集成开发环境。在运行下文的代码之前,请先引入 Pandas 和 NumPy 包:

1
2
import pandas as pd
import numpy as np

FROM - 读取数据

首先,我们需要将数据加载到工作区(内存)。Pandas 原生支持非常多的数据格式,CSV 是较常见的一种。我们以航班延误时间数据集为例(下载地址):

1
2
3
4
date,delay,distance,origin,destination
02221605,3,358,BUR,SMF
01022100,-5,239,HOU,DAL
03210808,6,288,BWI,ALB

我们可以使用 pd.read_csv 函数加载它:

1
2
df = pd.read_csv('flights.csv', dtype={'date': str})
df.head()

这条命令会将 flights.csv 文件读入内存,使用首行作为列名,并自动检测每一列的数据类型。其中,由于 date 一列的日期格式是 %m%d%H%M,自动转换成数字后会失去月份的前异零(02 月的 0),因此我们显式指定了该列的 dtype,告知 Pandas 保留原值。

阅读全文

使用 WebSocket 和 Python 编写日志查看器

在生产环境运维工作中,查看线上服务器日志是一项常规工作。如果这项工作可以在浏览器中进行,而无需登录服务器执行 tail -f 命令,就太方便了。我们可以使用 WebSocket 技术轻松实现这一目标。在本文中,我将带各位一起使用 Python 编写一个日志查看工具。

基于 WebSocket 的日志查看器

WebSocket 简介

WebSocket 是一个标准化协议,构建在 TCP 之上,能够在客户端和服务端之间建立一个全双工的通信渠道。这里的客户端和服务端通常是用户浏览器和 Web 服务器。在 WebSocket 诞生之前,如果我们想保持这样的一个长连接,就需要使用诸如长轮询、永久帧、Comet 等技术。而现今 WebSocket 已经得到了所有主流浏览器的支持,我们可以使用它开发出在线聊天室、游戏、实时仪表盘等软件。此外,WebSocket 可以通过 HTTP Upgrade 请求来建立连接,并使用 80 端口通信,从而降低对现有网络环境的影响,如无需穿越防火墙。

阅读全文

为什么不用 ES6 完全替换 Lodash

Lodash 是一款非常知名的 JavaScript 工具库,能够让开发者十分便捷地操纵数组和对象。我则是非常喜欢用它提供的函数式编程风格来操作集合类型,特别是链式调用和惰性求值。然而,随着 ECMAScript 2015 Standard (ES6) 得到越来越多主流浏览器的支持,以及像 Babel 这样,能够将 ES6 代码编译成 ES5 从而在旧浏览器上运行的工具日渐流行,人们会发现许多 Lodash 提供的功能已经可以用 ES6 来替换了。然而真的如此吗?我认为,Lodash 仍然会非常流行,因为它可以为程序员提供更多的便利,并且优化我们编程的方式。

_.mapArray#map 是有区别的

在处理集合对象时,_.map_.reduce_.filter、以及 _.forEach 的使用频率很高,而今 ES6 已经能够原生支持这些操作:

1
2
3
4
5
6
7
8
9
10
_.map([1, 2, 3], (i) => i + 1)
_.reduce([1, 2, 3], (sum, i) => sum + i, 0)
_.filter([1, 2, 3], (i) => i > 1)
_.forEach([1, 2, 3], (i) => { console.log(i) })

// 使用 ES6 改写
[1, 2, 3].map((i) => i + 1)
[1, 2, 3].reduce((sum, i) => sum + i, 0)
[1, 2, 3].filter((i) => i > 1)
[1, 2, 3].forEach((i) => { console.log(i) })

但是,Lodash 的 _.map 函数功能更强大,它能够操作对象类型,提供了遍历和过滤的快捷方式,能够惰性求值,对 null 值容错,并且有着更好的性能。

阅读全文