Flume 源码解析:组件生命周期

Apache Flume 是数据仓库体系中用于做实时 ETL 的工具。它提供了丰富的数据源和写入组件,这些组件在运行时都由 Flume 的生命周期管理机制进行监控和维护。本文将对这部分功能的源码进行解析。

项目结构

Flume 的源码可以从 GitHub 上下载。它是一个 Maven 项目,我们将其导入到 IDE 中以便更好地进行源码阅读。以下是代码仓库的基本结构:

1
2
3
4
5
6
/flume-ng-node
/flume-ng-code
/flume-ng-sdk
/flume-ng-sources/flume-kafka-source
/flume-ng-channels/flume-kafka-channel
/flume-ng-sinks/flume-hdfs-sink

程序入口

Flume Agent 的入口 main 函数位于 flume-ng-node 模块的 org.apache.flume.node.Application 类中。下列代码是该函数的摘要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Application {
public static void main(String[] args) {
CommandLineParser parser = new GnuParser();
if (isZkConfigured) {
if (reload) {
PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider;
components.add(zookeeperConfigurationProvider);
} else {
StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider;
application.handleConfigurationEvent();
}
} else {
// PropertiesFileConfigurationProvider
}
application.start();
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
}
}

启动过程说明如下:

  1. 使用 commons-cli 对命令行参数进行解析,提取 Agent 名称、配置信息读取方式及其路径信息;
  2. 配置信息可以通过文件或 ZooKeeper 的方式进行读取,两种方式都支持热加载,即我们不需要重启 Agent 就可以更新配置内容:
    • 基于文件的配置热加载是通过一个后台线程对文件进行轮询实现的;
    • 基于 ZooKeeper 的热加载则是使用了 Curator 的 NodeCache 模式,底层是 ZooKeeper 原生的监听(Watch)特性。
  3. 如果配置热更新是开启的(默认开启),配置提供方 ConfigurationProvider 就会将自身注册到 Agent 程序的组件列表中,并在 Application#start 方法调用后,由 LifecycleSupervisor 类进行启动和管理,加载和解析配置文件,从中读取组件列表。
  4. 如果热更新未开启,则配置提供方将在启动时立刻读取配置文件,并由 LifecycleSupervisor 启动和管理所有组件。
  5. 最后,main 会调用 Runtime#addShutdownHook,当 JVM 关闭时(SIGTERM 或者 Ctrl+C),Application#stop 会被用于关闭 Flume Agent,使各组件优雅退出。

阅读全文

Pandas 与数据整理

Tidy Data 论文中,Wickham 博士 提出了这样一种“整洁”的数据结构:每个变量是一列,每次观测结果是一行,不同的观测类型存放在单独的表中。他认为这样的数据结构可以帮助分析师更简单高效地进行处理、建模、和可视化。他在论文中列举了 五种 不符合整洁数据的情况,并演示了如何通过 R 语言 对它们进行整理。本文中,我们将使用 Python 和 Pandas 来达到同样的目的。

文中的源代码和演示数据可以在 GitHub(链接)上找到。读者应该已经安装好 Python 开发环境,推荐各位使用 Anaconda 和 Spyder IDE。

列名称是数据值,而非变量名

1
2
3
import pandas as pd
df = pd.read_csv('data/pew.csv')
df.head(10)

宗教信仰与收入 - Pew 论坛

表中的列“<$10k”、“$10-20k”其实是“收入”变量的具体值。变量 是指某一特性的观测值,如身高、体重,本例中则是收入、宗教信仰。表中的数值数据构成了另一个变量——人数。要做到 每个变量是一列 ,我们需要进行以下变换:

1
2
3
4
5
6
df = df.set_index('religion')
df = df.stack()
df.index = df.index.rename('income', level=1)
df.name = 'frequency'
df = df.reset_index()
df.head(10)

宗教信仰与收入 - 整洁版

阅读全文

Apache Beam 快速入门(Python 版)

