当前位置: 首页 > news >正文

百度收录网站与手机版宠物美容师宠物美容培训学校

百度收录网站与手机版,宠物美容师宠物美容培训学校,我的世界做墙纸网站,网页制作公司找哪家数据倾斜:主要就是在处理MR任务的时候,某个reduce的数据处理量比另外一些的reduce的数据量要大得多,其他reduce几乎不处理,这样的现象就是数据倾斜。 官方解释:数据倾斜指的是在数据处理过程中,由于某些键…

数据倾斜:主要就是在处理MR任务的时候,某个reduce的数据处理量比另外一些的reduce的数据量要大得多,其他reduce几乎不处理,这样的现象就是数据倾斜。

官方解释:数据倾斜指的是在数据处理过程中,由于某些键的分布极度不均匀,导致某些节点处理的数据量显著多于其他节点。‌这种情况会引发性能瓶颈,阻碍任务的并行执行,增加作业的整体执行时间。在Hadoop的MapReduce作业中,数据倾斜尤为明显,因为它会导致某些Reduce任务处理的数据量远大于其他任务,从而造成集群整体处理效率低下的问题。

这里比如有一个文本数据,里面内容全是:hadoop, hadoop, hadoop,hadoop ....,假设有800万条数据,这样更容易显示数据倾斜的效果,里面都是同样的单词,默认的hash取余分区的方法,明显不太适合,所以我们要自定义分区,重写分区方法。以及设置多个reduce,这里我设置为3,主要就是对数据倾斜的key进行一个增加后缀的方法,以及在Map阶段就增加后缀,实现过程是将每个hadoop都进行增加后缀,刚开始会全部默认存放到第一个分区里(0分区),然后写到分区后,自定义分区方法SkewPartitioner就会对里面的数据进行分析,如果后缀是1就分到1区里面,一共就0、1、2三个分区,以此来解决数据倾斜的问题。

注意:在Job端进行自定义分区器的设置:job,setPartitionerClass(SkewPartitioner.class)

具体代码如下:

package com.shujia.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Demo05SkewDataMR {public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {String line = value.toString();// 将每一行数据按照逗号/空格进行切分for (String word : line.split("[,\\s]")) {// 使用context.write将数据发送到下游// 将每个单词变成 单词,1 形式// 对数据倾斜的Key加上随机后缀if ("hadoop".equals(word)) {// 随机生成 0 1 2int prefix = (int) (Math.random() * 3);context.write(new Text(word + "_" + prefix), new IntWritable(1));} else {context.write(new Text(word), new IntWritable(1));}}}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 统计每个单词的数量int cnt = 0;for (IntWritable value : values) {cnt = cnt + value.get();}context.write(key, new IntWritable(cnt));}}// Driver端:组装(调度)及配置任务// 可以通过args接收参数// 本任务接收两个参数:输入路径、输出路径public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();// 创建JobJob job = Job.getInstance(conf);// 配置任务job.setJobName("Demo05SkewDataMR");job.setJarByClass(Demo05SkewDataMR.class);// 设置自定义分区器job.setPartitionerClass(SkewPartitioner.class);// 手动设置Reduce的数量// 最终输出到HDFS的文件数量等于Reduce的数量job.setNumReduceTasks(3);// 配置Map端job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 配置Reduce端job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 验证args的长度if (args.length != 2) {System.out.println("请传入输入输出目录!");return;}String input = args[0];String output = args[1];// 配置输入输出的路径FileInputFormat.addInputPath(job, new Path(input));Path ouputPath = new Path(output);// 通过FileSystem来实现覆盖写入FileSystem fs = FileSystem.get(conf);if (fs.exists(ouputPath)) {fs.delete(ouputPath, true);}// 该目录不能存在,会自动创建,如果已存在则会直接报错FileOutputFormat.setOutputPath(job, ouputPath);// 启动任务// 等待任务的完成job.waitForCompletion(true);}
}// 自定义分区:在Map阶段给key加上随机后缀,基于后缀返回不同的分区编号
class SkewPartitioner extends Partitioner<Text, IntWritable> {@Overridepublic int getPartition(Text text, IntWritable intWritable, int numPartitions) {String key = text.toString();int partitions = 0;// 只对数据倾斜的key做特殊处理if ("hadoop".equals(key.split("_")[0])) {switch (key) {
//                case "hadoop_0":
//                    partitions = 0;
//                    break;case "hadoop_1":partitions = 1;break;case "hadoop_2":partitions = 2;break;}} else {// 正常的key还是按照默认的Hash取余进行分区partitions = (key.hashCode() & Integer.MAX_VALUE) % numPartitions;}return partitions;}
}

http://www.qdjiajiao.com/news/2253.html

相关文章:

  • 全国加盟网站建设手机网站建设公司
  • 网站建设代理费用北京网络网站推广
  • 邯郸营销网站建设傻瓜式自助建站系统
  • 哪个网站做自考题目免费外贸谷歌推广怎么样
  • 政府网站设计方案农产品品牌推广方案
  • 住房和城乡建设部的网站首页百度云登陆首页
  • 网站设计哪家公司好广告投放方式
  • 自己如何建企业网站免费平台
  • 电脑网站建设规划百度移动首页
  • php网站开发指导教材 文献新闻最新头条10条
  • 日韩男女直接做的视频网站什么是白帽seo
  • 哪些网站做电商比较好外贸网络营销
  • 电子商务网站备案百度联盟官网登录入口
  • 佛山网站建设找方维网络免费网页在线客服系统
  • 广州模板网站建设app运营推广是干什么
  • 做旅游网站选什么空间semir是什么牌子衣服
  • 网站怎么加链接广告软文范例大全100
  • 淄博网站制作高端网络广州网站制作公司
  • 网站建设文献英文竞价推广平台
  • 做百度网站怎么创建一个网址
  • 博罗网站建设公司人民网舆情数据中心官网
  • 淘宝联盟怎么做网站推广在哪里查关键词排名
  • 创立网站网络推广公司收费标准
  • 做网络写手最好进那个网站seo优化或网站编辑
  • 上海市网站建设公叿上海百度竞价
  • 做淘宝返利网站能挣钱百度seo教程网
  • 上线一个网站需要多少钱六六seo基础运营第三讲
  • 网站设计的公司如何选电子商务主要学什么就业方向
  • 网站后台生成文章很慢郑州seo外包顾问热狗
  • 做网站的公司哪家好一点郑州高端网站建设