Posted on



Table of Contents

1 hadoop


1.1 FAQ

1.1.1 Hadoop可以用来做什么

Why Hadoop? http://www.cloudera.com/why-hadoop/

TODO(dirlt):translate it!!!

Simply put, Hadoop can transform the way you store and process data throughout your enterprise. According to analysts, about 80% of the data in the world is unstructured, and until Hadoop, it was essentially unusable in any systematic way. With Hadoop, for the first time you can combine all your data and look at it as one.

  • Make All Your Data Profitable. Hadoop enables you to gain insight from all the data you already have; to ingest the data flowing into your systems 24/7 and leverage it to make optimizations that were impossible before; to make decisions based on hard data, not hunches; to look at complete data, not samples; to look at years of transactions, not days or weeks. In short, Hadoop will change the way you run your organization.
  • Leverage All Types of Data, From All Types of Systems. Hadoop can handle all types of data from disparate systems: structured, unstructured, log files, pictures, audio files, communications records, email– just about anything you can think of. Even when different types of data have been stored in unrelated systems, you can dump it all into your Hadoop cluster before you even know how you might take advantage of it in the future.
  • Scale Beyond Anything You Have Today. The largest social network in the world is built on the same open-source technology as Hadoop, and now exceeds 100 petabytes. It’s unlikely your organization has that much data. As you need more capacity, you just add more commodity servers and Hadoop automatically incorporates the new storage and compute capacity.

1.1.2 Hadoop包括哪些组件

TODO(dirlt):translate it!!!

Apache Hadoop包括了下面这些组件:

和Apache Hadoop相关的组件有:

  • Avro A data serialization system.
  • Cassandra A scalable multi-master database with no single points of failure.
  • Chukwa A data collection system for managing large distributed systems.
  • HBase A scalable, distributed database that supports structured data storage for large tables.
  • Hive A data warehouse infrastructure that provides data summarization and ad hoc querying.
  • Mahout A Scalable machine learning and data mining library.
  • Pig A high-level data-flow language and execution framework for parallel computation.
  • ZooKeeper A high-performance coordination service for distributed applications.

    1.1.3 CDH和Apache Hadoop的关系

CDH Hadoop FAQ https://ccp.cloudera.com/display/SUPPORT/Hadoop+FAQ

TODO(dirlt):translate it!!!

  • What exactly is included in CDH? / Cloudera's Distribution Including Apache Hadoop (CDH) is a certified release of Apache Hadoop. We include some stable patches scheduled to be included in future releases, as well as some patches we have developed for our supported customers, and are in the process of contributing back to Apache.
  • What license is Cloudera's Distribution Including Apache Hadoop released under? / Just like Hadoop, Cloudera's Distribution Including Apache Hadoop is released under the Apache Public License version 2.
  • Is Cloudera forking Hadoop? / Absolutely not. Cloudera is committed to the Hadoop project and the principles of the Apache Software License and Foundation. We continue to work actively with current releases of Hadoop and deliver certified releases to the community as appropriate.
  • Does Cloudera contribute their changes back to Apache? / We do, and will continue to contribute all eligible changes back to Apache. We occasionally release code we know to be stable even if our contribution to Apache is still in progress. Some of our changes are not eligible for contribution, as they capture the Cloudera brand, or link to our tools and documentation, but these do not affect compatibility with core project.

1.1.4 CDH产品组件构成


从这里可以下载CDH4组件 http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDHTarballs/3.25.2013/CDH4-Downloadable-Tarballs/CDH4-Downloadable-Tarballs.html


1.1.5 CDH产品组件端口分布和配置

The CDH4 components, and third parties such as Kerberos, use the ports listed in the tables that follow. Before you deploy CDH4, make sure these ports are open on each system. Hadoop HDFS

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentDataNode50010TCPExternaldfs.datanode.addressDataNode HTTP server portDataNodeSecure1004TCPExternaldfs.datanode.addressDataNode50075TCPExternaldfs.datanode.http.addressDataNodeSecure1006TCPExternaldfs.datanode.http.addressDataNode50020TCPExternaldfs.datanode.ipc.addressNameNode8020TCPExternalfs.default.name or fs.defaultFSfs.default.name is deprecated (but still works)NameNode50070TCPExternaldfs.http.address or dfs.namenode.http-addressdfs.http.address is deprecated (but still works)NameNodeSecure50470TCPExternaldfs.https.address or dfs.namenode.https-addressdfs.https.address is deprecated (but still works)Sec NameNode50090TCPInternaldfs.secondary.http.address or dfs.namenode.secondary.http-addressdfs.secondary.http.address is deprecated (but still works)Sec NameNodeSecure50495TCPInternaldfs.secondary.https.addressJournalNode8485TCPInternaldfs.namenode.shared.edits.dirJournalNode8480TCPInternal Hadoop MRv1

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentJobTracker8021TCPExternalmapred.job.trackerJobTracker50030TCPExternalmapred.job.tracker.http.addressJobTrackerThrift Plugin9290TCPInternaljobtracker.thrift.addressRequired by Hue and Cloudera Manager Activity MonitorTaskTracker50060TCPExternalmapred.task.tracker.http.addressTaskTracker0TCPLocalhostmapred.task.tracker.report.addressCommunicating with child (umbilical) Hadoop YARN

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentResourceManager8032TCPyarn.resourcemanager.addressResourceManager8030TCPyarn.resourcemanager.scheduler.addressResourceManager8031TCPyarn.resourcemanager.resource-tracker.addressResourceManager8033TCPyarn.resourcemanager.admin.addressResourceManager8088TCPyarn.resourcemanager.webapp.addressNodeManager8040TCPyarn.nodemanager.localizer.addressNodeManager8042TCPyarn.nodemanager.webapp.addressNodeManager8041TCPyarn.nodemanager.addressMapReduce JobHistory Server10020TCPmapreduce.jobhistory.addressMapReduce JobHistory Server19888TCPmapreduce.jobhistory.webapp.address HBase

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentMaster60000TCPExternalhbase.master.portIPCMaster60010TCPExternalhbase.master.info.portHTTPRegionServer60020TCPExternalhbase.regionserver.portIPCRegionServer60030TCPExternalhbase.regionserver.info.portHTTPHQuorumPeer2181TCPhbase.zookeeper.property.clientPortHBase-managed ZK modeHQuorumPeer2888TCPhbase.zookeeper.peerportHBase-managed ZK modeHQuorumPeer3888TCPhbase.zookeeper.leaderportHBase-managed ZK modeRESTREST Service8080TCPExternalhbase.rest.portThriftServerThrift Server9090TCPExternalPass -p on CLIAvro server9090TCPExternalPass –port on CLI Hive

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentMetastore9083TCPExternalHiveServer10000TCPExternal Sqoop

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentMetastore16000TCPExternalsqoop.metastore.server.portSqoop 2 server12000TCPExternal Zookeeper

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentServer (with CDH4 and/or Cloudera Manager 4)2181TCPExternalclientPortClient portServer (with CDH4 only)2888TCPInternalX in server.N=host:X:YPeerServer (with CDH4 only)3888TCPInternalY in server.N=host:X:YPeerServer (with CDH4 and Cloudera Manager 4)3181TCPInternalX in server.N=host:X:YPeerServer (with CDH4 and Cloudera Manager 4)4181TCPInternalY in server.N=host:X:YPeerZooKeeper FailoverController (ZKFC)8019TCPInternalUsed for HAZooKeeper JMX port9010TCPInternal

As JMX port, ZooKeeper will also use another randomly selected port for RMI. In order for Cloudera Manager to monitor ZooKeeper, you must open up all ports when the connection originates from the Cloudera Manager server. Hue

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentServer8888TCPExternalBeeswax Server8002InternalBeeswax Metastore8003Internal Ozzie

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentOozie Server11000TCPExternalOOZIE_HTTP_PORT in oozie-env.shHTTPOozie Server11001TCPlocalhostOOZIE_ADMIN_PORT in oozie-env.shShutdown port Ganglia

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentganglia-gmond8649UDP/TCPInternalganglia-web80TCPExternalVia Apache httpd Kerberos

ServiceQualifierPortProtocolAccess RequirementConfigurationCommentKRB5 KDC ServerSecure88UDP/TCPExternalkdc_ports and kdc_tcp_ports in either the [kdcdefaults] or [realms] sections of kdc.confBy default only UDPKRB5 Admin ServerSecure749TCPInternalkadmind_port in the [realms] section of kdc.conf

1.2 观点

