Hadoop版本梳理

Posted on

Hadoop版本梳理

由于Hadoop版本混乱多变,因此,Hadoop的版本选择问题一直令很多初级用户苦恼。本文总结了Apache Hadoop和Cloudera Hadoop的版本衍化过程,并给出了选择Hadoop版本的一些建议。

1. Apache Hadoop

1.1 Apache版本衍化

截至目前(2012年12月23日),Apache Hadoop版本分为两代,我们将第一代Hadoop称为Hadoop 1.0,第二代Hadoop称为Hadoop 2.0。第一代Hadoop包含三个大版本,分别是0.20.x,0.21.x和0.22.x,其中,0.20.x最后演化成1.0.x,变成了稳定版,而0.21.x和0.22.x则NameNode HA等新的重大特性。第二代Hadoop包含两个版本,分别是0.23.x和2.x,它们完全不同于Hadoop 1.0,是一套全新的架构,均包含HDFS Federation和YARN两个系统,相比于0.23.x,2.x增加了NameNode HA和Wire-compatibility两个重大特性。

经过上面的大体解释,大家可能明白了Hadoop以重大特性区分各个版本的,总结起来,用于区分Hadoop版本的特性有以下几个:

(1)Append 支持文件追加功能,如果想使用HBase,需要这个特性。

(2)RAID 在保证数据可靠的前提下,通过引入校验码较少数据块数目。详细链接:

https://issues.apache.org/jira/browse/HDFS/component/12313080

(3)Symlink 支持HDFS文件链接,具体可参考: https://issues.apache.org/jira/browse/HDFS-245

(4)Security Hadoop安全,具体可参考:https://issues.apache.org/jira/browse/HADOOP-4487

(5) NameNode HA 具体可参考:https://issues.apache.org/jira/browse/HDFS-1064

(6) HDFS Federation和YARN

需要注意的是,Hadoop 2.0主要由Yahoo独立出来的hortonworks公司主持开发。

1.2 Apache版本下载

(1) 各版本说明:http://hadoop.apache.org/releases.html

(2) 下载稳定版:找到一个镜像,下载stable文件夹下的版本。

(3) Hadoop最全版本:http://svn.apache.org/repos/asf/hadoop/common/branches/,可直接导到eclipse中。

2. Cloudera Hadoop

2.1 CDH版本衍化

Apache当前的版本管理是比较混乱的,各种版本层出不穷,让很多初学者不知所措,相比之下,Cloudera公司的Hadoop版本管理的要很多。

我们知道,Hadoop遵从Apache开源协议,用户可以免费地任意使用和修改Hadoop,也正因此,市面上出现了很多Hadoop版本,其中比较出名的一是Cloudera公司的发行版,我们将该版本称为CDH(Cloudera Distribution Hadoop)。截至目前为止,CDH共有4个版本,其中,前两个已经不再更新,最近的两个,分别是CDH3(在Apache Hadoop 0.20.2版本基础上演化而来的)和CDH4在Apache Hadoop 2.0.0版本基础上演化而来的),分别对应Apache的Hadoop 1.0和Hadoop 2.0,它们每隔一段时间便会更新一次。

Cloudera以patch level划分小版本,比如patch level为923.142表示在原生态Apache Hadoop 0.20.2基础上添加了1065个patch(这些patch是各个公司或者个人贡献的,在Hadoop jira上均有记录),其中923个是最后一个beta版本添加的patch,而142个是稳定版发行后新添加的patch。由此可见,patch level越高,功能越完备且解决的bug越多。

Cloudera版本层次更加清晰,且它提供了适用于各种操作系统的Hadoop安装包,可直接使用apt-get或者yum命令进行安装,更加省事。

2.2 CDH版本下载

(1) 版本含义介绍:

https://ccp.cloudera.com/display/DOC/CDH+Version+and+Packaging+Information

(2)各版本特性查看:

https://ccp.cloudera.com/display/DOC/CDH+Packaging+Information+for+Previous+Releases

(3)各版本下载:

CDH3:http://archive.cloudera.com/cdh/3/

CDH4:http://archive.cloudera.com/cdh4/cdh/4/

注意,Hadoop压缩包在这两个链接中的最上层目录中,不在某个文件夹里,很多人进到链接还找不到安装包!

3. 如何选择Hadoop版本

当前Hadoop版本比较混乱,让很多用户不知所措。实际上,当前Hadoop只有两个版本:Hadoop 1.0和Hadoop 2.0,其中,Hadoop 1.0由一个分布式文件系统HDFS和一个离线计算框架MapReduce组成,而Hadoop 2.0则包含一个支持NameNode横向扩展的HDFS,一个资源管理系统YARN和一个运行在YARN上的离线计算框架MapReduce。相比于Hadoop 1.0,Hadoop 2.0功能更加强大,且具有更好的扩展性、性能,并支持多种计算框架。

当我们决定是否采用某个软件用于开源环境时,通常需要考虑以下几个因素:

(1)是否为开源软件,即是否免费。

(2) 是否有稳定版,这个一般软件官方网站会给出说明。

(3) 是否经实践验证,这个可通过检查是否有一些大点的公司已经在生产环境中使用知道。

(4) 是否有强大的社区支持,当出现一个问题时,能够通过社区、论坛等网络资源快速获取解决方法。

如今Hadoop 2.0已经发布了最新的稳定版2.2.0,推荐使用该版本,具体介绍可阅读:“Hadoop 2.0稳定版本2.2.0新特性剖析”,升级方法可参考:“Hadoop升级方案(二):从Hadoop 1.0升级到2.0(1)”。 来源: [http://dongxicheng.org/mapreduce-nextgen/how-to-select-hadoop-versions/](http://dongxicheng.org/mapreduce-nextgen/how-to-select-hadoop-versions/)

另外一篇:

hadoop的版本问题

现在hadoop的版本比较乱,常常搞不清楚版本之间的关系,下面简单的摘要了,apache hadoop和cloudera hadoop 的版本的演化.

apache hadoop官方给出的版本说明是:

1.0.X -current stable version, 1.0 release

1.1.X -current beta version, 1.1 release

2.X.X -current alpha version

0.23.X -simmilar to 2.X.X but missing NN HA.

0.22.X -does not include security

0.20.203.X -old legacy stable version

0.20.X -old legacy version

下图来自http://blog.cloudera.com/blog/2012/01/an-update-on-apache-hadoop-1-0/

可以简单说明apache hadoop和cloudera hadoop版本之间的变化关系

diagram-3

0.20.x版本最后演化成了现在的1.0.x版本

0.23.x版本最后演化成了现在的2.x版本

hadoop 1.0 指的是1.x(0.20.x),0.21,0.22

hadoop 2.0 指的是2.x,0.23.x

CDH3,CDH4分别对应了hadoop1.0 hadoop2.0

董的博客有2篇文章也很清晰的解释了,hadoop版本以及各自的版本特性:

http://dongxicheng.org/mapreduce-nextgen/how-to-select-hadoop-versions/

http://dongxicheng.org/mapreduce-nextgen/hadoop-2-0-terms-explained/

apache-hadoop-versions

最后给出常见的下载hadoop不同版本的地址:

http://archive.apache.org/dist/hadoop/core/

http://archive.cloudera.com/cdh/3/

http://archive.cloudera.com/cdh4/cdh/4/

另外附注一个 hadoop各商业发行版的比较:

http://www.xiaohui.org/archives/795.html 来源: <hadoop的版本问题 - 阿笨猫 - 博客园>

Hadoop集群_Hadoop安装配置

Posted on

Hadoop集群_Hadoop安装配置

Hadoop集群(第5期)_Hadoop安装配置

1、集群部署介绍

1.1 Hadoop简介

  Hadoop是Apache软件基金会旗下的一个开源分布式计算平台。以Hadoop分布式文件系统(HDFS,Hadoop Distributed Filesystem)和MapReduce(Google MapReduce的开源实现)为核心的Hadoop为用户提供了系统底层细节透明的分布式基础架构。

对于Hadoop的集群来讲,可以分成两大类角色:Master和Salve。一个HDFS集群是由一个NameNode和若干个DataNode组成的。其中NameNode作为主服务器,管理文件系统的命名空间和客户端对文件系统的访问操作;集群中的DataNode管理存储的数据。MapReduce框架是由一个单独运行在主节点上的JobTracker和运行在每个集群从节点的TaskTracker共同组成的。主节点负责调度构成一个作业的所有任务,这些任务分布在不同的从节点上。主节点监控它们的执行情况,并且重新执行之前的失败任务;从节点仅负责由主节点指派的任务。当一个Job被提交时,JobTracker接收到提交作业和配置信息之后,就会将配置信息等分发给从节点,同时调度任务并监控TaskTracker的执行。

从上面的介绍可以看出,HDFS和MapReduce共同组成了Hadoop分布式系统体系结构的核心。HDFS在集群上实现分布式文件系统,MapReduce在集群上实现了分布式计算和任务处理。HDFS在MapReduce任务处理过程中提供了文件操作和存储等支持,MapReduce在HDFS的基础上实现了任务的分发、跟踪、执行等工作,并收集结果,二者相互作用,完成了Hadoop分布式集群的主要任务。

1.2 环境说明

集群中包括4个节点:1个Master,3个Salve,节点之间局域网连接,可以相互ping通,具体集群信息可以查看"Hadoop集群(第2期)"。节点IP地址分布如下:

机器名称

IP地址Master.Hadoop

192.168.1.2Salve1.Hadoop

192.168.1.3Salve2.Hadoop

192.168.1.4Salve3.Hadoop

192.168.1.5

四个节点上均是CentOS6.0系统,并且有一个相同的用户hadoop。Master机器主要配置NameNode和JobTracker的角色,负责总管分布式数据和分解任务的执行;3个Salve机器配置DataNode和TaskTracker的角色,负责分布式数据存储以及任务的执行。其实应该还应该有1个Master机器,用来作为备用,以防止Master服务器宕机,还有一个备用马上启用。后续经验积累一定阶段后补上一台备用Master机器。

1.3 网络配置

Hadoop集群要按照1.2小节表格所示进行配置,我们在"Hadoop集群(第1期)"的CentOS6.0安装过程就按照提前规划好的主机名进行安装和配置。如果实验室后来人在安装系统时,没有配置好,不要紧,没有必要重新安装,在安装完系统之后仍然可以根据后来的规划对机器的主机名进行修改。

下面的例子我们将以Master机器为例,即主机名为"Master.Hadoop",IP为"192.168.1.2"进行一些主机名配置的相关操作。其他的Slave机器以此为依据进行修改。

1)查看当前机器名称

用下面命令进行显示机器名称,如果跟规划的不一致,要按照下面进行修改。

hostname

上图中,用"hostname"查"Master"机器的名字为"Master.Hadoop",与我们预先规划的一致。

2)修改当前机器名称

假定我们发现我们的机器的主机名不是我们想要的,通过对"/etc/sysconfig/network"文件修改其中"HOSTNAME"后面的值,改成我们规划的名称。

这个"/etc/sysconfig/network"文件是定义hostname和是否利用网络的不接触网络设备的对系统全体定义的文件。

设定形式:设定值=值

"/etc/sysconfig/network"的设定项目如下:

NETWORKING 是否利用网络

GATEWAY 默认网关

IPGATEWAYDEV 默认网关的接口名

HOSTNAME 主机名

DOMAIN 域名

用下面命令进行修改当前机器的主机名(备注:修改系统文件一般用root用户)

vim /etc/sysconfig/network

通过上面的命令我们从"/etc/sysconfig/network"中找到"HOSTNAME"进行修改,查看内容如下:

3)修改当前机器IP

假定我们的机器连IP在当时安装机器时都没有配置好,那此时我们需要对"ifcfg-eth0"文件进行配置,该文件位于"/etc/sysconfig/network-scripts"文件夹下。

在这个目录下面,存放的是网络接口(网卡)的制御脚本文件(控制文件),ifcfg- eth0是默认的第一个网络接口,如果机器中有多个网络接口,那么名字就将依此类推ifcfg-eth1,ifcfg-eth2,ifcfg- eth3,……。

这里面的文件是相当重要的,涉及到网络能否正常工作。

设定形式:设定值=值

设定项目项目如下:

DEVICE 接口名(设备,网卡)

BOOTPROTO IP的配置方法(static:固定IP, dhcpHCP, none:手动)

HWADDR MAC地址

ONBOOT 系统启动的时候网络接口是否有效(yes/no)

TYPE 网络类型(通常是Ethemet)

NETMASK 网络掩码

IPADDR IP地址

IPV6INIT IPV6是否有效(yes/no)

GATEWAY 默认网关IP地址

查看"/etc/sysconfig/network-scripts/ifcfg-eth0"内容,如果IP不复核,就行修改。

如果上图中IP与规划不相符,用下面命令进行修改:

vim /etc/sysconfig/network-scripts/ifcgf-eth0

修改完之后可以用"ifconfig"进行查看。

4)配置hosts文件(必须)

"/etc/hosts"这个文件是用来配置主机将用的DNS服务器信息,是记载LAN内接续的各主机的对应[HostName和IP]用的。当用户在进行网络连接时,首先查找该文件,寻找对应主机名(或域名)对应的IP地址。

我们要测试两台机器之间知否连通,一般用"ping 机器的IP",如果想用"ping 机器的主机名"发现找不见该名称的机器,解决的办法就是修改"/etc/hosts"这个文件,通过把LAN内的各主机的IP地址和HostName的一一对应写入这个文件的时候,就可以解决问题。

例如:机器为"Master.Hadoop:192.168.1.2"对机器为"Salve1.Hadoop:192.168.1.3"用命令"ping"记性连接测试。测试结果如下:

