使用 Kubernetes 部署 Flink 应用

Kubernetes 是目前非常流行的容器编排系统,在其之上可以运行 Web 服务、大数据处理等各类应用。这些应用被打包在一个个非常轻量的容器中,我们通过声明的方式来告知 Kubernetes 要如何部署和扩容这些程序,并对外提供服务。Flink 同样是非常流行的分布式处理框架,它也可以运行在 Kubernetes 之上。将两者相结合,我们就可以得到一个健壮和高可扩的数据处理应用,并且能够更安全地和其它服务共享一个 Kubernetes 集群。

Flink on Kubernetes

在 Kubernetes 上部署 Flink 有两种方式:会话集群(Session Cluster)和脚本集群(Job Cluster)。会话集群和独立部署一个 Flink 集群类似,只是底层资源换成了 K8s 容器,而非直接运行在操作系统上。该集群可以提交多个脚本,因此适合运行那些短时脚本和即席查询。脚本集群则是为单个脚本部署一整套服务,包括 JobManager 和 TaskManager,运行结束后这些资源也随即释放。我们需要为每个脚本构建专门的容器镜像,分配独立的资源,因而这种方式可以更好地和其他脚本隔离开,同时便于扩容或缩容。文本将以脚本集群为例,演示如何在 K8s 上运行 Flink 实时处理程序,主要步骤如下:

  • 编译并打包 Flink 脚本 Jar 文件;
  • 构建 Docker 容器镜像,添加 Flink 运行时库和上述 Jar 包;
  • 使用 Kubernetes Job 部署 Flink JobManager 组件;
  • 使用 Kubernetes Service 将 JobManager 服务端口开放到集群中;
  • 使用 Kubernetes Deployment 部署 Flink TaskManager;
  • 配置 Flink JobManager 高可用,需使用 ZooKeeper 和 HDFS;
  • 借助 Flink SavePoint 机制来停止和恢复脚本。

阅读全文

深入理解 Hive ACID 事务表

Apache Hive 0.13 版本引入了事务特性,能够在 Hive 表上实现 ACID 语义,包括 INSERT/UPDATE/DELETE/MERGE 语句、增量数据抽取等。Hive 3.0 又对该特性进行了优化,包括改进了底层的文件组织方式,减少了对表结构的限制,以及支持条件下推和向量化查询。Hive 事务表的介绍和使用方法可以参考 Hive Wiki各类教程,本文将重点讲述 Hive 事务表是如何在 HDFS 上存储的,及其读写过程是怎样的。

文件结构

插入数据

1
2
3
4
5
6
7
CREATE TABLE employee (id int, name string, salary int)
STORED AS ORC TBLPROPERTIES ('transactional' = 'true');

INSERT INTO employee VALUES
(1, 'Jerry', 5000),
(2, 'Tom', 8000),
(3, 'Kate', 6000);

INSERT 语句会在一个事务中运行。它会创建名为 delta 的目录,存放事务的信息和表的数据。

1
2
3
/user/hive/warehouse/employee/delta_0000001_0000001_0000
/user/hive/warehouse/employee/delta_0000001_0000001_0000/_orc_acid_version
/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

目录名称的格式为 delta_minWID_maxWID_stmtID,即 delta 前缀、写事务的 ID 范围、以及语句 ID。具体来说:

  • 所有 INSERT 语句都会创建 delta 目录。UPDATE 语句也会创建 delta 目录,但会先创建一个 delete 目录,即先删除、后插入。delete 目录的前缀是 delete_delta;
  • Hive 会为所有的事务生成一个全局唯一的 ID,包括读操作和写操作。针对写事务(INSERT、DELETE 等),Hive 还会创建一个写事务 ID(Write ID),该 ID 在表范围内唯一。写事务 ID 会编码到 deltadelete 目录的名称中;
  • 语句 ID(Statement ID)则是当一个事务中有多条写入语句时使用的,用作唯一标识。

阅读全文

使用 Apache Flink 开发实时 ETL

Apache Flink 是大数据领域又一新兴框架。它与 Spark 的不同之处在于,它是使用流式处理来模拟批量处理的,因此能够提供亚秒级的、符合 Exactly-once 语义的实时处理能力。Flink 的使用场景之一是构建实时的数据通道,在不同的存储之间搬运和转换数据。本文将介绍如何使用 Flink 开发实时 ETL 程序,并介绍 Flink 是如何保证其 Exactly-once 语义的。

Apache Flink

示例程序

让我们来编写一个从 Kafka 抽取数据到 HDFS 的程序。数据源是一组事件日志,其中包含了事件发生的时间,以时间戳的方式存储。我们需要将这些日志按事件时间分别存放到不同的目录中,即按日分桶。时间日志示例如下:

1
2
3
{"timestamp":1545184226.432,"event":"page_view","uuid":"ac0e50bf-944c-4e2f-bbf5-a34b22718e0c"}
{"timestamp":1545184602.640,"event":"adv_click","uuid":"9b220808-2193-44d1-a0e9-09b9743dec55"}
{"timestamp":1545184608.969,"event":"thumbs_up","uuid":"b44c3137-4c91-4f36-96fb-80f56561c914"}