1.2.1 Hadoop即将过时了吗?



  • 增量索引过滤器(Percolator for incremental indexing)和频繁变化数据集分析。Hadoop是一台大型“机器”,当启动并全速运转时处理数据的性能惊人,你唯一需要操心的就是硬盘的传输速度跟不上。但是每次你准备启动分析数据时,都需要把所有的数据都过一遍,当数据集越来越庞大时,这个问题将导致分析时间无限延长。那么Google是如何解决让搜索结果返回速度越来越接近实时的呢?答案是用增量处理引擎Percolator代替GMR。通过只处理新增的、改动过的或删除的文档和使用二级指数来高效率建目录,返回查询结果。Percolator论文的作者写道:“将索引系统转换成增量系统…将文档处理延迟缩短了100倍。”这意味着索引web新内容的速度比用MapReduce快100倍!类似大型强子对撞机产生的数据将不断变大,Twitter也是如此。这也是为什么HBase中会新增触发流程,而Twitter Storm正在成为实时处理流数据的热门技术。
  • 用于点对点分析的Dremel。Google和Hadoop生态系统都致力于让MapReduce成为可用的点对点分析工具。从Sawzall到Pig和Hive,创建了大量的界面层,但是尽管这让Hadoop看上去更像SQL系统,但是人们忘记了一个基本事实——MapReduce(以及Hadoop)是为组织数据处理任务开发的系统,诞生于工作流内核,而不是点对点分析。今天有大量的BI/分析查询都是点对点模式,属于互动和低延迟的分析。Hadoop的Map和Reduce工作流让很多分析师望而却步,而且工作启动和完成工作流运行的漫长周期对于很多互动性分析来说意味着糟糕的用户体验。于是,Google发明了Dremel(业界也称之为BigQuery产品)专用工具,可以让分析师数秒钟内就扫描成PB(Petabyte)的数据完成点到点查询,而且还能支持可视化。Google在Dremel的论文中声称:“Dremel能够在数秒内完成数万亿行数据的聚合查询,比MapReduce快上100倍!”
  • 分析图数据的Pregel。Google MapReduce的设计初衷是分析世界上最大的数据图谱——互联网。但是在分析人际网络、电信设备、文档和其他一些图数据时就没有那么灵光了,例如MapReduce在计算单源最短路径(SSSP)时效率非常低下,已有的并行图算法库Parallel BGL或者CGMgraph又没有容错。于是Google开发了Pregel,一个可以在分布式通用服务器上处理PB级别图数据的大型同步处理应用。与Hadoop经常在处理图数据时产生指数级数据放大相比,Pregel能够自然高效地处理SSSP或PageRank等图算法,所用时间要短得多,代码也简洁得多。目前唯一能与Pregel媲美的开源选择是Giraph,这是一个早期的Apache孵化项目,调用了HDFS和Zookeeper。Githb上还有一个项目Golden Orb可用。

1.2.2 Best Practices for Selecting Apache Hadoop Hardware


