Ji ZHANG's Blog

If I rest, I rust.

深入理解Reduce-side Join

在《MapReduce Design Patterns》一书中,作者给出了Reduce-side Join的实现方法,大致步骤如下:

  1. 使用MultipleInputs指定不同的来源表和相应的Mapper类;
  2. Mapper输出的Key为Join的字段内容,Value为打了来源表标签的记录;
  3. Reducer在接收到同一个Key的记录后,执行以下两步:
    1. 遍历Values,根据标签将来源表的记录分别放到两个List中;
    2. 遍历两个List,输出Join结果。

具体实现可以参考这段代码。但是这种实现方法有一个问题:如果同一个Key的记录数过多,存放在List中就会占用很多内存,严重的会造成内存溢出(Out of Memory, OOM)。这种方法在一对一的情况下没有问题,而一对多、多对多的情况就会有隐患。那么,Hive在做Reduce-side Join时是如何避免OOM的呢?两个关键点:

  1. Reducer在遍历Values时,会将前面的表缓存在内存中,对于最后一张表则边扫描边输出;
  2. 如果前面几张表内存中放不下,就写入磁盘。

使用git Rebase让历史变得清晰

当多人协作开发一个分支时,历史记录通常如下方左图所示,比较凌乱。如果希望能像右图那样呈线性提交,就需要学习git rebase的用法。

“Merge branch”提交的产生

我们的工作流程是:修改代码→提交到本地仓库→拉取远程改动→推送。正是在git pull这一步产生的Merge branch提交。事实上,git pull等效于get fetch origin和get merge origin/master这两条命令,前者是拉取远程仓库到本地临时库,后者是将临时库中的改动合并到本地分支中。

要避免Merge branch提交也有一个“土法”:先pull、再commit、最后push。不过万一commit和push之间远程又发生了改动,还需要再pull一次,就又会产生Merge branch提交。

使用git pull —rebase

修改代码→commit→git pull —rebase→git push。也就是将get merge origin/master替换成了git rebase origin/master,它的过程是先将HEAD指向origin/master,然后逐一应用本地的修改,这样就不会产生Merge branch提交了。具体过程见下文扩展阅读。

Spark快速入门

Apache Spark是新兴的一种快速通用的大规模数据处理引擎。它的优势有三个方面:

  • 通用计算引擎 能够运行MapReduce、数据挖掘、图运算、流式计算、SQL等多种框架;
  • 基于内存 数据可缓存在内存中,特别适用于需要迭代多次运算的场景;
  • 与Hadoop集成 能够直接读写HDFS中的数据,并能运行在YARN之上。

Spark是用Scala语言编写的,所提供的API也很好地利用了这门语言的特性。它也可以使用Java和Python编写应用。本文将用Scala进行讲解。

安装Spark和SBT

  • 官网上下载编译好的压缩包,解压到一个文件夹中。下载时需注意对应的Hadoop版本,如要读写CDH4 HDFS中的数据,则应下载Pre-built for CDH4这个版本。
  • 为了方便起见,可以将spark/bin添加到$PATH环境变量中:
1
2
export SPARK_HOME=/path/to/spark
export PATH=$PATH:$SPARK_HOME/bin
  • 在练习例子时,我们还会用到SBT这个工具,它是用来编译打包Scala项目的。Linux下的安装过程比较简单:
    • 下载sbt-launch.jar到$HOME/bin目录;
    • 新建$HOME/bin/sbt文件,权限设置为755,内容如下:
1
2
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"

离线环境下构建sbt项目

在公司网络中使用sbtMaven等项目构建工具时,我们通常会搭建一个公用的Nexus镜像服务,原因有以下几个:

  • 避免重复下载依赖,节省公司带宽;
  • 国内网络环境不理想,下载速度慢;
  • IDC服务器没有外网访问权限;
  • 用于发布内部模块。

sbt的依赖管理基于Ivy,虽然它能直接使用Maven中央仓库中的Jar包,在配置时还是有一些注意事项的。

MySQL异常UTF-8字符的处理

ETL流程中,我们会将Hive中的数据导入MySQL——先用Hive命令行将数据保存为文本文件,然后用MySQL的LOAD DATA语句进行加载。最近有一张表在加载到MySQL时会报以下错误:

1
Incorrect string value: '\xF0\x9D\x8C\x86' for column ...

经查,这个字段中保存的是用户聊天记录,因此会有一些表情符号。这些符号在UTF-8编码下需要使用4个字节来记录,而MySQL中的utf8编码只支持3个字节,因此无法导入。

根据UTF-8的编码规范,3个字节支持的Unicode字符范围是U+0000–U+FFFF,因此可以在Hive中对数据做一下清洗:

1
SELECT REGEXP_REPLACE(content, '[^\\u0000-\\uFFFF]', '') FROM ...

这样就能排除那些需要使用3个以上字节来记录的字符了,从而成功导入MySQL。

