1.多路径输入
1)FileInputFormat.addInputPath 多次调用加载不同路径
FileInputFormat.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"));
FileInputFormat.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path2"));2)FileInputFormat.addInputPaths一次调用加载 多路径字符串用逗号隔开
FileInputFormat.addInputPaths(job, "hdfs://RS5-112:9000/cs/path1,hdfs://RS5-112:9000/cs/path2");
2.多种输入
MultipleInputs可以加载不同路径的输入文件,并且每个路径可用不同的maper
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);
例子:
package example;
import .io.IOException;import org.apache..conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * 多类型文件输入 * @author lijl * */public class MultiTypeFileInputMR { static class MultiTypeFileInput1Mapper extends Mapper<LongWritable, Text, Text, Text>{ public void map(LongWritable key,Text value,Context context){ try { String[] str = value.toString().split("\\|");context.write(new Text(str[0]), new Text(str[1]));} catch (IOException e) { e.printStackTrace();} catch (InterruptedException e) { e.printStackTrace();}}}static class MultiTypeFileInput3Mapper extends Mapper<LongWritable, Text, Text, Text>{ public void map(LongWritable key,Text value,Context context){ try { String[] str = value.toString().split("");context.write(new Text(str[0]), new Text(str[1]));} catch (IOException e) { e.printStackTrace();} catch (InterruptedException e) { e.printStackTrace();}}}static class MultiTypeFileInputReducer extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key,Iterable<Text> values,Context context){ try { for(Text value:values){ context.write(key,value);}} catch (IOException e) { e.printStackTrace();} catch (InterruptedException e) { e.printStackTrace();}}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration();conf.set("mapred.textoutputformat.separator", ",");Job job = new Job(conf,"MultiPathFileInput");job.setJarByClass(MultiTypeFileInputMR.class);FileOutputFormat.setOutputPath(job, new Path("hdfs://RS5-112:9000/cs/path6"));job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setReducerClass(MultiTypeFileInputReducer.class);job.setNumReduceTasks(1);MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);System.exit(job.waitForCompletion(true)?0:1);}}