产生的目录结构为:

1
2
/user/flink/event_log/dt=20181219/part-0-1
/user/flink/event_log/dt=20181220/part-1-9

阅读全文

Spark DataSource API V2

Spark 1.3 引入了第一版的数据源 API,我们可以使用它将常见的数据格式整合到 Spark SQL 中。但是,随着 Spark 的不断发展,这一 API 也体现出了其局限性,故而 Spark 团队不得不加入越来越多的专有代码来编写数据源,以获得更好的性能。Spark 2.3 中,新一版的数据源 API 初见雏形,它克服了上一版 API 的种种问题,原来的数据源代码也在逐步重写。本文将演示这两版 API 的使用方法,比较它们的不同之处,以及新版 API 的优势在哪里。

DataSource V1 API

V1 API 由一系列的抽象类和接口组成,它们位于 spark/sql/sources/interfaces.scala 文件中。主要的内容有:

1
2
3
4
5
6
7
8
9
10
11
12
trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructType
}

trait TableScan {
def buildScan(): RDD[Row]
}

通过实现 RelationProvider 接口,表明该类是一种新定义的数据源,可以供 Spark SQL 取数所用。传入 createRelation 方法的参数可以用来做初始化,如文件路径、权限信息等。BaseRelation 抽象类则用来定义数据源的表结构,它的来源可以是数据库、Parquet 文件等外部系统,也可以直接由用户指定。该类还必须实现某个 Scan 接口,Spark 会调用 buildScan 方法来获取数据源的 RDD,我们将在下文看到。

阅读全文

Flume 源码解析:HDFS Sink

Apache Flume 数据流程的最后一部分是 Sink,它会将上游抽取并转换好的数据输送到外部存储中去,如本地文件、HDFS、ElasticSearch 等。本文将通过分析源码来展现 HDFS Sink 的工作流程。

Sink 组件的生命周期

在上一篇文章中, 我们了解到 Flume 组件都会实现 LifecycleAware 接口,并由 LifecycleSupervisor 实例管理和监控。不过,Sink 组件并不直接由它管理,而且被包装在了 SinkRunnerSinkProcessor 这两个类中。Flume 支持三种 Sink 处理器,该处理器会将 Channel 和 Sink 以不同的方式连接起来。这里我们只讨论 DefaultSinkProcessor 的情况,即一个 Channel 只会连接一个 Sink。同时,我们也将略过对 Sink 分组的讨论。

Sink Component LifeCycle

阅读全文

Java 空指针异常的若干解决方案

Java 中任何对象都有可能为空,当我们调用空对象的方法时就会抛出 NullPointerException 空指针异常,这是一种非常常见的错误类型。我们可以使用若干种方法来避免产生这类异常,使得我们的代码更为健壮。本文将列举这些解决方案,包括传统的空值检测、编程规范、以及使用现代 Java 语言引入的各类工具来作为辅助。

运行时检测

最显而易见的方法就是使用 if (obj == null) 来对所有需要用到的对象来进行检测,包括函数参数、返回值、以及类实例的成员变量。当你检测到 null 值时,可以选择抛出更具针对性的异常类型,如 IllegalArgumentException,并添加消息内容。我们可以使用一些库函数来简化代码,如 Java 7 开始提供的 Objects#requireNonNull 方法:

1
2
3
4
public void testObjects(Object arg) {
Object checked = Objects.requireNonNull(arg, "arg must not be null");
checked.toString();
}

Guava 的 Preconditions 类中也提供了一系列用于检测参数合法性的工具函数,其中就包含空值检测:

1
2
3
4
public void testGuava(Object arg) {
Object checked = Preconditions.checkNotNull(arg, "%s must not be null", "arg");
checked.toString();
}

我们还可以使用 Lombok 来生成空值检测代码,并抛出带有提示信息的空指针异常:

1
2
3
public void testLombok(@NonNull Object arg) {
arg.toString();
}

生成的代码如下:

1
2
3
4
5
6
public void testLombokGenerated(Object arg) {
if (arg == null) {
throw new NullPointerException("arg is marked @NonNull but is null");
}
arg.toString();
}

这个注解还可以用在类实例的成员变量上,所有的赋值操作会自动进行空值检测。

阅读全文

是否需要使用 ESLint jsx-no-bind 规则?

在使用 ESLint React 插件时,有一条名为 jsx-no-bind 的检测规则,它会禁止我们在 JSX 属性中使用 .bind 方法和箭头函数。比如下列代码,ESLint 会提示 onClick 属性中的箭头函数不合法:

1
2
3
4
5
6
7
8
9
10
11
class ListArrow extends React.Component {
render() {
return (
<ul>
{this.state.items.map(item => (
<li key={item.id} onClick={() => { alert(item.id) }}>{item.text}</li>
))}
</ul>
)
}
}