从上图中的值,直接对IP地址进行测试,能够ping通,但是对主机名进行测试,发现没有ping通,提示"unknown host——未知主机",这时查看"Master.Hadoop"的"/etc/hosts"文件内容。

发现里面没有"192.168.1.3 Slave1.Hadoop"内容,故而本机器是无法对机器的主机名为"Slave1.Hadoop" 解析。

在进行Hadoop集群配置中,需要在"/etc/hosts"文件中添加集群中所有机器的IP与主机名,这样Master与所有的Slave机器之间不仅可以通过IP进行通信,而且还可以通过主机名进行通信。所以在所有的机器上的"/etc/hosts"文件末尾中都要添加如下内容:

192.168.1.2 Master.Hadoop

192.168.1.3 Slave1.Hadoop

192.168.1.4 Slave2.Hadoop

192.168.1.5 Slave3.Hadoop

用以下命令进行添加:

vim /etc/hosts

添加结果如下:

现在我们在进行对机器为"Slave1.Hadoop"的主机名进行ping通测试,看是否能测试成功。

从上图中我们已经能用主机名进行ping通了,说明我们刚才添加的内容,在局域网内能进行DNS解析了,那么现在剩下的事儿就是在其余的Slave机器上进行相同的配置。然后进行测试。(备注:当设置SSH无密码验证后,可以"scp"进行复制,然后把原来的"hosts"文件执行覆盖即可。)

1.4 所需软件

1)JDK软件

下载地址:http://www.oracle.com/technetwork/java/javase/index.html

JDK版本:jdk-6u31-linux-i586.bin

2)Hadoop软件

下载地址:http://hadoop.apache.org/common/releases.html

Hadoop版本:hadoop-1.0.0.tar.gz

1.5 VSFTP上传

在"Hadoop集群(第3期)"讲了VSFTP的安装及配置,如果没有安装VSFTP可以按照该文档进行安装。如果安装好了,就可以通过FlashFXP.exe软件把我们下载的JDK6.0和Hadoop1.0软件上传到"Master.Hadoop:192.168.1.2"服务器上。

刚才我们用一般用户(hadoop)通过FlashFXP软件把所需的两个软件上传了跟目下,我们通过命令查看下一下是否已经上传了。

从图中,我们的所需软件已经准备好了。

2、SSH无密码验证配置

Hadoop运行过程中需要管理远端Hadoop守护进程,在Hadoop启动以后,NameNode是通过SSH(Secure Shell)来启动和停止各个DataNode上的各种守护进程的。这就必须在节点之间执行指令的时候是不需要输入密码的形式,故我们需要配置SSH运用无密码公钥认证的形式,这样NameNode使用SSH无密码登录并启动DataName进程,同样原理,DataNode上也能使用SSH无密码登录到NameNode。

2.1 安装和启动SSH协议

在"Hadoop集群(第1期)"安装CentOS6.0时,我们选择了一些基本安装包,所以我们需要两个服务:ssh和rsync已经安装了。可以通过下面命令查看结果显示如下:

rpm –qa | grep openssh

rpm –qa | grep rsync

假设没有安装ssh和rsync,可以通过下面命令进行安装。

yum install ssh 安装SSH协议

yum install rsync (rsync是一个远程数据同步工具,可通过LAN/WAN快速同步多台主机间的文件)

service sshd restart 启动服务

确保所有的服务器都安装,上面命令执行完毕,各台机器之间可以通过密码验证相互登。

2.2 配置Master无密码登录所有Salve

1)SSH无密码原理

Master(NameNode | JobTracker)作为客户端,要实现无密码公钥认证,连接到服务器Salve(DataNode | Tasktracker)上时,需要在Master上生成一个密钥对,包括一个公钥和一个私钥,而后将公钥复制到所有的Slave上。当Master通过SSH连接Salve时,Salve就会生成一个随机数并用Master的公钥对随机数进行加密,并发送给Master。Master收到加密数之后再用私钥解密,并将解密数回传给Slave,Slave确认解密数无误之后就允许Master进行连接了。这就是一个公钥认证过程,其间不需要用户手工输入密码。重要过程是将客户端Master复制到Slave上。

2)Master机器上生成密码对

在Master节点上执行以下命令:

ssh-keygen –t rsa –P ''

这条命是生成其无密码密钥对,询问其保存路径时直接回车采用默认路径。生成的密钥对:id_rsa和id_rsa.pub,默认存储在"/home/hadoop/.ssh"目录下。

查看"/home/hadoop/"下是否有".ssh"文件夹,且".ssh"文件下是否有两个刚生产的无密码密钥对。

接着在Master节点上做如下配置,把id_rsa.pub追加到授权的key里面去。

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

在验证前,需要做两件事儿。第一件事儿是修改文件"authorized_keys"权限(权限的设置非常重要,因为不安全的设置安全设置,会让你不能使用RSA功能),另一件事儿是用root用户设置"/etc/ssh/sshd_config"的内容。使其无密码登录有效。

1)修改文件"authorized_keys"

chmod 600 ~/.ssh/authorized_keys

备注:如果不进行设置,在验证时,扔提示你输入密码,在这里花费了将近半天时间来查找原因。在网上查到了几篇不错的文章,把作为"Hadoop集群_第5期副刊_JDK和SSH无密码配置"来帮助额外学习之用。

2)设置SSH配置

root用户登录服务器修改SSH配置文件"/etc/ssh/sshd_config"的下列内容。

RSAAuthentication yes /# 启用 RSA 认证

PubkeyAuthentication yes /# 启用公钥私钥配对认证方式

AuthorizedKeysFile .ssh/authorized_keys /# 公钥文件路径(和上面生成的文件同)

设置完之后记得重启SSH服务,才能使刚才设置有效。

service sshd restart

退出root登录,使用hadoop普通用户验证是否成功。

ssh localhost

从上图中得知无密码登录本级已经设置完毕,接下来的事儿是把公钥复制所有的Slave机器上。使用下面的命令格式进行复制公钥:

scp ~/.ssh/id_rsa.pub 远程用户名@远程服务器IP:~/

例如:

scp ~/.ssh/id_rsa.pub hadoop@192.168.1.3:~/

上面的命令是复制文件"id_rsa.pub"到服务器IP为"192.168.1.3"的用户为"hadoop"的"/home/hadoop/"下面。

下面就针对IP为"192.168.1.3"的Slave1.Hadoop的节点进行配置。

1)把Master.Hadoop上的公钥复制到Slave1.Hadoop上


从上图中我们得知,已经把文件"id_rsa.pub"传过去了,因为并没有建立起无密码连接,所以在连接时,仍然要提示输入输入Slave1.Hadoop服务器用户hadoop的密码。为了确保确实已经把文件传过去了,用SecureCRT登录Slave1.Hadoop:192.168.1.3服务器,查看"/home/hadoop/"下是否存在这个文件。

从上面得知我们已经成功把公钥复制过去了。

2)在"/home/hadoop/"下创建".ssh"文件夹

这一步并不是必须的,如果在Slave1.Hadoop的"/home/hadoop"已经存在就不需要创建了,因为我们之前并没有对Slave机器做过无密码登录配置,所以该文件是不存在的。用下面命令进行创建。(备注:用hadoop登录系统,如果不涉及系统文件修改,一般情况下都是用我们之前建立的普通用户hadoop进行执行命令。)

mkdir ~/.ssh

然后是修改文件夹".ssh"的用户权限,把他的权限修改为"700",用下面命令执行:

chmod 700 ~/.ssh

备注:如果不进行,即使你按照前面的操作设置了"authorized_keys"权限,并配置了"/etc/ssh/sshd_config",还重启了sshd服务,在Master能用"ssh localhost"进行无密码登录,但是对Slave1.Hadoop进行登录仍然需要输入密码,就是因为".ssh"文件夹的权限设置不对。这个文件夹".ssh"在配置SSH无密码登录时系统自动生成时,权限自动为"700",如果是自己手动创建,它的组权限和其他权限都有,这样就会导致RSA无密码远程登录失败。

对比上面两张图,发现文件夹".ssh"权限已经变了。

3)追加到授权文件"authorized_keys"

到目前为止Master.Hadoop的公钥也有了,文件夹".ssh"也有了,且权限也修改了。这一步就是把Master.Hadoop的公钥追加到Slave1.Hadoop的授权文件"authorized_keys"中去。使用下面命令进行追加并修改"authorized_keys"文件权限:

cat ~/id_rsa.pub >> ~/.ssh/authorized_keys

chmod 600 ~/.ssh/authorized_keys

4)用root用户修改"/etc/ssh/sshd_config"

**具体步骤参考前面Master.Hadoop的"设置SSH配置**",具体分为两步:第1是修改配置文件;第2是重启SSH服务。

5)用Master.Hadoop使用SSH无密码登录Slave1.Hadoop

当前面的步骤设置完毕,就可以使用下面命令格式进行SSH无密码登录了。

ssh 远程服务器IP

从上图我们主要3个地方,第1个就是SSH无密码登录命令,第2、3个就是登录前后"@"后面的机器名变了,由"Master"变为了"Slave1",这就说明我们已经成功实现了SSH无密码登录了。

最后记得把"/home/hadoop/"目录下的"id_rsa.pub"文件删除掉。

rm –r ~/id_rsa.pub

到此为止,我们经过前5步已经实现了从"Master.Hadoop"到"Slave1.Hadoop"SSH无密码登录,下面就是重复上面的步骤把剩余的两台(Slave2.Hadoop和Slave3.Hadoop)Slave服务器进行配置。这样,我们就完成了"配置Master无密码登录所有的Slave服务器"。

2.3 配置所有Slave无密码登录Master

和Master无密码登录所有Slave原理一样,就是把Slave的公钥追加到Master的".ssh"文件夹下的"authorized_keys"中,记得是追加(>>)

为了说明情况,我们现在就以"Slave1.Hadoop"无密码登录"Master.Hadoop"为例,进行一遍操作,也算是巩固一下前面所学知识,剩余的"Slave2.Hadoop"和"Slave3.Hadoop"就按照这个示例进行就可以了。

首先创建"Slave1.Hadoop"自己的公钥和私钥,并把自己的公钥追加到"authorized_keys"文件中。用到的命令如下:

ssh-keygen –t rsa –P ''

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

接着是用命令"scp"复制"Slave1.Hadoop"的公钥"id_rsa.pub"到"Master.Hadoop"的"/home/hadoop/"目录下,并追加到"Master.Hadoop"的"authorized_keys"中。

1)在"Slave1.Hadoop"服务器的操作

用到的命令如下:

scp ~/.ssh/id_rsa.pub hadoop@192.168.1.2:~/


2)在"Master.Hadoop"服务器的操作

用到的命令如下:

cat ~/id_rsa.pub >> ~/.ssh/authorized_keys

然后删除掉刚才复制过来的"id_rsa.pub"文件。

最后是测试从"Slave1.Hadoop"到"Master.Hadoop"无密码登录。

从上面结果中可以看到已经成功实现了,再试下从"Master.Hadoop"到"Slave1.Hadoop"无密码登录。

至此"Master.Hadoop"与"Slave1.Hadoop"之间可以互相无密码登录了,剩下的就是按照上面的步骤把剩余的"Slave2.Hadoop"和"Slave3.Hadoop"与"Master.Hadoop"之间建立起无密码登录。这样,Master能无密码验证登录每个Slave,每个Slave也能无密码验证登录到Master。

3、Java环境安装

所有的机器上都要安装JDK,现在就先在Master服务器安装,然后其他服务器按照步骤重复进行即可。安装JDK以及配置环境变量,需要以"root"的身份进行。

3.1 安装JDK

首先用root身份登录"Master.Hadoop"后在"/usr"下创建"java"文件夹,再把用FTP上传到"/home/hadoop/"下的"jdk-6u31-linux-i586.bin"复制到"/usr/java"文件夹中。

mkdir /usr/java

cp /home/hadoop/ jdk-6u31-linux-i586.bin /usr/java

接着进入"/usr/java"目录通过下面命令使其JDK获得可执行权限,并安装JDK。

chmod +x jdk-6u31-linux-i586.bin

./jdk-6u31-linux-i586.bin

按照上面几步进行操作,最后点击"Enter"键开始安装,安装完会提示你按"Enter"键退出,然后查看"/usr/java"下面会发现多了一个名为"jdk1.6.0_31"文件夹,说明我们的JDK安装结束,删除"jdk-6u31-linux-i586.bin"文件,进入下一个"配置环境变量"环节。

3.2 配置环境变量

编辑"/etc/profile"文件,在后面添加Java的"JAVA_HOME"、"CLASSPATH"以及"PATH"内容。

1)编辑"/etc/profile"文件

vim /etc/profile

2)添加Java环境变量

在"/etc/profile"文件的尾部添加以下内容:

/# set java environment

export JAVA_HOME=/usr/java/jdk1.6.0_31/

export JRE_HOME=/usr/java/jdk1.6.0_31/jre

export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/lib

export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

或者

/# set java environment

export JAVA_HOME=/usr/java/jdk1.6.0_31

export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib

export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin

以上两种意思一样,那么我们就选择第2种来进行设置。

3)使配置生效

保存并退出,执行下面命令使其配置立即生效。

source /etc/profile

3.3 验证安装成功

配置完毕并生效后,用下面命令判断是否成功。

java -version

从上图中得知,我们以确定JDK已经安装成功。

3.4 安装剩余机器

这时用普通用户hadoop通过下面命令格式把"Master.Hadoop"文件夹"/home/hadoop/"的JDK复制到其他Slave的"/home/hadoop/"下面,剩下的事儿就是在其余的Slave服务器上按照上图的步骤安装JDK。