RAID cards, redundant power supplies and other per-component reliability features are not needed. Buy error-correcting RAM and SATA drives with good MTBF numbers. Good RAM allows you to trust the quality of your computations. Hard drives are the largest source of failures, so buy decent ones.(不需要选购RAID,冗余电源或者是一些满足高可靠性组件,但是选择带有ECC的RAM以及good MTBF的SATA硬盘却是非常需要的。ECC RAM可以让你确保计算结果的正确性,而SATA故障是大部分故障的主要原因)

  • On CPU: It helps to understand your workload, but for most systems I recommend sticking with medium clock speeds and no more than 2 sockets. Both your upfront costs and power costs rise quickly on the high-end. For many workloads, the extra performance per node is not cost-effective.(没有特别要求,普通频率,dual-socket???)
  • On Power: Power is a major concern when designing Hadoop clusters. It is worth understanding how much power the systems you are buying use and not buying the biggest and fastest nodes on the market.In years past we saw huge savings in pricing and significant power savings by avoiding the fastest CPUs, not buying redundant power supplies, etc. Nowadays, vendors are building machines for cloud data centers that are designed to reduce cost and power and that exclude a lot of the niceties that bulk up traditional servers. Spermicro, Dell and HP all have such product lines for cloud providers, so if you are buying in large volume, it is worth looking for stripped-down cloud servers. (根据自己的需要尽量减少能耗开销,撇去一些不需要的部件。而且现在很多厂商也在尽量减少不必要的部件)
  • On RAM: What you need to consider is the amount of RAM needed to keep the processors busy and where the knee in the cost curve resides. Right now 48GB seems like a pretty good number. You can get this much RAM at commodity prices on low-end server motherboards. This is enough to provide the Hadoop framework with lots of RAM (~4 GB) and still have plenty to run many processes. Don’t worry too much about RAM, you’ll find a use for it, often running more processes in parallel. If you don’t, the system will still use it to good effect, caching disk data and improving performance.(RAM方面的话越大越好,对于48GB的RAM来说普通的主板也是支持的。如果RAM用的上的话那么允许多个进程并行执行,如果暂时永不上的话可以做cache来提高速度)
  • On Disk: Look to buy high-capacity SATA drives, usually 7200RPM. Hadoop is storage hungry and seek efficient but it does not require fast, expensive hard drives. Keep in mind that with 12-drive systems you are generally getting 24 or 36 TB/node. Until recently, putting this much storage in a node was not practical because, in large clusters, disk failures are a regular occurrence and replicating 24+TB could swamp the network for long enough to really disrupt work and cause jobs to miss SLAs. The most recent release of Hadoop 0.20.204 is engineered to handle the failure of drives more elegantly, allowing machines to continue serving from their remaining drives. With these changes, we expect to see a lot of 12+ drive systems. In general, add disks for storage and not seeks. If your workload does not require huge amounts of storage, dropping disk count to 6 or 4 per box is a reasonable way to economize.(高容量SATA硬盘,最好是7.2KRPM,并且最好单机上面挂在12个硬盘。对于hadoop之前这种方式并不实际,因为磁盘非常容易损坏并且备份这24TB的数据非常耗时。而hadoop可以很好地解决这个问题。


  • On Network: This is the hardest variable to nail down. Hadoop workloads vary a lot. The key is to buy enough network capacity to allow all nodes in your cluster to communicate with each other at reasonable speeds and for reasonable cost. For smaller clusters, I’d recommend at least 1GB all-to-all bandwidth, which is easily achieved by just connecting all of your nodes to a good switch. With larger clusters this is still a good target although based on workload you can probably go lower. In the very large data centers the Yahoo! built, they are seeing 2/10GB per 20 node rack going up to a pair of central switches, with rack nodes connected with two 1GB links. As a rule of thumb, watch the ratio of network-to-computer cost and aim for network cost being somewhere around 20% of your total cost. Network costs should include your complete network, core switches, rack switches, any network cards needed, etc. We’ve been seeing InfiniBand and 10GB Ethernet networks to the node now. If you can build this cost effectively, that’s great. However, keep in mind that Hadoop grew up with commodity Ethernet, so understand your workload requirements before spending too much on the network.(这个主要还是看需求。通常来说网络整体开销占据所有开销的20%,包括核心交换机,机架之间的交换机以及网卡设备等。yahoo大集群的部署方式是rack之间使用2/10GB的核心交换机工作,而20个节点的rack之间内部使用1GB链路)。

    1.2.3 The dark side of Hadoop - BackType Technology



  • Critical configuration poorly documented 一些关键的参数和配置并没有很好地说明清楚。
  • Terrible with memory usage 内存使用上面存在问题。hadoop里面有一些非常sloppy的实现,比如chmod以及ln -s等操作,并没有调用fs API而是直接创建一个shell进程来完成。因为fork出一个shell进程需要申请同样大小的内存(虽然实现上是COW),但是这样造成jvm出现oom。解决的办法是开辟一定空间的swap The solution to these memory problems is to allocate a healthy amount of swap space for each machine to protect you from these memory glitches. We couldn't believe how much more stable everything became when we added swap space to our worker machines.

  • Thomas Jungblut's Blog: Dealing with "OutOfMemoryError" in Hadoop http://codingwiththomas.blogspot.jp/2011/07/dealing-with-outofmemoryerror-in-hadoop.html 作者给出的解决办法就是修改hadoop的代码,通过调用Java API而不是使用ProcessBuilder来解决。

  • NOTE(dirlt):出现OOM的话必须区分JVM还是Linux System本身的OOM。JVM出现OOM是抛出异常,而Linux出现OOM是会触发OOM killer
  • Zombies hadoop集群出现一些zombie进程,而这些进程会一直持有内存直到大量zombie进程存在最后需要重启。造成这些zombie进程的原因通常是因为jvm oom(增加了swap之后就没有出现这个问题了),但是奇怪的是tasktracker作为这些process的parent,并不负责cleanup这些zombie进程而是依赖这些zombie进程的自己退出,这就是hadoop设计方面的问题。

Making Hadoop easy to deploy, use, and operate should be the /#1 priority for the developers of Hadoop.

1.3 使用问题

1.3.1 CDH3u3搭建单节点集群

搭建单节点集群允许我们在单机做一些模拟或者是测试,还是非常有意义的。如何操作的话可以参考链接 http://localhost/utils/hadoop-0.20.2-cdh3u3/docs/single_node_setup.html


  • 首先安装ssh和rsync /# sudo apt-get install ssh && sudo apt-get install rsync
  • 本机建立好信任关系 /# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
  • 将{hadoop-package}/conf配置文件修改如下:
  • conf/core-site.xml

    fs.default.name hdfs://localhost:9000

  • conf/hdfs-site.xml

    dfs.replication 1

  • conf/mapred-site.xml

    mapred.job.tracker localhost:9001

1.3.2 CDH4.2.0搭建单节点集群


  • 配置文件

  • 配置文件在etc/hadoop,包括环境配置脚本比如hadoop-env.sh

  • bin/sbin目录下面有hadoop集群启动停止工具 NOTE(dirlt):不要使用它们
  • libexec目录下面是公用的配置脚本
  • mapred-site.xml中jobtracker地址配置key修改为 mapred.jobtracker.address NOTE(dirlt):this for yarn.如果是mr1那么不用修改,依然是mapred.job.tracker
  • hadoop-daemons.sh会使用/sbin/slaves.sh来在各个节点启动,但是 /不知道什么原因,很多环境变量没有设置/ ,所以在slaves.sh执行ssh命令部分最开始增加了 source ~/.shrc; 来强制设置我的环境变量
  • NOTE(dirlt):不要使用shell脚本来启动,而是直接使用类似hadoop namenode这种方式来启动单个机器上的实例
  • 公共组件

  • CDH4.2.0 native-library都放在了目录lib/native下面,而不是CDH3u3的lib/native/Linux-amd64-64下面,这点需要注意。

  • CDH4.2.0 没有自带libhadoop.so, 所以启动的时候都会出现 ”Unable to load native-hadoop library for your platform… using builtin-java classes where applicable“ 这个警告。需要自己编译放到lib/native目录下面。
  • CDH4.2.0 lib下面没有任何文件,所有的lib都在share/hadoop//*/lib下面,比如share/hadoop/common/lib. 这点和CDH3有差别,CDH3所有的jar都放在lib目录下面。使用 hadoop classpath 命令可以察看
  • 环境变量

  • JAVA_LIBRARY_PATH用来设置native library path

  • HADOOP_CLASSPATH可以用来设置hadoop相关的classpath(比如使用hadoop-lzo等)
  • 准备工作

  • 使用hdfs namenode -format来做格式化 注意如果使用sudo apt-get来安装的话,是其他用户比如hdfs,impala,mapred,yarn来启动的,所以必须确保目录对于这些用户是可写的

  • 使用命令 hadoop org/apache/hadoop/examples/QuasiMonteCarlo 1 1 确定集群是否可以正常运行。

    1.3.3 CDH4.3.0


  • 在libexec/hadoop-config.sh添加source ~/.shrc 来强制设置环境变量。
  • mr1和mr2分开存放主要有

  • etc目录,hadoop and hadoop-mapreduce1

  • bin目录,bin and bin-mapreduce1
  • lib目录。如果需要使用mr1的话,那么将cp -r share/hadoop/mapreduce1/ .

  • NOTE(dirlt):似乎只需要最顶层的一些jar文件即可

  • 在bin/hadoop-config.sh添加source ~/.shrc 来强制设置环境变量。
  • NOTE(dirlt):不要使用start-dfs.sh这些脚本启动,似乎这些脚本会去读取master,slaves这些文件然后逐个上去ssh启动。直接使用hadoop namenode这种方式可以只启动单个机器上的实例

1.3.4 Configuration .bash_profile

export HADOOP_HOME=$HOME/dirlt/hadoop-2.0.0-cdh4.3.0/

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export HBASE_HOME=/home/alium_zhanyinan/dirlt/hbase-0.94.6-cdh4.3.0

export HBASE_CLASSPATH=$HBASE_HOME/hbase-0.94.6-cdh4.3.0-security.jar:$HBASE_HOME/conf export ZK_HOME=/home/alium_zhanyinan/dirlt/zookeeper-3.4.5-cdh4.3.0


export JAVA_HOME=/usr/java/default/ core-site.xml

fs.default.name hdfs://umengds1.mob.cm3:8020 fs.trash.interval 1440 hdfs-site.xml

dfs.name.dir /disk1/data/dfs/nn dfs.data.dir /disk1/data/dfs/dn fs.checkpoint.dir /disk1/data/dfs/snn dfs.replication 3 dfs.block.size 134217728 dfs.datanode.max.xcievers 8192 dfs.datanode.du.reserved 21474836480 dfs.namenode.handler.count 64 dfs.datanode.handler.count 32 dfs.client.read.shortcircuit true mapred-site.xml

mapred.job.tracker umengds2.mob.cm3:8021 mapred.system.dir /tmp/mapred/system mapreduce.jobtracker.staging.root.dir /user mapred.local.dir /disk1/data/mapred/local mapred.submit.replication 3 true mapred.tasktracker.map.tasks.maximum 6 mapred.tasktracker.reduce.tasks.maximum 8 mapred.child.java.opts -Xmx2048M -XX:-UseGCOverheadLimit mapred.job.tracker.handler.count 64 io.sort.mb 256 io.sort.factor 64 hadoop-env.sh

/# The maximum amount of heap to use, in MB. Default is 1000.


/# Extra Java runtime options. Empty by default. /# if ["$HADOOP_OPTS" == "" ]; then export HADOOP_OPTS=-server; else HADOOP_OPTS+=" -server"

; fi


hbase.cluster.distributed true hbase.rootdir hdfs://umengds1.mob.cm3:8020/hbase hbase.zookeeper.quorum umengds1.mob.cm3,umengds2.mob.cm3 hbase.hregion.memstore.mslab.enabled true hbase.regionserver.handler.count 128 hbase.client.write.buffer 4194304 hbase.hregion.memstore.block.multiplier 8 hbase.server.thread.wakefrequency 1000 hbase.regionserver.lease.period 600000 hbase.hstore.blockingStoreFiles 15 hbase.hregion.max.filesize 2147483648 hbase.ipc.client.tcpnodelay true ipc.ping.interval 10000 hbase.hregion.majorcompaction 0 hbase.regionserver.checksum.verify true hbase-env.sh

/# The maximum amount of heap to use, in MB. Default is 1000.

export HBASE_HEAPSIZE=14000

/# Extra Java runtime options. /# Below are what we set by default. May only work with SUN JVM.

/# For more on why as well as other possible settings, /# see http://wiki.apache.org/hadoop/PerformanceTuning

/# export HBASE_OPTS= "-ea -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode"export HBASE_OPTS="-ea -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=90"

1.4 Hadoop权威指南

1.4.1 初识Hadoop


1.4.2 关于MapReduce

  • 设置HADOOP_CLASSPATH就可以直接使用hadoop CLASSNAME来在本地运行mapreduce程序。
  • hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar 可以用来启动streaming任务

  • 使用stdin/stdout来作为输入和输出

  • NOTE(dirlt):倒是可以探索一下如何使用,但是觉得能力有限

  • Input/Output Format

  • 外围环境的访问比如访问hdfs以及hbase
  • 程序打包。比如使用很多第三方库的话在其他机器上面没有部署。
  • hadoop pipes 可以用来启动pipes任务

  • Hadoop的Pipes是Hadoop MapReduce的C++接口代称

  • 使用Unix Domain Socket来作为输入和输出
  • NOTE(dirlt):可能使用上面还是没有native mr或者是streaming方式方便

    1.4.3 Hadoop分布式文件系统

  • 使用hadoop archive能够将大量小文档打包,存档文件之能够只读访问

  • 使用hadoop archive -archiveName .har -p src dst

  • 存档过程使用mapreduce完成,输出结果为目录

  • part-0 表示存档内容文件,应该是使用一个reduce做聚合。

  • _index,_masterindex 是对存档内容文件的索引文件。
  • har(hadoop archive)文件系统是建立在其他文件系统上面的,比如hdfs或者是local fs.

  • hadoop fs -ls har:///file.har 那么访问的是默认的文件系统上面的file.har

  • 如果想显示地访问hdfs文件系统的话,那么可以hadoop fs -ls har://hdfs-localhost:9000/file.har
  • 如果想显示地访问本地文件系统的话,那么可以使用hadoop fs -ls har://file-localhost/file.har
  • hadoop fs -ls har://schema-/ 是通用的访问方式

1.4.4 Hadoop IO

  • 文件系统

  • ChecksumFileSystem

  • 使用decorator设计模式,底层filesystem称为RawFileSystem

  • 对于每个文件filename都会创建.filename.crc文件存储校验和
  • 计算crc的单位大小通过io.bytes.per.checksum来进行控制
  • 读取文件如果出现错误的话,那么会抛出ChecksumException
  • 考虑到存在多副本的情况,如果读取某个副本出错的话,期间那么会调用reportChecksumFailure方法

  • NOTE(dirlt):这个部分的代码不太好读,非常绕

  • RawLocalFileSystem

  • 本地文件系统

  • LocalFileSystem

  • RawLocalFileSystem + ChecksumFileSystem

  • reportChecksumFailure实现为将校验和存在问题的文件移动到bad_files边际文件夹(side directory)
  • DistributedFileSystem

  • 分布式文件系统

  • ChecksumDistributedFileSystem

  • DistributedFileSystem + ChecksumFileSystem

  • 压缩解压

  • DEFLATE org.apache.hadoop.io.compress.DefaultCodec 扩展名.defalte

  • Gzip org.apache.hadoop.io.compress.GzipCodec 扩展名.gz 使用DEFLATE算法但是增加了额外的文件头。
  • bzip2 org.apache.hadoop.io.compress.BZip2Codec 扩展名.bz2 自身支持文件切分,内置同步点。
  • LZO com.hadoop.compression.lzo.LzopCodec 扩展名.lzo 和lzop工具兼容,LZO算法增加了额外的文件头。

  • LzopCodec则是纯lzo格式的codec,使用.lzo_deflate作为文件扩展名

  • 因为LZO代码库拥有GPL许可,因此没有办法包含在Apache的发行版本里面。
  • 运行MapReduce时候可能需要针对不同压缩文件解压读取,就需要构造CompressionCodec对象,我们可以通过CompressionCodecFactory来构造这个对象

  • CompressionCodecFactory读取变量io.compression.codecs

  • 然后根据输入文件的扩展名来选择使用何种codec.
  • getDefaultExtension
  • 压缩和解压算法可能同时存在Java实现和原生实现

  • 如果是原生实现的话通常是.so,那么需要设置java.library.path或者是在环境变量里面设置LD_LIBRARY_PATH

  • 如果同时有原生实现和Java实现,我们想只是使用原生实现的话,那么可以设置hadoop.native.lib = false来禁用原生实现。
  • 压缩算法涉及到对应的InputFormat,也就涉及到是否支持切分

  • 对于一些不支持切分的文件,可能存在一些外部工具来建立索引,从而支持切分。

  • 下面这些选项可以针对map结果以及mapreduce结果进行压缩

  • mapred.output.compress = true 将mapreduce结果做压缩

  • mapred.output.compression.codec mapreduce压缩格式
  • mapred.output.compress.type = BLOCK/RECORD 如果输出格式为SequenceFile的话,那么这个参数可以控制是块压缩还是记录压缩
  • NOTE(dirlt):我现在强烈感觉MR的中间结果存储格式为SequenceFile
  • NOTE(dirlt):应该是IFile,但是是否共享了这个配置呢?
  • mapred.compress.map.output = true 将map结果做压缩
  • mapred.map.output.compression.codec map压缩格式

  • 序列化

  • Hadoop的序列化都是基于Writable实现的,WritableComparable则是同时继承Writable,Comparable.

  • 序列化对象需要实现RawComparator,接口为public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)进行二进制比较。

  • WritableComparator简化了这个实现,继承WritableComparator就实现了这个接口

  • 但是这个接口实现起来非常naive,就是将两个byte stream反序列化然后调用对象的compareTo实现
  • 如果想要提高效率的话,可以考虑通过直接比较两个byte stream来做优化。
  • 基于文件的数据结构

  • SequenceFile 主要用来存储KV数据结构,多条记录之间会穿插一些同步标记,因此允许进行切分。

  • 使用SequenceFileInputFormat和SequenceFileOutputFormat来读取和输出SequenceFile

  • hadoop fs -text 可以用来读取文件
  • mapred.output.compress.type = BLOCK/RECORD 可以用来控制压缩方式

  • 如果没有使用压缩的话,那么格式为 recordLength(4byte) + keyLength(4byte) + key + value

  • 如果使用记录压缩的话,那么格式为 recordLnegth(4byte) + keyLength(4byte) + key + compressedValue
  • 如果使用块压缩的话,那么格式为 numberRecord(1-5byte) + keyLength(4byte) + compressedKeys + valueLength(4byte) + compressedValues.每个block之间会插入sync标记
  • 块压缩大小可以使用io.seqfile.compress.blocksize来控制,默认1MB
  • MapFile 也是用来存储KV数据结构,但是可以认为已经按照了Key进行排序 NOTE(dirlt):要求添加顺序就按照Key排序

  • 存储格式实际上也是SequenceFile,data,index都是。

  • 底层会建立index,index在搜索的时候会加载到内存里面,这样可以减少data上的随机查询次数。
  • 使用io.map.index.interval可以控制多少个item在index里面创建一个条目
  • 使用io.map.index.skip = 0/1/2/n 可以控制skip几个index的item,如果为1的话那么表示只是使用1/2的索引。
  • 从SequenceFile创建MapFile非常简单

  • 首先使用sort将SequenceFile进行排序(可以使用hadoop example的sort)

  • 然后调用hadoop MapFileFixer来建立索引

    1.4.5 MapReduce应用开发

  • Configuration用来读取配置文件,功能还是比较强大的,有变量替换的功能

  • 如果使用true标记的话那么这个变量不允许被重置
  • 变量替换可以使用${variable}
  • 通过addResource来添加读取的配置文件

  • Hadoop集群有三种工作方式,分别为

  • standalone 使用单个JVM进程来模拟

  • 如果不进行任何配置的话默认使用这个模式 NOTE(dirlt):这个模式确实不错

  • fs.default.name = file 本地文件系统
  • mapred.job.tracker = local
  • pseudo-distributed 本地启动单节点集群

  • fs.default.name = hdfs://localhost

  • mapred.job.tracker = localhost:8021
  • fully-distributed 完全分布式环境

  • fs.default.name = hdfs://

  • mapred.job.tracer = :8021

  • 使用hadoop启动MapReduce任务的常用参数

  1. -D property=value 覆盖默认配置属性
  2. -conf filename 添加配置文件
  3. -fs uri 设置默认文件系统
  4. -jt host:port 设置jobtracker
  5. -files file,file2 这些文件可以在tasktracker工作目录下面访问
  6. -archives archive,archive2 和files类似,但是是存档文件
  • 突然觉得这个差别在files只能是平级结构,而archive可以是层级结构。
  • -libjars jar1,jar2 和files类似,通常这些JAR文件是MapReduce所需要的。


  • MiniDFSCluster
  • MiniMRCluster
  • MiniHBaseCluster
  • MiniZooKeeperClutser
  • NOTE(dirlt):都称为Mini???Cluster?

