IT外企那点儿事(1):外企也就那么回事

Posted on

IT外企那点儿事(1):外企也就那么回事

觉先

博客园 :: 首页 :: 博问 :: 闪存 :: 新随笔 :: 联系 :: 订阅 订阅 :: 管理 :: 130 随笔 :: 0 文章 :: 544 评论 :: 0 引用 <2010年4月>日一二三四五六2829303112345678910111213141516171819202122232425262728293012345678

公告

昵称:觉先 园龄:3年7个月 荣誉:推荐博客 粉丝:560 关注:3

+加关注

搜索

常用链接

随笔分类

随笔档案

相册

最新评论

阅读排行榜

评论排行榜

推荐排行榜

IT外企那点儿事(1):外企也就那么回事

外企,一个听起来似乎充满光环的名字,每年众多大学毕业生向往的地方。

说起外企,总能让人联想到很多令人心动的名词:高薪,人性化,浮动工作制,年假,完善的流程,各种福利如:旅游,室内乒乓球台,健身房,按摩椅,小食品,酸奶……

然而真正进入了外企,时间长了,也就发现,其实外企也就那么回事。

高薪

所谓高薪,严格意义上来讲是高起薪,也即刚毕业的时候每个企业公开的秘密,同学们总能够从师哥师姐那里打听到这个数字,有的企业甚至爆出较去年惊人的数字来做宣传。一个个光鲜的数字吸引着尚未毕业的大学生们,宣讲会的人数是基本和这个数字成正比的。

然而由于大多数的外企,由于规模比较大,机构也相对的稳定,高起薪的背后是稳定的加薪,每年7%~10%是常道,20%则是皇恩浩荡了,除非你能够取得整个Team都认可的成就,然而如果不幸参与的项目是一个多年的产品,至多是修改一些Bug或者增加一些边边角角的功能,又有多少这样的机会呢?大约在下看到的是这样的,也许并不符合所有外企的情形。

于是当毕业生中的佼佼者很幸运的加入大的外企的时候,不如你的同学只有默默的加入了不算太大的民企。

这一直是你引以为豪的资本,并总在同学聚会的时候大说特说你们公司的薪水,福利,在你的同学抱怨民企的加班声中附和着,心中却莫名的产生了一种优越感。

这种优越感使得你进一步沉浸在美好的外企生活中,却发现越来越没有那么优越了。三年,五年,你一次次的听说你的同学升职了,又升职了,而你还是一个普通的engineer,因为外企的升职基本是由严格的年限的,有时候多少有些按资排辈的味道。你一次一次听说你的同学加薪了,又加薪了,薪水直逼你当前的薪水,甚至在五年的关头超过你。

你越来越发现你的同学逐渐的掌握了一个系统前前后后的模块,能够完整的负责起一个项目的时候,你却还是螺丝钉,每天接受外国人的指示,在yes, ok, no problem, i am 100% agree的声音中继续做你的螺丝钉般的小功能。

我不知道十年后会如何,在参加了多次的开发者大会后,我发现几乎所有的外企的演讲者都是外国人,中国的演讲者则多来自本土的创业企业,当听着他们如数家珍的谈着自己的创业企业如何一步步做大,系统如何一步步改进,直到今天的架构,他们外企的同学能有这种机会吗?

人性化

所谓人性化,用外企的语言就是我们是很Open的。

Open体现在很多方面,诸如高管的办公室的门始终是开着的,你可以在任何时刻走到任何的高官的办公室里发表自己的看法,只是你必须保证,当你满怀激情的走进高官的办公室,关上门,半个小时后同样满怀激情走出办公室,你的顶头上司对你没有看法,即便你确实没有说什么,仅仅谈论了一下午餐而已。

所以除非高层主动安排和你谈话,尽量不要没事跑到高层那里,在你的顶头上司控制范围之外和他的上司进行私密的谈话,要知道有一种关系叫表面上支持,心中的隔阂。即便是高层主动要和你谈话,最好事先和你的顶头上司事先沟通,当然不用太正式,比如在闲聊的时间抱怨一下:"今天下午又要被老大找去One on One,项目这么忙,不知道有啥事情可谈的",呵呵,一些术而已,姑妄言之姑听之吧。

对你最重要的永远是你的顶头上司,当高层听完你的建议,OK, I will take it into consideration之后,便和你没有啥关系了,绝不会存在当你的顶头上司决定给你涨薪7%的时候,高层会出来说一句,我觉得他表现还不错,涨10%吧。

当然,按照公司的规定,你的顶头上司也会过一段时间和你来一次One on One,问问当前的情况,问问有啥意见等等,这可不是推心置腹的时候,需要把握火候,对当前的情况说的太满意,感觉不真诚,太不满意自然领导不爱听,说没意见显得对Team不够关心,说太多意见会让人感觉你不安全。

所以总的原则是:

  • 要多提改善性意见("code review预留的时间应该更长一些"),少提颠覆性意见("现在的项目流程有很大问题"),
  • 多提有证据的具体意见("我们有几十个Bug,可能一个星期确实做不完"),少提抽象型意见("Team之间的沟通有问题"),
  • 多说与项目相关的意见,少说与自己相关的意见(尤其不要太真实的说自己的人生规划),
  • 多说在领导意料范围之内的意见(这样会给领导以对Team的控制感,比如说天天加班到10点,领导也看在眼中,可以提一下),少说在领导意料之外的意见(即便有,请事先沟通,让领导在One on One之前就心里有数)。

Open还体现来另外的方面,比如领导会和员工一起参加各种工作之外的活动,比如打球,比如年会表演,比如一起健身等等,而且在此过程中,往往是充满的欢声笑语的,但一定不要忘记领导就是领导,哪怕不在项目中,千万不要因为你曾经是学校的篮球高手,或是文艺主干,就能在此类的活动中充当领袖角色,在你的项目领导面前指手画脚,虽然在活动中他会夸你,没想到你还有这方面的才能,但是在领导面前充老大,这笔账是迟早要还的,比如在项目的后期不能够完成美国派来的任务的时候,你会被冠以虽然前一阵成功组织了活动,但是耽误了一些项目进度的罪名,从而影响你的绩效。

如果你在健身房遇到领导,和你一起健身,你们可以边健身边聊的很开心,但是领导的心中的第一个想法一定是,这小子项目干完了吗,还有空工作时间健身?,并且会在以后的工作中反映出来,比如时常关心你的工作进度,加大你的工作量等。

浮动工作制

所谓浮动工作制,很好听的名字,就是你早上可以推迟来,晚上可以早些走,只要能够完成任务,每天工作6个小时都可以。

初入外企的时候,看到很多前辈可以早上十点,甚至十一点才到公司,认为浮动工作制太好了,于是拼命的工作,企图在6小时干完10个小时的活,然后有时间或学习或休息。然而最后发现,活是永远干不完的,资本家花钱请了你,会让你轻松应对?

浮动工作制,其实就是加班不给加班费的另一种说法,也即合同中也许会写着"所有的加班费已经被计入了薪水中"。只要能够完成任务,每天工作12个小时也是应该的。晚上留下来很晚,或是早上很早被拉起来和老美开会,也是浮动的时间之中,你无话可说。为了改美国客户的一个Bug,深夜加班,你无话可说。在中国是休息日,但美国不是休息日的时候派去美国,并不补偿你的休息日,也不给三倍工资,你无话可说。

年假

外企的年假是相对较多的,也是外企在校园宣讲中经常引以为豪的一点。然而年假又有多少真正能够落到实处呢?其时大部分是休不到的,项目不允许,领导不允许,外国人也不允许。

不允许当然不是显式的,而是潜规则的。项目永远是紧的,即便不那么紧,也会被人们喊得使大家觉得很紧,如果一个Team有很多人休很多假,对领导来说,好像对上面不太好交代。

如果Team中你单独休假,你会被提醒,现在大家都在赶进度,不要因为你这个模块把项目block了。

如果Team中大家想一起休假,领导会说,大家都在这个时候休,连backup都没有,出了事情找不到人啊。

如果你平时想休息一天,领导会说,有什么事情吗?没什么事情可以等项目闲了些集中休息一下,明天早上可以晚来些,可能这一阵确实太累了。

如果你想连着长假一起休,领导会说,本来就有一个星期了,还另外请,不如平时累的时候休息一天,效果好。

如果美国人放假(如圣诞),中国不放假,美国人会在放假前有很多任务布置过来,要在这个期间赶上美国的进度。

如果美国不放假,中国放假(如过年),总不能让美国老板找不到人吧。

当然以上借口只是在你提出请假的时候,以商量的口气被提及,如果你真想请假,领导还是会毫不犹豫的批准的,因为我们是Open的嘛。然而以上借口却会使得多数员工不太敢于请假,因为大家都明白,有一种关系叫表面上支持,心中的隔阂。

当然即便假期被批准,还是有条件的,比如"没问题,好好休息,走之前把文档(报告,邮件,代码)发出来(提交到svn)就行了"。一般这个附加条件都会耗费一些时间的,一般是第二天休,前一天晚上至少九十点走,早上请,中午才能走,中午请,下午三点多才能走。

完善的流程

外企的流程是非常完善的,甚至是极度的完善,过分的完善。

所以外企一般都会有会议室预定系统,会议室永远是被占着的,一天一天的总是开会,讨论。

例会就有模块组的,开发组的(包含多个模块),项目组的(开发和测试),Group的(同一个大老板的多个项目),all-hands的(整个公司)。

写一篇文档要模块组review,开发组review,测试组review,和美国开会review,重新改了第二轮review。以及code review,bug review。

每个项目组作了一个阶段后给整个项目组的demo,甚至给整个group及老外demo,说是增加visualbility。

一般要到下午晚些时候才能够清净些写代码,晚餐后才是代码的高峰期。

这也是为什么小公司半年作出来的东西,大公司要做几年。当然大公司这样做自然有它的道理,大公司稳定,不愁客户资源,不差钱,今年做出来或是明年做出来,客户别无选择,员工也养得起。这些小公司都做不到,必须尽快的满足客户的需要,必须在钱花完之前拉到下一个项目。

然而这对程序员的职业生涯来说好么,我不敢评价。只是在和很多朋友讨论的时候,他们发现,自己一直在忙啊忙,当跳槽试图总结自己做了啥的时候,却发现就不多的东西,不多的技术,当他们去面创业公司的时候,经常会被问,你们这么长时间,怎么就做了这么个东西?

大公司完善的流程还有一个特点,就是这个流程是完全为此公司定制的,当然公司大,自然可以有钱从头到尾弄自己的东西,既不用常用的,也不用开源的,无论是开发工具,测试工具,代码管理工具。这也导致了员工的粘性特别强,当走出这家公司,就像换了一片天地,原来会的别人用不到,别人常用的,却不怎么会,最后只好在公司养老,好在薪水也不错,福利也不错。

设施

最后提及的是各种美好的设施,这是很有吸引力的。然而为了您的前途,虽不能说敬而远之,也要注意享用的时间,如中午,晚上。

尽量不要在工作时间娱乐,甚至喧哗,人民的眼睛是雪亮的,领导的眼睛也是雪亮的,尤其是对于软件这种成果极难量化的产品,有时候表现和态度反而成了一种指标,不像销售一样,给公司带来的是真金白银,我无论怎么玩,能拿回单子就行,然而对于软件,你有绝对的证据证明成果超越别人吗?

所以外企有个很有意思的现象,一个团队的座位,离食品的距离越近越好,离娱乐设备的距离越远越好。离食品近,取用方便,领导看到你拿吃的也不会说什么,然而离娱乐设备近,领导办公室的门都开着,有谁胆敢长时间玩耍啊。所以娱乐设备上面玩耍的人一般都是座位离得比较远的。

此篇就写到这里的,在外企多年,其实发生了很多有趣的事情和现象,当走过几个外企的时候,发现有很多相似的潜规则。

进入中国的外企,其实是有中国特色的外企。中华文化的强大,使得所有的东西一到中国就会中国化,甚至改变了味道。很多民族如满族,回族的很多人都失去了原来民族的特色。也只有在中国,才可能存在儒释道三教合一的说法,不知道释迦摩尼有何感想。上学的时候,一个我很佩服的大物老师,年纪很大,他是坚定的马克思主义者,但是他曾经说,上个星期我病的厉害,差点就去见马克思了。我笑道,马克思是唯物的,是不相信死后有鬼的,死后去见阎王是迷信,去见马克思就不是了?

等有空的时候,再接着给大家讲外企的故事。

分类: IT外企那点儿事

绿色通道: 好文要顶 关注我 收藏该文与我联系

觉先 关注 - 3 粉丝 - 560

荣誉:推荐博客 +加关注

18

0 (请您对文章做出评价)

« 上一篇:高级Linux程序设计第五章:进程间通信 » 下一篇:信息检索导论(译):第一章 布尔检索(1) posted on 2010-04-30 21:30 觉先 阅读(8943) 评论(26) 编辑 收藏

评论

/#1楼 2010-04-30 21:54 土星的狗狗

写的太好了,又学习了~哈哈,直接映射了我之前的日本公司。

支持(0)反对(0) http://pic.cnitblog.com/face/u23929.jpg

/#2楼 2010-04-30 23:57 温景良(Jason) "发送站内短消息")

没去过,期待中

支持(0)反对(0) http://pic.cnitblog.com/face/u33118.jpg

/#3楼 2010-05-01 10:41 ilovedotnet

@土星的狗狗 我觉得日企和韩企不能算是真正的外企,大家通常说的外企只包括欧美企业。

支持(0)反对(0) http://pic.cnitblog.com/face/u26921.jpg

/#4楼 2010-05-01 21:59 DarroldYang

说的很像 加入这样的工作环境没多久 他们一个项目也是做了几年了 后面就一直在bug

支持(0)反对(0) http://pic.cnitblog.com/face/u23975.png

/#5楼 2010-05-04 09:26 加菲猫

作者深得外企三味,不过假如你有心,在薪水上超过大部分民企的同学问题不大,华为腾讯的除外

支持(0)反对(0) http://pic.cnitblog.com/face/u28333.jpg

/#6楼 2010-05-05 09:28 Jerry Qian

楼主打击人啦,我现在在学英语啊。

支持(0)反对(0)

/#7楼 2010-05-05 11:43 Bob Liu

看看,了解一下外企~

支持(0)反对(0) http://pic.cnitblog.com/face/u126058.jpg

/#8楼 2010-05-05 12:01 Aaron_Aanubis

楼主,太好了···把这些说出来,叫我们更加了解外企,我们就更能根据自身来进行职业规划了!!!3q

支持(0)反对(0)

/#9楼 2010-05-05 12:25 Caspar Jiong

深有同感!

支持(0)反对(0)

/#10楼 2010-05-05 12:51 鸟瞰

对外企的经营模式,如果想老板的朋友们,某些地方还是比较值得借鉴的

支持(0)反对(0) http://pic.cnitblog.com/face/u122874.jpg?id=16080143

/#11楼 2010-05-05 15:13 撞破南墙

看了觉得果然很中国化,不知道GG也是这样的吗?

支持(0)反对(0) http://pic.cnitblog.com/face/u69696.jpg?id=16133304

/#12楼 2010-05-05 15:46 Arthas-Cui

你知道的太多了。。。

支持(0)反对(0) http://pic.cnitblog.com/face/u37616.jpg?id=29112918

/#13楼 2010-05-05 16:11 卡通一下

大家可能不太清楚,在美国顶头上司才是你真正的老板。他要开你走,董事会是不会举行表决的,哈哈!

支持(0)反对(0)

/#14楼 2010-05-05 16:15 卡通一下

楼主文章的题目,“外企那点儿事”;“也就那么回事”,可以看出楼主当前的心态! 就好象世上有人说的,一不小心发财了!一不小心成功了...... 呵呵!

支持(0)反对(0)

/#15楼[楼主] 2010-05-05 18:36 觉先

@卡通一下 外企那点儿事是学了当前的流行语,明朝那点儿事,Java那点儿事... 及历史是什么玩意儿等,吸引人注意的一个噱头而已。