Apache Beam 是一种大数据处理标准,由谷歌于 2016 年创建。它提供了一套统一的 DSL 用以处理离线和实时数据,并能在目前主流的大数据处理平台上使用,包括 Spark、Flink、以及谷歌自身的商业套件 Dataflow。Beam 的数据模型基于过去的几项研究成果:FlumeJavaMillwheel,适用场景包括 ETL、统计分析、实时计算等。目前,Beam 提供了两种语言的 SDK:Java、Python。本文将讲述如何使用 Python 编写 Beam 应用程序。

Apache Beam Pipeline

安装 Apache Beam

Apache Beam Python SDK 必须使用 Python 2.7.x 版本,你可以安装 pyenv 来管理不同版本的 Python,或者直接从源代码编译安装(需要支持 SSL)。之后,你便可以在 Python 虚拟环境中安装 Beam SDK 了:

1
2
3
$ virtualenv venv --distribute
$ source venv/bin/activate
(venv) $ pip install apache-beam

阅读全文

2017 Top 15 Python 数据科学类库;时间序列异常点检测;如何加入开源项目

2017 Top 15 Python 数据科学类库

Google Trends

近年来,Python 在数据科学领域得到了越来越多的关注,本文整理归类了使用率最高的数据科学类库,供大家参考。

NumPy、SciPy、Pandas 是 Python 数据科学的核心类库。NumPy 提供了 N 维数组、矩阵、向量等数据结构,能够进行高性能的数学运算;SciPy 包含了线性代数、拟合优化、统计学习的通用方法;Pandas 则一般用于数据清洗、探索型分析等工作。

可视化方面,Matplotlib 是最早流行的类库,提供了丰富的图形化接口,但 API 的使用方式偏底层,需要编写较多代码;Seaborn 构建在 Matplotlib 之上,重新定义了图表样式,更适合在报告、演示文档中使用,并且它还预置了诸多探索型分析函数,可以快速地对数据进行描述性可视化;Bokeh 主打交互性,它运行在浏览器中,让使用者可以方便地调节可视化参数;Plotly 也是一款基于页面的可视化工具,但因为是商业软件,需要授权后才能使用。

SciKit-Learn 是公认的 Python 机器学习标准类库,它提供了准确、统一的接口,可以方便地使用各种机器学习算法;深度学习领域,Theano 是比较老牌的类库之一,特点是能够运行于不同的系统架构之上(CPU、GPU);Tensorflow 则是最近较火的基础类库,使用它提供的各种算子和数据流工具,我们可以构建出多层神经网络,在集群上对大数据进行运算;Keras 则是一款较上层的工具库,底层使用 Theano 或 Tensorflow 作为引擎,可以通过快速构建实验来验证模型。

自然语言处理领域中,NLTK 提供了文本标记、分词、构建语料树等功能,用以揭示句中或句间的依赖关系;Gensim 则擅长构建向量空间模型、话题建模、挖掘大量文本中重复出现的模式,其算法都属于非监督学习,因此只需提供语料库就能得到结果。

原文:http://www.kdnuggets.com/2017/06/top-15-python-libraries-data-science.html

阅读全文

Hive 窗口与分析型函数

SQL 结构化查询语言是数据分析领域的重要工具之一。它提供了数据筛选、转换、聚合等操作,并能借助 Hive 和 Hadoop 进行大数据量的处理。但是,传统的 SQL 语句并不能支持诸如分组排名、滑动平均值等计算,原因是 GROUP BY 语句只能为每个分组的数据返回一行结果,而非每条数据一行。幸运的是,新版的 SQL 标准引入了窗口查询功能,使用 WINDOW 语句我们可以基于分区和窗口为每条数据都生成一行结果记录,这一标准也已得到了 Hive 的支持。

滑动平均值

举例来说,我们想要计算表中每只股票的两日滑动平均值,可以编写以下查询语句:

1
2
3
4
5
6
SELECT
`date`, `stock`, `close`
,AVG(`close`) OVER `w` AS `mavg`
FROM `t_stock`
WINDOW `w` AS (PARTITION BY `stock` ORDER BY `date`
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)

