博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
01Hadoop二次排序
阅读量:6115 次
发布时间:2019-06-21

本文共 6106 字,大约阅读时间需要 20 分钟。

 

我的目的:

示例:

2012,01,01,35

2011,12,23,-4
2012,01,01,43
2012,01,01,23
2011,12,23,5
2011,4,1,2
2011,4,1,56

结果:

201112 -4,5

20114 2,56
201201 23,35,43



 

正式实现:

 

代码结构:

 

 

分为以下的步骤:

(1)编写封装类,把上述的字段分装进去。

package com.book.test;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;public class DataTemperaturePair implements Writable,WritableComparable
{ //年-月private Text yearMoth=new Text();//温度private IntWritable temperature=new IntWritable();//日期private Text day=new Text();public DataTemperaturePair(){}public Text getYearMoth() { return yearMoth;}public Text getDay() { return day;}public void setDay(Text day) { this.day = day;}public void setYearMoth(Text yearMoth) { this.yearMoth = yearMoth;}public IntWritable getTemperature() { return temperature;}public void setTemperature(IntWritable temperature) { this.temperature = temperature;} //这俩个函数是必须要写的,不然在reduce端,这个分装类拿不到public void readFields(DataInput input) throws IOException { String readuf=input.readUTF(); int readuf3=input.readInt(); String readuf2=input.readUTF(); this.yearMoth=new Text(readuf); this.temperature=new IntWritable(readuf3); this.day=new Text(readuf2); }
//这俩个函数是必须要写的,不然在reduce端,这个分装类拿不到
public void write(DataOutput output) throws IOException { output.writeUTF(yearMoth.toString()); output.writeInt(temperature.get()); output.writeUTF(day.toString()); } public int compareTo(DataTemperaturePair that) { int compareValue=this.yearMoth.compareTo(that.yearMoth); if(compareValue==0) { compareValue=temperature.compareTo(that.temperature); } //升序 return compareValue; }

 

(2)编写分区器

为什么要自定义这个分区器呢?

因为我们的key是自己写的一个对象,我们想按照这个对象里面的Yearmoth来分到一个区。

package com.book.test;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** * 自定义的分区器 * @author Sxq * */public class DataTemperaturePartition extends Partitioner
{ @Override public int getPartition(DataTemperaturePair pair, NullWritable text, int numberOfPartotions) { return Math.abs(pair.getYearMoth().hashCode()%numberOfPartotions); } }

 

(3)编写比较器

决定数据分入到哪个分组

package com.book.test;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class DataTemperatureGroupingComparator extends WritableComparator {         public DataTemperatureGroupingComparator() {     super(DataTemperaturePair.class,true);     }    @Override    public int compare(WritableComparable a, WritableComparable b) {                DataTemperaturePair v1=(DataTemperaturePair)a;        DataTemperaturePair v2=(DataTemperaturePair)b;        return v1.getYearMoth().compareTo(v2.getYearMoth());    }                }

 

(4)写驱动类

package com.book.test;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.guigu.shen.flowsun.FlowCountSort; public class Cmain {    static  class mapper1 extends Mapper
{ DataTemperaturePair dataTemperaturePair=new DataTemperaturePair(); @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { String valuestring=value.toString(); String[] lines=valuestring.split(","); String yymm=lines[0]+lines[1]; dataTemperaturePair.setYearMoth(new Text(yymm)); IntWritable temparature=new IntWritable(Integer.valueOf(lines[3])); dataTemperaturePair.setTemperature(temparature); dataTemperaturePair.setDay(new Text(lines[2])); context.write(dataTemperaturePair, temparature); } } static class reduce1 extends Reducer
{ @Override protected void reduce(DataTemperaturePair KEY, Iterable
VALUE, Context context) throws IOException, InterruptedException { StringBuffer sortedTemperaturelist=new StringBuffer(); Iterator
iterator=VALUE.iterator(); while(iterator.hasNext()) { sortedTemperaturelist.append(iterator.next()); sortedTemperaturelist.append(","); } context.write(KEY.getYearMoth(), new Text(sortedTemperaturelist.toString())); } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job=Job.getInstance(conf); job.setJarByClass(Cmain.class); job.setMapperClass(mapper1.class); job.setReducerClass(reduce1.class); job.setMapOutputKeyClass(DataTemperaturePair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setGroupingComparatorClass(DataTemperatureGroupingComparator.class); job.setPartitionerClass(DataTemperaturePartition.class); //指定输入的数据的目录 FileInputFormat.setInputPaths(job, new Path("/Users/mac/Desktop/temperature.txt")); FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort")); boolean result=job.waitForCompletion(true); System.exit(result?0:1); } }

 

结果:

 

成功了 

 

转载地址:http://bsvka.baihongyu.com/

你可能感兴趣的文章
在OSCHINA上的第一篇博文,以后好好学习吧
查看>>
高利率时代的结局,任重道远,前途叵测
查看>>
Debian 6.05安装后乱码
查看>>
欢迎大家观看本人录制的51CTO精彩视频课程!
查看>>
IntelliJ IDEA中设置忽略@param注释中的参数与方法中的参数列表不一致的检查
查看>>
关于软件开发的一些感悟
查看>>
uva 10806
查看>>
纯CSS3绘制的黑色图标按钮组合
查看>>
Linux中环境变量文件及配置
查看>>
从0开始学Flutter
查看>>
mysql操作入门基础之对数据库和表的增删改查
查看>>
IIS负载均衡
查看>>
分布式事务,EventBus 解决方案:CAP【中文文档】
查看>>
Linux下的CPU性能瓶颈分析案例
查看>>
spring mvc入门
查看>>
2012在数据库技术会议上的讲话PPT打包
查看>>
【Android】 TextView设置个别字体样式
查看>>
python svn
查看>>
raise语句
查看>>
sequence2(高精度dp)
查看>>