另外还有自带的ClusterMapReduceTestCase以及HBaseTestingUtility来帮助进行mapreduce的testcase. 这些类散步在hadoop,hbase,hadoop-test以及hbase-test里面。


job,task and attempt

  • jobID常见格式为 job_200904110811_0002

  • 其中200904110811表示jobtracker从2009.04.11的08:11启动的

  • 0002 表示第三个job,从0000开始计数。超过10000的话就不能够很好地排序
  • taskID常见格式为 task_200904110811_0002_m_000003

  • 前面一串数字和jobID匹配,表示从属于这个job

  • m表示map任务,r表示reduce任务
  • 000003表示这是第4个map任务。顺序是在初始化时候指定的,并不反应具体的执行顺序。
  • attemptID常见格式为 attempt_200904110811_0002_m_000003_0

  • 前面一串数字和taskID匹配,表示从属与这个task

  • attempt出现的原因是因为一个task可能会因为失败重启或者是预测执行而执行多次
  • 如果jobtracker重启而导致作业重启的话,那么做后面id从1000开始避免和原来的attempt冲突。


  • 相关配置

  • mapred.jobtracker.completeuserjobs.maximum 表示web页面下面展示completed jobs的个数,默认是100,超过的部分放到历史信息页。

  • mapred.jobtracker.restart.recover = true jobtracker重启之后自动恢复作业
  • hadoop.job.history.location 历史作业信息存放位置,超过30天删除,默认在_logs/history
  • hadoop.job.history.user.location 如果不为none那么历史作业信息在这里也会存在一份,不会删除。
  • 相关命令

  • hadoop fs -getmerge 能够将hdfs的src下面所有的文件merge合并成为一份文件并且copy到本地

  • hadoop job -history 察看作业历史
  • hadoop job -counter 察看作业计数器
  • 相关日志

  • 系统守护进程日志 写入HADOOP_LOG_DIR里面,可以用来监控namenode以及datanode的运行情况

  • MapReduce作业历史日志 _logs/history
  • MapReduce任务日志 写入HADOOP_LOG_DIR/userlogs里面,可以用来监控每个job的运行情况
  • 分析任务

  • JobConf允许设置profile参数 NOTE(dirlt):新的接口里面JobConf->JobContext->Job,Job没有这些接口,但是可以通过Configuration来设置

  • setProfileEnabled 打开profile功能,默认false,属性 mapred.task.profile

  • setProfileParams 设置profile参数

  • 属性 mapred.task.profile.params

  • 默认使用hprof -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
  • 其中%s会替换成为profile输出文件
  • NOTE(dirlt):其实这里似乎也可以设置成为jmxremote来通过jvisualvm来调试
  • setProfileTaskRange(boolean,String)

  • 参数1表示针对map还是reduce task做profile, true表示map, false表示reduce

  • 参数2表示针对哪些tasks做优化,"0-2"表示针对0,1,2三个任务,默认也是"0-2"
  • map task对应属性mapred.task.profile.maps,reduce task对应属性mapred.task.profile.reduces
  • 任务重现

  • 首先将keep.failed.task.files设置为true,这样如果任务失败的话,那么这个任务的输入和输出都会保留下来

  • 如果是map任务的话,那么输入分别会在本地保留

  • 如果是reduce任务的话,那么对应的map任务输出会在本地保留
  • 然后我们使用hadoop IsolationRunner job.xml来重新运行这个任务
  • 可以修改HADOOP_OPTS添加远程调试选项来启动这个任务。
  • 如果希望任务都保留而不仅仅是失败任务保留的话,那么可以设置 keep.task.files.pattern 为正则表达式(与保留的任务ID匹配)