OVERWINDOW、以及 ROWS BETWEEN AND 都是新增的窗口查询关键字。在这个查询中,PARTITION BYORDER BY 的工作方式与 GROUP BYORDER BY 相似,区别在于它们不会将多行记录聚合成一条结果,而是将它们拆分到互不重叠的分区中进行后续处理。其后的 ROWS BETWEEN AND 语句用于构建一个 窗口帧。此例中,每一个窗口帧都包含了当前记录和上一条记录。下文会对窗口帧做进一步描述。最后,AVG 是一个窗口函数,用于计算每个窗口帧的结果。窗口帧的定义(WINDOW 语句)还可以直接附加到窗口函数之后:

1
SELECT AVG(`close`) OVER (PARTITION BY `stock`) AS `mavg` FROM `t_stock`;

阅读全文

实时计算工具库 stream-lib 使用指南

进行大数据处理时,计算唯一值、95% 分位数等操作非常占用空间和时间。但有时我们只是想对数据集有一个概略的了解,数值的准确性并不那么重要。实时监控系统中也是如此,可以容忍一定的错误率。目前已经有许多算法可以通过牺牲准确性来减少计算所需的空间和时间,这些算法大多支持数据结构之间的合并,因此可以方便地用在实时计算中。stream-lib 就是一个集成了很多此类算法的实时计算工具库,是对现有研究成果的 Java 实现。本文就将介绍这一工具库的使用方法。

唯一值计算 HyperLogLog

独立访客(UV)是网站的重要指标之一。我们通常会为每一个用户生成一个 UUID,并在 HTTP Cookie 中记录和跟踪,或直接使用 IP 地址做近似计算。我们可以使用一个 HashSet 来计算 UV 的准确值,但无疑会占用大量的空间。HyperLogLog 则是一种近似算法,用于解决此类唯一值计算的问题。该算法在对超过 10^9 个唯一值进行计算时可以做到 2% 的标准差,并只占用 1.5 kB 内存

1
2
3
4
5
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.9.5</version>
</dependency>
1
2
3
4
5
ICardinality card = new HyperLogLog(10);
for (int i : new int[] { 1, 2, 3, 2, 4, 3 }) {
card.offer(i);
}
System.out.println(card.cardinality()); // 4

阅读全文

使用 Binlog 和 Canal 从 MySQL 抽取数据

数据抽取是 ETL 流程的第一步。我们会将数据从 RDBMS 或日志服务器等外部系统抽取至数据仓库,进行清洗、转换、聚合等操作。在现代网站技术栈中,MySQL 是最常见的数据库管理系统,我们会从多个不同的 MySQL 实例中抽取数据,存入一个中心节点,或直接进入 Hive。市面上已有多种成熟的、基于 SQL 查询的抽取软件,如著名的开源项目 Apache Sqoop,然而这些工具并不支持实时的数据抽取。MySQL Binlog 则是一种实时的数据流,用于主从节点之间的数据复制,我们可以利用它来进行数据抽取。借助阿里巴巴开源的 Canal 项目,我们能够非常便捷地将 MySQL 中的数据抽取到任意目标存储中。

Canal

Canal 的组成部分

简单来说,Canal 会将自己伪装成 MySQL 从节点(Slave),并从主节点(Master)获取 Binlog,解析和贮存后供下游消费端使用。Canal 包含两个组成部分:服务端和客户端。服务端负责连接至不同的 MySQL 实例,并为每个实例维护一个事件消息队列;客户端则可以订阅这些队列中的数据变更事件,处理并存储到数据仓库中。下面我们来看如何快速搭建起一个 Canal 服务。

阅读全文

Apache Flume 如何解析消息中的事件时间

数据开发工作中,从上游消息队列抽取数据是一项常规的 ETL 流程。在基于 Hadoop 构建的数据仓库体系中,我们通常会使用 Flume 将事件日志从 Kafka 抽取到 HDFS,然后针对其开发 MapReduce 脚本,或直接创建以时间分区的 Hive 外部表。这项流程中的关键一环是提取日志中的事件时间,因为实时数据通常会包含延迟,且在系统临时宕机的情况下,我们需要追回遗漏的数据,因而使用的时间戳必须是事件产生的时间。Flume 提供的诸多工具能帮助我们非常便捷地实现这一点。

Apache Flume

HDFS Sink 和时间戳头信息