scp /home/hadoop/jdk-6u31-linux-i586.bin 远程用户名@远程服务器IP:~/

或者

scp ~/jdk-6u31-linux-i586.bin 远程用户名@远程服务器IP:~/

备注:"~"代表当前用户的主目录,当前用户为hadoop,所以"~"代表"/home/hadoop"。

例如:把JDK从"Master.Hadoop"复制到"Slave1.Hadoop"的命令如下。

scp ~/jdk-6u31-linux-i586 hadoop@192.168.1.3:~/

然后查看"Slave1.Hadoop"的"/home/hadoop"查看是否已经复制成功了。

从上图中得知,我们已经成功复制了,现在我们就用最高权限用户root进行安装了。其他的与这个一样。

4、Hadoop集群安装

所有的机器上都要安装hadoop,现在就先在Master服务器安装,然后其他服务器按照步骤重复进行即可。安装和配置hadoop需要以"root"的身份进行。

4.1 安装hadoop

首先用root用户登录"Master.Hadoop"机器,查看我们之前用FTP上传至"/home/Hadoop"上传的"hadoop-1.0.0.tar.gz"。

接着把"hadoop-1.0.0.tar.gz"复制到"/usr"目录下面。

cp /home/hadoop/hadoop-1.0.0.tar.gz /usr

下一步进入"/usr"目录下,用下面命令把"hadoop-1.0.0.tar.gz"进行解压,并将其命名为"hadoop",把该文件夹的读权限分配给普通用户hadoop,然后删除"hadoop-1.0.0.tar.gz"安装包。

cd /usr /#进入"/usr"目录

tar –zxvf hadoop-1.0.0.tar.gz /#解压"hadoop-1.0.0.tar.gz"安装包

mv hadoop-1.0.0 hadoop /#将"hadoop-1.0.0"文件夹重命名"hadoop"

chown –R hadoop:hadoop hadoop /#将文件夹"hadoop"读权限分配给hadoop用户

rm –rf hadoop-1.0.0.tar.gz /#删除"hadoop-1.0.0.tar.gz"安装包

解压后,并重命名。

把"/usr/hadoop"读权限分配给hadoop用户(非常重要

删除"hadoop-1.0.0.tar.gz"安装包

最后在"/usr/hadoop"下面创建tmp文件夹,把Hadoop的安装路径添加到"/etc/profile"中,修改"/etc/profile"文件(配置java环境变量的文件),将以下语句添加到末尾,并使其有效:

/# set hadoop path

export HADOOP_HOME=/usr/hadoop

export PATH=$PATH :$HADOOP_HOME/bin

1)在"/usr/hadoop"创建"tmp"文件夹

mkdir /usr/hadoop/tmp

2)配置"/etc/profile"

vim /etc/profile

配置后的文件如下:

3)重启"/etc/profile"

source /etc/profile

4.2 配置hadoop

1)配置hadoop-env.sh

该"hadoop-env.sh"文件位于"/usr/hadoop/conf"目录下。

在文件的末尾添加下面内容。

/# set java environment

export JAVA_HOME=/usr/java/jdk1.6.0_31

Hadoop配置文件在conf目录下,之前的版本的配置文件主要是Hadoop-default.xml和Hadoop-site.xml。由于Hadoop发展迅速,代码量急剧增加,代码开发分为了core,hdfs和map/reduce三部分,配置文件也被分成了三个core-site.xml、hdfs-site.xml、mapred-site.xml。core-site.xml和hdfs-site.xml是站在HDFS角度上配置文件;core-site.xml和mapred-site.xml是站在MapReduce角度上配置文件。

2)配置core-site.xml文件

修改Hadoop核心配置文件core-site.xml,这里配置的是HDFS的地址和端口号。

hadoop.tmp.dir /usr/hadoop/tmp备注:请先在 /usr/hadoop 目录下建立 tmp 文件夹) A base for other temporary directories. fs.default.name hdfs://192.168.1.2:9000

备注:如没有配置hadoop.tmp.dir参数,此时系统默认的临时目录为:/tmp/hadoo-hadoop。而这个目录在每次重启后都会被干掉,必须重新执行format才行,否则会出错。

用下面命令进行编辑:

编辑结果显示如下:

3)配置hdfs-site.xml文件

修改Hadoop中HDFS的配置,配置的备份方式默认为3。

dfs.replication 1 (备注:replication 是数据副本数量,默认为3,salve少于3台就会报错) 用下面命令进行编辑: 编辑结果显示如下: 4)配置mapred-site.xml文件 修改Hadoop中MapReduce的配置文件,配置的是JobTracker的地址和端口。 mapred.job.tracker http://**192.168.1.2**:**9001**

用下面命令进行编辑:

编辑结果显示如下:

5)配置masters文件

有两种方案:

(1)第一种

修改localhost为Master.Hadoop

(2)第二种

去掉"localhost",加入Master机器的IP:192.168.1.2

为保险起见,启用第二种,因为万一忘记配置"/etc/hosts"局域网的DNS失效,这样就会出现意想不到的错误,但是一旦IP配对,网络畅通,就能通过IP找到相应主机。

用下面命令进行修改:

编辑结果显示如下:

6)配置slaves文件(Master主机特有

有两种方案:

(1)第一种

去掉"localhost",每行只添加一个主机名,把剩余的Slave主机名都填上。

例如:添加形式如下

Slave1.Hadoop

Slave2.Hadoop

Slave3.Hadoop

(2)第二种

去掉"localhost",加入集群中所有Slave机器的IP,也是每行一个。

例如:添加形式如下

192.168.1.3

192.168.1.4

192.168.1.5

原因和添加"masters"文件一样,选择第二种方式。

用下面命令进行修改:

编辑结果如下:

现在在Master机器上的Hadoop配置就结束了,剩下的就是配置Slave机器上的Hadoop。

一种方式是按照上面的步骤,把Hadoop的安装包在用普通用户hadoop通过"scp"复制到其他机器的"/home/hadoop"目录下,然后根据实际情况进行安装配置,除了第6步,那是Master特有的。用下面命令格式进行。(备注:此时切换到普通用户hadoop)

scp ~/hadoop-1.0.0.tar.gz hadoop@服务器IP:~/

例如:从"Master.Hadoop"到"Slave1.Hadoop"复制Hadoop的安装包。

另一种方式是将 Master上配置好的hadoop所在文件夹"/usr/hadoop"复制到所有的Slave的"/usr"目录下(实际上Slave机器上的slavers文件是不必要的, 复制了也没问题)。用下面命令格式进行。(备注:此时用户可以为hadoop也可以为root)

scp -r /usr/hadoop root@服务器IP:/usr/

例如:从"Master.Hadoop"到"Slave1.Hadoop"复制配置Hadoop的文件。

上图中以root用户进行复制,当然不管是用户root还是hadoop,虽然Master机器上的"/usr/hadoop"文件夹用户hadoop有权限,但是Slave1上的hadoop用户却没有"/usr"权限,所以没有创建文件夹的权限。所以无论是哪个用户进行拷贝,右面都是"root@机器IP"格式。因为我们只是建立起了hadoop用户的SSH无密码连接,所以用root进行"scp"时,扔提示让你输入"Slave1.Hadoop"服务器用户root的密码。

查看"Slave1.Hadoop"服务器的"/usr"目录下是否已经存在"hadoop"文件夹,确认已经复制成功。查看结果如下:

从上图中知道,hadoop文件夹确实已经复制了,但是我们发现hadoop权限是root,所以我们现在要给"Slave1.Hadoop"服务器上的用户hadoop添加对"/usr/hadoop"读权限。

root用户登录"Slave1.Hadoop",执行下面命令。

chown -R hadoop:hadoop(用户名:用户组) hadoop(文件夹

接着在"Slave1 .Hadoop"上修改"/etc/profile"文件(配置 java 环境变量的文件),将以下语句添加到末尾,并使其有效(source /etc/profile):

/# set hadoop environment

export HADOOP_HOME=/usr/hadoop

export PATH=$PATH :$HADOOP_HOME/bin

如果不知道怎么设置,可以查看前面"Master.Hadoop"机器的"/etc/profile"文件的配置,到此为此在一台Slave机器上的Hadoop配置就结束了。剩下的事儿就是照葫芦画瓢把剩余的几台Slave机器按照《从"Master.Hadoop"到"Slave1.Hadoop"复制Hadoop的安装包。》这个例子进行部署Hadoop。

4.3 启动及验证

1)格式化HDFS文件系统

在"Master.Hadoop"上使用普通用户hadoop进行操作。(备注:只需一次,下次启动不再需要格式化,只需 start-all.sh)

hadoop namenode -format

某些书上和网上的某些资料中用下面命令执行。

我们在看好多文档包括有些书上,按照他们的hadoop环境变量进行配置后,并立即使其生效,但是执行发现没有找见"bin/hadoop"这个命令。

其实我们会发现我们的环境变量配置的是"$HADOOP_HOME/bin",我们已经把bin包含进入了,所以执行时,加上"bin"反而找不到该命令,除非我们的hadoop坏境变量如下设置。

/# set hadoop path

export HADOOP_HOME=/usr/hadoop

export PATH=$PATH :$HADOOP_HOME:$HADOOP_HOME/bin

这样就能直接使用"bin/hadoop"也可以直接使用"hadoop",现在不管哪种情况,hadoop命令都能找见了。我们也没有必要重新在设置hadoop环境变量了,只需要记住执行Hadoop命令时不需要在前面加"bin"就可以了。

从上图中知道我们已经成功格式话了,但是美中不足就是出现了一个警告,从网上的得知这个警告并不影响hadoop执行,但是也有办法解决,详情看后面的"常见问题FAQ"。

2)启动hadoop

在启动前关闭集群中所有机器的防火墙,不然会出现datanode开后又自动关闭。

service iptables stop

使用下面命令启动。

start-all.sh

执行结果如下:

可以通过以下启动日志看出,首先启动namenode 接着启动datanode1,datanode2,…,然后启动secondarynamenode。再启动jobtracker,然后启动tasktracker1,tasktracker2,…。

启动 hadoop成功后,在 Master 中的 tmp 文件夹中生成了 dfs 文件夹,在Slave 中的 tmp 文件夹中均生成了 dfs 文件夹和 mapred 文件夹。

查看Master中"/usr/hadoop/tmp"文件夹内容

查看Slave1中"/usr/hadoop/tmp"文件夹内容。

3)验证hadoop

(1)验证方法一:用"jps"命令

在Master上用 java自带的小工具jps查看进程。

在Slave1上用jps查看进程。

如果在查看Slave机器中发现"DataNode"和"TaskTracker"没有起来时,先查看一下日志的,如果是"namespaceID"不一致问题,采用"常见问题FAQ6.2"进行解决,如果是"No route to host"问题,采用"常见问题FAQ6.3"进行解决。

(2)验证方式二:用"hadoop dfsadmin -report"

用这个命令可以查看Hadoop集群的状态。

Master服务器的状态:

Slave服务器的状态

4.4 网页查看集群

1)访问"http:192.168.1.2:50030"

2)访问"http:192.168.1.2:50070"

5、常见问题FAQ

5.1 关于 Warning: $HADOOP_HOME is deprecated.

hadoop 1.0.0版本,安装完之后敲入hadoop命令时,是提示这个警告:

Warning: $HADOOP_HOME is deprecated.

经查hadoop-1.0.0/bin/hadoop脚本和"hadoop-config.sh"脚本,发现脚本中对HADOOP_HOME的环境变量设置做了判断,笔者的环境根本不需要设置HADOOP_HOME环境变量。

解决方案一:编辑"/etc/profile"文件,去掉HADOOP_HOME的变量设定,重新输入hadoop fs命令,警告消失。

解决方案二:编辑"/etc/profile"文件,添加一个环境变量,之后警告消失:

export HADOOP_HOME_WARN_SUPPRESS=1

解决方案三:编辑"hadoop-config.sh"文件,把下面的"if - fi"功能注释掉。

我们这里本着不动Hadoop原配置文件的前提下,采用"方案二",在"/etc/profile"文件添加上面内容,并用命令"source /etc/profile"使之有效。

1)切换至root用户

2)添加内容

3)重新生效

5.2 解决"no datanode to stop"问题

当我停止Hadoop时发现如下信息:

原因:每次namenode format会重新创建一个namenodeId,而tmp/dfs/data下包含了上次format下的id,namenode format清空了namenode下的数据,但是没有清空datanode下的数据,导致启动时失败,所要做的就是每次fotmat前,清空tmp一下的所有目录。

第一种解决方案如下:

1)先删除"/usr/hadoop/tmp"

rm -rf /usr/hadoop/tmp

2)创建"/usr/hadoop/tmp"文件夹

mkdir /usr/hadoop/tmp

3)删除"/tmp"下以"hadoop"开头文件