支持(0)反对(0) http://pic.cnitblog.com/face/u103165.jpg

/#16楼 2010-05-05 19:20 卡通一下

引用觉先: @卡通一下 外企那点儿事是学了当前的流行语,明朝那点儿事,Java那点儿事... 及历史是什么玩意儿等,吸引人注意的一个噱头而已。 朋友间聊天我们也经常地说,只是在正式场合是不说的,呵呵!

支持(0)反对(0)

/#17楼 2010-05-09 21:17 dytes

不要一概而论,就我的经历而言,还是相当宽松的。-

支持(0)反对(0)

/#18楼 2010-05-10 13:02 Koy

講得好好,頂一下。

支持(0)反对(0)

/#19楼 2010-05-10 22:53 小飞哥

哈哈楼主你知道得太多了

支持(0)反对(0)

/#20楼 2010-05-13 10:56 SnowToday

基本上是这个样子,不过外企跟外企也不太一样,部门跟部门不太一样,项目跟项目不太一样,比如请假放假,我们这请假绝大多数不会有问题,请假就请了,不会有那么多顾虑,还有老外的一些重要节日我们也会跟着放比如圣诞、复活节。

支持(0)反对(0) http://pic.cnitblog.com/face/u10907.jpg

/#21楼 2010-05-13 11:11 足球王子

在外企参与项目的机会会少很多,但是没有进过外企,就好像没怎么见过世面一样。

支持(0)反对(0)

/#22楼 2010-05-13 15:46 Abbott zhao

这也是为什么小公司半年作出来的东西,大公司要做几年。当然大公司这样做自然有它的道理,大公司稳定,不愁客户资源,不差钱,今年做出来或是明年做出来,客户别无选择,员工也养得起。这些小公司都做不到,必须尽快的满足客户的需要,必须在钱花完之前拉到下一个项目。 这句话有点偏。小公司做的产品真的不能恭维。

支持(1)反对(0)

/#23楼 2010-05-13 21:28 Forrest Liu

我也在一家小外企工作,最近遇到点事很郁闷。前两天女朋友来公司附近办事,我就带她在公司里待会,等着和我一起吃午饭。。结果她待了没有十分钟,公司老板从我身边过就看到她了,过了一会我的team leader就用communicater跟我说,让她出去,我就带她出去了。。回来后我的leader跟我说老板看到我女朋友了,很生气。因为我之前的公司很随便,以前也带朋友同学什么的进去过。当时也就觉得没什么。今天早上leader又跟我谈话,说客户那边反应对我的工作不满意。我当时很奇怪,因为我跟我的客户一直保持沟通,而且分配给我的task我也都完成的很不错。前一个月的时候我还特意问过客户对我的工作有什么意见,我在哪方面可以做的更好,结果客户给我回复说对我的工作很满意。我不知道是因为我得罪了谁或者我做错了什么。。现在很困惑,前辈给我指点一下吧~~

支持(0)反对(0) http://pic.cnitblog.com/face/u39386.jpg

/#24楼 2010-05-14 18:32 开心每一天ㄨ

@Forrest Liu 感觉比较没有人情味就是。做事都是规规矩矩的。 如果你在民企或者是小企,哪怕是国企可能都不一定。外企业也是不一样,但你目前所处的就是挺没人情味的感觉~

支持(0)反对(0) http://pic.cnitblog.com/face/u25669.jpg

/#25楼 2010-05-14 23:16 无待

有意思,受教了。

支持(0)反对(0) http://pic.cnitblog.com/face/u131056.jpg?id=02141056

/#26楼18306582010/5/22 21:08:21 2010-05-22 21:08 fyljf

有些点真是深有体会

支持(0)反对(0)

刷新评论刷新页面返回顶部

注册用户登录后才能发表评论,请 登录注册访问网站首页。 博客园首页博问新闻闪存程序员招聘知识库

最新IT新闻: · 新硬硬整合时代 · 狗血的百度91并购案啊 阿里和周鸿祎都曾掺和 · 如何让搜索引擎抓取AJAX内容? · 避免代码注释的五大理由 · OpenWrt——适用于路由器的Linux系统 » 更多新闻...

最新知识库文章: · 阿里巴巴集团去IOE运动的思考与总结 · 硅谷归来7点分享:创业者,做你自己 · 我为什么不能坚持? · 成为高效程序员的7个重要习惯 · 谈谈对BPM的理解 » 更多知识库文章... Powered by: 博客园 Copyright © 觉先

hdfs_shell

Posted on

hdfs_shell

HDFS File System Shell Guide Table of contents 1 Overview............................................................................................................................3 1.1 1.2 1.3 1.4 1.5 1.6 1.7 1.8 1.9 cat ................................................................................................................................. 3 chgrp ............................................................................................................................. 3 chmod ........................................................................................................................... 3 chown ........................................................................................................................... 4 copyFromLocal..............................................................................................................4 copyToLocal..................................................................................................................4 count ............................................................................................................................. 4 cp .................................................................................................................................. 4 du................................................................................................................................... 5 dus ...............................................................................................................................5 expunge .......................................................................................................................5 get ................................................................................................................................5 getmerge ......................................................................................................................6 ls...................................................................................................................................6 lsr................................................................................................................................. 6 mkdir ...........................................................................................................................7 moveFromLocal ..........................................................................................................7 moveToLocal............................................................................................................... 7 mv ............................................................................................................................... 7 put ............................................................................................................................... 8 rm ................................................................................................................................ 8 rmr ...............................................................................................................................8 setrep ...........................................................................................................................9 Copyright © 2008 The Apache Software Foundation. All rights reserved. 1.10 1.11 1.12 1.13 1.14 1.15 1.16 1.17 1.18 1.19 1.20 1.21 1.22 1.23 HDFS File System Shell Guide 1.24 1.25 1.26 1.27 1.28 stat ...............................................................................................................................9 tail ............................................................................................................................... 9 test .............................................................................................................................10 text ............................................................................................................................ 10 touchz ........................................................................................................................10 Page 2 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS File System Shell Guide

  1. Overview The FileSystem (FS) shell is invoked by bin/hadoop fs . All FS shell commands take path URIs as arguments. The URI format is scheme://autority/path. For HDFS the scheme is hdfs, and for the local filesystem the scheme is file. The scheme and authority are optional. If not specified, the default scheme specified in the configuration is used. An HDFS file or directory such as /parent/child can be specified as hdfs://namenodehost/parent/child or simply as /parent/child (given that your configuration is set to point to hdfs://namenodehost). Most of the commands in FS shell behave like corresponding Unix commands. Differences are described with each of the commands. Error information is sent to stderr and the output is sent to stdout. 1.1. cat Usage: hadoop fs -cat URI [URI …] Copies source paths to stdout. Example: • hadoop fs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2 • hadoop fs -cat file:///file3 /user/hadoop/file4 Exit Code: Returns 0 on success and -1 on error. 1.2. chgrp Usage: hadoop fs -chgrp [-R] GROUP URI [URI …] Change group association of files. With -R, make the change recursively through the directory structure. The user must be the owner of files, or else a super-user. Additional information is in the HDFS Admin Guide: Permissions. 1.3. chmod Usage: hadoop fs -chmod [-R] URI [URI …] Change the permissions of files. With -R, make the change recursively through the directory structure. The user must be the owner of the file, or else a super-user. Additional information is in the HDFS Admin Guide: Permissions. Page 3 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS File System Shell Guide 1.4. chown Usage: hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ] Change the owner of files. With -R, make the change recursively through the directory structure. The user must be a super-user. Additional information is in the HDFS Admin Guide: Permissions. 1.5. copyFromLocal Usage: hadoop fs -copyFromLocal URI Similar to put command, except that the source is restricted to a local file reference. 1.6. copyToLocal Usage: hadoop fs -copyToLocal [-ignorecrc] [-crc] URI Similar to get command, except that the destination is restricted to a local file reference. 1.7. count Usage: hadoop fs -count [-q] Count the number of directories, files and bytes under the paths that match the specified file pattern. The output columns are: DIR_COUNT, FILE_COUNT, CONTENT_SIZE FILE_NAME. The output columns with -q are: QUOTA, REMAINING_QUATA, SPACE_QUOTA, REMAINING_SPACE_QUOTA, DIR_COUNT, FILE_COUNT, CONTENT_SIZE, FILE_NAME. Example: • hadoop fs -count hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2 • hadoop fs -count -q hdfs://nn1.example.com/file1 Exit Code: Returns 0 on success and -1 on error. 1.8. cp Page 4 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS File System Shell Guide Usage: hadoop fs -cp URI [URI …] Copy files from source to destination. This command allows multiple sources as well in which case the destination must be a directory. Example: • hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 • hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir Exit Code: Returns 0 on success and -1 on error. 1.9. du Usage: hadoop fs -du URI [URI …] Displays aggregate length of files contained in the directory or the length of a file in case its just a file. Example: hadoop fs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://nn.example.com/user/hadoop/dir1 Exit Code: Returns 0 on success and -1 on error. 1.10. dus Usage: hadoop fs -dus Displays a summary of file lengths. 1.11. expunge Usage: hadoop fs -expunge Empty the Trash. Refer to HDFS Architecture for more information on Trash feature. 1.12. get Usage: hadoop fs -get [-ignorecrc] [-crc] Copy files to the local file system. Files that fail the CRC check may be copied with the -ignorecrc option. Files and CRCs may be copied using the -crc option. Page 5 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS File System Shell Guide Example: • hadoop fs -get /user/hadoop/file localfile • hadoop fs -get hdfs://nn.example.com/user/hadoop/file localfile Exit Code: Returns 0 on success and -1 on error. 1.13. getmerge Usage: hadoop fs -getmerge [addnl] Takes a source directory and a destination file as input and concatenates files in src into the destination local file. Optionally addnl can be set to enable adding a newline character at the end of each file. 1.14. ls Usage: hadoop fs -ls For a file returns stat on the file with the following format: permissions number_of_replicas userid groupid filesize modification_date modification_time filename For a directory it returns list of its direct children as in unix.A directory is listed as: permissions userid groupid modification_date modification_time dirname Example: hadoop fs -ls /user/hadoop/file1 Exit Code: Returns 0 on success and -1 on error. 1.15. lsr Usage: hadoop fs -lsr Recursive version of ls. Similar to Unix ls -R. Page 6 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS File System Shell Guide 1.16. mkdir Usage: hadoop fs -mkdir Takes path uri's as argument and creates directories. The behavior is much like unix mkdir -p creating parent directories along the path. Example: • hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2 • hadoop fs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir Exit Code: Returns 0 on success and -1 on error. 1.17. moveFromLocal Usage: dfs -moveFromLocal Similar to put command, except that the source localsrc is deleted after it's copied. 1.18. moveToLocal Usage: hadoop fs -moveToLocal [-crc] Displays a "Not implemented yet" message. 1.19. mv Usage: hadoop fs -mv URI [URI …] Moves files from source to destination. This command allows multiple sources as well in which case the destination needs to be a directory. Moving files across filesystems is not permitted. Example: • hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2 • hadoop fs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2 hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1 Exit Code: Page 7 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS File System Shell Guide Returns 0 on success and -1 on error. 1.20. put Usage: hadoop fs -put ... Copy single src, or multiple srcs from local file system to the destination filesystem. Also reads input from stdin and writes to destination filesystem. • hadoop fs -put localfile /user/hadoop/hadoopfile • hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir • hadoop fs -put localfile hdfs://nn.example.com/hadoop/hadoopfile • hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile Reads the input from stdin. Exit Code: Returns 0 on success and -1 on error. 1.21. rm Usage: hadoop fs -rm [-skipTrash] URI [URI …] Delete files specified as args. Only deletes non empty directory and files. If the -skipTrash option is specified, the trash, if enabled, will be bypassed and the specified file(s) deleted immediately. This can be useful when it is necessary to delete files from an over-quota directory. Refer to rmr for recursive deletes. Example: • hadoop fs -rm hdfs://nn.example.com/file /user/hadoop/emptydir Exit Code: Returns 0 on success and -1 on error. 1.22. rmr Usage: hadoop fs -rmr [-skipTrash] URI [URI …] Recursive version of delete. If the -skipTrash option is specified, the trash, if enabled, will be bypassed and the specified file(s) deleted immediately. This can be useful when it is necessary to delete files from an over-quota directory. Page 8 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS File System Shell Guide Example: • hadoop fs -rmr /user/hadoop/dir • hadoop fs -rmr hdfs://nn.example.com/user/hadoop/dir Exit Code: Returns 0 on success and -1 on error. 1.23. setrep Usage: hadoop fs -setrep [-R] Changes the replication factor of a file. -R option is for recursively increasing the replication factor of files within a directory. Example: • hadoop fs -setrep -w 3 -R /user/hadoop/dir1 Exit Code: Returns 0 on success and -1 on error. 1.24. stat Usage: hadoop fs -stat URI [URI …] Returns the stat information on the path. Example: • hadoop fs -stat path Exit Code: Returns 0 on success and -1 on error. 1.25. tail Usage: hadoop fs -tail [-f] URI Displays last kilobyte of the file to stdout. -f option can be used as in Unix. Example: • hadoop fs -tail pathname Exit Code: Page 9 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS File System Shell Guide Returns 0 on success and -1 on error. 1.26. test Usage: hadoop fs -test -[ezd] URI Options: -e check to see if the file exists. Return 0 if true. -z check to see if the file is zero length. Return 0 if true. -d check to see if the path is directory. Return 0 if true. Example: • hadoop fs -test -e filename 1.27. text Usage: hadoop fs -text Takes a source file and outputs the file in text format. The allowed formats are zip and TextRecordInputStream. 1.28. touchz Usage: hadoop fs -touchz URI [URI …] Create a file of zero length. Example: • hadoop -touchz pathname Exit Code: Returns 0 on success and -1 on error. Page 10 Copyright © 2008 The Apache Software Foundation. All rights reserved.

hdfs_design

Posted on

hdfs_design