1.4.6 MapReduce的工作机制




  • 计算分片信息是在本地完成的,分片信息和其他resouce(包括jars,files,archives等)一起copy到HDFS上面,然后jobtracker直接读取分片信息。
  • 提交的资源可以设置replication数目,高副本数目可以缓解tasktracker获取resource的压力。参数是mapred.submit.replication.
  • 对于streaming以及pipes的实现,无非就是task并不直接执行任务,而是开辟另外一个子进程来运行streaming或者是pipes的程序。



  • map任务进度是已经处理输入的比例
  • reduce任务进度分为三个部分

  • shuffle 1/3

  • sort 1/3
  • reduce 1/3
  • 也就是说如果刚运行完成sort的话,那么进度是2/3
  • 状态的更新

  • 触发事件

  • 读取记录

  • 输出记录
  • 修改状态 reporter的setStatus
  • 计数器修改
  • reporter的progress
  • 子进程有单独线程每隔3秒检查progress位是否设置,如果设置的话那么和tasktracker发起心跳

  • 通过mapred.task.timeout控制

  • tasktracker每隔5秒和jobtracker做心跳

  • 心跳时间通过 mapred.tasktracker.expircy.interval 设置

  • jobClient定期会去jobtracker询问job是否完成

  • jobClient也可以设置属性job.end.notification.url,任务完成jobtracker会调用这个url

  • 可以认为就是推拉方式的结合。


  • 任务失败

  • 子进程抛出异常的话,tasktracker将异常信息记录到日志文件然后标记失败

  • 对于streaming任务的话非0退出表示出现问题,也可以使用stream.non.zero.exit.is.failure = false来规避( 这样是否就没有办法判断是否正常退出了?
  • 如果长时间没有响应的话,没有和tasktracker有交互,那么也会认为失败。这个时间使用mapred.task.timeout控制,默认10min
  • 如果任务失败的话,jobtracker会尝试进行多次重试

  • map重试次数通过 mapred.map.max.attempts 配置

  • reduce重试次数通过 mapre.reduce.max.attempts 配置
  • 任何任务重试超过4次的话那么会认为整个job失败
  • 另外需要区分KILLED状态和FAILED状态,对于KILLED状态可能是因为推测执行造成的,不会记录到failed attempts里面
  • 如果我们希望允许少量任务失败的话,那么可以配置

  • mapred.max.map.failures.percent 允许map失败的最大比率

  • mapred.max.reduce.failures.percent 允许reduce失败的最大比率
  • 如果一个job超过一定的task在某个tt上面运行失败的话,那么就会将这个tt加入到这个job的blacklist. mapred.max.tracker.failures = 4
  • 如果job成功的话,检查运行task失败的tt并且标记,如果超过一定阈值的话,那么会将tt加入到全局的blacklist. mapred.max.tracker.blacklists = 4


  • fifo scheduler

  • 可以通过mapred.job.priority或者是setJobPriority设置

  • 当队列中有空闲的槽位需要执行任务时,从等待队列中选择优先级最高的作业
  • fair scheduler
  • capacity scheduler




有下面这些参数控制shuffle和sort的过程 NOTE(dirlt):书上倒是有很多参数,但是好多还是不太理解

  • io.sort.mb map输出缓存空间大小,默认是100MB. 建议设置10/* io.sort.factor.
  • io.sort.spill.percent 如果map输出超过了缓存空间大小的这个阈值的话,那么就会spill,默认是0.8

  • 每次spill之前先会对这个文件进行排序,如果有combiner的话那么会在上面调用combiner

  • 写磁盘是按照轮询的方式写到mapred.local.dir属性指定的目录下面
  • 如果spill速度太慢的话,那么往缓存空间写入进程就会阻塞,直到spill腾出空间。
  • io.sort.factor 多路归并的数量,默认是10. 建议设置在25-32.

  • 在map阶段,因为最终会存在多个spill文件,所以需要做多路归并。 TODO(dirlt):如果归并数量少的话是否可能会多次merge?

  • 在reduce阶段的话,因为可能存在多路map输出的结果,所以需要做多路归并。
  • min.num.spill.for.combine 如果指定combiner并且spill次数超过这个值的话就会调用combine,默认为3
  • tasktracker.http.threads reduce通过HTTP接口来发起数据请求,这个就是HTTP接口相应线程数目,默认为40。 mapper as server
  • mapred.reduce.parallel.copies reduce启动多少个线程去请求map输出,默认为5。 reducer as client

  • NOTE(dirlt):如果reduce和每个map都使用一个线程去请求输出结果的话,只要shuffle阶段没有出现network congestion,那么提高线程数量是有效果的

  • NOTE(dirlt):可以设置到15-50
  • mapred.reduce.copy.backoff = 300(s) reduce下载线程最大等待时间
  • mapred.job.shuffle.input.buffer.percent = 0.7 用来缓存shuffle数据的reduce task heap百分比
  • mapred.job.shuffle.merge.percent = 0.66 缓存的内存中多少百分比后开始做merge操作
  • mapred.job.reduce.input.buffer.percent = 0.0 sort完成后reduce计算阶段用来缓存数据的百分比. 默认来说不会使用任何内存来缓存,因此完全从磁盘上进行读取。


  • 推测执行参数

  • 如果某个任务执行缓慢的话会执行另外一个备份任务

  • mapred.map.tasks.speculative.execution true
  • mapred.reduce.tasks.speculative.execution true
  • JVM重用

  • 一个JVM实例可以用来执行多个task.

  • mapred.job.reuse.jvm.num.tasks/setNumTasksToExecutePerJvm 单个JVM运行任务的最大数目
  • -1表示没有限制
  • 任务执行环境

  • 程序自身可以知道执行环境对于开发还是比较有帮助的

  • 这些属性对于streaming可以通过环境变量获得

  • 对于streaming来说.替换成为_

  • mapred.job.id string jobID
  • mapred.tip.id string taskID
  • mapred.task.id string attemptID
  • mapred.task.partition int 作业中任务编号
  • mapred.task.is.map boolean 是否为map
  • mapred.work.output.dir / FileOutputFormat.getWorkOutputPath 当前工作目录
  • 杂项 NOTE(dirlt):from misc articles

  • mapred.job.map.capacity /# 最大同时运行map数量

  • mapred.job.reduce.capacity /# 最大同时运行reduce数量
  • mapred.job.queue.name /# 选择执行queue

    1.4.7 MapReduce的类型与格式


老API里面还有MapRunner这个类,这个类主要的作用是可以用来控制Mapper运行的方法,比如可以多线程来控制Mapper的运行。 但是在新API里面已经完全集成到Mapper实现里面来了,用户可以重写两个方法来完全控制mapper的运行

  • map 如何处理kv
  • run 如何从context里面读取kv protected void map(KEYIN key, VALUEIN value,

                 Context context) throws IOException, InterruptedException {

    context.write((KEYOUT) key, (VALUEOUT) value);

} public void run(Context context) throws IOException, InterruptedException {

setup(context); while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);


cleanup(context); }


  • mapred.input.format.class setInputFormat
  • mapred.mapoutput.key.class setMapOutputKeyClass
  • mapred.mapoutput.value.class setMapOutputValueClass
  • mapred.output.key.class setOutputKeyClass
  • mapred.output.value.class setOutputValueClass
  • mapred.mapper.class setMapperClass
  • mapred.map.runner.class setMapRunnerClass
  • mapred.combiner.class setCombinerClass
  • mapred.partitioner.class setPartitionerClass
  • mapred.output.key.comparator.class setOutputKeyComparatorClass
  • mapred.output.value.groupfn.class setOutputValueGroupingComparator
  • mapred.reducer.class setReducerClass
  • mapred.output.format.class setOutputFormat



  • 根据job描述来对输入进行切片(InputSplit)
  • 根据切片信息来读取记录(RecordReader) public abstract class InputFormat {

    public abstract List getSplits(JobContext context

                             ) throws IOException, InterruptedException;

public abstract RecordReader createRecordReader(InputSplit split,

                                     TaskAttemptContext context
                                    ) throws IOException,



public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException;

public abstract

String[] getLocations() throws IOException, InterruptedException;


public abstract class RecordReader implements Closeable {

public abstract void initialize(InputSplit split, TaskAttemptContext context

                              ) throws IOException, InterruptedException;

public abstract boolean nextKeyValue() throws IOException, InterruptedException;

public abstract

KEYIN getCurrentKey() throws IOException, InterruptedException;

public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

public abstract float getProgress() throws IOException, InterruptedException;

public abstract void close() throws IOException;



  • FileInputFormat

  • addInputPath或者是setInputPaths修改输入路径 mapred.input.dir

  • setInputPathFilter可以修改过滤器 mapred.input.path.Filter.class

  • 基本实现会排除隐藏.或者是_开头文件。

  • 自定义的过滤器是建立在默认过滤器的基础上的。
  • 分片大小由下面三个参数控制

  • mapred.min.split.size 1

  • mapred.max.split.size MAX
  • dfs.block.size 64MB
  • 算法是max(minSplitSize,min(maxSplitSize,blockSize))
  • isSplitable可以控制输入文件是否需要分片
  • CombineFileInputFormat 可以处理多个小文件输入,抽象类需要继承实现。
  • TextInputFormat

  • 输入单位是行,key是LongWritable表示行偏移,value是Text表示行内容

  • KeyValueTextInputFormat

  • 输入单位是行,按照key.value.seperator.in.input.line来进行分隔默认是\t

  • key和value的格式都是Text
  • NLineInputFormat

  • 和TextInputFormat非常类似,大师使用多行输入默认为1行

  • 通过mapred.line.input.format.linespermap来控制行数
  • XML

  • InputFormat使用StreamInputFormat,

  • 设置RecordReader使用stream.recordreader.class来设置
  • RecordReader使用org.apache.hadoop.streaming.StreamXmlRecordReader
  • NOTE(dirlt):也有现成的XmlInputFormat的实现
  • SequenceFileInputFormat
  • SequenceFileAsTextInputFormat

  • 将输入的kv转换成为text对象适合streaming处理方式

  • SequenceFileAsBinaryInputFormat NOTE(dirlt):似乎没有什么用!
  • MultipleInputs
  • DBInputFormat/DBOutputFormat JDBC数据库输入输出
  • TableInputFormat/TableOutputFormat HBase输入输出


  • TextOutputFormat

  • 使用mpared.textoutputformat.seperator来控制kv的分隔,默认是\t

  • 对应的输入格式为KeyValueTextInputFormat
  • 可以使用NullWritable来忽略输出的k或者是v
  • SequenceFileOutputFormat
  • SequenceFileAsBinaryOutpuFormat NOTE(dirlt):似乎没有什么用!
  • MapFileOutputFormat
  • MultipleOutputFormat
  • MultipleOutputs

  • 如果不像生成那写part-r-00000这些空文件的话,那么可以将OutputFormat设置成为NullOutputFormat

  • 但是使用NullOutputFormat的话会没有输出目录,如果想保留目录的话那么可以使用LazyOutputFormat

1.4.8 MapReduce的特性

  • 计数器

  • streaming计数器和可以通过写stderr来提交

  • reporter:counter:,,

  • reporter:status:
  • 连接

  • map端连接

  • 必须确保多路输入文件的reduce数量相同以及键相同。

  • 使用CompositeInputFormat来运行map端连接。
  • NOTE(dirlt);不过我稍微看了一下代码,实现上其实也是针对输入文件对每条记录读取,然后进行join包括inner或者是outer。感觉场景会有限,而且效率不会太高
  • 分布式缓存

  • 使用-files以及-archives来添加缓存文件

  • 也可以使用DistributedAPI来完成之间事情

  • addCacheFile/addCacheArchive

  • 然后在task里面通过configuration的getLocalCacheFiles以及getLocalCacheArchives来获得这些缓存文件
  • 工作原理

  • 缓存文件首先被放到hdfs上面

  • task需要的话那么会尝试下载,之后会对这个缓存文件进行引用计数,如果为0那么删除

  • 这也就意味着缓存文件可能会被多次下载

  • 但是运气好的话多个task在一个node上面的话那么就不用重复下载
  • 缓存文件存放在${mapred.local.dir}/taskTracker/archive下面,但是通过软连接指向工作目录
  • 缓存大小通过local.cache.size来配置
  • MapReduce库类

  • ChainMapper/ChainReducer 能够在一个mapper以及reducer里面运行多次mapper以及reducer

  • ChainMapper 允许在Map阶段,多个mapper组成一个chain,然后连续进行调用

  • ChainReducer 允许在Reuduce阶段,reducer完成之后执行一个mapper chain.
  • 最终达到的效果就是 M+ -> R -> M/* (1个或者是多个mapper, 一个reducer,然后0个或者是多个mapper)
  • TODO(dirlt):这样做倒是可以将各个mapper组合起来用作adapter.

    1.4.9 构建Hadoop集群

  • 很多教程说hadoop集群需要配置ssh,但是配置这个前提是你希望使用start-all.sh这个脚本来启动集群

  • 我现在的公司使用apt-get来安装,使用cssh来登陆到所有的节点上面进行配置,因此没有配置这个信任关系

  • Hadoop配置

  • 配置文件

  • hadoop-env.sh 环境变量脚本

  • core-site.xml core配置,包括hdfs以及mapred的IO配置等
  • hdfs-site.xml hadoop进程配置比如namenode以及datanode以及secondary namenode
  • mapred-site.xml mapred进程配置比如jobtracker以及tasktracker
  • masters 运行namenode(secondary namenode)的机器列表,每行一个, 无需分发到各个节点

  • 在本地启动primary namenode

  • slaves 运行datanode以及tasktracker的机器列表,每行一个 无需分发到各个节点

  • 在本地启动jobtracker

  • hadoop-metrics.properties 对hadoop做监控的配置文件
  • log4j.properties 日志配置文件
  • 这些文件在conf目录下面有,如果想使用不同的文件也可以使用-config来另行指定
  • NOTE(dirlt):所以从上面这个脚本来看,还是具有一定的局限性的
  • hadoop-env.sh

  • HADOOP_HEAPSIZE = 1000MB 守护进程大小

  • HADOOP_LOG_DIR hadoop日志文件,默认是HADOOP_INSTALL/logs
  • core-site.xml

  • io.file.buffer.size IO操作缓冲区大小,默认是4KB 这个需要提高

  • hdfs-site.xml

  • fs.default.name

  • hadoop.tmp.dir hadoop临时目录,默认是在/tmp/hadoop-${user.name}
  • dfs.name.dir namenode数据目录,一系列的目录,namenode内容会同时备份在所有指定的目录中。默认为${hadoop.tmp.dir}/dfs/name
  • dfs.data.dir datanode数据目录,一系列的目录,循环将数据写在各个目录里面。默认是${hadoop.tmp.dir}/dfs/data
  • fs.checkpoint.dir secondarynamenode数据目录,一系列目录,所有目录都会写一份。默认为${hadoop.tmp.dir}/dfs/namesecondary
  • dfs.namenode.handler.count namenode上用来处理请求的线程数目
  • dfs.datanode.ipc.address datanode的RPC接口,主要和namenode交互
  • dfs.datanode.address datanode的data block传输接口,主要和client交互
  • dfs.datanode.http.address datanode的HTTP接口,和user交互
  • dfs.datanode.handler.count datanode上用来处理请求的线程数目
  • dfs.datanode.max.xcievers datanode允许最多同时打开的文件数量
  • dfs.http.address namenode的HTTP接口
  • dfs.secondary.http.address secondard namenode的HTTP接口
  • dfs.datanode.dns.interface default 绑定的NIC,默认是绑定默认的NIC比如eth0
  • dfs.hosts / dfs.hosts.exclude 加入的datanode以及排除的datanode
  • dfs.replication = 3 副本数目
  • dfs.block.size = 64MB
  • dfs.datanode.du.reserved 默认datanode会使用目录所在磁盘所有空间,这个值可以保证有多少空间被reserved的
  • fs.trash.interval 单位分钟,如果不为0的话,那么删除文件会移动到回收站,超过这个单位时间的文件才会完全删除。

  • 回收站位置/home/${user]/.Trash NOTE(dirlt):回收站这个功能只是对fs shell有效。fs shell remove时候会构造Trash这个类来处理删除文件的请求。如果调用Java API的话那么会直接删除文件

  • haddop fs -expunge 强制删除
  • NOTE(dirlt):grep代码发现只有NameNode在TrashEmptier里面构造了Trash这个类,因此这个配置之需要在nn上配置即可,决定多久定期删除垃圾文件
  • fs.trash.checkpoint.interval 单位分钟,namenode多久检查一次文件是否需要删除。

  • NOTE(dirlt):似乎没有这个参数。如果没有这个参数的话,那么两次检查时长应该是由参数fs.trasn.interval来决定

  • mapred-site.xml

  • mapred.job.tracker

  • mapred.local.dir MR中间数据存储,一系列目录,分散写到各个目录下面,默认为${hadoop.tmp.dir}/mapred/local
  • mapred.system.dir MR运行期间存储,比如存放jar或者是缓存文件等。默认${hadoop.tmp.dir}/mapred/system
  • mapred.tasktracker.map.tasks.maximum = 2 单个tasktracker最多多少map任务
  • mapred.tasktracker.reduce.tasks.maximum = 2 单个tasktracker最多多少个reduce任务
  • mapred.tasktracker.dns.interface default 绑定的NIC,默认是绑定默认的NIC比如eth0
  • mapred.child.ulimit 单个tasktracker允许子进程占用的最大内存空间。通常为2-3/* mapred.child.java.opts.
  • mapred.child.java.opts = -Xmx200m 每个子JVM进程200M. NOTE(dirlt):这个是在提交机器上面设置的,而不是每个tasktracker上面设置的,每个job可以不同

  • 不一定支持将map/reduce的jvm参数分开设置 http://hadoop-common.472056.n3.nabble.com/separate-JVM-flags-for-map-and-reduce-tasks-td743351.html

  • NOTE(dirlt):个人折中思路是限制内存大小为1G,然后大内存机器允许同时执行map/reduce数量上限提高,通过增加job的map/reduce数量来提高并发增加性能
  • NOTE(dirlt):我grep了一下cdh3u3的代码,应该是将map/reduce的jvm参数分开进行了设置

  • mapred.map.child.java.opts

  • mapred.reduce.child.java.opts
  • mapred.task.tracker.report.address tasktracker启动子进程通信的端口,0表示使用任意端口
  • mapred.task.tracker.expiry.interval 600(sec) tt和jt之间的心跳间隔
  • mapred.job.tracker.handler.count. jobtracker用来处理请求的线程数目。
  • mapred.job.tracker.http.address jobtracker的HTTP接口
  • mapred.task.tracker.http.address tasktrackder的HTTP接口
  • mapred.hosts / mapred.hosts.exclude 加入的tasktracker以及排除的tasktracker.
  • Hadoop Benchmark NOTE(dirlt):try it out

  • 在hadoop安装目录下面有jar可以来做基准测试

  • TestDFSIO测试HDFS的IO性能
  • Sort测试MapReduce性能
  • MRBench多次运行一个小作业来检验小作业能否快速相应
  • NNBench测试namenode硬件的负载

1.4.10 管理Hadoop

  • 永久性数据结构

  • namenode的目录结构

  • current表示当前的namenode数据(对于辅助节点上这个数据并不是最新的)

  • previous.checkpoint表示secondarynamenode完成checkpoint的数据(和current可能存在一些编辑差距)

  • hadoop dfsadmin -saveNamespace 可以强制创建检查点,仅仅在安全模式下面运行

  • 辅助namenode每隔5分钟会检查

  • 如果超过fs.checkpoint.period = 3600(sec),那么会创建检查点

  • 如果编辑日志大小超过fs.checkpoint.size = 64MB,同样也会创建检查点
  • 除了将文件copy到namenode之外,在辅助节点上面可以使用选项-importCheckpoint来载入
  • VERSION Java属性文件

  • namespaceID 每次格式化都会重新生成一个ID,这样可以防止错误的datanode加入

  • cTime namenode存储系统创建时间,对于刚格式化的存储系统为0.对于升级的话会更新到最新的时间戳
  • storageType NAME_NODE or DATA_NODE
  • layoutVersion 负整数表示hdfs文件系统布局版本号,对于hadoop升级的话这个版本号可能不会变化
  • edits 编辑日志文件
  • fsimage 镜像文件
  • fstime ???
  • datanode的目录结构

  • blk以及blk.meta 表示块数据以及对应的元信息,元数据主要包括校验和等内容

  • 如果datanode文件非常多的话,超过dfs.datanode.numblocks = 64的话,那么会创建一个目录单独存放,最终结果就是形成树存储结构。
  • dfs.data.dir目录是按照round-robin的算法选择的。
  • 安全模式

  • namenode启动的时候会尝试合并edit数据并且新建一个checkpoint,然后进入安全模式,在这个模式内文件系统是只读的

  • 可以通过hadoop dfsadmin -safemode来操作安全模式
  • 当达到下面几个条件的时候会离开安全模式

  • 整个系统的副本数目大于某个阈值的副本数目比率超过一个阈值之后,然后继续等待一段时间就会离开安全模式

  • dfs.replication.min = 1 副本数目阈值
  • dfs.safemode.threshold.pct = 0.999 比率阈值
  • dfs.safemode.extension = 30000(ms) 等待时间
  • 工具

  • dfsadmin

  • fsck
  • scanner

  • DataBlockScanner每隔一段时间会扫描本地的data block检查是否出现校验和问题

  • 时间间隔是dfs.datanode.scan.period.hours = 504默认三周
  • 可以通过页面访问每个datanode的block情况 http://localhost:50075/blockScannerReport
  • 加上listblocks参数可以看每个block情况 http://localhost:50075/blockScannerReport?listblocks NOTE(dirlt):可能会很大
  • balancer

  • 通过start-balancer.sh来启动,集群中只允许存在一个均衡器

  • 均衡的标准是datanode的利用率和集群平均利用率的插值,如果超过某个阈值就会进行block movement
  • -threshold可以执行阈值,默认为10%
  • dfs.balance.bandwidthPerSec = 1024 /* 1024 用于balance的带宽上限。
  • 监控

  • 日志

  • jobtracker的stack信息(thread-dump)http://localhost:50030/stacks

  • 度量

  • 度量从属于特性的上下文(context),包括下面几个

  • dfs

  • mapred
  • rpc
  • jvm
  • 下面是几种常见的context

  • FileContext 度量写到文件

  • GangliaContext 度量写到ganglia (这个似乎比较靠谱)
  • CompositeContext 组合context
  • 度量可以从hadoop-metrics.properties进行配置

1.5 Benchmark

1.5.1 TestDFSIO

测试hdfs吞吐 hdfs@hadoop1:~$ hadoop jar /usr/lib/hadoop/hadoop-test-0.20.2-cdh3u3.jar TestDFSIO

Usage: TestDFSIO [genericOptions] -read | -write | -append | -clean [-nrFiles N] [-fileSize Size[B|KB|MB|GB|TB]] [-resFile resultFileName] [-bufferSize Bytes] [-rootDir]%

  • read / write / append / clean 操作类型 append和write执行效率差别不大,但是write会创建新文件所以使用比较方便 (default read)
  • nrFiles 文件数目(default 1) 启动相同数量的map
  • fileSize 每个文件大小(1MB)
  • resFile 结果报告文件(TestDFSIO_results.log)
  • bufferSize write buffer size(单次write写入大小)(1000000 bytes)
  • rootDir 操作文件根目录(/benchmarks/TestDFSIO/) ----- TestDFSIO ----- : write

         Date & time: Thu Apr 25 19:14:21 CST 2013
     Number of files: 2

Total MBytes processed: 2.0 Throughput mb/sec: 7.575757575757576

Average IO rate mb/sec: 7.61113977432251 IO rate std deviation: 0.5189420757292891

Test exec time sec: 14.565

----- TestDFSIO ----- : read Date & time: Thu Apr 25 19:15:13 CST 2013

   Number of files: 2

Total MBytes processed: 2.0

 Throughput mb/sec: 27.77777777777778

Average IO rate mb/sec: 28.125

IO rate std deviation: 3.125 Test exec time sec: 14.664

  • throughtput = sum(filesize) / sum(time)
  • avaerage io rate = sum(filesize/time) / n
  • io rate std deviation

    1.5.2 TeraSort

通过排序测试MR执行效率 我看了一下代码map/reduce都有CPU操作,并且这个也非常依靠shuffle/copy.因此这个测试应该会是比较全面的 hdfs@hadoop1:~$ hadoop jar /usr/lib/hadoop/hadoop-examples-0.20.2-cdh3u3.jar

  • teragen 产生排序数据

  • 10 bytes key(random characters)

  • 10 bytes rowid(right justified row id as a int)
  • 78 bytes filler
  • \r\n
  • terasort 对数据排序

  • teravalidate 对排序数据做验证

可以使用hadoop job -history all 来观察程序运行数据,也可以通过web page来分析。

1.5.3 nnbench

测试nn负载能力 ➜ ~HADOOP_HOME hadoop jar hadoop-test-0.20.2-cdh3u3.jar nnbench

NameNode Benchmark 0.4 Usage: nnbench

Options: -operation

     /* NOTE: The open_read, rename and delete operations assume that the files they operate on, are already available. The create_write operation must be run before running the other operations.
    -maps <number of maps. default is 1. This is not mandatory>

    -reduces <number of reduces. default is 1. This is not mandatory>
    -startTime <time to start, given in seconds from the epoch. Make sure this is far enough into the future, so all maps (operations) will start at the same time>. default is launch time + 2 mins. This is not mandatory

    -blockSize <Block size in bytes. default is 1. This is not mandatory>
    -bytesToWrite <Bytes to write. default is 0. This is not mandatory>

    -bytesPerChecksum <Bytes per checksum for the files. default is 1. This is not mandatory>
    -numberOfFiles <number of files to create. default is 1. This is not mandatory>

    -replicationFactorPerFile <Replication factor for the files. default is 1. This is not mandatory>
    -baseDir <base DFS path. default is /becnhmarks/NNBench. This is not mandatory>

    -readFileAfterOpen <true or false. if true, it reads the file and reports the average time to read. This is valid with the open_read operation. default is false. This is not mandatory>
    -help: Display the help statement
  • startTime 作用是为了能够让所有的map同时启动以便对nn造成压力 ➜ ~HADOOP_HOME hadoop jar hadoop-test-0.20.2-cdh3u3.jar nnbench -operation create_write -bytesToWrite 0 -numberOfFiles 1200

➜ ~HADOOP_HOME hadoop jar hadoop-test-0.20.2-cdh3u3.jar nnbench -operation open_read

结果报告文件是 NNBench_results.log

-------------- NNBench -------------- :

                           Version: NameNode Benchmark 0.4
                       Date & time: 2013-04-25 19:41:02,873

                    Test Operation: create_write

                        Start time: 2013-04-25 19:40:21,70
                       Maps to run: 1

                    Reduces to run: 1
                Block Size (bytes): 1

                    Bytes to write: 0
                Bytes per checksum: 1

                   Number of files: 1200
                Replication factor: 1

        Successful file operations: 1200

    /# maps that missed the barrier: 0
                      /# exceptions: 0

           TPS: Create/Write/Close: 75

Avg exec time (ms): Create/Write/Close: 26.526666666666667 Avg Lat (ms): Create/Write: 13.236666666666666

               Avg Lat (ms): Close: 13.164166666666667

             RAW DATA: AL Total /#1: 15884
             RAW DATA: AL Total /#2: 15797

          RAW DATA: TPS Total (ms): 31832
   RAW DATA: Longest Map Time (ms): 31832.0

               RAW DATA: Late maps: 0
         RAW DATA: /# of exceptions: 0

-------------- NNBench -------------- :

                           Version: NameNode Benchmark 0.4
                       Date & time: 2013-04-25 19:44:42,354

                    Test Operation: open_read

                        Start time: 2013-04-25 19:44:31,921
                       Maps to run: 1

                    Reduces to run: 1
                Block Size (bytes): 1

                    Bytes to write: 0
                Bytes per checksum: 1

                   Number of files: 1
                Replication factor: 1

        Successful file operations: 1

    /# maps that missed the barrier: 0
                      /# exceptions: 0

                    TPS: Open/Read: 500

     Avg Exec time (ms): Open/Read: 2.0
                Avg Lat (ms): Open: 2.0

             RAW DATA: AL Total /#1: 2

             RAW DATA: AL Total /#2: 0
          RAW DATA: TPS Total (ms): 2

   RAW DATA: Longest Map Time (ms): 2.0
               RAW DATA: Late maps: 0

         RAW DATA: /# of exceptions: 0
  • maps that missed the barrier 从代码上分析是,在等待到start time期间中,如果sleep出现异常的话。
  • exceptions 表示在操作文件系统时候的exception数量
  • TPS transactions per second
  • exec(execution) 执行时间
  • lat(latency) 延迟时间
  • late maps 和 maps missed the barrier是一个概念。

对于后面RAW DATA部分的话,从代码上看,就是为了计算出上面那些指标的,所以没有必要关注。

1.5.4 mrbench

测试运行small mr jobs执行效率,主要关注响应时间。 MRBenchmark.0.0.2

Usage: mrbench [-baseDir ] [-jar ] [-numRuns ] [-maps ] [-reduces ] [-inputLines ] [-inputType ] [-verbose]

  • baseDir 输入输出目录
  • jar 通常不需要指定,用默认即可。
  • inputLines 输入条数
  • inputType 输入是否有序 hdfs@hadoop1:~$ hadoop jar /usr/lib/hadoop/hadoop-test-0.20.2-cdh3u3.jar mrbench -verbose


Total MapReduce jobs executed: 1

Total lines of data per job: 1 Maps per job: 2

Reduces per job: 1 Total milliseconds for task: 1 = 16452

DataLines Maps Reduces AvgTime (milliseconds) 1 2 1 16452


1.5.5 hbase.PerformanceEvaluation

hdfs@hadoop1:~$ hbase org.apache.hadoop.hbase.PerformanceEvaluation

Usage: java org.apache.hadoop.hbase.PerformanceEvaluation \ [--miniCluster] [--nomapred] [--rows=ROWS]


miniCluster Run the test on an HBaseMiniCluster nomapred Run multiple clients using threads (rather than use mapreduce)

rows Rows each client runs. Default: One million flushCommits Used to determine if the test should flush the table. Default: false

writeToWAL Set writeToWAL on puts. Default: True

Command: filterScan Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)

randomRead Run random read test randomSeekScan Run random seek and scan 100 test

randomWrite Run random write test scan Run scan test (read every row)

scanRange10 Run random seek scan with both start and stop row (max 10 rows) scanRange100 Run random seek scan with both start and stop row (max 100 rows)

scanRange1000 Run random seek scan with both start and stop row (max 1000 rows) scanRange10000 Run random seek scan with both start and stop row (max 10000 rows)

sequentialRead Run sequential read test sequentialWrite Run sequential write test


nclients Integer. Required. Total number of clients (and HRegionServers) running: 1 <= value <= 500

Examples: To run a single evaluation client:

$ bin/hbase org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 1

从参数上看还是比较直接的。benchmark每个client通常对应10个mapper, 每个client操作个row,因此每个mapper操作/10个row,每个row大约1000bytes.

  • filterScan 随机生成value,然后从头开始scan直到equal
  • randomRead 随机选取key读取
  • randomSeekScan 从某个随机位置开始scan最多100个
  • randomWrite 随即生成key写入
  • scan 每次scan 1个row,start随机
  • scan 每次scan num个row,start随机
  • seqRead 顺序地读取每个key
  • seqWrite 顺序地写入每个key
  • NOTE(dirlt):这里的key都非常简单,10个字符的数字,printf("%010d",row) hdfs@hadoop1:~$ time hbase org.apache.hadoop.hbase.PerformanceEvaluation --rows=1000 sequentialWrite 2

13/04/25 23:47:56 INFO mapred.JobClient: HBase Performance Evaluation 13/04/25 23:47:56 INFO mapred.JobClient: Row count=2000

13/04/25 23:47:56 INFO mapred.JobClient: Elapsed time in milliseconds=258

输出结果是在counter里面,这里面row count = 2000, 占用时间为258 ms. Date: 2013-12-15T10:28+0800

Org version 7.9.2 with Emacs version 24 Validate XHTML 1.0 来源: [http://dirlt.com/hadoop.html/#sec-1-1-4](http://dirlt.com/hadoop.html#sec-1-1-4)

转载请注明作者(RobinChia)和出处 It so life ,请勿用于任何商业用途
本文链接: hadoop