大数据开发之用CombineFileInputFormat优化Hadoop小文件

2018-02-27 10:49:24 1531浏览

HDFS设计是用来存储海量数据的,特别适合存储TB、PB量级别的数据。但是随着时间的推移,HDFS上可能会存在大量的小文件,这里说的小文件指的是文件大小远远小于一个HDFS块(128MB)的大小;HDFS上存在大量的小文件至少会产生以下影响:



消耗NameNode大量的内存

延长MapReduce作业的总运行时间

本文将介绍如何在MapReduce作业层面上将大量的小文件合并,以此减少运行作业的MapTask的数量;关于如何在HDFS上合并这些小文件,请参见《Hadoop小文件优化》。

Hadoop内置提供了一个CombineFileInputFormat类来专门处理小文件,其核心思想是:根据一定的规则,将HDFS上多个小文件合并到一个InputSplit中,然后会启用一个Map来处理这里面的文件,以此减少MR整体作业的运行时间。

CombineFileInputFormat类继承自FileInputFormat,主要重写了ListgetSplits(JobContextjob)方法;这个方法会根据数据的分布,mapreduce.input.fileinputformat.split.minsize.per.node、mapreduce.input.fileinputformat.split.minsize.per.rack以及mapreduce.input.fileinputformat.split.maxsize参数的设置来合并小文件,并生成List。其中mapreduce.input.fileinputformat.split.maxsize参数至关重要:

如果用户没有设置这个参数(默认就是没设置),那么同一个机架上的所有小文件将组成一个InputSplit,最终由一个MapTask来处理;

如果用户设置了这个参数,那么同一个节点(node)上的文件将会组成一个InputSplit。

同一个InputSplit包含了多个HDFS块文件,这些信息存储在CombineFileSplit类中,它主要包含以下信息:

从上面的定义可以看出,CombineFileSplit类包含了每个块文件的路径、起始偏移量、相对于原始偏移量的大小以及这个文件的存储节点,因为一个CombineFileSplit包含了多个小文件,所以需要使用数组来存储这些信息。

CombineFileInputFormat是抽象类,如果我们要使用它,需要实现createRecordReader方法,告诉MR程序如何读取组合的InputSplit。内置实现了两种用于解析组合InputSplit的类:org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat和org.apache.hadoop.mapreduce.lib.input.CombineSequenceFileInputFormat,我们可以把这两个类理解是TextInputFormat和SequenceFileInputFormat。为了简便,这里主要来介绍CombineTextInputFormat。

在CombineTextInputFormat中创建了org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader,具体如何解析CombineFileSplit中的文件主要在CombineFileRecordReader中实现。

CombineFileRecordReader类中其实封装了TextInputFormat的RecordReader,并对CombineFileSplit中的多个文件循环遍历并读取其中的内容,初始化每个文件的RecordReader主要在initNextRecordReader里面实现;每次初始化新文件的RecordReader都会设置mapreduce.map.input.file、mapreduce.map.input.length以及mapreduce.map.input.start参数,这样我们可以在Map程序里面获取到当前正在处理哪个文件。

现在我们就来看看如何使用CombineTextInputFormat类,如下:

packagecom.iteblog.hadoop.examples;

importorg.apache.commons.logging.Log;

importorg.apache.commons.logging.LogFactory;

importorg.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.conf.Configured;

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.InputSplit;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.MRJobConfig;

importorg.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

importorg.apache.hadoop.util.Tool;

importorg.apache.hadoop.util.ToolRunner;

importjava.io.IOException;

importjava.util.List;

/////////////////////////////////////////////////////////////////////

User:过往记忆

Date:2017-04-25

Time:22:59

bolg:https://www.iteblog.com

本文地址:https://www.iteblog.com/archives/2139

过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

过往记忆博客微信公共帐号:iteblog_hadoop

/////////////////////////////////////////////////////////////////////

publicclassHadoopTestextendsConfiguredimplementsTool{

privatestaticfinalLogLOG=LogFactory.getLog(HadoopTest.class);

privatestaticfinallongONE_MB=1024*1024L;

staticclassTextFileMapperextendsMapper{

@Override

protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)

throwsIOException,InterruptedException{

Configurationconfiguration=context.getConfiguration();

LOG.warn("#######################"+configuration.get(MRJobConfig.MAP_INPUT_FILE));

TextfilenameKey=newText(configuration.get(MRJobConfig.MAP_INPUT_FILE));

context.write(filenameKey,value);

}

}

publicstaticvoidmain(String[]args)throwsException{

intexitCode=ToolRunner.run(newHadoopTest(),args);

System.exit(exitCode);

}

@Override

publicintrun(String[]args)throwsException{

Configurationconf=newConfiguration(getConf());

conf.set("mapreduce.input.fileinputformat.split.maxsize",ONE_MB*32);

Jobjob=Job.getInstance(conf);

FileInputFormat.setInputPaths(job,args[0]);

FileOutputFormat.setOutputPath(job,newPath(args[1]));

job.setJarByClass(HadoopTest.class);

job.setInputFormatClass(CombineTextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setMapperClass(TextFileMapper.class);

returnjob.waitForCompletion(true)?0:1;

}

}

上面的程序很简单,其实就是将HDFS上多个小文件合并到大文件中,并再每行存储了这行数据的文件路径。程序运行的结果如下:

可以看到最终结果将三个文件里面的内容合并到一个文件中。注意体会mapreduce.input.fileinputformat.split.maxsize参数的设置,大家可以不设置这个参数并且和设置这个参数运行情况对比,观察MapTask的个数变化。

最后想要了解更多关于大数据发展前景趋势,请关注扣丁学堂官网、微信等平台,扣丁学堂IT职业在线学习教育平台为您提供权威的大数据培训视频教程系统,通过千锋旗下金牌讲师在线录制的大数据视频教程系统,让你快速掌握大数据从入门到精通大数据开发实战技能。扣丁学堂大数据学习群:209080834。



扣丁学堂微信公众号



关注微信公众号获取更多学习资料



查看更多关于“大数据培训资讯的相关文章>>

标签: 大数据分析 大数据培训 大数据视频教程 Hadoop视频教程 大数据开发工程师 大数据在线视频

热门专区

暂无热门资讯

课程推荐

微信
微博
15311698296

全国免费咨询热线

邮箱:codingke@1000phone.com

官方群:148715490

北京千锋互联科技有限公司版权所有   北京市海淀区宝盛北里西区28号中关村智诚科创大厦4层
京ICP备12003911号-6   Copyright © 2013 - 2019

京公网安备 11010802030908号