HDFS Architecture by Dhruba Borthakur Table of contents 1 2 Introduction .......................................................................................................................3 Assumptions and Goals .....................................................................................................3 2.1 2.2 2.3 2.4 2.5 2.6 Hardware Failure .......................................................................................................... 3 Streaming Data Access .................................................................................................3 Large Data Sets .............................................................................................................3 Simple Coherency Model ............................................................................................. 4 “Moving Computation is Cheaper than Moving Data” ................................................4 Portability Across Heterogeneous Hardware and Software Platforms .........................4 3 4 5 NameNode and DataNodes ...............................................................................................4 The File System Namespace ............................................................................................. 5 Data Replication ................................................................................................................6 5.1 5.2 5.3 Replica Placement: The First Baby Steps .................................................................... 7 Replica Selection .......................................................................................................... 8 Safemode ...................................................................................................................... 8 6 7 8 The Persistence of File System Metadata ......................................................................... 8 The Communication Protocols ......................................................................................... 9 Robustness ........................................................................................................................ 9 8.1 8.2 8.3 8.4 8.5 Data Disk Failure, Heartbeats and Re-Replication .....................................................10 Cluster Rebalancing ....................................................................................................10 Data Integrity ..............................................................................................................10 Metadata Disk Failure ................................................................................................ 10 Snapshots ....................................................................................................................11 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture 9 Data Organization ........................................................................................................... 11 9.1 9.2 9.3 Data Blocks ................................................................................................................ 11 Staging ........................................................................................................................11 Replication Pipelining ................................................................................................ 12 FS Shell .....................................................................................................................12 DFSAdmin ................................................................................................................ 13 Browser Interface ......................................................................................................13 File Deletes and Undeletes ....................................................................................... 13 Decrease Replication Factor ..................................................................................... 14 10 Accessibility .................................................................................................................. 12 10.1 10.2 10.3 11 Space Reclamation ........................................................................................................ 13 11.1 11.2 12 References ..................................................................................................................... 14 Page 2 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture

  1. Introduction The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets. HDFS relaxes a few POSIX requirements to enable streaming access to file system data. HDFS was originally built as infrastructure for the Apache Nutch web search engine project. HDFS is part of the Apache Hadoop Core project. The project URL is http://hadoop.apache.org/core/.
  2. Assumptions and Goals 2.1. Hardware Failure Hardware failure is the norm rather than the exception. An HDFS instance may consist of hundreds or thousands of server machines, each storing part of the file system’s data. The fact that there are a huge number of components and that each component has a non-trivial probability of failure means that some component of HDFS is always non-functional. Therefore, detection of faults and quick, automatic recovery from them is a core architectural goal of HDFS. 2.2. Streaming Data Access Applications that run on HDFS need streaming access to their data sets. They are not general purpose applications that typically run on general purpose file systems. HDFS is designed more for batch processing rather than interactive use by users. The emphasis is on high throughput of data access rather than low latency of data access. POSIX imposes many hard requirements that are not needed for applications that are targeted for HDFS. POSIX semantics in a few key areas has been traded to increase data throughput rates. 2.3. Large Data Sets Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to terabytes in size. Thus, HDFS is tuned to support large files. It should provide high aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should support tens of millions of files in a single instance. Page 3 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture 2.4. Simple Coherency Model HDFS applications need a write-once-read-many access model for files. A file once created, written, and closed need not be changed. This assumption simplifies data coherency issues and enables high throughput data access. A Map/Reduce application or a web crawler application fits perfectly with this model. There is a plan to support appending-writes to files in the future. 2.5. “Moving Computation is Cheaper than Moving Data” A computation requested by an application is much more efficient if it is executed near the data it operates on. This is especially true when the size of the data set is huge. This minimizes network congestion and increases the overall throughput of the system. The assumption is that it is often better to migrate the computation closer to where the data is located rather than moving the data to where the application is running. HDFS provides interfaces for applications to move themselves closer to where the data is located. 2.6. Portability Across Heterogeneous Hardware and Software Platforms HDFS has been designed to be easily portable from one platform to another. This facilitates widespread adoption of HDFS as a platform of choice for a large set of applications.
  3. NameNode and DataNodes HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode. Page 4 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture The NameNode and DataNode are pieces of software designed to run on commodity machines. These machines typically run a GNU/Linux operating system (OS). HDFS is built using the Java language; any machine that supports Java can run the NameNode or the DataNode software. Usage of the highly portable Java language means that HDFS can be deployed on a wide range of machines. A typical deployment has a dedicated machine that runs only the NameNode software. Each of the other machines in the cluster runs one instance of the DataNode software. The architecture does not preclude running multiple DataNodes on the same machine but in a real deployment that is rarely the case. The existence of a single NameNode in a cluster greatly simplifies the architecture of the system. The NameNode is the arbitrator and repository for all HDFS metadata. The system is designed in such a way that user data never flows through the NameNode.
  4. The File System Namespace HDFS supports a traditional hierarchical file organization. A user or an application can create directories and store files inside these directories. The file system namespace hierarchy is Page 5 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture similar to most other existing file systems; one can create and remove files, move a file from one directory to another, or rename a file. HDFS does not yet implement user quotas or access permissions. HDFS does not support hard links or soft links. However, the HDFS architecture does not preclude implementing these features. The NameNode maintains the file system namespace. Any change to the file system namespace or its properties is recorded by the NameNode. An application can specify the number of replicas of a file that should be maintained by HDFS. The number of copies of a file is called the replication factor of that file. This information is stored by the NameNode.
  5. Data Replication HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once and have strictly one writer at any time. The NameNode makes all decisions regarding replication of blocks. It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode. Page 6 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture 5.1. Replica Placement: The First Baby Steps The placement of replicas is critical to HDFS reliability and performance. Optimizing replica placement distinguishes HDFS from most other distributed file systems. This is a feature that needs lots of tuning and experience. The purpose of a rack-aware replica placement policy is to improve data reliability, availability, and network bandwidth utilization. The current implementation for the replica placement policy is a first effort in this direction. The short-term goals of implementing this policy are to validate it on production systems, learn more about its behavior, and build a foundation to test and research more sophisticated policies. Large HDFS instances run on a cluster of computers that commonly spread across many racks. Communication between two nodes in different racks has to go through switches. In most cases, network bandwidth between machines in the same rack is greater than network bandwidth between machines in different racks. The NameNode determines the rack id each DataNode belongs to via the process outlined in Rack Awareness. A simple but non-optimal policy is to place replicas on unique racks. This prevents losing data when an entire rack fails and allows use of bandwidth from multiple Page 7 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture racks when reading data. This policy evenly distributes replicas in the cluster which makes it easy to balance load on component failure. However, this policy increases the cost of writes because a write needs to transfer blocks to multiple racks. For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a different node in the local rack, and the last on a different node in a different rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance. The current, default replica placement policy described here is a work in progress. 5.2. Replica Selection To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read request from a replica that is closest to the reader. If there exists a replica on the same rack as the reader node, then that replica is preferred to satisfy the read request. If angg/ HDFS cluster spans multiple data centers, then a replica that is resident in the local data center is preferred over any remote replica. 5.3. Safemode On startup, the NameNode enters a special state called Safemode. Replication of data blocks does not occur when the NameNode is in the Safemode state. The NameNode receives Heartbeat and Blockreport messages from the DataNodes. A Blockreport contains the list of data blocks that a DataNode is hosting. Each block has a specified minimum number of replicas. A block is considered safely replicated when the minimum number of replicas of that data block has checked in with the NameNode. After a configurable percentage of safely replicated data blocks checks in with the NameNode (plus an additional 30 seconds), the NameNode exits the Safemode state. It then determines the list of data blocks (if any) that still have fewer than the specified number of replicas. The NameNode then replicates these blocks to other DataNodes.
  6. The Persistence of File System Metadata Page 8 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture The HDFS namespace is stored by the NameNode. The NameNode uses a transaction log called the EditLog to persistently record every change that occurs to file system metadata. For example, creating a new file in HDFS causes the NameNode to insert a record into the EditLog indicating this. Similarly, changing the replication factor of a file causes a new record to be inserted into the EditLog. The NameNode uses a file in its local host OS file system to store the EditLog. The entire file system namespace, including the mapping of blocks to files and file system properties, is stored in a file called the FsImage. The FsImage is stored as a file in the NameNode’s local file system too. The NameNode keeps an image of the entire file system namespace and file Blockmap in memory. This key metadata item is designed to be compact, such that a NameNode with 4 GB of RAM is plenty to support a huge number of files and directories. When the NameNode starts up, it reads the FsImage and EditLog from disk, applies all the transactions from the EditLog to the in-memory representation of the FsImage, and flushes out this new version into a new FsImage on disk. It can then truncate the old EditLog because its transactions have been applied to the persistent FsImage. This process is called a checkpoint. In the current implementation, a checkpoint only occurs when the NameNode starts up. Work is in progress to support periodic checkpointing in the near future. The DataNode stores HDFS data in files in its local file system. The DataNode has no knowledge about HDFS files. It stores each block of HDFS data in a separate file in its local file system. The DataNode does not create all files in the same directory. Instead, it uses a heuristic to determine the optimal number of files per directory and creates subdirectories appropriately. It is not optimal to create all local files in the same directory because the local file system might not be able to efficiently support a huge number of files in a single directory. When a DataNode starts up, it scans through its local file system, generates a list of all HDFS data blocks that correspond to each of these local files and sends this report to the NameNode: this is the Blockreport.
  7. The Communication Protocols All HDFS communication protocols are layered on top of the TCP/IP protocol. A client establishes a connection to a configurable TCP port on the NameNode machine. It talks the ClientProtocol with the NameNode. The DataNodes talk to the NameNode using the DataNode Protocol. A Remote Procedure Call (RPC) abstraction wraps both the Client Protocol and the DataNode Protocol. By design, the NameNode never initiates any RPCs. Instead, it only responds to RPC requests issued by DataNodes or clients.
  8. Robustness The primary objective of HDFS is to store data reliably even in the presence of failures. The Page 9 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture three common types of failures are NameNode failures, DataNode failures and network partitions. 8.1. Data Disk Failure, Heartbeats and Re-Replication Each DataNode sends a Heartbeat message to the NameNode periodically. A network partition can cause a subset of DataNodes to lose connectivity with the NameNode. The NameNode detects this condition by the absence of a Heartbeat message. The NameNode marks DataNodes without recent Heartbeats as dead and does not forward any new IO requests to them. Any data that was registered to a dead DataNode is not available to HDFS any more. DataNode death may cause the replication factor of some blocks to fall below their specified value. The NameNode constantly tracks which blocks need to be replicated and initiates replication whenever necessary. The necessity for re-replication may arise due to many reasons: a DataNode may become unavailable, a replica may become corrupted, a hard disk on a DataNode may fail, or the replication factor of a file may be increased. 8.2. Cluster Rebalancing The HDFS architecture is compatible with data rebalancing schemes. A scheme might automatically move data from one DataNode to another if the free space on a DataNode falls below a certain threshold. In the event of a sudden high demand for a particular file, a scheme might dynamically create additional replicas and rebalance other data in the cluster. These types of data rebalancing schemes are not yet implemented. 8.3. Data Integrity It is possible that a block of data fetched from a DataNode arrives corrupted. This corruption can occur because of faults in a storage device, network faults, or buggy software. The HDFS client software implements checksum checking on the contents of HDFS files. When a client creates an HDFS file, it computes a checksum of each block of the file and stores these checksums in a separate hidden file in the same HDFS namespace. When a client retrieves file contents it verifies that the data it received from each DataNode matches the checksum stored in the associated checksum file. If not, then the client can opt to retrieve that block from another DataNode that has a replica of that block. 8.4. Metadata Disk Failure The FsImage and the EditLog are central data structures of HDFS. A corruption of these files can cause the HDFS instance to be non-functional. For this reason, the NameNode can be configured to support maintaining multiple copies of the FsImage and EditLog. Any update Page 10 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture to either the FsImage or EditLog causes each of the FsImages and EditLogs to get updated synchronously. This synchronous updating of multiple copies of the FsImage and EditLog may degrade the rate of namespace transactions per second that a NameNode can support. However, this degradation is acceptable because even though HDFS applications are very data intensive in nature, they are not metadata intensive. When a NameNode restarts, it selects the latest consistent FsImage and EditLog to use. The NameNode machine is a single point of failure for an HDFS cluster. If the NameNode machine fails, manual intervention is necessary. Currently, automatic restart and failover of the NameNode software to another machine is not supported. 8.5. Snapshots Snapshots support storing a copy of data at a particular instant of time. One usage of the snapshot feature may be to roll back a corrupted HDFS instance to a previously known good point in time. HDFS does not currently support snapshots but will in a future release.
  9. Data Organization 9.1. Data Blocks HDFS is designed to support very large files. Applications that are compatible with HDFS are those that deal with large data sets. These applications write their data only once but they read it one or more times and require these reads to be satisfied at streaming speeds. HDFS supports write-once-read-many semantics on files. A typical block size used by HDFS is 64 MB. Thus, an HDFS file is chopped up into 64 MB chunks, and if possible, each chunk will reside on a different DataNode. 9.2. Staging A client request to create a file does not reach the NameNode immediately. In fact, initially the HDFS client caches the file data into a temporary local file. Application writes are transparently redirected to this temporary local file. When the local file accumulates data worth over one HDFS block size, the client contacts the NameNode. The NameNode inserts the file name into the file system hierarchy and allocates a data block for it. The NameNode responds to the client request with the identity of the DataNode and the destination data block. Then the client flushes the block of data from the local temporary file to the specified DataNode. When a file is closed, the remaining un-flushed data in the temporary local file is transferred to the DataNode. The client then tells the NameNode that the file is closed. At this point, the NameNode commits the file creation operation into a persistent store. If the Page 11 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture NameNode dies before the file is closed, the file is lost. The above approach has been adopted after careful consideration of target applications that run on HDFS. These applications need streaming writes to files. If a client writes to a remote file directly without any client side buffering, the network speed and the congestion in the network impacts throughput considerably. This approach is not without precedent. Earlier distributed file systems, e.g. AFS, have used client side caching to improve performance. A POSIX requirement has been relaxed to achieve higher performance of data uploads. 9.3. Replication Pipelining When a client is writing data to an HDFS file, its data is first written to a local file as explained in the previous section. Suppose the HDFS file has a replication factor of three. When the local file accumulates a full block of user data, the client retrieves a list of DataNodes from the NameNode. This list contains the DataNodes that will host a replica of that block. The client then flushes the data block to the first DataNode. The first DataNode starts receiving the data in small portions (4 KB), writes each portion to its local repository and transfers that portion to the second DataNode in the list. The second DataNode, in turn starts receiving each portion of the data block, writes that portion to its repository and then flushes that portion to the third DataNode. Finally, the third DataNode writes the data to its local repository. Thus, a DataNode can be receiving data from the previous one in the pipeline and at the same time forwarding data to the next one in the pipeline. Thus, the data is pipelined from one DataNode to the next.
  10. Accessibility HDFS can be accessed from applications in many different ways. Natively, HDFS provides a FileSystem Java API for applications to use. A C language wrapper for this Java API is also available. In addition, an HTTP browser can also be used to browse the files of an HDFS instance. Work is in progress to expose HDFS through the WebDAV protocol. 10.1. FS Shell HDFS allows user data to be organized in the form of files and directories. It provides a commandline interface called FS shell that lets a user interact with the data in HDFS. The syntax of this command set is similar to other shells (e.g. bash, csh) that users are already familiar with. Here are some sample action/command pairs: Action Create a directory named /foodir Command bin/hadoop dfs -mkdir /foodir Page 12 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture Remove a directory named /foodir View the contents of a file named /foodir/myfile.txt bin/hadoop dfs -rmr /foodir bin/hadoop dfs -cat /foodir/myfile.txt FS shell is targeted for applications that need a scripting language to interact with the stored data. 10.2. DFSAdmin The DFSAdmin command set is used for administering an HDFS cluster. These are commands that are used only by an HDFS administrator. Here are some sample action/command pairs: Action Put the cluster in Safemode Generate a list of DataNodes Recommission or decommission DataNode(s) Command bin/hadoop dfsadmin -safemode enter bin/hadoop dfsadmin -report bin/hadoop dfsadmin -refreshNodes 10.3. Browser Interface A typical HDFS install configures a web server to expose the HDFS namespace through a configurable TCP port. This allows a user to navigate the HDFS namespace and view the contents of its files using a web browser.
  11. Space Reclamation 11.1. File Deletes and Undeletes When a file is deleted by a user or an application, it is not immediately removed from HDFS. Instead, HDFS first renames it to a file in the /trash directory. The file can be restored quickly as long as it remains in /trash. A file remains in /trash for a configurable amount of time. After the expiry of its life in /trash, the NameNode deletes the file from the HDFS namespace. The deletion of a file causes the blocks associated with the file to be freed. Note that there could be an appreciable time delay between the time a file is deleted by a user and the time of the corresponding increase in free space in HDFS. A user can Undelete a file after deleting it as long as it remains in the /trash directory. If a user wants to undelete a file that he/she has deleted, he/she can navigate the /trash directory and retrieve the file. The /trash directory contains only the latest copy of the file Page 13 Copyright © 2008 The Apache Software Foundation. All rights reserved. HDFS Architecture that was deleted. The /trash directory is just like any other directory with one special feature: HDFS applies specified policies to automatically delete files from this directory. The current default policy is to delete files from /trash that are more than 6 hours old. In the future, this policy will be configurable through a well defined interface. 11.2. Decrease Replication Factor When the replication factor of a file is reduced, the NameNode selects excess replicas that can be deleted. The next Heartbeat transfers this information to the DataNode. The DataNode then removes the corresponding blocks and the corresponding free space appears in the cluster. Once again, there might be a time delay between the completion of the setReplication API call and the appearance of free space in the cluster.
  12. References Hadoop JavaDoc API. HDFS source code: http://hadoop.apache.org/core/version_control.html Page 14 Copyright © 2008 The Apache Software Foundation. All rights reserved.