以下是一些详细说明和参考资料。

在CDH 4.5上安装Shark 0.9

Spark是一个新兴的大数据计算平台,它的优势之一是内存型计算,因此对于需要多次迭代的算法尤为适用。同时,它又能够很好地融合到现有的Hadoop生态环境中,包括直接存取HDFS上的文件,以及运行于YARN之上。对于Hive,Spark也有相应的替代项目——Shark,能做到 drop-in replacement ,直接构建在现有集群之上。本文就将简要阐述如何在CDH4.5上搭建Shark0.9集群。

准备工作

  • 安装方式:Spark使用CDH提供的Parcel,以Standalone模式启动
  • 软件版本
  • 服务器基础配置
    • 可用的软件源(如中科大的源
    • 配置主节点至子节点的root账户SSH无密码登录。
    • /etc/hosts中写死IP和主机名,或者DNS做好正反解析。

Use WebJars in Scalatra Project

As I’m working with my first Scalatra project, I automatically think of using WebJars to manage Javascript library dependencies, since it’s more convenient and seems like a good practice. Though there’s no official support for Scalatra framework, the installation process is not very complex. But this doesn’t mean I didn’t spend much time on this. I’m still a newbie to Scala, and there’s only a few materials on this subject.

Add WebJars Dependency in SBT Build File

Scalatra uses .scala configuration file instead of .sbt, so let’s add dependency into project/build.scala. Take Dojo for example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
object DwExplorerBuild extends Build {
  ...
  lazy val project = Project (
    ...
    settings = Defaults.defaultSettings ++ ScalatraPlugin.scalatraWithJRebel ++ scalateSettings ++ Seq(
      ...
      libraryDependencies ++= Seq(
        ...
        "org.webjars" % "dojo" % "1.9.3"
      ),
      ...
    )
  )
}

To view this dependency in Eclipse, you need to run sbt eclipse again. In the Referenced Libraries section, you can see a dojo-1.9.3.jar, and the library lies in META-INF/resources/webjars/.

Hive小文件问题的处理

Hive的后端存储是HDFS,它对大文件的处理是非常高效的,如果合理配置文件系统的块大小,NameNode可以支持很大的数据量。但是在数据仓库中,越是上层的表其汇总程度就越高,数据量也就越小。而且这些表通常会按日期进行分区,随着时间的推移,HDFS的文件数目就会逐渐增加。

小文件带来的问题

关于这个问题的阐述可以读一读Cloudera的这篇文章。简单来说,HDFS的文件元信息,包括位置、大小、分块信息等,都是保存在NameNode的内存中的。每个对象大约占用150个字节,因此一千万个文件及分块就会占用约3G的内存空间,一旦接近这个量级,NameNode的性能就会开始下降了。

此外,HDFS读写小文件时也会更加耗时,因为每次都需要从NameNode获取元信息,并与对应的DataNode建立连接。对于MapReduce程序来说,小文件还会增加Mapper的个数,每个脚本只处理很少的数据,浪费了大量的调度时间。当然,这个问题可以通过使用CombinedInputFile和JVM重用来解决。

Java反射机制

原文:http://www.programcreek.com/2013/09/java-reflection-tutorial/

什么是反射?它有何用处?

1. 什么是反射?

“反射(Reflection)能够让运行于JVM中的程序检测和修改运行时的行为。”这个概念常常会和内省(Introspection)混淆,以下是这两个术语在Wikipedia中的解释:

  1. 内省用于在运行时检测某个对象的类型和其包含的属性;
  2. 反射用于在运行时检测和修改某个对象的结构及其行为。

从他们的定义可以看出,内省是反射的一个子集。有些语言支持内省,但并不支持反射,如C++。

反射和内省

抽象泄漏定律

原文:http://www.joelonsoftware.com/articles/LeakyAbstractions.html

TCP协议是互联网的基石,我们每天都需要依靠它来构建各类互联网应用。也正是在这一协议中,时刻发生着一件近乎神奇的事情。

TCP是一种 可靠的 数据传输协议,也就是说,当你通过TCP协议在网络上传输一条消息时,它一定会到达目的地,而且不会失真或毁坏。

我们可以使用TCP来做很多事情,从浏览网页信息到收发邮件。TCP的可靠性使得东非贪污受贿的新闻能够一字一句地传递到世界各地。真是太棒了!

和TCP协议相比,IP协议也是一种传输协议,但它是 不可靠的 。没有人可以保证你的数据一定会到达目的地,或者在它到达前就已经被破坏了。如果你发送了一组消息,不要惊讶为何只有一半的消息到达,有些消息的顺序会不正确,甚至消息的内容被替换成了黑猩猩宝宝的图片,或是一堆无法阅读的垃圾数据,像极了台湾人的邮件标题。

这就是TCP协议神奇的地方:它是构建在IP协议之上的。换句话说,TCP协议能够 使用一个不可靠的工具来可靠地传输数据