以下是一个基本的 HDFS Sink 配置:

1
2
3
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/flume/ds_alog/dt=%Y%m%d

%Y%m%d 是该 Sink 支持的时间占位符,它会使用头信息中 timestamp 的值来替换这些占位符。HDFS Sink 还提供了 hdfs.useLocalTimeStamp 选项,直接使用当前系统时间来替换时间占位符,但这并不是我们想要达到的目的。

我们还可以使用 Hive Sink 直接将事件日志导入成 Hive 表,它能直接和 Hive 元数据库通信,自动创建表分区,并支持分隔符分隔和 JSON 两种序列化形式。当然,它同样需要一个 timestamp 头信息。不过,我们没有选择 Hive Sink,主要出于以下原因:

  • 它不支持正则表达式,因此我们无法从类似访问日志这样的数据格式中提取字段列表;
  • 它所提取的字段列表是根据 Hive 表信息产生的。假设上游数据源在 JSON 日志中加入了新的键值,直至我们主动更新 Hive 元信息,这些新增字段将被直接丢弃。对于数据仓库来说,完整保存原始数据是很有必要的。

阅读全文

Spark Streaming 中如何实现 Exactly-Once 语义

Exactly-once 语义是实时计算的难点之一。要做到每一条记录只会被处理一次,即使服务器或网络发生故障时也能保证没有遗漏,这不仅需要实时计算框架本身的支持,还对上游的消息系统、下游的数据存储有所要求。此外,我们在编写计算流程时也需要遵循一定规范,才能真正实现 Exactly-once。本文将讲述如何结合 Spark Streaming 框架、Kafka 消息系统、以及 MySQL 数据库来实现 Exactly-once 的实时计算流程。

Spark Streaming

引例

首先让我们实现一个简单而完整的实时计算流程。我们从 Kafka 接收用户访问日志,解析并提取其中的时间和日志级别,并统计每分钟错误日志的数量,结果保存到 MySQL 中。

示例日志:

1
2
3
2017-07-30 14:09:08 ERROR some message
2017-07-30 14:09:20 INFO some message
2017-07-30 14:10:50 ERROR some message

结果表结构,其中 log_time 字段会截取到分钟级别:

1
2
3
4
create table error_log (
log_time datetime primary key,
log_count int not null default 0
);

阅读全文

通过 SQL 查询学习 Pandas 数据处理

Pandas 是一款广泛使用的数据处理工具。结合 NumPy 和 Matplotlib 类库,我们可以在内存中进行高性能的数据清洗、转换、分析及可视化工作。虽然 Python 本身是一门非常容易学习的语言,但要熟练掌握 Pandas 丰富的 API 接口及正确的使用方式,还是需要投入一定时间的。对于数据开发工程师或分析师而言,SQL 语言是标准的数据查询工具。本文提供了一系列的示例,如何将常见的 SQL 查询语句使用 Pandas 来实现。

Pandas 的安装和基本概念并不在本文讲述范围内,请读者到官网上阅读相关文档,或者阅读《利用 Python 进行数据分析》一书。我推荐大家使用 Anaconda Python 套件,其中集成了 Spyder 集成开发环境。在运行下文的代码之前,请先引入 Pandas 和 NumPy 包:

1
2
import pandas as pd
import numpy as np

FROM - 读取数据

首先,我们需要将数据加载到工作区(内存)。Pandas 原生支持非常多的数据格式,CSV 是较常见的一种。我们以航班延误时间数据集为例(下载地址):

1
2
3
4
date,delay,distance,origin,destination
02221605,3,358,BUR,SMF
01022100,-5,239,HOU,DAL
03210808,6,288,BWI,ALB

我们可以使用 pd.read_csv 函数加载它:

1
2
df = pd.read_csv('flights.csv', dtype={'date': str})
df.head()

这条命令会将 flights.csv 文件读入内存,使用首行作为列名,并自动检测每一列的数据类型。其中,由于 date 一列的日期格式是 %m%d%H%M,自动转换成数字后会失去月份的前异零(02 月的 0),因此我们显式指定了该列的 dtype,告知 Pandas 保留原值。

阅读全文