mapred_tutorial

Posted on

mapred_tutorial

Map/Reduce Tutorial Table of contents 1 2 3 4 5 Purpose...............................................................................................................................2 Pre-requisites......................................................................................................................2 Overview............................................................................................................................2 Inputs and Outputs............................................................................................................. 3 Example: WordCount v1.0................................................................................................ 3 5.1 5.2 5.3 Source Code...................................................................................................................3 Usage............................................................................................................................. 6 Walk-through.................................................................................................................7 Payload.......................................................................................................................... 9 Job Configuration........................................................................................................ 13 Task Execution & Environment.................................................................................. 14 Job Submission and Monitoring..................................................................................21 Job Input...................................................................................................................... 22 Job Output................................................................................................................... 23 Other Useful Features..................................................................................................25 Source Code.................................................................................................................31 Sample Runs................................................................................................................37 Highlights.................................................................................................................... 39 6 Map/Reduce - User Interfaces............................................................................................9 6.1 6.2 6.3 6.4 6.5 6.6 6.7 7 Example: WordCount v2.0.............................................................................................. 30 7.1 7.2 7.3 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial

  1. Purpose This document comprehensively describes all user-facing facets of the Hadoop Map/Reduce framework and serves as a tutorial.
  2. Pre-requisites Ensure that Hadoop is installed, configured and is running. More details: • Hadoop Quick Start for first-time users. • Hadoop Cluster Setup for large, distributed clusters.
  3. Overview Hadoop Map/Reduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner. A Map/Reduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks. Typically the compute nodes and the storage nodes are the same, that is, the Map/Reduce framework and the Hadoop Distributed File System (see HDFS Architecture ) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster. The Map/Reduce framework consists of a single master JobTracker and one slave TaskTracker per cluster-node. The master is responsible for scheduling the jobs' component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master. Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration. The Hadoop job client then submits the job (jar/executable etc.) and configuration to the JobTracker which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client. Page 2 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial Although the Hadoop framework is implemented in JavaTM, Map/Reduce applications need not be written in Java. • Hadoop Streaming is a utility which allows users to create and run jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer. • Hadoop Pipes is a SWIG- compatible C++ API to implement Map/Reduce applications (non JNITM based).
  4. Inputs and Outputs The Map/Reduce framework operates exclusively on pairs, that is, the framework views the input to the job as a set of pairs and produces a set of pairs as the output of the job, conceivably of different types. The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework. Input and Output types of a Map/Reduce job: (input) -> map -> -> combine -> -> reduce -> (output)
  5. Example: WordCount v1.0 Before we jump into the details, lets walk through an example Map/Reduce application to get a flavour for how they work. WordCount is a simple application that counts the number of occurences of each word in a given input set. This works with a local-standalone, pseudo-distributed or fully-distributed Hadoop installation(see Hadoop Quick Start). 5.1. Source Code WordCount.java 1. 2. 3. 4. import java.io.IOException; import java.util./*; package org.myorg; Page 3 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial
                    1. public static class Map extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public class WordCount { import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf./; import org.apache.hadoop.io./; import org.apache.hadoop.mapred./; import org.apache.hadoop.util./;
      1. 18. public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken());
            1. 25. output.collect(word, one); } } Page 4 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial
    1. 28. } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } 29.
                        1. 42. public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); 43. conf.setOutputValueClass(IntWritable.class); 44. 45. conf.setMapperClass(Map.class); Page 5 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial
  6. conf.setCombinerClass(Reduce.class); 47. conf.setReducerClass(Reduce.class); 48. 49. conf.setInputFormat(TextInputFormat.class); 50. conf.setOutputFormat(TextOutputFormat.class); 51. 52. FileInputFormat.setInputPaths(conf, new Path(args[0])); 53. FileOutputFormat.setOutputPath(conf, new Path(args[1])); 54. 55. 57. 58. 59. } JobClient.runJob(conf); } 5.2. Usage Assuming HADOOP_HOME is the root of the installation and HADOOP_VERSION is the Hadoop version installed, compile WordCount.java and create a jar: $ mkdir wordcount_classes $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java $ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ . Assuming that: • /usr/joe/wordcount/input - input directory in HDFS Page 6 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial • /usr/joe/wordcount/output - output directory in HDFS Sample text-files as input: $ bin/hadoop dfs -ls /usr/joe/wordcount/input/ /usr/joe/wordcount/input/file01 /usr/joe/wordcount/input/file02 $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01 Hello World Bye World $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02 Hello Hadoop Goodbye Hadoop Run the application: $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output Output: $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2 Applications can specify a comma separated list of paths which would be present in the current working directory of the task using the option -files. The -libjars option allows applications to add jars to the classpaths of the maps and reduces. The -archives allows them to pass archives as arguments that are unzipped/unjarred and a link with name of the jar/zip are created in the current working directory of tasks. More details about the command line options are available at Hadoop Command Guide. Running wordcount example with -libjars and -files: hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output 5.3. Walk-through The WordCount application is quite straight-forward. The Mapper implementation (lines 14-26), via the map method (lines 18-25), processes one line at a time, as provided by the specified TextInputFormat (line 49). It then splits the line into tokens separated by whitespaces, via the StringTokenizer, and emits a Page 7 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial key-value pair of < , 1>. For the given sample input the first map emits: < Hello, 1> < World, 1> < Bye, 1> < World, 1> The second map emits: < Hello, 1> < Hadoop, 1> < Goodbye, 1> < Hadoop, 1> We'll learn more about the number of maps spawned for a given job, and how to control them in a fine-grained manner, a bit later in the tutorial. WordCount also specifies a combiner (line 46). Hence, the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the keys. The output of the first map: < Bye, 1> < Hello, 1> < World, 2> The output of the second map: < Goodbye, 1> < Hadoop, 2> < Hello, 1> The Reducer implementation (lines 28-36), via the reduce method (lines 29-35) just sums up the values, which are the occurence counts for each key (i.e. words in this example). Thus the output of the job is: < Bye, 1> < Goodbye, 1> < Hadoop, 2> < Hello, 2> < World, 2> The run method specifies various facets of the job, such as the input/output paths (passed via the command line), key/value types, input/output formats etc., in the JobConf. It then calls the JobClient.runJob (line 55) to submit the and monitor its progress. Page 8 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial We'll learn more about JobConf, JobClient, Tool and other interfaces and classes a bit later in the tutorial.
  7. Map/Reduce - User Interfaces This section provides a reasonable amount of detail on every user-facing aspect of the Map/Reduce framwork. This should help users implement, configure and tune their jobs in a fine-grained manner. However, please note that the javadoc for each class/interface remains the most comprehensive documentation available; this is only meant to be a tutorial. Let us first take the Mapper and Reducer interfaces. Applications typically implement them to provide the map and reduce methods. We will then discuss other core interfaces including JobConf, JobClient, Partitioner, OutputCollector, Reporter, InputFormat, OutputFormat, OutputCommitter and others. Finally, we will wrap up by discussing some useful features of the framework such as the DistributedCache, IsolationRunner etc. 6.1. Payload Applications typically implement the Mapper and Reducer interfaces to provide the map and reduce methods. These form the core of the job. 6.1.1. Mapper Mapper maps input key/value pairs to a set of intermediate key/value pairs. Maps are the individual tasks that transform input records into intermediate records. The transformed intermediate records do not need to be of the same type as the input records. A given input pair may map to zero or many output pairs. The Hadoop Map/Reduce framework spawns one map task for each InputSplit generated by the InputFormat for the job. Overall, Mapper implementations are passed the JobConf for the job via the JobConfigurable.configure(JobConf) method and override it to initialize themselves. The framework then calls map(WritableComparable, Writable, OutputCollector, Reporter) for each key/value pair in the InputSplit for that task. Applications can then override the Closeable.close() method to perform any required cleanup. Output pairs do not need to be of the same types as input pairs. A given input pair may map Page 9 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial to zero or many output pairs. Output pairs are collected with calls to OutputCollector.collect(WritableComparable,Writable). Applications can use the Reporter to report progress, set application-level status messages and update Counters, or just indicate that they are alive. All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator via JobConf.setOutputKeyComparatorClass(Class). The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner. Users can optionally specify a combiner, via JobConf.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer. The intermediate, sorted outputs are always stored in a simple (key-len, key, value-len, value) format. Applications can control if, and how, the intermediate outputs are to be compressed and the CompressionCodec to be used via the JobConf. 6.1.1.1. How Many Maps? The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files. The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes awhile, so it is best if the maps take at least a minute to execute. Thus, if you expect 10TB of input data and have a blocksize of 128MB, you'll end up with 82,000 maps, unless setNumMapTasks(int) (which only provides a hint to the framework) is used to set it even higher. 6.1.2. Reducer Reducer reduces a set of intermediate values which share a key to a smaller set of values. The number of reduces for the job is set by the user via JobConf.setNumReduceTasks(int). Overall, Reducer implementations are passed the JobConf for the job via the JobConfigurable.configure(JobConf) method and can override it to initialize themselves. The Page 10 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial framework then calls reduce(WritableComparable, Iterator, OutputCollector, Reporter) method for each pair in the grouped inputs. Applications can then override the Closeable.close() method to perform any required cleanup. Reducer has 3 primary phases: shuffle, sort and reduce. 6.1.2.1. Shuffle Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP. 6.1.2.2. Sort The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged. Secondary Sort If equivalence rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, then one may specify a Comparator via JobConf.setOutputValueGroupingComparator(Class). Since JobConf.setOutputKeyComparatorClass(Class) can be used to control how intermediate keys are grouped, these can be used in conjunction to simulate secondary sort on values. 6.1.2.3. Reduce In this phase the reduce(WritableComparable, Iterator, OutputCollector, Reporter) method is called for each pair in the grouped inputs. The output of the reduce task is typically written to the FileSystem via OutputCollector.collect(WritableComparable, Writable). Applications can use the Reporter to report progress, set application-level status messages and update Counters, or just indicate that they are alive. The output of the Reducer is not sorted. 6.1.2.4. How Many Reduces? The right number of reduces seems to be 0.95 or 1.75 multiplied by ( / mapred.tasktracker.reduce.tasks.maximum). Page 11 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial With 0.95 all of the reduces can launch immediately and start transfering map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing. Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures. The scaling factors above are slightly less than whole numbers to reserve a few reduce slots in the framework for speculative-tasks and failed tasks. 6.1.2.5. Reducer NONE It is legal to set the number of reduce-tasks to zero if no reduction is desired. In this case the outputs of the map-tasks go directly to the FileSystem, into the output path set by setOutputPath(Path). The framework does not sort the map-outputs before writing them out to the FileSystem. 6.1.3. Partitioner Partitioner partitions the key space. Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction. HashPartitioner is the default Partitioner. 6.1.4. Reporter Reporter is a facility for Map/Reduce applications to report progress, set application-level status messages and update Counters. Mapper and Reducer implementations can use the Reporter to report progress or just indicate that they are alive. In scenarios where the application takes a significant amount of time to process individual key/value pairs, this is crucial since the framework might assume that the task has timed-out and kill that task. Another way to avoid this is to set the configuration parameter mapred.task.timeout to a high-enough value (or even set it to zero for no time-outs). Applications can also update Counters using the Reporter. Page 12 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial 6.1.5. OutputCollector OutputCollector is a generalization of the facility provided by the Map/Reduce framework to collect data output by the Mapper or the Reducer (either the intermediate outputs or the output of the job). Hadoop Map/Reduce comes bundled with a library of generally useful mappers, reducers, and partitioners. 6.2. Job Configuration JobConf represents a Map/Reduce job configuration. JobConf is the primary interface for a user to describe a Map/Reduce job to the Hadoop framework for execution. The framework tries to faithfully execute the job as described by JobConf, however: • f Some configuration parameters may have been marked as final by administrators and hence cannot be altered. • While some job parameters are straight-forward to set (e.g. setNumReduceTasks(int)), other parameters interact subtly with the rest of the framework and/or job configuration and are more complex to set (e.g. setNumMapTasks(int)). JobConf is typically used to specify the Mapper, combiner (if any), Partitioner, Reducer, InputFormat, OutputFormat and OutputCommitter implementations. JobConf also indicates the set of input files (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) and (setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) and where the output files should be written (setOutputPath(Path)). Optionally, JobConf is used to specify other advanced facets of the job such as the Comparator to be used, files to be put in the DistributedCache, whether intermediate and/or job outputs are to be compressed (and how), debugging via user-provided scripts (setMapDebugScript(String)/setReduceDebugScript(String)) , whether job tasks can be executed in a speculative manner (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) , maximum number of attempts per task (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) , percentage of tasks failure which can be tolerated by the job (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) etc. Of course, users can use set(String, String)/get(String, String) to set/get arbitrary parameters needed by applications. However, use the DistributedCache for large amounts of Page 13 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial (read-only) data. 6.3. Task Execution & Environment The TaskTracker executes the Mapper/ Reducer task as a child process in a separate jvm. The child-task inherits the environment of the parent TaskTracker. The user can specify additional options to the child-jvm via the mapred.child.java.opts configuration parameter in the JobConf such as non-standard paths for the run-time linker to search shared libraries via -Djava.library.path=<> etc. If the mapred.child.java.opts contains the symbol @taskid@ it is interpolated with value of taskid of the map/reduce task. Here is an example with multiple arguments and substitutions, showing jvm GC logging, and start of a passwordless JVM JMX agent so that it can connect with jconsole and the likes to watch child memory, threads and get thread dumps. It also sets the maximum heap-size of the child jvm to 512MB and adds an additional path to the java.library.path of the child-jvm. mapred.child.java.opts -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false 6.3.1. Memory management Users/admins can also specify the maximum virtual memory of the launched child-task, and any sub-process it launches recursively, using mapred.child.ulimit. Note that the value set here is a per process limit. The value for mapred.child.ulimit should be specified in kilo bytes (KB). And also the value must be greater than or equal to the -Xmx passed to JavaVM, else the VM might not start. Note: mapred.child.java.opts are used only for configuring the launched child tasks from task tracker. Configuring the memory options for daemons is documented in cluster_setup.html The memory available to some parts of the framework is also configurable. In map and Page 14 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial reduce tasks, performance may be influenced by adjusting parameters influencing the concurrency of operations and the frequency with which data will hit disk. Monitoring the filesystem counters for a job- particularly relative to byte counts from the map and into the reduce- is invaluable to the tuning of these parameters. 6.3.2. Map Parameters A record emitted from a map will be serialized into a buffer and metadata will be stored into accounting buffers. As described in the following options, when either the serialization buffer or the metadata exceed a threshold, the contents of the buffers will be sorted and written to disk in the background while the map continues to output records. If either buffer fills completely while the spill is in progress, the map thread will block. When the map is finished, any remaining records are written to disk and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper. Name io.sort.mb int Type Description The cumulative size of the serialization and accounting buffers storing records emitted from the map, in megabytes. The ratio of serialization to accounting space can be adjusted. Each serialized record requires 16 bytes of accounting information in addition to its serialized size to effect the sort. This percentage of space allocated from io.sort.mb affects the probability of a spill to disk being caused by either exhaustion of the serialization buffer or the accounting space. Clearly, for a map outputting small records, a higher value than the default will likely decrease the number of spills to disk. This is the threshold for the accounting and serialization buffers. When this percentage of either buffer has filled, their io.sort.record.percent float io.sort.spill.percent float Page 15 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial contents will be spilled to disk in the background. Let io.sort.record.percent be r, io.sort.mb be x, and this value be q. The maximum number of records collected before the collection thread will spill is r / x / q / 2^16. Note that a higher value may decrease the number of- or even eliminate- merges, but will also increase the probability of the map task getting blocked. The lowest average map times are usually obtained by accurately estimating the size of the map output and preventing multiple spills. Other notes • If either spill threshold is exceeded while a spill is in progress, collection will continue until the spill is finished. For example, if io.sort.buffer.spill.percent is set to 0.33, and the remainder of the buffer is filled while the spill runs, the next spill will include all the collected records, or 0.66 of the buffer, and will not generate additional spills. In other words, the thresholds are defining triggers, not blocking. • A record larger than the serialization buffer will first trigger a spill, then be spilled to a separate file. It is undefined whether or not this record will first pass through the combiner. 6.3.3. Shuffle/Reduce Parameters As described previously, each reduce fetches the output assigned to it by the Partitioner via HTTP into memory and periodically merges these outputs to disk. If intermediate compression of map outputs is turned on, each output is decompressed into memory. The following options affect the frequency of these merges to disk prior to the reduce and the memory allocated to map output during the reduce. Name io.sort.factor int Type Description Specifies the number of segments on disk to be merged at the same time. It limits the number of open files and compression codecs during the merge. If the number of files Page 16 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial exceeds this limit, the merge will proceed in several passes. Though this limit also applies to the map, most jobs should be configured so that hitting this limit is unlikely there. mapred.inmem.merge.threshold int The number of sorted map outputs fetched into memory before being merged to disk. Like the spill thresholds in the preceding note, this is not defining a unit of partition, but a trigger. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk (see notes following this table). This threshold influences only the frequency of in-memory merges during the shuffle. The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Since map outputs that can't fit in memory can be stalled, setting this high may decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces whose input can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle. The percentage of memoryrelative to the maximum heapsize as typically specified in mapred.child.java.optsthat can be allocated to storing mapred.job.shuffle.merge.percentfloat mapred.job.shuffle.input.buffer.percent float Page 17 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial map outputs during the shuffle. Though some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs. mapred.job.reduce.input.buffer.percent float The percentage of memory relative to the maximum heapsize in which map outputs may be retained during the reduce. When the reduce begins, map outputs will be merged to disk until those that remain are under the resource limit this defines. By default, all map outputs are merged to disk before the reduce begins to maximize the memory available to the reduce. For less memory-intensive reduces, this should be increased to avoid trips to disk. Other notes • If a map output is larger than 25 percent of the memory allocated to copying map outputs, it will be written directly to disk without first staging through memory. • When running with a combiner, the reasoning about high merge thresholds and large buffers may not hold. For merges started before all map outputs have been fetched, the combiner is run while spilling to disk. In some cases, one can obtain better reduce times by spending resources combining map outputs- making disk spills small and parallelizing spilling and fetching- rather than aggressively increasing buffer sizes. • When merging in-memory map outputs to disk to begin the reduce, if an intermediate merge is necessary because there are segments to spill and at least io.sort.factor segments already on disk, the in-memory map outputs will be part of the intermediate merge. 6.3.4. Directory Structure The task tracker has local directory, ${mapred.local.dir}/taskTracker/ to create localized cache and localized job. It can define multiple local directories (spanning multiple disks) and then each filename is assigned to a semi-random local directory. When the job starts, task tracker creates a localized job directory relative to the local directory specified in Page 18 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial the configuration. Thus the task tracker directory structure looks the following: • ${mapred.local.dir}/taskTracker/archive/ : The distributed cache. This directory holds the localized distributed cache. Thus localized distributed cache is shared among all the tasks and jobs • ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : The localized job directory • ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ : The job-specific shared directory. The tasks can use this space as scratch space and share files among them. This directory is exposed to the users through the configuration property job.local.dir. The directory can accessed through api JobConf.getJobLocalDir(). It is available as System property also. So, users (streaming etc.) can call System.getProperty("job.local.dir") to access the directory. • ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/ : The jars directory, which has the job jar file and expanded jar. The job.jar is the application's jar file that is automatically distributed to each machine. It is expanded in jars directory before the tasks for the job start. The job.jar location is accessible to the application through the api JobConf.getJar() . To access the unjarred directory, JobConf.getJar().getParent() can be called. • ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml : The job.xml file, the generic job configuration, localized for the job. • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid : The task directory for each task attempt. Each task directory again has the following structure : • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml : A job.xml file, task localized job configuration, Task localization means that properties have been set that are specific to this particular task within the job. The properties localized for each task are described below. • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output : A directory for intermediate output files. This contains the temporary map reduce data generated by the framework such as map output files etc. • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work : The curernt working directory of the task. With jvm reuse enabled for tasks, this directory will be the directory on which the jvm has started • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp : The temporary directory for the task. (User can specify the property mapred.child.tmp to set the value of temporary directory for map and reduce tasks. This defaults to ./tmp. If the value is not an absolute path, it is prepended with task's working directory. Otherwise, it is directly assigned. The directory will be created if it doesn't exist. Then, the child java tasks are executed Page 19 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial with option -Djava.io.tmpdir='the absolute path of the tmp dir'. Anp pipes and streaming are set with environment variable, TMPDIR='the absolute path of the tmp dir'). This directory is created, if mapred.child.tmp has the value ./tmp 6.3.5. Task JVM Reuse Jobs can enable task JVMs to be reused by specifying the job configuration mapred.job.reuse.jvm.num.tasks. If the value is 1 (the default), then JVMs are not reused (i.e. 1 task per JVM). If it is -1, there is no limit to the number of tasks a JVM can run (of the same job). One can also specify some value greater than 1 using the api JobConf.setNumTasksToExecutePerJvm(int) The following properties are localized in the job configuration for each task's execution: Name mapred.job.id mapred.jar job.local.dir mapred.tip.id mapred.task.id mapred.task.is.map mapred.task.partition map.input.file map.input.start map.input.length mapred.work.output.dir String String String String String boolean int String long long String Type The job id job.jar location in job directory The job specific shared scratch space The task id The task attempt id Is this a map task The id of the task within the job The filename that the map is reading from The offset of the start of the map input split The number of bytes in the map input split The task's temporary output directory Description The standard output (stdout) and error (stderr) streams of the task are read by the TaskTracker and logged to ${HADOOPLOG_DIR}/userlogs Page 20 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial The DistributedCache can also be used to distribute both jars and native libraries for use in the map and/or reduce tasks. The child-jvm always has its current working directory added to the java.library.path and LD_LIBRARY_PATH. And hence the cached libraries can be loaded via System.loadLibrary or System.load. More details on how to load shared libraries through distributed cache are documented at native_libraries.html 6.4. Job Submission and Monitoring JobClient is the primary interface by which user-job interacts with the JobTracker. JobClient provides facilities to submit jobs, track their progress, access component-tasks' reports and logs, get the Map/Reduce cluster's status information and so on. The job submission process involves: 1. Checking the input and output specifications of the job. 2. Computing the InputSplit values for the job. 3. Setting up the requisite accounting information for the DistributedCache of the job, if necessary. 4. Copying the job's jar and configuration to the Map/Reduce system directory on the FileSystem. 5. Submitting the job to the JobTracker and optionally monitoring it's status. Job history files are also logged to user specified directory hadoop.job.history.user.location which defaults to job output directory. The files are stored in "_logs/history/" in the specified directory. Hence, by default they will be in mapred.output.dir/_logs/history. User can stop logging by giving the value none for hadoop.job.history.user.location User can view the history logs summary in specified directory using the following command $ bin/hadoop job -history output-dir This command will print job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed using the following command $ bin/hadoop job -history all output-dir User can use OutputLogFilter to filter log files from the output directory listing. Normally the user creates the application, describes various facets of the job via JobConf, and then uses the JobClient to submit the job and monitor its progress. 6.4.1. Job Control Users may need to chain Map/Reduce jobs to accomplish complex tasks which cannot be Page 21 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial done via a single Map/Reduce job. This is fairly easy since the output of the job typically goes to distributed file-system, and the output, in turn, can be used as the input for the next job. However, this also means that the onus on ensuring jobs are complete (success/failure) lies squarely on the clients. In such cases, the various job-control options are: • runJob(JobConf) : Submits the job and returns only after the job has completed. • submitJob(JobConf) : Only submits the job, then poll the returned handle to the RunningJob to query status and make scheduling decisions. • JobConf.setJobEndNotificationURI(String) : Sets up a notification upon job-completion, thus avoiding polling. 6.5. Job Input InputFormat describes the input-specification for a Map/Reduce job. The Map/Reduce framework relies on the InputFormat of the job to: 1. Validate the input-specification of the job. 2. Split-up the input file(s) into logical InputSplit instances, each of which is then assigned to an individual Mapper. 3. Provide the RecordReader implementation used to glean input records from the logical InputSplit for processing by the Mapper. The default behavior of file-based InputFormat implementations, typically sub-classes of FileInputFormat, is to split the input into logical InputSplit instances based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapred.min.split.size. Clearly, logical splits based on input-size is insufficient for many applications since record boundaries must be respected. In such cases, the application should implement a RecordReader, who is responsible for respecting record-boundaries and presents a record-oriented view of the logical InputSplit to the individual task. TextInputFormat is the default InputFormat. If TextInputFormat is the InputFormat for a given job, the framework detects input-files with the .gz extensions and automatically decompresses them using the appropriate CompressionCodec. However, it must be noted that compressed files with the above extensions cannot be split and each compressed file is processed in its entirety by a single mapper. Page 22 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial 6.5.1. InputSplit InputSplit represents the data to be processed by an individual Mapper. Typically InputSplit presents a byte-oriented view of the input, and it is the responsibility of RecordReader to process and present a record-oriented view. FileSplit is the default InputSplit. It sets map.input.file to the path of the input file for the logical split. 6.5.2. RecordReader RecordReader reads pairs from an InputSplit. Typically the RecordReader converts the byte-oriented view of the input, provided by the InputSplit, and presents a record-oriented to the Mapper implementations for processing. RecordReader thus assumes the responsibility of processing record boundaries and presents the tasks with keys and values. 6.6. Job Output OutputFormat describes the output-specification for a Map/Reduce job. The Map/Reduce framework relies on the OutputFormat of the job to: 1. Validate the output-specification of the job; for example, check that the output directory doesn't already exist. 2. Provide the RecordWriter implementation used to write the output files of the job. Output files are stored in a FileSystem. TextOutputFormat is the default OutputFormat. 6.6.1. OutputCommitter OutputCommitter describes the commit of task output for a Map/Reduce job. The Map/Reduce framework relies on the OutputCommitter of the job to: 1. Setup the job during initialization. For example, create the temporary output directory for the job during the initialization of the job. Job setup is done by a separate task when the job is in PREP state and after initializing tasks. Once the setup task completes, the job will be moved to RUNNING state. 2. Cleanup the job after the job completion. For example, remove the temporary output directory after the job completion. Job cleanup is done by a separate task at the end of the Page 23 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial job. Job is declared SUCCEDED/FAILED/KILLED after the cleanup task completes. 3. Setup the task temporary output. Task setup is done as part of the same task, during task initialization. 4. Check whether a task needs a commit. This is to avoid the commit procedure if a task does not need commit. 5. Commit of the task output. Once task is done, the task will commit it's output if required. 6. Discard the task commit. If the task has been failed/killed, the output will be cleaned-up. If task could not cleanup (in exception block), a separate task will be launched with same attempt-id to do the cleanup. FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce slots, whichever is free on the TaskTracker. And JobCleanup task, TaskCleanup tasks and JobSetup task have the highest priority, and in that order. 6.6.2. Task Side-Effect Files In some applications, component tasks need to create and/or write to side-files, which differ from the actual job-output files. In such cases there could be issues with two instances of the same Mapper or Reducer running simultaneously (for example, speculative tasks) trying to open and/or write to the same file (path) on the FileSystem. Hence the application-writer will have to pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task. To avoid these issues the Map/Reduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special ${mapred.output.dir}/_temporary/${taskid} sub-directory accessible via ${mapred.work.output.dir} for each task-attempt on the FileSystem where the output of the task-attempt is stored. On successful completion of the task-attempt, the files in the ${mapred.output.dir}/temporary/${taskid} (only) are promoted to ${mapred.output.dir}. Of course, the framework discards the sub-directory of unsuccessful task-attempts. This process is completely transparent to the application. The application-writer can take advantage of this feature by creating any side-files required in ${mapred.work.output.dir} during execution of a task via FileOutputFormat.getWorkOutputPath(), and the framework will promote them similarly for succesful task-attempts, thus eliminating the need to pick unique paths per task-attempt. Note: The value of ${mapred.work.output.dir} during execution of a particular task-attempt is actually ${mapred.output.dir}/temporary/{$taskid}, and this value is set by the Map/Reduce framework. So, just create any side-files in the path returned by FileOutputFormat.getWorkOutputPath() from map/reduce task to take advantage Page 24 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial of this feature. The entire discussion holds true for maps of jobs with reducer=NONE (i.e. 0 reduces) since output of the map, in that case, goes directly to HDFS. 6.6.3. RecordWriter RecordWriter writes the output pairs to an output file. RecordWriter implementations write the job outputs to the FileSystem. 6.7. Other Useful Features 6.7.1. Submitting Jobs to Queues Users submit jobs to Queues. Queues, as collection of jobs, allow the system to provide specific functionality. For example, queues use ACLs to control which users who can submit jobs to them. Queues are expected to be primarily used by Hadoop Schedulers. Hadoop comes configured with a single mandatory queue, called 'default'. Queue names are defined in the mapred.queue.names property of the Hadoop site configuration. Some job schedulers, such as the Capacity Scheduler, support multiple queues. A job defines the queue it needs to be submitted to through the mapred.job.queue.name property, or through the setQueueName(String) API. Setting the queue name is optional. If a job is submitted without an associated queue name, it is submitted to the 'default' queue. 6.7.2. Counters Counters represent global counters, defined either by the Map/Reduce framework or applications. Each Counter can be of any Enum type. Counters of a particular Enum are bunched into groups of type Counters.Group. Applications can define arbitrary Counters (of type Enum) and update them via Reporter.incrCounter(Enum, long) or Reporter.incrCounter(String, String, long) in the map and/or reduce methods. These counters are then globally aggregated by the framework. 6.7.3. DistributedCache DistributedCache distributes application-specific, large, read-only files efficiently. DistributedCache is a facility provided by the Map/Reduce framework to cache files Page 25 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial (text, archives, jars and so on) needed by applications. Applications specify the files to be cached via urls (hdfs://) in the JobConf. The DistributedCache assumes that the files specified via hdfs:// urls are already present on the FileSystem. The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves. DistributedCache tracks the modification timestamps of the cached files. Clearly the cache files should not be modified by the application or externally while the job is executing. DistributedCache can be used to distribute simple, read-only data/text files and more complex types such as archives and jars. Archives (zip, tar, tgz and tar.gz files) are un-archived at the slave nodes. Files have execution permissions set. The files/archives can be distributed by setting the property mapred.cache.{files|archives}. If more than one file/archive has to be distributed, they can be added as comma separated paths. The properties can also be set by APIs DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) where URI is of the form hdfs://host:port/absolute-path/#link-name. In Streaming, the files can be distributed through command line option -cacheFile/-cacheArchive. Optionally users can also direct the DistributedCache to symlink the cached file(s) into the current working directory of the task via the DistributedCache.createSymlink(Configuration) api. Or by setting the configuration property mapred.create.symlink as yes. The DistributedCache will use the fragment of the URI as the name of the symlink. For example, the URI hdfs://namenode:port/lib.so.1/#lib.so will have the symlink name as lib.so in task's cwd for the file lib.so.1 in distributed cache. The DistributedCache can also be used as a rudimentary software distribution mechanism for use in the map and/or reduce tasks. It can be used to distribute both jars and native libraries. The DistributedCache.addArchiveToClassPath(Path, Configuration) or DistributedCache.addFileToClassPath(Path, Configuration) api can be used to cache files/jars and also add them to the classpath of child-jvm. The same can be done by setting the configuration properties mapred.job.classpath.{files|archives}. Similarly the cached files that are symlinked into the working directory of the task can be used to distribute native libraries and load them. Page 26 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial 6.7.4. Tool The Tool interface supports the handling of generic Hadoop command-line options. Tool is the standard for any Map/Reduce tool or application. The application should delegate the handling of standard command-line options to GenericOptionsParser via ToolRunner.run(Tool, String[]) and only handle its custom arguments. The generic Hadoop command-line options are: -conf -D -fs -jt 6.7.5. IsolationRunner IsolationRunner is a utility to help debug Map/Reduce programs. To use the IsolationRunner, first set keep.failed.tasks.files to true (also see keep.tasks.files.pattern). Next, go to the node on which the failed task ran and go to the TaskTracker's local directory and run the IsolationRunner: $ cd /taskTracker/${taskid}/work $ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml IsolationRunner will run the failed task in a single jvm, which can be in the debugger, over precisely the same input. 6.7.6. Profiling Profiling is a utility to get a representative (2 or 3) sample of built-in java profiler for a sample of maps and reduces. User can specify whether the system should collect profiler information for some of the tasks in the job by setting the configuration property mapred.task.profile. The value can be set using the api JobConf.setProfileEnabled(boolean). If the value is set true, the task profiling is enabled. The profiler information is stored in the user log directory. By default, profiling is not enabled for the job. Once user configures that profiling is needed, she/he can use the configuration property mapred.task.profile.{maps|reduces} to set the ranges of map/reduce tasks to Page 27 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial profile. The value can be set using the api JobConf.setProfileTaskRange(boolean,String). By default, the specified range is 0-2. User can also specify the profiler configuration arguments by setting the configuration property mapred.task.profile.params. The value can be specified using the api JobConf.setProfileParams(String). If the string contains a %s, it will be replaced with the name of the profiling output file when the task runs. These parameters are passed to the task child JVM on the command line. The default value for the profiling parameters is -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s 6.7.7. Debugging The Map/Reduce framework provides a facility to run user-provided scripts for debugging. When a map/reduce task fails, a user can run a debug script, to process task logs for example. The script is given access to the task's stdout and stderr outputs, syslog and jobconf. The output from the debug script's stdout and stderr is displayed on the console diagnostics and also as part of the job UI. In the following sections we discuss how to submit a debug script with a job. The script file needs to be distributed and submitted to the framework. 6.7.7.1. How to distribute the script file: The user needs to use DistributedCache to distribute and symlink the script file. 6.7.7.2. How to submit the script: A quick way to submit the debug script is to set values for the properties mapred.map.task.debug.script and mapred.reduce.task.debug.script, for debugging map and reduce tasks respectively. These properties can also be set by using APIs JobConf.setMapDebugScript(String) and JobConf.setReduceDebugScript(String) . In streaming mode, a debug script can be submitted with the command-line options -mapdebug and -reducedebug, for debugging map and reduce tasks respectively. The arguments to the script are the task's stdout, stderr, syslog and jobconf files. The debug command, run on the node where the map/reduce task failed, is: $script $stdout $stderr $syslog $jobconf Pipes programs have the c++ program name as a fifth argument for the command. Thus for the pipes programs the command is $script $stdout $stderr $syslog $jobconf $program Page 28 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial 6.7.7.3. Default Behavior: For pipes, a default script is run to process core dumps under gdb, prints stack trace and gives info about running threads. 6.7.8. JobControl JobControl is a utility which encapsulates a set of Map/Reduce jobs and their dependencies. 6.7.9. Data Compression Hadoop Map/Reduce provides facilities for the application-writer to specify compression for both intermediate map-outputs and the job-outputs i.e. output of the reduces. It also comes bundled with CompressionCodec implementation for the zlib compression algorithm. The gzip file format is also supported. Hadoop also provides native implementations of the above compression codecs for reasons of both performance (zlib) and non-availability of Java libraries. More details on their usage and availability are available here. 6.7.9.1. Intermediate Outputs Applications can control compression of intermediate map-outputs via the JobConf.setCompressMapOutput(boolean) api and the CompressionCodec to be used via the JobConf.setMapOutputCompressorClass(Class) api. 6.7.9.2. Job Outputs Applications can control compression of job-outputs via the FileOutputFormat.setCompressOutput(JobConf, boolean) api and the CompressionCodec to be used can be specified via the FileOutputFormat.setOutputCompressorClass(JobConf, Class) api. If the job outputs are to be stored in the SequenceFileOutputFormat, the required SequenceFile.CompressionType (i.e. RECORD / BLOCK - defaults to RECORD) can be specified via the SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType) api. 6.7.10. Skipping Bad Records Hadoop provides an option where a certain set of bad input records can be skipped when processing map inputs. Applications can control this feature through the SkipBadRecords Page 29 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial class. This feature can be used when map tasks crash deterministically on certain input. This usually happens due to bugs in the map function. Usually, the user would have to fix these bugs. This is, however, not possible sometimes. The bug may be in third party libraries, for example, for which the source code is not available. In such cases, the task never completes successfully even after multiple attempts, and the job fails. With this feature, only a small portion of data surrounding the bad records is lost, which may be acceptable for some applications (those performing statistical analysis on very large data, for example). By default this feature is disabled. For enabling it, refer to SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and SkipBadRecords.setReducerMaxSkipGroups(Configuration, long). With this feature enabled, the framework gets into 'skipping mode' after a certain number of map failures. For more details, see SkipBadRecords.setAttemptsToStartSkipping(Configuration, int). In 'skipping mode', map tasks maintain the range of records being processed. To do this, the framework relies on the processed record counter. See SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS and SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS. This counter enables the framework to know how many records have been processed successfully, and hence, what record range caused a task to crash. On further attempts, this range of records is skipped. The number of records skipped depends on how frequently the processed record counter is incremented by the application. It is recommended that this counter be incremented after every record is processed. This may not be possible in some applications that typically batch their processing. In such cases, the framework may skip additional records surrounding the bad record. Users can control the number of skipped records through SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and SkipBadRecords.setReducerMaxSkipGroups(Configuration, long). The framework tries to narrow the range of skipped records using a binary search-like approach. The skipped range is divided into two halves and only one half gets executed. On subsequent failures, the framework figures out which half contains bad records. A task will be re-executed till the acceptable skipped value is met or all task attempts are exhausted. To increase the number of task attempts, use JobConf.setMaxMapAttempts(int) and JobConf.setMaxReduceAttempts(int). Skipped records are written to HDFS in the sequence file format, for later analysis. The location can be changed through SkipBadRecords.setSkipOutputPath(JobConf, Path).
  8. Example: WordCount v2.0 Page 30 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial Here is a more complete WordCount which uses many of the features provided by the Map/Reduce framework we discussed so far. This needs the HDFS to be up and running, especially for the DistributedCache-related features. Hence it only works with a pseudo-distributed or fully-distributed Hadoop installation. 7.1. Source Code WordCount.java 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. public static class Map extends MapReduceBase implements Mapper { public class WordCount extends Configured implements Tool { import org.apache.hadoop.fs.Path; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.conf./; import org.apache.hadoop.io./; import org.apache.hadoop.mapred./; import org.apache.hadoop.util./; import java.io./; import java.util./; package org.myorg;
    1. static enum Counters { INPUT_WORDS } Page 31 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial
            1. private boolean caseSensitive = true; private Set patternsToSkip = new HashSet(); private final static IntWritable one = new IntWritable(1); private Text word = new Text();
            1. public void configure(JobConf job) { caseSensitive = job.getBoolean("wordcount.case.sensitive", true); inputFile = job.get("map.input.file"); private long numRecords = 0; private String inputFile;
    1. 32. if (job.getBoolean("wordcount.skip.patterns", false)) { Path[] patternsFiles = new Path[0]; try { patternsFiles = DistributedCache.getLocalCacheFiles(job); } catch (IOException ioe) { System.err.println("Caught
        1. 37. Page 32 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial exception while getting cached files: " + StringUtils.stringifyException(ioe)); 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. private void parseSkipFile(Path patternsFile) { try { BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString())); String pattern = null; while ((pattern = fis.readLine()) != null) { patternsToSkip.add(pattern); } } catch (IOException ioe) { System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe)); } } } for (Path patternsFile : patternsFiles) { parseSkipFile(patternsFile); } } }
          1. 53.
      1. 57. public void map(LongWritable key, Page 33 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial Text value, OutputCollector output, Reporter reporter) throws IOException { 58. String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
      1. ""); 62. 63. 64. 65. 66. word.set(tokenizer.nextToken()); 67. 68. reporter.incrCounter(Counters.INPUT_WORDS, 1); 69. 70. 71. 72. if ((++numRecords % 100) == 0) { reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile); } } } } output.collect(word, one); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { } for (String pattern : patternsToSkip) { line = line.replaceAll(pattern,
    1. 75. Page 34 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial
    1. public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } 78.
                        1. 91. public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); 92. conf.setOutputValueClass(IntWritable.class); 93. 94. 95. conf.setMapperClass(Map.class); Page 35 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial conf.setCombinerClass(Reduce.class); 96. conf.setReducerClass(Reduce.class); 97. 98. conf.setInputFormat(TextInputFormat.class); 99. conf.setOutputFormat(TextOutputFormat.class); 100. 101. 102. 103. 104. DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf); 105. conf.setBoolean("wordcount.skip.patterns", true); 106. 107. 108. 109. 110. 111. FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); 112. FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); 113. } else { other_args.add(args[i]); } } List other_args = new ArrayList(); for (int i=0; i < args.length; ++i) { if ("-skip".equals(args[i])) { Page 36 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial
          1. 119. JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }
      1. 123. 7.2. Sample Runs Sample text-files as input: $ bin/hadoop dfs -ls /usr/joe/wordcount/input/ /usr/joe/wordcount/input/file01 /usr/joe/wordcount/input/file02 $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01 Hello World, Bye World! $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02 Hello Hadoop, Goodbye to hadoop. Run the application: $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output Output: $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 Bye 1 Goodbye 1 Hadoop, 1 Hello 2 Page 37 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial World! 1 World, 1 hadoop. 1 to 1 Notice that the inputs differ from the first version we looked at, and how they affect the outputs. Now, lets plug-in a pattern-file which lists the word-patterns to be ignored, via the DistributedCache. $ hadoop dfs -cat /user/joe/wordcount/patterns.txt . \, ! to Run it again, this time with more options: $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt As expected, the output: $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 Bye 1 Goodbye 1 Hadoop 1 Hello 2 World 2 hadoop 1 Run it once more, this time switch-off case-sensitivity: $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt Sure enough, the output: $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 bye 1 Page 38 Copyright © 2008 The Apache Software Foundation. All rights reserved. Map/Reduce Tutorial goodbye 1 hadoop 2 hello 2 world 2 7.3. Highlights The second version of WordCount improves upon the previous one by using some features offered by the Map/Reduce framework: • Demonstrates how applications can access configuration parameters in the configure method of the Mapper (and Reducer) implementations (lines 28-43). • Demonstrates how the DistributedCache can be used to distribute read-only data needed by the jobs. Here it allows the user to specify word-patterns to skip while counting (line 104). • Demonstrates the utility of the Tool interface and the GenericOptionsParser to handle generic Hadoop command-line options (lines 87-116, 119). • Demonstrates how applications can use Counters (line 68) and how they can set application-specific status information via the Reporter instance passed to the map (and reduce) method (line 72). Java and JNI are trademarks or registered trademarks of Sun Microsystems, Inc. in the United States and other countries. Page 39 Copyright © 2008 The Apache Software Foundation. All rights reserved.

mapreduce

Posted on

mapreduce-osdi04

MapReduce: Simplified Data Processing on Large Clusters Jeffrey Dean and Sanjay Ghemawat jeff@google.com, sanjay@google.com Google, Inc. Abstract MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the paper. Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. The run-time system takes care of the details of partitioning the input data, scheduling the program’s execution across a set of machines, handling machine failures, and managing the required inter-machine communication. This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system. Our implementation of MapReduce runs on a large cluster of commodity machines and is highly scalable: a typical MapReduce computation processes many terabytes of data on thousands of machines. Programmers find the system easy to use: hundreds of MapReduce programs have been implemented and upwards of one thousand MapReduce jobs are executed on Google’s clusters every day. 1 Introduction Over the past five years, the authors and many others at Google have implemented hundreds of special-purpose computations that process large amounts of raw data, such as crawled documents, web request logs, etc., to compute various kinds of derived data, such as inverted indices, various representations of the graph structure of web documents, summaries of the number of pages crawled per host, the set of most frequent queries in a To appear in OSDI 2004 given day, etc. Most such computations are conceptually straightforward. However, the input data is usually large and the computations have to be distributed across hundreds or thousands of machines in order to finish in a reasonable amount of time. The issues of how to parallelize the computation, distribute the data, and handle failures conspire to obscure the original simple computation with large amounts of complex code to deal with these issues. As a reaction to this complexity, we designed a new abstraction that allows us to express the simple computations we were trying to perform but hides the messy details of parallelization, fault-tolerance, data distribution and load balancing in a library. Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages. We realized that most of our computations involved applying a map operation to each logical “record” in our input in order to compute a set of intermediate key/value pairs, and then applying a reduce operation to all the values that shared the same key, in order to combine the derived data appropriately. Our use of a functional model with userspecified map and reduce operations allows us to parallelize large computations easily and to use re-execution as the primary mechanism for fault tolerance. The major contributions of this work are a simple and powerful interface that enables automatic parallelization and distribution of large-scale computations, combined with an implementation of this interface that achieves high performance on large clusters of commodity PCs. Section 2 describes the basic programming model and gives several examples. Section 3 describes an implementation of the MapReduce interface tailored towards our cluster-based computing environment. Section 4 describes several refinements of the programming model that we have found useful. Section 5 has performance measurements of our implementation for a variety of tasks. Section 6 explores the use of MapReduce within Google including our experiences in using it as the basis 1 for a rewrite of our production indexing system. Section 7 discusses related and future work. 2.2 Types Even though the previous pseudo-code is written in terms of string inputs and outputs, conceptually the map and reduce functions supplied by the user have associated types: map reduce (k1,v1) (k2,list(v2)) → list(k2,v2) → list(v2) 2 Programming Model The computation takes a set of input key/value pairs, and produces a set of output key/value pairs. The user of the MapReduce library expresses the computation as two functions: Map and Reduce. Map, written by the user, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the Reduce function. The Reduce function, also written by the user, accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per Reduce invocation. The intermediate values are supplied to the user’s reduce function via an iterator. This allows us to handle lists of values that are too large to fit in memory. I.e., the input keys and values are drawn from a different domain than the output keys and values. Furthermore, the intermediate keys and values are from the same domain as the output keys and values. Our C++ implementation passes strings to and from the user-defined functions and leaves it to the user code to convert between strings and appropriate types. 2.3 More Examples Here are a few simple examples of interesting programs that can be easily expressed as MapReduce computations. Distributed Grep: The map function emits a line if it matches a supplied pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output. Count of URL Access Frequency: The map function processes logs of web page requests and outputs URL, 1 . The reduce function adds together all values for the same URL and emits a URL, total count pair. Reverse Web-Link Graph: The map function outputs target, source pairs for each link to a target URL found in a page named source. The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: target, list(source) Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list of word, f requency pairs. The map function emits a hostname, term vector pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final hostname, term vector pair. 2 2.1 Example Consider the problem of counting the number of occurrences of each word in a large collection of documents. The user would write code similar to the following pseudo-code: map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result)); The map function emits each word plus an associated count of occurrences (just ‘1’ in this simple example). The reduce function sums together all counts emitted for a particular word. In addition, the user writes code to fill in a mapreduce specification object with the names of the input and output files, and optional tuning parameters. The user then invokes the MapReduce function, passing it the specification object. The user’s code is linked together with the MapReduce library (implemented in C++). Appendix A contains the full program text for this example. To appear in OSDI 2004 User Program (1) fork (1) fork (1) fork Master (2) assign map (2) assign reduce worker split 0 split 1 split 2 split 3 split 4 worker Input files Map phase Intermediate files (on local disks) Reduce phase Output files (3) read (5) remote read worker worker (6) write output file 0 output file 1 worker (4) local write Figure 1: Execution overview Inverted Index: The map function parses each document, and emits a sequence of word, document ID pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a word, list(document ID) pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions. Distributed Sort: The map function extracts the key from each record, and emits a key, record pair. The reduce function emits all pairs unchanged. This computation depends on the partitioning facilities described in Section 4.1 and the ordering properties described in Section 4.2. large clusters of commodity PCs connected together with switched Ethernet [4]. In our environment: (1) Machines are typically dual-processor x86 processors running Linux, with 2-4 GB of memory per machine. (2) Commodity networking hardware is used – typically either 100 megabits/second or 1 gigabit/second at the machine level, but averaging considerably less in overall bisection bandwidth. (3) A cluster consists of hundreds or thousands of machines, and therefore machine failures are common. (4) Storage is provided by inexpensive IDE disks attached directly to individual machines. A distributed file system [8] developed in-house is used to manage the data stored on these disks. The file system uses replication to provide availability and reliability on top of unreliable hardware. (5) Users submit jobs to a scheduling system. Each job consists of a set of tasks, and is mapped by the scheduler to a set of available machines within a cluster. 3 Implementation Many different implementations of the MapReduce interface are possible. The right choice depends on the environment. For example, one implementation may be suitable for a small shared-memory machine, another for a large NUMA multi-processor, and yet another for an even larger collection of networked machines. This section describes an implementation targeted to the computing environment in wide use at Google: To appear in OSDI 2004 3.1 Execution Overview The Map invocations are distributed across multiple machines by automatically partitioning the input data 3 into a set of M splits. The input splits can be processed in parallel by different machines. Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and the partitioning function are specified by the user. Figure 1 shows the overall flow of a MapReduce operation in our implementation. When the user program calls the MapReduce function, the following sequence of actions occurs (the numbered labels in Figure 1 correspond to the numbers in the list below): 1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines. 2. One of the copies of the program is special – the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task. 3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory. 4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers. 5. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used. 6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition. To appear in OSDI 2004

  1. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code. After successful completion, the output of the mapreduce execution is available in the R output files (one per reduce task, with file names as specified by the user). Typically, users do not need to combine these R output files into one file – they often pass these files as input to another MapReduce call, or use them from another distributed application that is able to deal with input that is partitioned into multiple files. 3.2 Master Data Structures The master keeps several data structures. For each map task and reduce task, it stores the state (idle, in-progress, or completed), and the identity of the worker machine (for non-idle tasks). The master is the conduit through which the location of intermediate file regions is propagated from map tasks to reduce tasks. Therefore, for each completed map task, the master stores the locations and sizes of the R intermediate file regions produced by the map task. Updates to this location and size information are received as map tasks are completed. The information is pushed incrementally to workers that have in-progress reduce tasks. 3.3 Fault Tolerance Since the MapReduce library is designed to help process very large amounts of data using hundreds or thousands of machines, the library must tolerate machine failures gracefully. Worker Failure The master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling. Completed map tasks are re-executed on a failure because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global file system. When a map task is executed first by worker A and then later executed by worker B (because A failed), all 4 workers executing reduce tasks are notified of the reexecution. Any reduce task that has not already read the data from worker A will read the data from worker B. MapReduce is resilient to large-scale worker failures. For example, during one MapReduce operation, network maintenance on a running cluster was causing groups of 80 machines at a time to become unreachable for several minutes. The MapReduce master simply re-executed the work done by the unreachable worker machines, and continued to make forward progress, eventually completing the MapReduce operation. Master Failure It is easy to make the master write periodic checkpoints of the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, given that there is only a single master, its failure is unlikely; therefore our current implementation aborts the MapReduce computation if the master fails. Clients can check for this condition and retry the MapReduce operation if they desire. Semantics in the Presence of Failures When the user-supplied map and reduce operators are deterministic functions of their input values, our distributed implementation produces the same output as would have been produced by a non-faulting sequential execution of the entire program. We rely on atomic commits of map and reduce task outputs to achieve this property. Each in-progress task writes its output to private temporary files. A reduce task produces one such file, and a map task produces R such files (one per reduce task). When a map task completes, the worker sends a message to the master and includes the names of the R temporary files in the message. If the master receives a completion message for an already completed map task, it ignores the message. Otherwise, it records the names of R files in a master data structure. When a reduce task completes, the reduce worker atomically renames its temporary output file to the final output file. If the same reduce task is executed on multiple machines, multiple rename calls will be executed for the same final output file. We rely on the atomic rename operation provided by the underlying file system to guarantee that the final file system state contains just the data produced by one execution of the reduce task. The vast majority of our map and reduce operators are deterministic, and the fact that our semantics are equivalent to a sequential execution in this case makes it very To appear in OSDI 2004 easy for programmers to reason about their program’s behavior. When the map and/or reduce operators are nondeterministic, we provide weaker but still reasonable semantics. In the presence of non-deterministic operators, the output of a particular reduce task R1 is equivalent to the output for R1 produced by a sequential execution of the non-deterministic program. However, the output for a different reduce task R2 may correspond to the output for R2 produced by a different sequential execution of the non-deterministic program. Consider map task M and reduce tasks R1 and R2 . Let e(Ri ) be the execution of Ri that committed (there is exactly one such execution). The weaker semantics arise because e(R1 ) may have read the output produced by one execution of M and e(R2 ) may have read the output produced by a different execution of M . 3.4 Locality Network bandwidth is a relatively scarce resource in our computing environment. We conserve network bandwidth by taking advantage of the fact that the input data (managed by GFS [8]) is stored on the local disks of the machines that make up our cluster. GFS divides each file into 64 MB blocks, and stores several copies of each block (typically 3 copies) on different machines. The MapReduce master takes the location information of the input files into account and attempts to schedule a map task on a machine that contains a replica of the corresponding input data. Failing that, it attempts to schedule a map task near a replica of that task’s input data (e.g., on a worker machine that is on the same network switch as the machine containing the data). When running large MapReduce operations on a significant fraction of the workers in a cluster, most input data is read locally and consumes no network bandwidth. 3.5 Task Granularity We subdivide the map phase into M pieces and the reduce phase into R pieces, as described above. Ideally, M and R should be much larger than the number of worker machines. Having each worker perform many different tasks improves dynamic load balancing, and also speeds up recovery when a worker fails: the many map tasks it has completed can be spread out across all the other worker machines. There are practical bounds on how large M and R can be in our implementation, since the master must make O(M + R) scheduling decisions and keeps O(M ∗ R) state in memory as described above. (The constant factors for memory usage are small however: the O(M ∗ R) piece of the state consists of approximately one byte of data per map task/reduce task pair.) 5 Furthermore, R is often constrained by users because the output of each reduce task ends up in a separate output file. In practice, we tend to choose M so that each individual task is roughly 16 MB to 64 MB of input data (so that the locality optimization described above is most effective), and we make R a small multiple of the number of worker machines we expect to use. We often perform MapReduce computations with M = 200, 000 and R = 5, 000, using 2,000 worker machines. 3.6 Backup Tasks One of the common causes that lengthens the total time taken for a MapReduce operation is a “straggler”: a machine that takes an unusually long time to complete one of the last few map or reduce tasks in the computation. Stragglers can arise for a whole host of reasons. For example, a machine with a bad disk may experience frequent correctable errors that slow its read performance from 30 MB/s to 1 MB/s. The cluster scheduling system may have scheduled other tasks on the machine, causing it to execute the MapReduce code more slowly due to competition for CPU, memory, local disk, or network bandwidth. A recent problem we experienced was a bug in machine initialization code that caused processor caches to be disabled: computations on affected machines slowed down by over a factor of one hundred. We have a general mechanism to alleviate the problem of stragglers. When a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks. The task is marked as completed whenever either the primary or the backup execution completes. We have tuned this mechanism so that it typically increases the computational resources used by the operation by no more than a few percent. We have found that this significantly reduces the time to complete large MapReduce operations. As an example, the sort program described in Section 5.3 takes 44% longer to complete when the backup task mechanism is disabled. the intermediate key. A default partitioning function is provided that uses hashing (e.g. “hash(key) mod R”). This tends to result in fairly well-balanced partitions. In some cases, however, it is useful to partition data by some other function of the key. For example, sometimes the output keys are URLs, and we want all entries for a single host to end up in the same output file. To support situations like this, the user of the MapReduce library can provide a special partitioning function. For example, using “hash(Hostname(urlkey)) mod R” as the partitioning function causes all URLs from the same host to end up in the same output file. 4.2 Ordering Guarantees We guarantee that within a given partition, the intermediate key/value pairs are processed in increasing key order. This ordering guarantee makes it easy to generate a sorted output file per partition, which is useful when the output file format needs to support efficient random access lookups by key, or users of the output find it convenient to have the data sorted. 4.3 Combiner Function In some cases, there is significant repetition in the intermediate keys produced by each map task, and the userspecified Reduce function is commutative and associative. A good example of this is the word counting example in Section 2.1. Since word frequencies tend to follow a Zipf distribution, each map task will produce hundreds or thousands of records of the form . All of these counts will be sent over the network to a single reduce task and then added together by the Reduce function to produce one number. We allow the user to specify an optional Combiner function that does partial merging of this data before it is sent over the network. The Combiner function is executed on each machine that performs a map task. Typically the same code is used to implement both the combiner and the reduce functions. The only difference between a reduce function and a combiner function is how the MapReduce library handles the output of the function. The output of a reduce function is written to the final output file. The output of a combiner function is written to an intermediate file that will be sent to a reduce task. Partial combining significantly speeds up certain classes of MapReduce operations. Appendix A contains an example that uses a combiner. 4 Refinements Although the basic functionality provided by simply writing Map and Reduce functions is sufficient for most needs, we have found a few extensions useful. These are described in this section. 4.1 Partitioning Function The users of MapReduce specify the number of reduce tasks/output files that they desire (R). Data gets partitioned across these tasks using a partitioning function on To appear in OSDI 2004 4.4 Input and Output Types The MapReduce library provides support for reading input data in several different formats. For example, “text” 6 mode input treats each line as a key/value pair: the key is the offset in the file and the value is the contents of the line. Another common supported format stores a sequence of key/value pairs sorted by key. Each input type implementation knows how to split itself into meaningful ranges for processing as separate map tasks (e.g. text mode’s range splitting ensures that range splits occur only at line boundaries). Users can add support for a new input type by providing an implementation of a simple reader interface, though most users just use one of a small number of predefined input types. A reader does not necessarily need to provide data read from a file. For example, it is easy to define a reader that reads records from a database, or from data structures mapped in memory. In a similar fashion, we support a set of output types for producing data in different formats and it is easy for user code to add support for new output types. the signal handler sends a “last gasp” UDP packet that contains the sequence number to the MapReduce master. When the master has seen more than one failure on a particular record, it indicates that the record should be skipped when it issues the next re-execution of the corresponding Map or Reduce task. 4.7 Local Execution Debugging problems in Map or Reduce functions can be tricky, since the actual computation happens in a distributed system, often on several thousand machines, with work assignment decisions made dynamically by the master. To help facilitate debugging, profiling, and small-scale testing, we have developed an alternative implementation of the MapReduce library that sequentially executes all of the work for a MapReduce operation on the local machine. Controls are provided to the user so that the computation can be limited to particular map tasks. Users invoke their program with a special flag and can then easily use any debugging or testing tools they find useful (e.g. gdb). 4.5 Side-effects In some cases, users of MapReduce have found it convenient to produce auxiliary files as additional outputs from their map and/or reduce operators. We rely on the application writer to make such side-effects atomic and idempotent. Typically the application writes to a temporary file and atomically renames this file once it has been fully generated. We do not provide support for atomic two-phase commits of multiple output files produced by a single task. Therefore, tasks that produce multiple output files with cross-file consistency requirements should be deterministic. This restriction has never been an issue in practice. 4.8 Status Information The master runs an internal HTTP server and exports a set of status pages for human consumption. The status pages show the progress of the computation, such as how many tasks have been completed, how many are in progress, bytes of input, bytes of intermediate data, bytes of output, processing rates, etc. The pages also contain links to the standard error and standard output files generated by each task. The user can use this data to predict how long the computation will take, and whether or not more resources should be added to the computation. These pages can also be used to figure out when the computation is much slower than expected. In addition, the top-level status page shows which workers have failed, and which map and reduce tasks they were processing when they failed. This information is useful when attempting to diagnose bugs in the user code. 4.6 Skipping Bad Records Sometimes there are bugs in user code that cause the Map or Reduce functions to crash deterministically on certain records. Such bugs prevent a MapReduce operation from completing. The usual course of action is to fix the bug, but sometimes this is not feasible; perhaps the bug is in a third-party library for which source code is unavailable. Also, sometimes it is acceptable to ignore a few records, for example when doing statistical analysis on a large data set. We provide an optional mode of execution where the MapReduce library detects which records cause deterministic crashes and skips these records in order to make forward progress. Each worker process installs a signal handler that catches segmentation violations and bus errors. Before invoking a user Map or Reduce operation, the MapReduce library stores the sequence number of the argument in a global variable. If the user code generates a signal, To appear in OSDI 2004 4.9 Counters The MapReduce library provides a counter facility to count occurrences of various events. For example, user code may want to count total number of words processed or the number of German documents indexed, etc. To use this facility, user code creates a named counter object and then increments the counter appropriately in the Map and/or Reduce function. For example: 7 Input (MB/s) Counter/ uppercase; uppercase = GetCounter("uppercase"); map(String name, String contents): for each word w in contents: if (IsCapitalized(w)): uppercase->Increment(); EmitIntermediate(w, "1"); 30000 20000 10000 0 20 40 60 80 100 The counter values from individual worker machines are periodically propagated to the master (piggybacked on the ping response). The master aggregates the counter values from successful map and reduce tasks and returns them to the user code when the MapReduce operation is completed. The current counter values are also displayed on the master status page so that a human can watch the progress of the live computation. When aggregating counter values, the master eliminates the effects of duplicate executions of the same map or reduce task to avoid double counting. (Duplicate executions can arise from our use of backup tasks and from re-execution of tasks due to failures.) Some counter values are automatically maintained by the MapReduce library, such as the number of input key/value pairs processed and the number of output key/value pairs produced. Users have found the counter facility useful for sanity checking the behavior of MapReduce operations. For example, in some MapReduce operations, the user code may want to ensure that the number of output pairs produced exactly equals the number of input pairs processed, or that the fraction of German documents processed is within some tolerable fraction of the total number of documents processed. Seconds Figure 2: Data transfer rate over time disks, and a gigabit Ethernet link. The machines were arranged in a two-level tree-shaped switched network with approximately 100-200 Gbps of aggregate bandwidth available at the root. All of the machines were in the same hosting facility and therefore the round-trip time between any pair of machines was less than a millisecond. Out of the 4GB of memory, approximately 1-1.5GB was reserved by other tasks running on the cluster. The programs were executed on a weekend afternoon, when the CPUs, disks, and network were mostly idle. 5.2 Grep The grep program scans through 1010 100-byte records, searching for a relatively rare three-character pattern (the pattern occurs in 92,337 records). The input is split into approximately 64MB pieces (M = 15000), and the entire output is placed in one file (R = 1). Figure 2 shows the progress of the computation over time. The Y-axis shows the rate at which the input data is scanned. The rate gradually picks up as more machines are assigned to this MapReduce computation, and peaks at over 30 GB/s when 1764 workers have been assigned. As the map tasks finish, the rate starts dropping and hits zero about 80 seconds into the computation. The entire computation takes approximately 150 seconds from start to finish. This includes about a minute of startup overhead. The overhead is due to the propagation of the program to all worker machines, and delays interacting with GFS to open the set of 1000 input files and to get the information needed for the locality optimization. 5 Performance In this section we measure the performance of MapReduce on two computations running on a large cluster of machines. One computation searches through approximately one terabyte of data looking for a particular pattern. The other computation sorts approximately one terabyte of data. These two programs are representative of a large subset of the real programs written by users of MapReduce – one class of programs shuffles data from one representation to another, and another class extracts a small amount of interesting data from a large data set. 5.3 Sort The sort program sorts 1010 100-byte records (approximately 1 terabyte of data). This program is modeled after the TeraSort benchmark [10]. The sorting program consists of less than 50 lines of user code. A three-line Map function extracts a 10-byte sorting key from a text line and emits the key and the 8 5.1 Cluster Configuration All of the programs were executed on a cluster that consisted of approximately 1800 machines. Each machine had two 2GHz Intel Xeon processors with HyperThreading enabled, 4GB of memory, two 160GB IDE To appear in OSDI 2004 20000 Input (MB/s) Input (MB/s) 15000 10000 5000 0 500 1000 Input (MB/s) Done 20000 15000 10000 5000 0 500 1000 20000 Done Done 15000 10000 5000 0 500 1000 Shuffle (MB/s) 15000 10000 5000 0 500 1000 Shuffle (MB/s) 500 1000 20000 Shuffle (MB/s) 20000 15000 10000 5000 0 20000 15000 10000 5000 0 500 1000 Output (MB/s) Output (MB/s) 15000 10000 5000 0 500 1000 15000 10000 5000 0 500 1000 Output (MB/s) 20000 20000 20000 15000 10000 5000 0 500 1000 Seconds Seconds Seconds (a) Normal execution (b) No backup tasks (c) 200 tasks killed Figure 3: Data transfer rates over time for different executions of the sort program original text line as the intermediate key/value pair. We used a built-in Identity function as the Reduce operator. This functions passes the intermediate key/value pair unchanged as the output key/value pair. The final sorted output is written to a set of 2-way replicated GFS files (i.e., 2 terabytes are written as the output of the program). As before, the input data is split into 64MB pieces (M = 15000). We partition the sorted output into 4000 files (R = 4000). The partitioning function uses the initial bytes of the key to segregate it into one of R pieces. Our partitioning function for this benchmark has builtin knowledge of the distribution of keys. In a general sorting program, we would add a pre-pass MapReduce operation that would collect a sample of the keys and use the distribution of the sampled keys to compute splitpoints for the final sorting pass. Figure 3 (a) shows the progress of a normal execution of the sort program. The top-left graph shows the rate at which input is read. The rate peaks at about 13 GB/s and dies off fairly quickly since all map tasks finish before 200 seconds have elapsed. Note that the input rate is less than for grep. This is because the sort map tasks spend about half their time and I/O bandwidth writing intermediate output to their local disks. The corresponding intermediate output for grep had negligible size. The middle-left graph shows the rate at which data is sent over the network from the map tasks to the reduce tasks. This shuffling starts as soon as the first map task completes. The first hump in the graph is for To appear in OSDI 2004 the first batch of approximately 1700 reduce tasks (the entire MapReduce was assigned about 1700 machines, and each machine executes at most one reduce task at a time). Roughly 300 seconds into the computation, some of these first batch of reduce tasks finish and we start shuffling data for the remaining reduce tasks. All of the shuffling is done about 600 seconds into the computation. The bottom-left graph shows the rate at which sorted data is written to the final output files by the reduce tasks. There is a delay between the end of the first shuffling period and the start of the writing period because the machines are busy sorting the intermediate data. The writes continue at a rate of about 2-4 GB/s for a while. All of the writes finish about 850 seconds into the computation. Including startup overhead, the entire computation takes 891 seconds. This is similar to the current best reported result of 1057 seconds for the TeraSort benchmark [18]. A few things to note: the input rate is higher than the shuffle rate and the output rate because of our locality optimization – most data is read from a local disk and bypasses our relatively bandwidth constrained network. The shuffle rate is higher than the output rate because the output phase writes two copies of the sorted data (we make two replicas of the output for reliability and availability reasons). We write two replicas because that is the mechanism for reliability and availability provided by our underlying file system. Network bandwidth requirements for writing data would be reduced if the underlying file system used erasure coding [14] rather than replication. 9 In Figure 3 (b), we show an execution of the sort program with backup tasks disabled. The execution flow is similar to that shown in Figure 3 (a), except that there is a very long tail where hardly any write activity occurs. After 960 seconds, all except 5 of the reduce tasks are completed. However these last few stragglers don’t finish until 300 seconds later. The entire computation takes 1283 seconds, an increase of 44% in elapsed time. Number of instances in source tree 5.4 Effect of Backup Tasks 1000 800 600 400 200 0 2003/03 2003/06 2003/09 2003/12 2004/03 2004/06 2004/09 5.5 Machine Failures In Figure 3 (c), we show an execution of the sort program where we intentionally killed 200 out of 1746 worker processes several minutes into the computation. The underlying cluster scheduler immediately restarted new worker processes on these machines (since only the processes were killed, the machines were still functioning properly). The worker deaths show up as a negative input rate since some previously completed map work disappears (since the corresponding map workers were killed) and needs to be redone. The re-execution of this map work happens relatively quickly. The entire computation finishes in 933 seconds including startup overhead (just an increase of 5% over the normal execution time). Figure 4: MapReduce instances over time Number of jobs Average job completion time Machine days used Input data read Intermediate data produced Output data written Average worker machines per job Average worker deaths per job Average map tasks per job Average reduce tasks per job Unique map implementations Unique reduce implementations Unique map/reduce combinations 29,423 634 secs 79,186 days 3,288 TB 758 TB 193 TB 157 1.2 3,351 55 395 269 426 6 Experience We wrote the first version of the MapReduce library in February of 2003, and made significant enhancements to it in August of 2003, including the locality optimization, dynamic load balancing of task execution across worker machines, etc. Since that time, we have been pleasantly surprised at how broadly applicable the MapReduce library has been for the kinds of problems we work on. It has been used across a wide range of domains within Google, including: • large-scale machine learning problems, • clustering problems for the Google News and Froogle products, • extraction of data used to produce reports of popular queries (e.g. Google Zeitgeist), • extraction of properties of web pages for new experiments and products (e.g. extraction of geographical locations from a large corpus of web pages for localized search), and • large-scale graph computations. To appear in OSDI 2004 Table 1: MapReduce jobs run in August 2004 Figure 4 shows the significant growth in the number of separate MapReduce programs checked into our primary source code management system over time, from 0 in early 2003 to almost 900 separate instances as of late September 2004. MapReduce has been so successful because it makes it possible to write a simple program and run it efficiently on a thousand machines in the course of half an hour, greatly speeding up the development and prototyping cycle. Furthermore, it allows programmers who have no experience with distributed and/or parallel systems to exploit large amounts of resources easily. At the end of each job, the MapReduce library logs statistics about the computational resources used by the job. In Table 1, we show some statistics for a subset of MapReduce jobs run at Google in August 2004. 6.1 Large-Scale Indexing One of our most significant uses of MapReduce to date has been a complete rewrite of the production index10 ing system that produces the data structures used for the Google web search service. The indexing system takes as input a large set of documents that have been retrieved by our crawling system, stored as a set of GFS files. The raw contents for these documents are more than 20 terabytes of data. The indexing process runs as a sequence of five to ten MapReduce operations. Using MapReduce (instead of the ad-hoc distributed passes in the prior version of the indexing system) has provided several benefits: • The indexing code is simpler, smaller, and easier to understand, because the code that deals with fault tolerance, distribution and parallelization is hidden within the MapReduce library. For example, the size of one phase of the computation dropped from approximately 3800 lines of C++ code to approximately 700 lines when expressed using MapReduce. • The performance of the MapReduce library is good enough that we can keep conceptually unrelated computations separate, instead of mixing them together to avoid extra passes over the data. This makes it easy to change the indexing process. For example, one change that took a few months to make in our old indexing system took only a few days to implement in the new system. • The indexing process has become much easier to operate, because most of the problems caused by machine failures, slow machines, and networking hiccups are dealt with automatically by the MapReduce library without operator intervention. Furthermore, it is easy to improve the performance of the indexing process by adding new machines to the indexing cluster. 7 Related Work Many systems have provided restricted programming models and used the restrictions to parallelize the computation automatically. For example, an associative function can be computed over all prefixes of an N element array in log N time on N processors using parallel prefix computations [6, 9, 13]. MapReduce can be considered a simplification and distillation of some of these models based on our experience with large real-world computations. More significantly, we provide a fault-tolerant implementation that scales to thousands of processors. In contrast, most of the parallel processing systems have only been implemented on smaller scales and leave the details of handling machine failures to the programmer. Bulk Synchronous Programming [17] and some MPI primitives [11] provide higher-level abstractions that To appear in OSDI 2004 make it easier for programmers to write parallel programs. A key difference between these systems and MapReduce is that MapReduce exploits a restricted programming model to parallelize the user program automatically and to provide transparent fault-tolerance. Our locality optimization draws its inspiration from techniques such as active disks [12, 15], where computation is pushed into processing elements that are close to local disks, to reduce the amount of data sent across I/O subsystems or the network. We run on commodity processors to which a small number of disks are directly connected instead of running directly on disk controller processors, but the general approach is similar. Our backup task mechanism is similar to the eager scheduling mechanism employed in the Charlotte System [3]. One of the shortcomings of simple eager scheduling is that if a given task causes repeated failures, the entire computation fails to complete. We fix some instances of this problem with our mechanism for skipping bad records. The MapReduce implementation relies on an in-house cluster management system that is responsible for distributing and running user tasks on a large collection of shared machines. Though not the focus of this paper, the cluster management system is similar in spirit to other systems such as Condor [16]. The sorting facility that is a part of the MapReduce library is similar in operation to NOW-Sort [1]. Source machines (map workers) partition the data to be sorted and send it to one of R reduce workers. Each reduce worker sorts its data locally (in memory if possible). Of course NOW-Sort does not have the user-definable Map and Reduce functions that make our library widely applicable. River [2] provides a programming model where processes communicate with each other by sending data over distributed queues. Like MapReduce, the River system tries to provide good average case performance even in the presence of non-uniformities introduced by heterogeneous hardware or system perturbations. River achieves this by careful scheduling of disk and network transfers to achieve balanced completion times. MapReduce has a different approach. By restricting the programming model, the MapReduce framework is able to partition the problem into a large number of finegrained tasks. These tasks are dynamically scheduled on available workers so that faster workers process more tasks. The restricted programming model also allows us to schedule redundant executions of tasks near the end of the job which greatly reduces completion time in the presence of non-uniformities (such as slow or stuck workers). BAD-FS [5] has a very different programming model from MapReduce, and unlike MapReduce, is targeted to 11 the execution of jobs across a wide-area network. However, there are two fundamental similarities. (1) Both systems use redundant execution to recover from data loss caused by failures. (2) Both use locality-aware scheduling to reduce the amount of data sent across congested network links. TACC [7] is a system designed to simplify construction of highly-available networked services. Like MapReduce, it relies on re-execution as a mechanism for implementing fault-tolerance. 8 Conclusions The MapReduce programming model has been successfully used at Google for many different purposes. We attribute this success to several reasons. First, the model is easy to use, even for programmers without experience with parallel and distributed systems, since it hides the details of parallelization, fault-tolerance, locality optimization, and load balancing. Second, a large variety of problems are easily expressible as MapReduce computations. For example, MapReduce is used for the generation of data for Google’s production web search service, for sorting, for data mining, for machine learning, and many other systems. Third, we have developed an implementation of MapReduce that scales to large clusters of machines comprising thousands of machines. The implementation makes efficient use of these machine resources and therefore is suitable for use on many of the large computational problems encountered at Google. We have learned several things from this work. First, restricting the programming model makes it easy to parallelize and distribute computations and to make such computations fault-tolerant. Second, network bandwidth is a scarce resource. A number of optimizations in our system are therefore targeted at reducing the amount of data sent across the network: the locality optimization allows us to read data from local disks, and writing a single copy of the intermediate data to local disk saves network bandwidth. Third, redundant execution can be used to reduce the impact of slow machines, and to handle machine failures and data loss. David Kramer, Shun-Tak Leung, and Josh Redstone for their work in developing GFS. We would also like to thank Percy Liang and Olcan Sercinoglu for their work in developing the cluster management system used by MapReduce. Mike Burrows, Wilson Hsieh, Josh Levenberg, Sharon Perl, Rob Pike, and Debby Wallach provided helpful comments on earlier drafts of this paper. The anonymous OSDI reviewers, and our shepherd, Eric Brewer, provided many useful suggestions of areas where the paper could be improved. Finally, we thank all the users of MapReduce within Google’s engineering organization for providing helpful feedback, suggestions, and bug reports. References [1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, David E. Culler, Joseph M. Hellerstein, and David A. Patterson. High-performance sorting on networks of workstations. In Proceedings of the 1997 ACM SIGMOD International Conference on Management of Data, Tucson, Arizona, May 1997. [2] Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10–22, Atlanta, Georgia, May 1999. [3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨ lzle. Web o search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22–28, April 2003. [5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004. [6] Guy E. Blelloch. Scans as primitive parallel operations. IEEE Transactions on Computers, C-38(11), November 1989. [7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78– 91, Saint-Malo, France, 1997. [8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29–43, Lake George, New York, 2003. Acknowledgements Josh Levenberg has been instrumental in revising and extending the user-level MapReduce API with a number of new features based on his experience with using MapReduce and other people’s suggestions for enhancements. MapReduce reads its input from and writes its output to the Google File System [8]. We would like to thank Mohit Aron, Howard Gobioff, Markus Gutschke, To appear in OSDI 2004 12 [9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401–408. Springer-Verlag, 1996. [10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/. [11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999. [12] L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004. [13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831–838, 1980. [14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335–348, 1989. [15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68–74, June 2001. [16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004. [17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103–111, 1997. [18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf. if (start < i) Emit(text.substr(start,i-start),"1"); } } }; REGISTER_MAPPER(WordCounter); // User’s reduce function class Adder : public Reducer { virtual void Reduce(ReduceInput/ input) { // Iterate over all entries with the // same key and add the values int64 value = 0; while (!input->done()) { value += StringToInt(input->value()); input->NextValue(); } // Emit sum for input->key() Emit(IntToString(value)); } }; REGISTER_REDUCER(Adder); int main(int argc, char// argv) { ParseCommandLineFlags(argc, argv); MapReduceSpecification spec; // Store list of input files into "spec" for (int i = 1; i < argc; i++) { MapReduceInput/ input = spec.add_input(); input->set_format("text"); input->set_filepattern(argv[i]); input->set_mapper_class("WordCounter"); } // Specify the output files: // /gfs/test/freq-00000-of-00100 // /gfs/test/freq-00001-of-00100 // ... MapReduceOutput/ out = spec.output(); out->set_filebase("/gfs/test/freq"); out->set_num_tasks(100); out->set_format("text"); out->set_reducer_class("Adder"); // Optional: do partial sums within map // tasks to save network bandwidth out->set_combiner_class("Adder"); // Tuning parameters: use at most 2000 // machines and 100 MB of memory per task spec.set_machines(2000); spec.set_map_megabytes(100); spec.set_reduce_megabytes(100); // Now run it MapReduceResult result; if (!MapReduce(spec, &result)) abort(); // Done: ’result’ structure contains info // about counters, time taken, number of // machines used, etc. return 0; } A Word Frequency This section contains a program that counts the number of occurrences of each unique word in a set of input files specified on the command line. /#include "mapreduce/mapreduce.h" // User’s map function class WordCounter : public Mapper { public: virtual void Map(const MapInput& input) { const string& text = input.value(); const int n = text.size(); for (int i = 0; i < n; ) { // Skip past leading whitespace while ((i < n) && isspace(text[i])) i++; // Find word end int start = i; while ((i < n) && !isspace(text[i])) i++; To appear in OSDI 2004 13