rm -rf /tmp/hadoop/*

4)重新格式化hadoop

hadoop namenode -format

5)启动hadoop

start-all.sh

使用第一种方案,有种不好处就是原来集群上的重要数据全没有了。假如说Hadoop集群已经运行了一段时间。建议采用第二种。

第二种方案如下:

1)修改每个Slave的namespaceID使其与Master的namespaceID一致。

或者

2)修改Master的namespaceID使其与Slave的namespaceID一致。

该"namespaceID"位于"/usr/hadoop/tmp/dfs/data/current/VERSION"文件中,前面蓝色的可能根据实际情况变化,但后面红色是不变的。

例如:查看"Master"下的"VERSION"文件

本人建议采用第二种,这样方便快捷,而且还能防止误删。

5.3 Slave服务器中datanode启动后又自动关闭

查看日志发下如下错误。

ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Call to ... failed on local exception: java.net.NoRouteToHostException: No route to host

解决方案是:关闭防火墙

service iptables stop

5.4 从本地往hdfs文件系统上传文件

出现如下错误:

INFO hdfs.DFSClient: Exception in createBlockOutputStream java.io.IOException: Bad connect ack with firstBadLink

INFO hdfs.DFSClient: Abandoning block blk_-1300529705803292651_37023

WARN hdfs.DFSClient: DataStreamer Exception: java.io.IOException: Unable to create new block.

解决方案是:

1)关闭防火墙

service iptables stop

2)禁用selinux

编辑 "/etc/selinux/config"文件,设置"SELINUX=disabled"

5.5 安全模式导致的错误

出现如下错误:

org.apache.hadoop.dfs.SafeModeException: Cannot delete ..., Name node is in safe mode

在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。

解决方案是:关闭安全模式

hadoop dfsadmin -safemode leave

5.6 解决Exceeded MAX_FAILED_UNIQUE_FETCHES

出现错误如下:

Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out

程序里面需要打开多个文件,进行分析,系统一般默认数量是1024,(用ulimit -a可以看到)对于正常使用是够了,但是对于程序来讲,就太少了。

解决方案是:修改2个文件。

1)"/etc/security/limits.conf"

vim /etc/security/limits.conf

加上:

soft nofile 102400

hard nofile 409600

2)"/etc/pam.d/login"

vim /etc/pam.d/login

添加:

session required /lib/security/pam_limits.so

针对第一个问题我纠正下答案:

这是reduce预处理阶段shuffle时获取已完成的map的输出失败次数超过上限造成的,上限默认为5。引起此问题的方式可能会有很多种,比如网络连接不正常,连接超时,带宽较差以及端口阻塞等。通常框架内网络情况较好是不会出现此错误的。

5.7 解决"Too many fetch-failures"

出现这个问题主要是结点间的连通不够全面。

解决方案是:

1)检查"/etc/hosts"

要求本机ip 对应 服务器名

要求要包含所有的服务器ip +服务器名

2)检查".ssh/authorized_keys"

要求包含所有服务器(包括其自身)的public key

5.8 处理速度特别的慢

出现map,但是reduce,而且反复出现"reduce=0%"。

解决方案如下:

结合解决方案5.7,然后修改"conf/hadoop-env.sh"中的"export HADOOP_HEAPSIZE=4000"

5.9解决hadoop OutOfMemoryError问题

出现这种异常,明显是jvm内存不够得原因。

解决方案如下:要修改所有的datanode的jvm内存大小。

Java –Xms 1024m -Xmx 4096m

一般jvm的最大内存使用应该为总内存大小的一半,我们使用的8G内存,所以设置为4096m,这一值可能依旧不是最优的值。

5.10 Namenode in safe mode

解决方案如下:

bin/hadoop dfsadmin -safemode leave

5.11 IO写操作出现问题

0-1246359584298, infoPort=50075, ipcPort=50020):Got exception while serving blk_-5911099437886836280_1292 to /172.16.100.165:

java.net.SocketTimeoutException: 480000 millis timeout while waiting for channel to be ready for write. ch : java.nio.channels.SocketChannel[connected local=/

172.16.100.165:50010 remote=/172.16.100.165:50930]

at org.apache.hadoop.net.SocketIOWithTimeout.waitForIO(SocketIOWithTimeout.java:185)

at org.apache.hadoop.net.SocketOutputStream.waitForWritable(SocketOutputStream.java:159)

……

It seems there are many reasons that it can timeout, the example given in HADOOP-3831 is a slow reading client.

解决方案如下:

在hadoop-site.xml中设置dfs.datanode.socket.write.timeout=0

5.12 status of 255 error

错误类型:

java.io.IOException: Task process exit with nonzero status of 255.

at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:424)

错误原因:

Set mapred.jobtracker.retirejob.interval and mapred.userlog.retain.hours to higher value. By default, their values are 24 hours. These might be the reason for failure, though I'm not sure restart.

解决方案如下:单个datanode

如果一个datanode 出现问题,解决之后需要重新加入cluster而不重启cluster,方法如下:

bin/hadoop-daemon.sh start datanode

bin/hadoop-daemon.sh start jobtracker

6、用到的Linux命令

6.1 chmod命令详解

使用权限:所有使用者

使用方式:chmod [-cfvR] [--help] [--version] mode file...

说明:

Linux/Unix 的档案存取权限分为三级 : 档案拥有者、群组、其他。利用 chmod 可以藉以控制档案如何被他人所存取。

mode :权限设定字串,格式如下 :[ugoa...][[+-=][rwxX]...][,...],其中u 表示该档案的拥有者,g 表示与该档案的拥有者属于同一个群体(group)者,o 表示其他以外的人,a 表示这三者皆是。

  • 表示增加权限、- 表示取消权限、= 表示唯一设定权限。

r 表示可读取,w 表示可写入,x 表示可执行,X 表示只有当该档案是个子目录或者该档案已经被设定过为可执行。

-c : 若该档案权限确实已经更改,才显示其更改动作

-f : 若该档案权限无法被更改也不要显示错误讯息

-v : 显示权限变更的详细资料

-R : 对目前目录下的所有档案与子目录进行相同的权限变更(即以递回的方式逐个变更)

--help : 显示辅助说明

--version : 显示版本

范例:

将档案 file1.txt 设为所有人皆可读取

chmod ugo+r file1.txt

将档案 file1.txt 设为所有人皆可读取

chmod a+r file1.txt

将档案 file1.txt 与 file2.txt 设为该档案拥有者,与其所属同一个群体者可写入,但其他以外的人则不可写入

chmod ug+w,o-w file1.txt file2.txt

将 ex1.py 设定为只有该档案拥有者可以执行

chmod u+x ex1.py

将目前目录下的所有档案与子目录皆设为任何人可读取

chmod -R a+r /*

此外chmod也可以用数字来表示权限如 chmod 777 file

语法为:chmod abc file

其中a,b,c各为一个数字,分别表示User、Group、及Other的权限。

r=4,w=2,x=1

若要rwx属性则4+2+1=7;

若要rw-属性则4+2=6;

若要r-x属性则4+1=7。

范例:

chmod a=rwx file 和 chmod 777 file 效果相同

chmod ug=rwx,o=x file 和 chmod 771 file 效果相同

若用chmod 4755 filename可使此程式具有root的权限

6.2 chown命令详解

使用权限:root

使用方式:chown [-cfhvR] [--help] [--version] user[:group] file...

说明:

Linux/Unix 是多人多工作业系统,所有的档案皆有拥有者。利用 chown 可以将档案的拥有者加以改变。一般来说,这个指令只有是由系统管理者(root)所使用,一般使用者没有权限可以改变别人的档案拥有者,也没有权限可以自己的档案拥有者改设为别人。只有系统管理者(root)才有这样的权限。

user : 新的档案拥有者的使用者

IDgroup : 新的档案拥有者的使用者群体(group)

-c : 若该档案拥有者确实已经更改,才显示其更改动作

-f : 若该档案拥有者无法被更改也不要显示错误讯息

-h : 只对于连结(link)进行变更,而非该 link 真正指向的档案

-v : 显示拥有者变更的详细资料

-R : 对目前目录下的所有档案与子目录进行相同的拥有者变更(即以递回的方式逐个变更)

--help : 显示辅助说明

--version : 显示版本

范例:

将档案 file1.txt 的拥有者设为 users 群体的使用者 jessie

chown jessie:users file1.txt

将目前目录下的所有档案与子目录的拥有者皆设为 users 群体的使用者 lamport

chown -R lamport:users /*

-rw------- (600) -- 只有属主有读写权限。

-rw-r--r-- (644) -- 只有属主有读写权限;而属组用户和其他用户只有读权限。

-rwx------ (700) -- 只有属主有读、写、执行权限。

-rwxr-xr-x (755) -- 属主有读、写、执行权限;而属组用户和其他用户只有读、执行权限。

-rwx--x--x (711) -- 属主有读、写、执行权限;而属组用户和其他用户只有执行权限。

-rw-rw-rw- (666) -- 所有用户都有文件读、写权限。这种做法不可取。

-rwxrwxrwx (777) -- 所有用户都有读、写、执行权限。更不可取的做法。

以下是对目录的两个普通设定:

drwx------ (700) - 只有属主可在目录中读、写。

drwxr-xr-x (755) - 所有用户可读该目录,但只有属主才能改变目录中的内容

suid的代表数字是4,比如4755的结果是-rwsr-xr-x

sgid的代表数字是2,比如6755的结果是-rwsr-sr-x

sticky位代表数字是1,比如7755的结果是-rwsr-sr-t

6.3 scp命令详解

scp是 secure copy的缩写,scp是linux系统下基于ssh登陆进行安全的远程文件拷贝命令。linux的scp命令可以在linux服务器之间复制文件和目录。

scp命令的用处:

scp在网络上不同的主机之间复制文件,它使用ssh安全协议传输数据,具有和ssh一样的验证机制,从而安全的远程拷贝文件。

scp命令基本格式:

scp [-1246BCpqrv] [-c cipher] [-F ssh_config] [-i identity_file]

[-l limit] [-o ssh_option] [-P port] [-S program]

[[user@]host1:]file1 [...] [[user@]host2:]file2

scp命令的参数说明:

-1 强制scp命令使用协议ssh1

-2 强制scp命令使用协议ssh2

-4 强制scp命令只使用IPv4寻址

-6 强制scp命令只使用IPv6寻址

-B 使用批处理模式(传输过程中不询问传输口令或短语)

-C 允许压缩。(将-C标志传递给ssh,从而打开压缩功能)

-p 保留原文件的修改时间,访问时间和访问权限。

-q 不显示传输进度条。

-r 递归复制整个目录。

-v 详细方式显示输出。scp和ssh(1)会显示出整个过程的调试信息。这些信息用于调试连接,验证和配置问题。

-c cipher 以cipher将数据传输进行加密,这个选项将直接传递给ssh。

-F ssh_config 指定一个替代的ssh配置文件,此参数直接传递给ssh。

-i identity_file 从指定文件中读取传输时使用的密钥文件,此参数直接传递给ssh。

-l limit 限定用户所能使用的带宽,以Kbit/s为单位。

-o ssh_option 如果习惯于使用ssh_config(5)中的参数传递方式,

-P port 注意是大写的P, port是指定数据传输用到的端口号

-S program 指定加密传输时所使用的程序。此程序必须能够理解ssh(1)的选项。

scp命令的实际应用

1)从本地服务器复制到远程服务器

(1) 复制文件:

命令格式:

scp local_file remote_username@remote_ip:remote_folder

或者

scp local_file remote_username@remote_ip:remote_file

或者

scp local_file remote_ip:remote_folder

或者

scp local_file remote_ip:remote_file

第1,2个指定了用户名,命令执行后需要输入用户密码,第1个仅指定了远程的目录,文件名字不变,第2个指定了文件名

第3,4个没有指定用户名,命令执行后需要输入用户名和密码,第3个仅指定了远程的目录,文件名字不变,第4个指定了文件名

实例:

scp /home/linux/soft/scp.zip root@www.mydomain.com:/home/linux/others/soft

scp /home/linux/soft/scp.zip root@www.mydomain.com:/home/linux/others/soft/scp2.zip

scp /home/linux/soft/scp.zip www.mydomain.com:/home/linux/others/soft

scp /home/linux/soft/scp.zip www.mydomain.com:/home/linux/others/soft/scp2.zip

(2) 复制目录:

命令格式:

scp -r local_folder remote_username@remote_ip:remote_folder

或者

scp -r local_folder remote_ip:remote_folder

第1个指定了用户名,命令执行后需要输入用户密码;

第2个没有指定用户名,命令执行后需要输入用户名和密码;

例子:

scp -r /home/linux/soft/ root@www.mydomain.com:/home/linux/others/

scp -r /home/linux/soft/ www.mydomain.com:/home/linux/others/

上面 命令 将 本地 soft 目录 复制 到 远程 others 目录下,即复制后远程服务器上会有/home/linux/others/soft/ 目录。

2)从远程服务器复制到本地服务器

从远程复制到本地的scp命令与上面的命令雷同,只要将从本地复制到远程的命令后面2个参数互换顺序就行了。

例如:

scp root@www.mydomain.com:/home/linux/soft/scp.zip /home/linux/others/scp.zip

scp www.mydomain.com:/home/linux/soft/ -r /home/linux/others/

linux系统下scp命令中很多参数都和ssh1有关,还需要看到更原汁原味的参数信息,可以运行man scp 看到更细致的英文说明。

文章下载地址:http://files.cnblogs.com/xia520pi/HadoopCluster_Vol.5.rar 来源: <Hadoop集群(第5期)_Hadoop安装配置 - 虾皮 - 博客园>

Hadoop知识分享文稿 ( by quqi99 )

Posted on

Hadoop知识分享文稿 ( by quqi99 ) - 技术并艺术着

您还未登录!|登录|注册|帮助

技术并艺术着

张华的技术Blog

[置顶] Hadoop知识分享文稿 ( by quqi99 )

分类: Seach Engine 2011-03-31 15:19 1977人阅读 评论(0) 收藏 举报 hadoop任务mapreduce任务调度集群作业

目录(?)[+]

  1. 作者张华 写于2010-08-15 发表于2011-03-31 版权声明可以任意转载转载时请务必以超链接形式标明文章原始出处和作者信息及本版权声明
  2. httpblogcsdnnetquqi99
  3. hadoop 理论基础

  4. hadoop 是什么

  5. hadoop 项目
  6. MapReduce 任务的运行流程
  7. MapReduce 任务的数据流图

  8. hadoop 入门实战

  9. 测试环境

  10. 测试程序
  11. 属性配置
  12. 免密码 SSH 设置
  13. 配置 hosts
  14. 格式化 HDFS 文件系统
  15. 启动守护进程
  16. 运行程序

  17. hadoop 高级进阶

  18. hadoop 应用案例
  19. 参考文献
                           **Hadoop知识分享文稿 ( by quqi99 )**
    

作者:张华 写于:2010-08-15 发表于:2011-03-31

版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息及本版权声明

( http://blog.csdn.net/quqi99 )

内容目录

目 录

1 hadoop 理论基础 3

1.1 hadoop 是什么 3

1.2 hadoop 项目 3

1.3 Map/Reduce 任务的运行流程 4

1.4 Map/Reduce 任务的数据流图 5

2 hadoop 入门实战 7

2.1 测试环境 7

2.2 测试程序 7

2.3 属性配置 9

2.4 免密码SSH 设置 10

2.5 配置hosts 11

2.6 格式化HDFS 文件系统 11

2.7 启动守护进程 11

2.8 运行程序 11

3 hadoop 高级进阶 12

4 hadoop 应用案例 12

5 参考文献 12

1 hadoop 理论基础

1.1 hadoop 是什么

Hadoop 是 Doug Cutting 开发的,他是一个相当牛的哥们,他同时是大名鼎鼎的 Lucene 及 Nutch 的作者。

我是这样理解 hadoop 的,它就是用来对海量数据进行存储与分析的一个开源软件。它包括两块:

1 ) HDFS ( Hadoop Distrubuted File System ) ,可以对重要数据进行冗余存储,有点类似于冗余磁盘陈列。

2 )对 Map/Reduce 编程模型的一个实现。当然,关系型数据库( RDBMS )也能做类似的事情,但为什么不用 RDBMS 呢?我们知道,让计算移动于数据上比让数据移动到计算更有效率。这使得 Map/Reduce 适合数据被一次写入和多次读取的应用,而 RDBMS 更适合持续更新的数据集。

1.2 hadoop 项目

如今,广义上的 Hadoop 已经发展成为一个分布式计算基础架构这把“大伞”下相关子项目的集合,其技术栈如下图所示:

图:

                                     ![]()

                                                图1 hadoop 的子项目
  • Core : 一系列分布式文件系统和通用I/O 的组件和接口( 序列化、Java RPC 和持久化数据结构) 。
  • Avro : 用于数据的序列化,当然,JDK 中也有Seriable 接口,但hadoop 中有它自己的序列化方式,具说更有效率。
  • MapReduce : 分布式数据处理模式和执行环境,运行于大型商用机集群。
  • HDFS : 分布式文件系统,运行于大型商用机集群。
  • Pig : HDFS 上的数据检索语言,类似于RDBMS 中的SQL 语言。
  • Hbase : 一个分布式的、列存储数据库。HBase 使用HDFS 作为底层存储,同时支持MapReduce 的批量式计算和点查询( 随机读取) 。
  • ZooKeeper : 一个分布式的、高可用性的协调服务。ZooKeeper 提供分布式锁之类的基本服务用于构建分布式应用。
  • Hive : 分布式数据仓库。Hive 管理HDFS 中存储的数据,并提供基于SQL 的查询语言( 由运行时引擎翻译成MapReduce 作业) 用以查询数据。
  • Chukwa : 分布式数据收集和分析系统。Chukwa 运行HDFS 中存储数据的收集器,它使用MapReduce 来生成报告。

1.3 Map/Reduce 任务的运行流程

                 ![]()

JobClient 的 submitJob() 方法的作业提交过程如下:

1 )向 Jobtraker 请求一个新作业 ID

2 ) 调用 JobTracker 的 getNewJobId()

3 ) JobClient 进行作业划分,并将划分后的输入及作业的 JAR 文件、配置文件等复制到 HDFS 中去

4 ) 提交作业,会把此调用放入到一个内部的队列中,交由作业调度器进行调度。值得一提的是,针对 Map 任务与 Reduce 任务,任务调度器是优先选择 Map 任务的,另外,任务调度器在选择 Reduce 任务时并没有考虑数据的本地化。然而,针对一个 Map 任务,它考虑的是 Tasktracker 网络位置和选取一个距离其输入划分文件最近的 Tasktracker ,它可能是数据本地化的,也可能是机架本地化的,还可能得到不同的机架上取数据。

5 ) 初始化包括创建一个代表该正在运行的作业的对象,它封装任务和记录信息,以便跟踪任务的状态和进度。

6 ) JobTracker 任务调度器首先从共享文件系统中获取 JobClient 已计算好的输入划分信息,然后为每个划分创建一个 Map 任务。创建 的 reduce 任务的数量是由 JobConf 的 Mapred.reduce.tasks 属性决定,它是用 setNumReduceTask() 方法来设置的。

7 ) TaskTracker 执行一个简单的循环,定期发送心跳( Heartbeat )方法调用 Jobtracker 告诉是否还活着,同时,心跳还会报告任务运行的是否已经准备运行新的任务。

8 ) TaskTracker 已经被分配了任务,下一步是运行任务。首先它需要将它所需的全部文件从 HDFS 中复制到本地磁盘。

9 )紧接着,它要启动一个新的 Java 虚拟机来运行每个任务,这使得用户所定义的 Map 和 Reduce 函数的任务缺陷都不会影响 TaskTracker (比如导致它崩溃或者挂起)

10 )运行 Map 任务或者 Reduce 任务,值得一提的是,这些任务使用标准输入与输出流,换句话说,你可以用任务语言(如 JAVA , C++ , Shell 等)来实现 Map 和 Reduce ,只要保证它们也使用标准输入与输出流,就可以将输出的键值对传回给 JAVA 进程了。

1.4 Map/Reduce 任务的数据流图

    图3  Map/Reduce  中单一 Reduce  任务的数据流图

             图4  Map/Reduce  中多个 Reduce  任务的数据流图

            图5  MapReduce  中没有 Reduce  任务的数据流图

任务粒度 : 分片的个数,在将原始大数据切割成小数据集时,通常让小数据集小于或等于 HDFS 中的一个 Block 的大小(缺省是 64M) ,这样能够保证一个小数据集位于一台计算机上,便于本地计算。 有 M 个 小数据集 待处理,就启动 M 个 Map 任务,注意这 M 个 Map 任务分布于 N 台计算机上并行运行,Reduce 任务的数量 R 则可由用户指定 。

Map : 输入 输出 List()

Reduce : 输入 输出

分区( Partition) : 把 Map 任务输出的中间结果按 key 的范围划分成 R 份 ( R 是预先定义的 Reduce 任务的个数) ,划分时通常使用 hash 函数如: hash(key) mod R ,这样可以保证某一段范围内的 key ,一定是由一个 Reduce 任务来处理,可以简化 Reduce 的过程。

Combine : 在 partition 之前,还可以对中间结果先做 combine ,即将中间结果中有相同 key 的 对合并成一对。 combine 的过程与 Reduce 的过程类似,很多情况下就可以直接使用 Reduce 函数,但 combine 是作为 Map 任务的一部分,在执行完 Map 函数后紧接着执行的。 Combine 能够减少中间结果中 对的数目,从而减少网络流量。

下面举个例子来着重说明 Combine , hadoop 允许用户声明一个 combiner 运行在 Map 的输出上,它的输出再作为 Reduce 的输入。例如,找出每一年的最调气温:

假如用户的输入的分片数是 2 ,那么:

1 )第一个 Map 的输出如下:

( 1950 , 0 )

( 1950 , 20 )

( 1950 , 10 )

2 ) 第二个 Map 的输出如下:

( 1950 , 25 )

( 1950 , 15 )

3 ) Reduce 的输入如下:

( 1950 ,[ 0 , 20 , 10 , 25 , 15 ])

注意:如果有 combine 的话,此时 Reduce 的输入应该是:

max(0, 20, 10, 25, 15) = max(max(0,20,10), max(25,15)) = max(20,25)

combine 并不能取代 reduce, 例如,如果我们计算平均气温,便不能使用 combine ,因为:

mean(0,20,10,25,15) = 14

但是:

mean(mean(0,20,10), mean(25,15)) = mean(10,20) = 15

4 ) Reduce 的输出如下:

( 1950 , 25 )

2 hadoop 入门实战

hadoop 有三种部署模式:

  • 单机模式:没有守护进程,一切都运行在单个 JVM 上,适合测试与调试。
  • 伪集群模式:守护进程在本地运行,适合模拟集群。
  • 集群模式:守护进程运行在集群的某台机器上。

所以,在以上任一特定模式运行 hadoop 时,只需要做两件事情:

1 ) 设置适当属性

2 )启动 hadoop 的守护进程(名称节点,二级名称节名,数据节点)

hadoop 默认的是单机模式,下面,我们将着重介绍在集群模式是如何部署?

2.1 测试环境

用两台机器做为测试环境 , 通常,集群里的一台机器被指定为 NameNode ,另一台不同的机器被指定为 JobTracker ,这些机器是 masters; 余下的机器即作为 DataNode 作为 TaskTracker ,这些机器是 slaves

1 ) master (JobTracker & NameNode) :我的工作机 ( zhanghua .quqi.com)

2 ) slave (TaskTracker & DataNode) :我的开发机 ( tadev03 .quqi.com)

3) 两机均已安装 ssh 与 rsync

2.2 测试程序

1 ) /home/workspace/hadoopExample/input/file01:

Hello World Bye World

2) /home/workspace/hadoopExample/input/file02:

Hello Hadoop Goodbye Hadoop

  1. WordCount.java

package com.TripResearch.hadoop;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred. FileInputFormat ;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred. JobConf ;

import org.apache.hadoop.mapred. MapReduceBase ;

import org.apache.hadoop.mapred. Mapper ;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred. Reducer ;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred. TextInputFormat ;

import org.apache.hadoop.mapred. TextOutputFormat ;

///

/ *@author huazhang

/*/