这条规则的引入原因有二。首先,每次执行 render 方法时都会生成一个新的匿名函数对象,这样就会对垃圾回收器造成负担;其次,属性中的箭头函数会影响渲染过程:当你使用了 PureComponent,或者自己实现了 shouldComponentUpdate 方法,使用对象比较的方式来决定是否要重新渲染组件,那么组件属性中的箭头函数就会让该方法永远返回真值,引起不必要的重复渲染。

然而,反对的声音认为这两个原因还不足以要求我们在所有代码中应用该规则,特别是当需要引入更多代码、并牺牲一定可读性的情况下。在 Airbnb ESLint 预设规则集中,只禁止了 .bind 方法的使用,而允许在属性(props)或引用(refs)中使用箭头函数。对此我翻阅了文档,阅读了一些关于这个话题的博客,也认为这条规则有些过于严格。甚至还有博主称该规则是一种过早优化(premature optimization),我们需要先做基准测试,再着手修改代码。下文中,我将简要叙述箭头函数是如何影响渲染过程的,有哪些可行的解决方案,以及它为何不太重要。

阅读全文

TensorFlow 模型如何对外提供服务

TensorFlow 是目前最为流行的机器学习框架之一,通过它我们可以便捷地构建机器学习模型。使用 TensorFlow 模型对外提供服务有若干种方式,本文将介绍如何使用 SavedModel 机制来编写模型预测接口。

鸢尾花深层神经网络分类器

首先让我们使用 TensorFlow 的深层神经网络模型来构建一个鸢尾花的分类器。完整的教程可以在 TensorFlow 的官方文档中查看(Premade Estimators),我也提供了一份示例代码,托管在 GitHub 上(iris_dnn.py),读者可以克隆到本地进行测试。以下是部分代码摘要:

1
2
3
4
5
6
7
8
9
10
11
12
13
feature_columns = [tf.feature_column.numeric_column(key=key)
for key in train_x.keys()]
classifier = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
hidden_units=[10, 10],
n_classes=3)

classifier.train(
input_fn=lambda: train_input_fn(train_x, train_y, batch_size=BATCH_SIZE),
steps=STEPS)

predictions = classifier.predict(
input_fn=lambda: eval_input_fn(predict_x, labels=None, batch_size=BATCH_SIZE))

阅读全文

使用 Python 和 Thrift 连接 HBase

Apache HBase 是 Hadoop 生态环境中的键值存储系统(Key-value Store)。它构建在 HDFS 之上,可以对大型数据进行高速的读写操作。HBase 的开发语言是 Java,因此提供了原生的 Java 语言客户端。不过,借助于 Thrift 和其丰富的语言扩展,我们可以十分便捷地在任何地方调用 HBase 服务。文本将讲述的就是如何使用 Thrift 和 Python 来读写 HBase。

生成 Thrift 类定义

如果你对 Apache Thrift 并不熟悉,它提供了一套 IDL(接口描述语言),用于定义远程服务的方法签名和数据类型,并能将其转换成所需要的目标语言。举例来说,以下是用该 IDL 定义的一个数据结构:

1
2
3
4
5
struct TColumn {
1: required binary family,
2: optional binary qualifier,
3: optional i64 timestamp
}

转换后的 Python 代码是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TColumn(object):
def __init__(self, family=None, qualifier=None, timestamp=None,):
self.family = family
self.qualifier = qualifier
self.timestamp = timestamp

def read(self, iprot):
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
# ...

def write(self, oprot):
oprot.writeStructBegin('TColumn')
# ...

阅读全文

Vuex 严格模式下的表单处理

在使用 Vue 进行表单处理时,我们通常会使用 v-model 来建立双向绑定。但是,如果将表单数据交由 Vuex 管理,这时的双向绑定就会引发问题,因为在 严格模式 下,Vuex 是不允许在 Mutation 之外的地方修改状态数据的。以下用一个简单的项目举例说明,完整代码可在 GitHub(链接) 查看。

src/store/table.js

1
2
3
4
5
6
7
8
export default {
state: {
namespaced: true,
table: {
table_name: ''
}
}
}

src/components/NonStrict.vue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<b-form-group label="表名:">
<b-form-input v-model="table.table_name" />
</b-form-group>

<script>
import { mapState } from 'vuex'

export default {
computed: {
...mapState('table', [
'table'
])
}
}
</script>

当我们在“表名”字段输入文字时,浏览器会报以下错误:

1
2
3
4
错误:[vuex] 禁止在 Mutation 之外修改 Vuex 状态数据。
at assert (vuex.esm.js?358c:97)
at Vue.store._vm.$watch.deep (vuex.esm.js?358c:746)
at Watcher.run (vue.esm.js?efeb:3233)

当然,我们可以选择不开启严格模式,只是这样就无法通过工具追踪到每一次的状态变动了。下面我将列举几种解决方案,描述如何在严格模式下进行表单处理。

阅读全文