@SuppressWarnings ( "deprecation" )

public class WordCount {

public static class MyMap extends MapReduceBase implements

Mapper {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,

OutputCollector output, Reporter reporter)

throws IOException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word .set(tokenizer.nextToken());

output.collect( word , one );

}

}

}

public static class MyReduce extends MapReduceBase implements

Reducer {

public void reduce(Text key, Iterator values,

OutputCollector output, Reporter reporter)

throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WordCount. class );

conf.setJobName( "wordcount" );

conf.setOutputKeyClass(Text. class );

conf.setOutputValueClass(IntWritable. class );

conf.setMapperClass(MyMap. class );

conf.setCombinerClass(MyReduce. class );

conf.setReducerClass(MyReduce. class );

conf.setInputFormat( TextInputFormat . class );

conf.setOutputFormat( TextOutputFormat . class );

FileInputFormat . setInputPaths (conf, new Path(args[0]));

FileOutputFormat. setOutputPath (conf, new Path(args[1]));

JobClient.runJob (conf);

}

}

2.3 属性配置

按下图所示修改至少 3 个属性, 如下图所示:

  1. conf/core-site.xml

fs.default.name hdfs://zhanghua .quqi.com:9000

注意:此处如果是伪集群模式可配置为 hdfs://localhost:9000 , 是本地模式则为: localhost:9000 。另外,其他输入输入路径,是本地模式是本地文件系统的路径,是非地模式,用 hdfs 文件系统的路径格式。

  1. conf/hdfs-site.xml
dfs.replication 1

  1. conf/mapred-site.xml
mapred.job.tracker zhanghua .quqi.com:8021

  1. masters

zhanghua .quqi.com ( 伪分布模式就配成 localhost)

  1. slaves

tadev03 .quqi.com ( 伪分布模式就配成 localhost)

  1. 将以上配置好的 hadoop 文件夹拷到所有机器的相同目录下:

scp -r /home/soft/hadoop-0.20.2 root@tadev03 .quqi.com :/home/soft/hadoop-0.20.2

注意:确保两台机器的 JAVA_HOME 的路径一致,如果不一致,就要改 。

hadoop 所有可配置的配置文件说明如下:

hadoop-env.sh 运行 hadoop 的脚本中使用的环境变量

core-site.xml hadoop 的核心配置,如 HDFS 和 MapReduce 中很普遍的 I/O 设置

hdfs-site.xml HDFS 后台程序设置的配置:名称节点,第二名称节点及数据节点

mapred-site.xml MapReduce 后台程序设置的配置: jobtracker 和 tasktracker

masters 记录运行第二名称节点 的机器(一行一个)的列表

slaves 记录运行数据节点的机器(一行一个)的列表

2.4 免密码 SSH 设置

免密码 ssh 设置, 保证至少从 master 可以不用口令登陆所有的 slaves 。

1 )生成密钥对: ssh-keygen -t rsa -P '' -f /root/.ssh/id_rsa ( 这样密钥就留在了客户端 )

2) 将公钥拷到要连接的服务器,

scp /root/.ssh/id_rsa.pub root@tadev03 .quqi.com:/tmp

ssh -l root tadev03 .quqi.com

more /tmp/id_rsa.pub >> /root/.ssh/authorized_keys

  1. ssh tadev03 .quqi.com 不需要输入密码即为成功。

(注意:伪分布模式也要配置 ssh localhost 无密码登录,如果是 mac ,请将 ssh 打开)

( 另外,在 mac 中请在 hadoop-config.sh 文件中配置 export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home)

三条控制线线:

SSH → 这样就可以直接从主节点远程启动从节点上的脚本,如 ssh tadev03 .quqi.com '/var/aa.sh'

NameNode (http://localhost:50070 ) → DataNode

JobTracker ( http://localhost:50030 )→ TaskTracker (http://localhost:50060 )

2.5 配置 hosts

必须配置 master 和 slaves 之间的双向 hosts. 修改 /etc/hosts 进行配置,略。

2.6 格式化 HDFS 文件系统

和我们常见的 NTFS , FAT32 文件系统一样, NDFS 最开始也是需要格式化的。格式化过程用来创建存储目录以及名称节点的永久数据结构的初始版本来创建一个空的文件系统。命令如下:

hadoop namenode -format

已知问题:在重新格式化时,可能会报: SHUTDOWN_MSG: Shutting down NameNode

解决办法: rm -rf /tmp/hadoop-root/dfs/name

2.7 启动守护进程

1 )启动 HDFS 守护进程: start-dfs.sh

( start-dfs.sh 脚本会参照 NameNode 上 ${HADOOP_CONF_DIR}/slaves 文件的内容,在所有列出的 slave 上启动 DataNode 守护进程。 )

已知问题:在已设置 JAVA_HOME 的情况下仍会报: Error: JAVA_HOME is not set

解决办法:我是在 hadoop.sh 文件中加下面一句解决的:

JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home

2 )启动 Map/Reduce 守护进程: start-mapred.sh

( start-mapred.sh 脚本会参照 JobTracker 上 ${HADOOP_CONF_DIR}/slaves 文件的内容,在所有列出的 slave 上启动 TaskTracker 守护进程 )

3) 启动成功后,可以通过访问 http://localhost:50030 验证。

注意:也可直接使用 start-all.sh 与 stop-all.sh 脚本 , 在主节点 master 上面启动 hadoop ,主节点会启动 / 停止所有从节点的 hadoop 。会启动 5 个 java 进程 , 同时会在 /tmp 目录下创建五个 pid 文件记录这些进程 ID 号。通过这五个文件,可以得知 namenode, datanode, secondary namenode, jobtracker, tasktracker 分别对应于哪一个 Java 进程。

已知问题:启动后,日志中报: java.io.IOException: File /tmp/hadoop-root/mapred/system/jobtracker.info could only be replicated to 0 nodes, instead of 1

解决办法:原因是 从 tadev03 .quqi.com 机器上无法 ping zhanghua .quqi.com

2.8 运行程序

先将测试数据及其他输入由本地文件系统拷到 HFDS 文件系统中去(注意: jar 除外 )

  1. hadoop fs -mkdir input

  2. hadoop fs -ls .
  3. hadoop fs -copyFromLocal /home/workspace/hadoopExample/input/file01 input/file01
  4. hadoop fs -copyFromLocal /home/workspace/hadoopExample/input/file02 input/file02

这时候就可以执行下列命令运行程序了,注意:后面的input , output 等目录都是HDFS 文件系统的路径。( 如果是本地模式,就用本地文件系统的绝对路径)

hadoop jar /home/workspace/hadoopExample/hadoopExample.jar com.TripResearch.hadoop.WordCount input/ output

已知问题:在集群模式下运行时任务会Pending

最后,运行下列命令查看结果:

/home/soft/hadoop-0.20.2/bin/hadoop fs -cat output/part-00000

也可访问下列地址查看状态:

NameNode – http://zhanghua .quqi.com :50070/

JobTracker - http://zhanghua .quqi.com :50030/

常用命令说明如下:

hadoop dfs –ls 查看 /usr/root 目录下的内容径; hadoop dfs –rmr xxx xxx 就是删除目录; hadoop dfsadmin -report 这个命令可以全局的查看 DataNode 的情况; hadoop job -list 后面增加参数是对于当前运行的 Job 的操作,例如 list,kill 等; hadoop balancer 均衡磁盘负载的命令。

3 hadoop 高级进阶

4 hadoop 应用案例

5 参考文献

  1. http://hadoop.apache.org/common/docs/r0.18.2/cn/
  2. hadoop 0.20.2 集群配置入门 http://dev.firnow.com/course/3_program/java/javajs/
  3. Hadoop 分布式文件系统(HDFS )初步实践 http://huatai.me/?p=352
  4. Hadoop 分布式部署实验2_ 格式化分布式文件系统 http://hi.baidu.com/thinke365/blog/item/15602aa8f9074cf41e17a235.html
  5. hadoop 安装出现问题(紧急),请前辈指教 http://forum.hadoop.tw/viewtopic.php?f=4&t=90
  6. 用 Hadoop 进行分布式并行编程 http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop1/index.html
  7. 用 Hadoop 进行分布式数据处理 http://tech.ddvip.com/2010-06/1275983295155033.html

分享到:

  1. 上一篇:Lucene Scoring 评分机制 ( by quqi99 )
  2. 下一篇:深入理解各JEE服务器Web层集群原理 ( by quqi99 ) 查看评论

    暂无评论 您还没有登录,请[登录][注册]

/* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场 TOP

个人资料

quqi99

  • 访问:198660次
  • 积分:3337分
  • 排名:第1895名

  • 原创:146篇

  • 转载:23篇
  • 译文:0篇
  • 评论:123条

文章搜索

文章分类

展开

阅读排行

推荐文章 最新评论

quqi99: hi whoeversucks, 谢谢你的实时信息,非常有用,我已经更新到博客里了。另外,问个问题,...

whoeversucks: 注意,OpenDayLight Controller和OSCP实际上2个独立的SDN控制器项目(分别...

quqi99: hi dalinhuang, 谢谢你的回复,你给的这个方法是只适合LVM场景的啊,我没有使用LVM。

dalinhuang: 给根(/)扩充的步骤:(以你的virtualbox并使用LVM为例)1. 新增一块虚拟硬盘,给虚机。...

piaochenping: 你好,为什么我安装时老是出现这个错误呢? Failed to execute goal org.co...

quqi99: @sunyilong2012: 这种错误应该是差模块吧,可以单独安装一下试试, sudo pip i...

quqi99: openstack因为用到了一些linux特有的东西,如iptables,所以目前只能跑在linux...

javaerss: 大神...看哭了,为此特地跑去下载fedora 16来做实验。之前用ubuntu下用eclipse ...

nanfu08: 你能看得清,如果只是自己看的话我没话说,这样的文字叫人怎么读??

dragonsun: 您好,我在做这个测试的时候遇到了无法导入statsd的问题,请问您有解决的方法吗?+ /home/j...

公司简介|招贤纳士|广告服务|银行汇款帐号|联系方式|版权声明|法律顾问|问题报告QQ客服 微博客服 论坛反馈 联系邮箱:webmaster@csdn.net 服务热线:400-600-2320京 ICP 证 070598 号北京创新乐知信息技术有限公司 版权所有世纪乐知(北京)网络技术有限公司 提供技术支持江苏乐知网络技术有限公司 提供商务支持Copyright © 1999-2012, CSDN.NET, All Rights Reserved GongshangLogo

JAVA线程池管理及分布式HADOOP调度框架搭建

Posted on

JAVA线程池管理及分布式HADOOP调度框架搭建

平时的开发中线程是个少不了的东西,比如tomcat里的servlet就是线程,没有线程我们如何提供多用户访问呢?不过很多刚开始接触线程的开发攻城师却在这个上面吃了不少苦头。怎么做一套简便的线程开发模式框架让大家从单线程开发快速转入多线程开发,这确实是个比较难搞的工程。

那具体什么是线程呢?首先看看进程是什么,进程就是系统中执行的一个程序,这个程序可以使用内存、处理器、文件系统等相关资源。例如 QQ软件、eclipse、tomcat等就是一个exe程序,运行启动起来就是一个进程。为什么需要多线程?如果每个进程都是单独处理一件事情不能多个任务同时处理,比如我们打开qq只能和一个人聊天,我们用eclipse开发代码的时候不能编译代码,我们请求tomcat服务时只能服务一个用户请求,那我想我们还在原始社会。多线程的目的就是让一个进程能够同时处理多件事情或者请求。比如现在我们使用的QQ软件可以同时和多个人聊天,我们用eclipse开发代码时还可以编译代码,tomcat可以同时服务多个用户请求。

线程这么多好处,怎么把单进程程序变成多线程程序呢?不同的语言有不同的实现,这里说下java语言的实现多线程的两种方式:扩展java.lang.Thread类、实现java.lang.Runnable接口。 先看个例子,假设有100个数据需要分发并且计算。看下单线程的处理速度: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package thread; import java.util.Vector; public class OneMain { public static void main(String[] args) throws InterruptedException { Vector list = new Vector(100); for (int i = 0; i < 100; i++) { list.add(i); } long start = System.currentTimeMillis(); while (list.size() > 0) { int val = list.remove(0); Thread. sleep(100);//模拟处理 System. out.println(val); } long end = System.currentTimeMillis(); System. out.println("消耗 " + (end - start) + " ms"); } // 消耗 10063 ms }

再看一下多线程的处理速度,采用了10个线程分别处理:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package thread; import java.util.Vector; import java.util.concurrent.CountDownLatch; public class MultiThread extends Thread { static Vector list = new Vector(100); static CountDownLatch count = new CountDownLatch(10); public void run() { while (list.size() > 0) { try { int val = list.remove(0); System.out.println(val); Thread.sleep(100);//模拟处理 } catch (Exception e) { // 可能数组越界,这个地方只是为了说明问题,忽略错误 } }

      count.countDown(); // 删除成功减一
 }
 public static void main([String](http://www.google.com/search?hl=en&q=allinurl%3Astring+java.sun.com&btnI=I%27m%20Feeling%20Lucky)[] args) throws [InterruptedException](http://www.google.com/search?hl=en&q=allinurl%3Ainterruptedexception+java.sun.com&btnI=I%27m%20Feeling%20Lucky) {

      for (int i = 0; i < 100; i++) {
           list.add(i);
      }

      long start = [System](http://www.google.com/search?hl=en&q=allinurl%3Asystem+java.sun.com&btnI=I%27m%20Feeling%20Lucky).currentTimeMillis();
      for (int i = 0; i < 10; i++) {
           new MultiThread().start();
      }

      count.await();
      long end = [System](http://www.google.com/search?hl=en&q=allinurl%3Asystem+java.sun.com&btnI=I%27m%20Feeling%20Lucky).currentTimeMillis();
      [System](http://www.google.com/search?hl=en&q=allinurl%3Asystem+java.sun.com&btnI=I%27m%20Feeling%20Lucky).out.println("消耗 " + (end - start) + " ms");
 }
 // 消耗 1001 ms

}

大家看到了线程的好处了吧!单线程需要10S,10个线程只需要1S。充分利用了系统资源实现并行计算。也许这里会产生一个误解,是不是增加的线程个数越多效率越高。线程越多处理性能越高这个是错误的,范式都要合适,过了就不好了。需要普及一下计算机硬件的一些知识。我们的cpu是个运算器,线程执行就需要这个运算器来运行。不过这个资源只有一个,大家就会争抢。一般通过以下几种算法实现争抢cpu的调度:

1、队列方式,先来先服务。不管是什么任务来了都要按照队列排队先来后到。 2、时间片轮转,这也是最古老的cpu调度算法。设定一个时间片,每个任务使用cpu的时间不能超过这个时间。如果超过了这个时间就把任务暂停保存状态,放到队列尾部继续等待执行。 3、优先级方式:给任务设定优先级,有优先级的先执行,没有优先级的就等待执行。

这三种算法都有优缺点,实际操作系统是结合多种算法,保证优先级的能够先处理,但是也不能一直处理优先级的任务。硬件方面为了提高效率也有多核cpu、多线程cpu等解决方案。目前看得出来线程增多了会带来cpu调度的负载增加,cpu需要调度大量的线程,包括创建线程、销毁线程、线程是否需要换出cpu、是否需要分配到cpu。这些都是需要消耗系统资源的,由此,我们需要一个机制来统一管理这一堆线程资源。线程池的理念提出解决了频繁创建、销毁线程的代价。线程池指预先创建好一定大小的线程等待随时服务用户的任务处理,不必等到用户需要的时候再去创建。特别是在java开发中,尽量减少垃圾回收机制的消耗就要减少对象的频繁创建和销毁。

之前我们都是自己实现的线程池,不过随之jdk1.5的推出,jdk自带了 java.util.concurrent并发开发框架,解决了我们大部分线程池框架的重复工作。可以使用Executors来建立线程池,列出以下大概的,后面再介绍。 newCachedThreadPool 建立具有缓存功能线程池 newFixedThreadPool 建立固定数量的线程 newScheduledThreadPool 建立具有时间调度的线程

有了线程池后有以下几个问题需要考虑: 1、线程怎么管理,比如新建任务线程。 2、线程如何停止、启动。 3、线程除了scheduled模式的间隔时间定时外能否实现精确时间启动。比如晚上1点启动。 4、线程如何监控,如果线程执行过程中死掉了,异常终止我们怎么知道。

考虑到这几点,我们需要把线程集中管理起来,用java.util.concurrent是做不到的。需要做以下几点: 1、将线程和业务分离,业务的配置单独做成一个表。 2、构建基于concurrent的线程调度框架,包括可以管理线程的状态、停止线程的接口、线程存活心跳机制、线程异常日志记录模块。 3、构建灵活的timer组件,添加quartz定时组件实现精准定时系统。 4、和业务配置信息结合构建线程池任务调度系统。可以通过配置管理、添加线程任务、监控、定时、管理等操作。 组件图为: 分布式调度框架-lanceyan.com

构建好线程调度框架是不是就可以应对大量计算的需求了呢?答案是否定的。因为一个机器的资源是有限的,上面也提到了cpu是时间周期的,任务一多了也会排队,就算增加cpu,一个机器能承载的cpu也是有限的。所以需要把整个线程池框架做成分布式的任务调度框架才能应对横向扩展,比如一个机器上的资源呢达到瓶颈了,马上增加一台机器部署调度框架和业务就可以增加计算能力了。好了,如何搭建?如下图: 分布式调度框架-lanceyan.com

基于jeeframework我们封装spring、ibatis、数据库等操作,并且可以调用业务方法完成业务处理。主要组件为: 1、任务集中存储到数据库服务器 2、控制中心负责管理集群中的节点状态,任务分发 3、线程池调度集群负责控制中心分发的任务执行 4、web服务器通过可视化操作任务的分派、管理、监控。

一般这个架构可以应对常用的分布式处理需求了,不过有个缺陷就是随着开发人员的增多和业务模型的增多,单线程的编程模型也会变得复杂。比如需要对1000w数据进行分词,如果这个放到一个线程里来执行,不算计算时间消耗光是查询数据库就需要耗费不少时间。有人说,那我把1000w数据打散放到不同机器去运算,然后再合并不就行了吗?因为这是个特例的模式,专为了这个需求去开发相应的程序没有问题,但是以后又有其他的海量需求如何办?比如把倒退3年的所有用户发的帖子中发帖子最多的粉丝转发的最高的用户作息时间取出来。又得编一套程序实现,太麻烦!分布式云计算架构要解决的就是这些问题,减少开发复杂度并且要高性能,大家会不会想到一个最近很热的一个框架,hadoop,没错就是这个玩意。hadoop解决的就是这个问题,把大的计算任务分解、计算、合并,这不就是我们要的东西吗?不过玩过这个的人都知道他是一个单独的进程。不是!他是一堆进程,怎么和我们的调度框架结合起来?看图说话: task31

基本前面的分布式调度框架组件不变,增加如下组件和功能: 1、改造分布式调度框架,可以把本身线程任务变成mapreduce任务并提交到hadoop集群。 2、hadoop集群能够调用业务接口的spring、ibatis处理业务逻辑访问数据库。 3、hadoop需要的数据能够通过hive查询。 4、hadoop可以访问hdfs/hbase读写操作。 5、业务数据要及时加入hive仓库。 6、hive处理离线型数据、hbase处理经常更新的数据、hdfs是hive和hbase的底层结构也可以存放常规文件。

这样,整个改造基本完成。不过需要注意的是架构设计一定要减少开发程序的复杂度。这里虽然引入了hadoop模型,但是框架上开发者还是隐藏的。业务处理类既可以在单机模式下运行也可以在hadoop上运行,并且可以调用spring、ibatis。减少了开发的学习成本,在实战中慢慢体会就学会了一项新技能。

界面截图: task4 来源: [http://www.lanceyan.com/category/tech/hadoop](http://www.lanceyan.com/category/tech/hadoop)

Notes for Hadoop the definitive guide

Posted on

Notes for Hadoop the definitive guide

1. Introduction to HDFS

1.1. HDFS Concepts

1.1.1. Blocks

l HDFS too has the concept of a block, but it is a much larger unit 64 MB by default.

l Like in a filesystem for a single disk, files in HDFS are broken into block-sized chunks, which are stored as independent units.

l Unlike a filesystem for a single disk, a file in HDFS that is smaller than a single block does not occupy a full block’s worth of underlying storage.

1.1.2. Namenodes and Datanodes

l The namenode manages the filesystem namespace.

n It maintains the filesystem tree and the metadata for all the files and directories in the tree.

n This information is stored persistently on the local disk in the form of two files: the namespace image and the edit log.

n The namenode also knows the datanodes on which all the blocks for a given file are located, however, it does not store block locations persistently, since this information is reconstructed from datanodes when the system starts.

l Datanodes are the work horses of the filesystem.

n They store and retrieve blocks when they are told to (by clients or the namenode)

n They report back to the namenode periodically with lists of blocks that they are storing.

l secondary namenode

n It does not act as a namenode.

n Its main role is to periodically merge the namespace image with the edit log to prevent the edit log from becoming too large.

n It keeps a copy of the merged name space image, which can be used in the event of the namenode failing.

Namenode directory structure

clip_image002

l The VERSION file is a Java properties file that contains information about the version of HDFS that is running

n The layoutVersion is a negative integer that defines the version of HDFS’s persistent data structures.

n The namespaceID is a unique identifier for the filesystem, which is created when the filesystem is first formatted.

n The cTime property marks the creation time of the namenode’s storage.

n The storageType indicates that this storage directory contains data structures for a namenode.

clip_image004

The filesystem image and edit log

l When a filesystem client performs a write operation, it is first recorded in the edit log.

l The namenode also has an in-memory representation of the filesystem metadata, which it updates after the edit log has been modified.

l The edit log is flushed and synced after every write before a success code is returned to the client.

l The fsimage file is a persistent checkpoint of the filesystem metadata. it is not updated for every filesystem write operation.

l If the namenode fails, then the latest state of its metadata can be reconstructed by loading the fsimage from disk into memory, then applying each of the operations in the edit log.

l This is precisely what the namenode does when it starts up.

l The fsimage file contains a serialized form of all the directory and file inodes in the filesystem.

l The secondary namenode is to produce checkpoints of the primary’s in-memory filesystem metadata.

l The checkpointing process proceeds as follows :

n The secondary asks the primary to roll its edits file, so new edits go to a new file.

n The secondary retrieves fsimage and edits from the primary (using HTTP GET).

n The secondary loads fsimage into memory, applies each operation from edits, then creates a new consolidated fsimage file.

n The secondary sends the new fsimage back to the primary (using HTTP POST).

n The primary replaces the old fsimage with the new one from the secondary, and the old edits file with the new one it started in step 1. It also updates the fstime file to record the time that the checkpoint was taken.

n At the end of the process, the primary has an up-to-date fsimage file, and a shorter edits file.

clip_image006

Secondary namenode directory structure

clip_image008

Datanode directory structure

clip_image010

l A datanode’s VERSION file

clip_image012

l The other files in the datanode’s current storage directory are the files with the blk_ prefix.

n There are two types: the HDFS blocks themselves (which just consist of the file’s raw bytes) and the metadata for a block (with a .meta suffix).

n A block file just consists of the raw bytes of a portion of the file being stored;

n the metadata file is made up of a header with version and type information, followed by a series of checksums for sections of the block.

l When the number of blocks in a directory grows to a certain size, the datanode creates a new subdirectory in which to place new blocks and their accompanying metadata.

1.2. Data Flow

1.2.1. Anatomy of a File Read

clip_image014

l The client opens the file it wishes to read by calling open() on the FileSystem object (step 1).

l DistributedFileSystem calls the namenode, using RPC, to determine the locations of the blocks for the first few blocks in the file (step 2).

l For each block, the namenode returns the addresses of the datanodes that have a copy of that block.

l The datanodes are sorted according to their proximity to the client.

l The DistributedFileSystem returns a FSDataInputStream to the client for it to read data from.

l The client then calls read() on the stream (step 3).

l DFSInputStream connects to the first (closest) datanode for the first block in the file.

l Data is streamed from the datanode back to the client (step 4).

l When the end of the block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block (step 5).

l When the client has finished reading, it calls close() on the FSDataInputStream (step 6).

l During reading, if the client encounters an error while communicating with a datanode, then it will try the next closest one for that block.

l It will also remember datanodes that have failed so that it doesn’t needlessly retry them for later blocks.

l The client also verifies checksums for the data transferred to it from the datanode. If a corrupted block is found, it is reported to the namenode.

1.2.2. Anatomy of a File Write

clip_image016

l The client creates the file by calling create() (step 1).

l DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it (step 2).

l The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException.

l The DistributedFileSystem returns a FSDataOutputStream for the client to start writing data to.

l As the client writes data (step 3), DFSOutputStream splits it into packets, which it writes to an internal queue, called the data queue.

l The data queue is consumed by the Data Streamer, whose responsibility it is to ask the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas. The list of datanodes forms apipeline.

l The DataStreamer streams the packets to the first datanode in the pipeline, which stores the packet and forwards it to the second datanode in the pipeline. Similarly, the second datanode stores the packet and forwards it to the third (and last) datanode in the pipe line (step 4).

l DFSOutputStream also maintains an internal queue of packets that are waiting to be acknowledged by datanodes, called the ack queue. A packet is removed from the ack queue only when it has been acknowledged by all the datanodes in the pipeline (step 5).

l If a datanode fails while data is being written to it,

n First the pipeline is closed, and any packets in the ack queue are added to the front of the data queue.

n The current block on the good datanodes is given a new identity by the namenode, so that the partial block on the failed datanode will be deleted if the failed data node recovers later on.

n The failed datanode is removed from the pipeline and the remainder of the block’s data is written to the two good datanodes in the pipeline.

n The namenode notices that the block is under-replicated, and it arranges for a further replica to be created on another node.

l When the client has finished writing data it calls close() on the stream (step 6). This action flushes all the remaining packets to the datanode pipeline and waits for acknowledgments before contacting the namenode to signal that the file is complete (step7).

2. Meet Map/Reduce

l MapReduce has two phases: the map phase and the reduce phase.

l Each phase has key-value pairs as input and output (the types can be specified).

n The input key-value types of the map phase is determined by the input format

n The output key-value types of the map phase should match the input key value types of the reduce phase

n The output key-value types of the reduce phase can be set in the JobConf interface.

l The programmer specifies two functions: the map function and the reduce function.

2.1. MapReduce logical data flow

clip_image018

2.2. MapReduce Code

2.2.1. The map function is represented by an implementation of the Mapper interface, which declares a map() method.

clip_image020

2.2.2. The reduce function is defined using a Reducer

l The input types of the reduce function must match the output type of the map function.

clip_image022

2.2.3. The code runs the MapReduce job

l An input path is specified by calling the static addInputPath() method on FileInputFormat

n It can be a single file, a directory, or a file pattern.

n addInputPath() can be called more than once to use input from multiple paths.

l The output path is specified by the static setOutputPath() method on FileOutputFormat.

n It specifies a directory where the output files from the reducer functions are written.

n The directory shouldn’t exist before running the job

l The map and reduce types can be specified via the setMapperClass() and setReducerClass() methods.

l The setOutputKeyClass() and setOutputValueClass() methods control the output types for the map and the reduce functions, which are often the same.

n If they are different, then the map output types can be set using the methods setMapOutputKeyClass() and setMapOutputValueClass().

l The input types are controlled via the input format, which we have not explicitly set since we are using the default TextInputFormat.

clip_image024

2.3. Scaling Out

2.3.1. MapReduce data flow with a single reduce task

l A MapReduce job is a unit of work that the client wants to be performed: it consists of the input data, the MapReduce program, and configuration information.

l Hadoop runs the job by dividing it into tasks, of which there are two types: map tasks and reduce tasks.

l There are two types of nodes that control the job execution process: a jobtracker and a number of tasktrackers.

n The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers.

n Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job.

n If a tasks fails, the jobtracker can reschedule it on a different tasktracker.

l Hadoop divides the input to a MapReduce job into fixed-size input splits.

l Hadoop creates one map task for each split, which runs the user defined map function for each record in the split.

l Hadoop does its best to run the map task on a node where the input data resides in HDFS.

n This is called the data locality optimization.

n This is why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node.

l Reduce tasks don’t have the advantage of data locality

n The input to a single reduce task is normally the output from all mappers.

n The output of the reduce is normally stored in HDFS for reliability.

clip_image026

2.3.2. MapReduce data flow with multiple reduce tasks

The number of reduce tasks is not governed by the size of the input, but is specified independently.

l When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task.

l There can be many keys (and their associated values) in each partition, but the records for every key are all in a single partition.

l The partitioning can be controlled by a user-defined partitioning function

n Normally the default partitioner which buckets keys using a hash function.

n conf.setPartitionerClass(HashPartitioner.class);

n conf.setNumReduceTasks(1);

l The data flow between map and reduce tasks is “the shuffle,” as each reduce task is fed by many map tasks.

clip_image028

l It’s also possible to have zero reduce tasks. This can be appropriate when you don’t need the shuffle since the processing can be carried out entirely in parallel

3. MapReduce Types and Formats

3.1. MapReduce Types

l The map and reduce functions in Hadoop MapReduce have the following general form:

clip_image030

clip_image032

l The partition function operates on the intermediate key and value types (K2 and V2), and returns the partition index.

clip_image034

clip_image036

3.1.1. Configuration of MapReduce types

clip_image038

l Input types are set by the input format.

n For instance, a TextInputFormat generates keys of type LongWritable and values of type Text.

l A minimal MapReduce driver, with the defaults explicitly set

clip_image040

l The default input format is TextInputFormat, which produces keys of type LongWritable (the offset of the beginning of the line in the file) and values of type Text (the line of text).

l The setNumMapTasks() call does not necessarily set the number of map tasks to one

n The actual number of map tasks depends on the size of the input

l The default mapper is IdentityMapper

clip_image042

l Map tasks are run by MapRunner, the default implementation of MapRunnable that calls the Mapper’s map() method sequentially with each record.

l The default partitioner is HashPartitioner, which hashes a record’s key to determine which partition the record belongs in.

n Each partition is processed by a reduce task, so the number of partitions is equal to the number of reduce tasks for the job

clip_image044

l The default reducer is IdentityReducer

clip_image046

l Records are sorted by the MapReduce system before being presented to the reducer.

l The default output format is TextOutputFormat, which writes out records, one per line, by converting keys and values to strings and separating them with a tab character.

3.2. Input Formats

3.2.1. Input Splits and Records

l An input split is a chunk of the input that is processed by a single map.

l Each split is divided into records, and the map processes each record—a key-value pair—in turn.

clip_image048

l An InputSplit has a length in bytes, and a set of storage locations, which are just hostname strings.

l A split doesn’t contain the input data; it is just a reference to the data.

l The storage locations are used by the MapReduce system to place map tasks as close to the split’s data as possible

l The size is used to order the splits so that the largest get processed first

l An InputFormat is responsible for creating the input splits, and dividing them into records.

clip_image050

l The JobClient calls the getSplits() method, passing the desired number of map tasks as the numSplits argument.

l Having calculated the splits, the client sends them to the jobtracker, which uses their storage locations to schedule map tasks to process them on the tasktrackers.

l On a tasktracker, the map task passes the split to the getRecordReader() method on InputFormat to obtain a RecordReader for that split.

l A RecordReader is little more than an iterator over records, and the map task uses one to generate record key-value pairs, which it passes to the map function.

clip_image052

l The same key and value objects are used on each invocation of the map() method—only their contents are changed. If you need to change the value out of map, make a copy of the object you want to hold on to.

3.2.2. FileInputFormat

l FileInputFormat is the base class for all implementations of InputFormat that use files as their data source.

l It provides two things: a place to define which files are included as the input to a job, and an implementation for generating splits for the input files.

clip_image054

l FileInputFormat input paths may represent a file, a directory, or, by using a glob, a collection of files and directories.

clip_image056

l To exclude certain files from the input, you can set a filter using the setInputPathFilter() method on FileInputFormat

clip_image058

l FileInputFormat splits only large files. Here “large” means larger than an HDFS block.

l Properties for controlling split size

n The minimum split size is usually 1 byte, by setting this to a value larger than the block size, they can force splits to be larger than a block.

n The maximum split size defaults to the maximum value that can be represented by a Java long type. It has an effect only when it is less than the block size, forcing splits to be smaller than a block.

Small files and CombineFileInputFormat

l Hadoop works better with a small number of large files than a large number of small files.

l Where FileInputFormat creates a split per file, CombineFileInputFormat packs many files into each split so that each mapper has more to process.

l One technique for avoiding the many small files case is to merge small files into larger files by using a SequenceFile: the keys can act as filenames and the values as file contents.

3.2.3. Text Input

l TextInputFormat is the default InputFormat.

n Each record is a line of input.

n The key, a LongWritable, is the byte offset within the file of the beginning of the line.

n The value is the contents of the line, excluding any line terminators, and is packaged as a Text object.

l The logical records that FileInputFormats define do not usually fit neatly into HDFS blocks.

l A single file is broken into lines, and the line boundaries do not correspond with the HDFS block boundaries.

l Splits honor logical record boundaries

n The first split contains line 5, even though it spans the first and second block.

n The second split starts at line 6.

l Data-local maps will perform some remote reads.

clip_image060

KeyValueTextInputFormat

l It is common for each line in a file to be a key-value pair, separated by a delimiter such as a tab character.

l You can specify the separator via the key.value.separator.in.input.line property.

NLineInputFormat

l If you want your mappers to receive a fixed number of lines of input, then NLineInputFormat is the InputFormat to use.

l Like TextInputFormat, the keys are the byte offsets within the file and the values are the lines themselves.

l N refers to the number of lines of input that each mapper receives.

3.2.4. Binary Input

SequenceFileInputFormat

l Hadoop’s sequence file format stores sequences of binary key-value pairs.

l To use data from sequence files as the input to MapReduce, you use SequenceFileInputFormat.

l The keys and values are determined by the sequence file, and you need to make sure that your map input types correspond.

l For example, if your sequence file has IntWritable keys and Text values, then the map signature would be Mapper.

SequenceFileAsTextInputFormat

l SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat that converts the sequence file’s keys and values to Text objects.

SequenceFileAsBinaryInputFormat

l SequenceFileAsBinaryInputFormat is a variant of SequenceFileInputFormat that retrieves the sequence file’s keys and values as opaque binary objects.

l They are encapsulated as BytesWritable objects

SequenceFile

l Writing a SequenceFile

n To create a SequenceFile, use one of its createWriter() static methods, which returns a SequenceFile.Writer instance.

n specify a stream to write to (either a FSDataOutputStream or a FileSystem and Path pairing), a Configuration object, and the key and value types.

n Once you have a SequenceFile.Writer, you then write key-value pairs, using the append() method.

n Then when you’ve finished you call the close() method

clip_image062

l Reading a SequenceFile

n Reading sequence files from beginning to end is a matter of creating an instance of SequenceFile.Reader, and iterating over records by repeatedly invoking one of the next() methods.

clip_image064

l The SequenceFile Format

n A sequence file consists of a header followed by one or more records.

n The first three bytes of a sequence file are the bytes SEQ, which acts a magic number, followed by a single byte representing the version number.

n The header contains other fields including the names of the key and value classes, compression details, user-defined metadata, and the sync marker.

n The sync marker is used to allow a reader to synchronize to a record boundary from any position in the file.

clip_image066

3.2.5. Multiple Inputs

l The MultipleInputs class allows you to specify the InputFormat and Mapper to use on a per-path basis.

clip_image068

3.3. Output Formats

3.3.1. Text Output

l The default output format, TextOutputFormat, writes records as lines of text.

l Its keys and values may be of any type, since TextOutputFormat turns them to strings by calling toString() on them.

l Each key-value pair is separated by a tab character, although that may be changed using the mapred.textoutputformat.separator property.

3.3.2. Binary Output

l SequenceFileOutputFormat

l SequenceFileAsBinaryOutputFormat

l MapFileOutputFormat

Writing a MapFile

l You create an instance of MapFile.Writer, then call the append() method to add entries in order.

l Keys must be instances of WritableComparable, and values must be Writable

clip_image070

l If we look at the MapFile, we see it’s actually a directory containing two files called data and index:

clip_image072

l Both files are SequenceFiles. The data file contains all of the entries, in order:

clip_image074

l The index file contains a fraction of the keys, and contains a mapping from the key to that key’s offset in the data file:

clip_image076

Reading a MapFile

l you create a MapFile.Reader, then call the next() method until it returns false

3.3.3. Multiple Outputs

MultipleOutputFormat

l MultipleOutputFormat allows you to write data to multiple files whose names are derived from the output keys and values.

n conf.setOutputFormat(StationNameMultipleTextOutputFormat.class);

clip_image078

MultipleOutputs

l MultipleOutputs can emit different types for each output.

clip_image080

4. Developing a MapReduce Application

4.1. The Configuration API

l An instance of the Configuration class (found in the org.apache.hadoop.conf package) represents a collection of configuration properties and their values.

l Configurations read their properties from resources—XML files

clip_image082

l we can access its properties using a piece of code like this:

clip_image084

4.2. Configuring the Development Environment

4.2.1. Managing Configuration

l When developing Hadoop applications, it is common to switch between running the application locally and running it on a cluster.

l hadoop-local.xml

clip_image086

l hadoop-localhost.xml

clip_image088

l hadoop-cluster.xml

clip_image090

clip_image092

l With this setup, it is easy to use any configuration with the -conf command-line switch.

l For example, the following command shows a directory listing on the HDFS server running in pseudo-distributed mode on localhost:

clip_image094

4.2.2. GenericOptionsParser, Tool, and ToolRunner

clip_image096

clip_image098

5. How MapReduce Works

5.1. Anatomy of a MapReduce Job Run

l There are four independent entities:

n The client, which submits the MapReduce job.

n The jobtracker, which coordinates the job run. The jobtracker is a Java application whose main class is JobTracker.

n The tasktrackers, which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker.

n The distributed filesystem, which is used for sharing job files between the other entities.

clip_image100

5.1.1. Job Submission

l The runJob() method on JobClient creates a new JobClient instance and calls submitJob() on it.

l Having submitted the job, runJob() polls the job’s progress once a second, and reports the progress to the console if it has changed since the last report.

l When the job is complete, if it was successful, the job counters are displayed. Otherwise, the error that caused the job to fail is logged to the console.

The job submission process

l Asks the jobtracker for a new job ID (by calling getNewJobId() on JobTracker)

l Checks the output specification of the job.

l Computes the input splits for the job.

l Copies the resources needed to run the job, including the job JAR file, the configuration file and the computed input splits, to the jobtracker’s filesystem in a directory named after the job ID.

l Tells the jobtracker that the job is ready for execution (by calling submitJob() on JobTracker)

5.1.2. Job Initialization

l When the JobTracker receives a call to its submitJob() method, it puts it into an internal queue from where the job scheduler will pick it up and initialize it.

l Initialization involves creating an object to represent the job being run, which encapsulates its tasks, and bookkeeping information to keep track of the tasks’ status and progress.

l To create the list of tasks to run, the job scheduler first retrieves the input splits computed by the JobClient from the shared filesystem.

l It then creates one map task for each split.

l Tasks are given IDs at this point.

5.1.3. Task Assignment

l Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker.

l As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task, and if it is, the jobtracker will allocate it a task, which it communicates to the tasktracker using the heartbeat return value

l Before it can choose a task for the tasktracker, the jobtracker must choose a job to select the task from according to priority.(setJobPriority() and FIFO)

l Tasktrackers have a fixed number of slots for map tasks and for reduce tasks.

l The default scheduler fills empty map task slots before reduce task slots

l To choose a reduce task the jobtracker simply takes the next in its list of yet-to-be-run reduce tasks, since there are no data locality considerations.

5.1.4. Task Execution

l Now the tasktracker has been assigned a task, the next step is for it to run the task.

l First, it localizes the job JAR by copying it from the shared filesystem to the tasktracker’s filesystem.

l It also copies any files needed from the distributed cache by the application to the local disk

l Second, it creates a local working directory for the task, and un-jars the contents of the JAR into this directory.

l Third, it creates an instance of TaskRunner to run the task.

l TaskRunner launches a new Java Virtual Machine to run each task in

l It is however possible to reuse the JVM between tasks;

l The child process communicates with its parent through the umbilical interface.

5.1.5. Job Completion

l When the jobtracker receives a notification that the last task for a job is complete, it changes the status for the job to “successful.” T

l hen, when the JobClient polls for status, it learns that the job has completed successfully, so it prints a message to tell the user, and then returns from the runJob() method.

clip_image102

5.2. Failures

5.2.1. Task Failure

l The most common way is when user code in the map or reduce task throws a runtime exception.

n the child JVM reports the error back to its parent tasktracker, before it exits.

n The error ultimately makes it into the user logs.

n The tasktracker marks the task attempt as failed, freeing up a slot to run another task.

l Another failure mode is the sudden exit of the child JVM

n the tasktracker notices that the process has exited, and marks the attempt as failed.

l Hanging tasks are dealt with differently.

n The tasktracker notices that it hasn’t received a progress update for a while, and proceeds to mark the task as failed.

n The child JVM process will be automatically killed after this period

l When the jobtracker is notified of a task attempt that has failed (by the tasktracker’s heartbeat call) it will reschedule execution of the task.

n The jobtracker will try to avoid rescheduling the task on a tasktracker where it has previously failed.

n If a task fails more than four times, it will not be retried further.

5.2.2. Tasktracker Failure

l If a tasktracker fails by crashing, or running very slowly, it will stop sending heartbeats to the jobtracker (or send them very infrequently).

l The jobtracker will notice a tasktracker that has stopped sending heartbeats and remove it from its pool of tasktrackers to schedule tasks on.

l The jobtracker arranges for map tasks that were run and completed successfully on that tasktracker to be rerun if they belong to incomplete jobs, since their intermediate output residing on the failed tasktracker’s local filesystem may not be accessible to the reduce task. Any tasks in progress are also rescheduled.

5.2.3. Jobtracker Failure

5.3. Shuffle and Sort

clip_image104

5.3.1. The Map Side

l When the map function starts producing output, it is not simply written to disk.

l Each map task has a circular memory buffer that it writes the output to.

l When the contents of the buffer reach a certain threshold size, a background thread will start to spill the contents to disk.

l Spills are written in round-robin fashion to the directories specified by the mapred.local.dir property

l Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to.

l Within each partition, the background thread performs an in-memory sort by key.

l Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record there could be several spill files.

l Before the task is finished, the spill files are merged into a single partitioned and sorted output file.

l The output file’s partitions are made available to the reducers over HTTP.

l The number of worker threads used to serve the file partitions is controlled by the task tracker.http.threads property

5.3.2. The Reduce Side

l As map tasks complete successfully, they notify their parent tasktracker of the status update, which in turn notifies the jobtracker.

l for a given job, the jobtracker knows the mapping between map outputs and tasktrackers.

l A thread in the reducer periodically asks the jobtracker for map output locations until it has retrieved them all.

l The reduce task needs the map output for its particular partition from several map tasks across the cluster.

l The map tasks may finish at different times, so the reduce task starts copying their outputs as soon as each completes. This is known as the copy phase of the reduce task.

l The reduce task has a small number of copier threads so that it can fetch map outputs in parallel.

l As the copies accumulate on disk, a background thread merges them into larger, sorted files.

l When all the map outputs have been copied, the reduce task moves into the sort phase (which should properly be called the merge phase, as the sorting was carried out on the map side), which merges the map outputs, maintaining their sort ordering.

l During the reduce phase the reduce function is invoked for each key in the sorted output. The output of this phase is written directly to the output filesystem, typically HDFS. 来源: <Notes for Hadoop the definitive guide - 觉先 - 博客园>