ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Hadoop-day01_(java代码模拟hadoop存储数据)

2022-05-21 00:03:16  阅读:198  来源: 互联网

标签:map null java Hadoop hadoop bw new clazz


hadoop文件切分思想

需求:统计文本文件中的各个班级的人数(一共多到数不清的人)

1500100129,容寄南,23,女,文科三班
1500100130,宁怀莲,21,女,理科四班
1500100131,胡昊明,22,男,文科六班
1500100132,曾安寒,22,女,文科五班
1500100133,钱向山,24,女,理科二班
1500100134,计宣朗,22,男,理科四班
1500100135,庾振海,21,男,理科四班
1500100136,黎昆鹏,22,男,文科六班
1500100137,宣向山,22,女,理科四班
1500100138,栾鸿信,22,男,文科二班
1500100139,左代萱,24,女,文科三班
1500100140,郁运发,24,男,文科六班
1500100141,谢昌勋,23,男,理科六班
...
...

用 java 简单模拟hadoop 进行统计

传统方法:利用集合统计每个班级的人数

package day01;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @author WangTao
 * @date 2022/5/20 16:40
 */
public class Student_Count_Demo {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        //创建一个 map 集合,接受总的结果数据
        HashMap<String, Integer> map = new HashMap<>();
        BufferedReader br = null;
        BufferedWriter bw = null;
        try {
            //读取文件进行分割,
            //创建字符缓冲输入流对
            br = new BufferedReader(new FileReader("src/day01/student"));
            String len = null;
            while((len = br.readLine()) != null){
                //以逗号进行分割,得到班级和人数
                String[] split = len.split("[,]");
                String clazz = split[4];
                //判断 map 集合是否存在对应的班级,不存在的话就添加
                if(!map.containsKey(clazz)){
                    map.put(clazz,1);
                }else{
                    //存在的话,就在原本的值上加 1
                    map.put(clazz,map.get(clazz)+1);
                }
            }
            //将结果集写道最终的文件中
            bw = new BufferedWriter(new FileWriter("src/day01/student_demo"));
            //遍历集合
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String,  Integer> entry : entries) {
                Integer value = entry.getValue();
                String key = entry.getKey();
                bw.write(key+":"+value);
                bw.newLine();
                bw.flush();
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //释放资源
            if (bw != null){
                try {
                    bw.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(br!=null){
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

问题:

  • 读取效率低,低效率和昂贵的服务器
  • 存在数据安全问题
  • 等等

Hadoop方法

分布式思想:

将数据分成多个block,(这里每一行数据等于 1 mb 大小的数据)

分布式存储(HDFS),在给每个模块进行分配任务(MAP即给每个模块一个线程),

每个模块分别进行计算,然后将各个模块计算的结果聚合(redus

  1. 1. 步骤一:分块:每块大小为128兆,但是进行切分时,每快的大小为128*1.1时(约等于140多兆),才进行切分 128兆 的大小,最后一块,

    如果和倒数第二块的大小没超过 128*1.1 的大小,就只会分配一个map资源,进行计算

package day01;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @author WangTao
 * @date 2022/5/20 16:40
 */
public class Student_Count_Demo {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        //创建一个 map 集合,接受总的结果数据
        HashMap<String, Integer> map = new HashMap<>();
        BufferedReader br = null;
        BufferedWriter bw = null;
        try {
            //读取文件进行分割,
            //创建字符缓冲输入流对
            br = new BufferedReader(new FileReader("src/day01/student"));
            String len = null;
            while((len = br.readLine()) != null){
                //以逗号进行分割,得到班级和人数
                String[] split = len.split("[,]");
                String clazz = split[4];
                //判断 map 集合是否存在对应的班级,不存在的话就添加
                if(!map.containsKey(clazz)){
                    map.put(clazz,1);
                }else{
                    //存在的话,就在原本的值上加 1
                    map.put(clazz,map.get(clazz)+1);
                }
            }
            //将结果集写道最终的文件中
            bw = new BufferedWriter(new FileWriter("src/day01/student_demo"));
            //遍历集合
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String,  Integer> entry : entries) {
                Integer value = entry.getValue();
                String key = entry.getKey();
                bw.write(key+":"+value);
                bw.newLine();
                bw.flush();
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //释放资源
            if (bw != null){
                try {
                    bw.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(br!=null){
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

2 .应为分了 8个模块,这里创建8 个线程

package day02;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author WangTao
 * @date 2022/5/20 20:58
 */
/*
    Map (通过线程池的方式,简单来说模拟Hadoop中一个block块生成一个map任务,一个map相当于一个线程
    在切分出来的的block块中,统计每个班级的人数)
 */
public class Map {
    public static void main(String[] args) {
        //创建一个线程池
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        //定义文件编号,从 0 开始
        int offset = 0;
        File file = new File("src/day02/blocks");
        File[] files = file.listFiles();
        for (File file1 : files) {
            MapTask mapTask = new MapTask(file1, offset);
            executorService.submit(mapTask);
            offset++;
        }
        System.out.println("分布式求取班级人数完成!!!");
        //关闭线程池
        executorService.shutdown();
        


    }
}

3 . 线程池中跑如下 map 任务:计算每个模块的人数

package day02;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @author WangTao
 * @date 2022/5/20 21:07
 */
public class MapTask implements Runnable{
    private File file;
    public int offset;

    public MapTask(File file,int offset) {
        this.file = file;
        this.offset = offset;
    }

    @Override
    public void run() {
        BufferedReader br = null;
        BufferedWriter bw = null;
        try {
            //字符缓冲输入流
            br = new BufferedReader(new FileReader(file));
            //创建一个HashMap集合,来存储学生对象
            HashMap<String, Integer> map = new HashMap<>();
            String lin = null;
            while((lin = br.readLine()) != null) {
                //用逗号进行分割
                String clazz = lin.split("[,]")[4];
                //如果在map中没有班级作为key,那我们把班级作为key,value 为 1
                if (!map.containsKey(clazz)) {
                    map.put(clazz, 1);
                } else {
                    map.put(clazz, map.get(clazz) + 1);
                }

            }
            //将局部文件 ,集合中的数据写入到文本文件中
            //创建字符缓冲输出流
            bw = new BufferedWriter(new FileWriter("src/day02/block_counts/block---"+offset));
            //遍历集合
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String, Integer> entry : entries) {
                Integer value = entry.getValue();
                String key = entry.getKey();
                bw.write(key+":"+value);
                bw.newLine();
                bw.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if (bw != null) {
                try {
                    bw.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(br!= null){
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

4 . 最后将8个线程跑出来的任务进行聚合统计出最终人数

package day02;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
 * @author WangTao
 * @date 2022/5/20 21:51
 */
/*
        将每个map任务的结果,在做一次聚合,统计出最终的人数
 */
public class Reduce {
    public static void main(String[] args)throws Exception {
        //将 past 目录封装成 File 对象
        File file = new File("src/day02/block_counts");
        //获取下面所哟文件的对象数组
        File[] files = file.listFiles();
        //创建一个 map 集合,接受总的数据
        HashMap<String, Integer> map = new HashMap<>();
        //遍历每一个block_counts 对象
        for (File file1 : files) {
            //读取文件,进行分割
            //创建字符缓冲输入流对象
            BufferedReader br = new BufferedReader(new FileReader(file1));
            String len = null;
            while ((len = br.readLine()) != null) {
                //读取的数据以 : 进行分割,得到键和值
                String[] split = len.split("[:]");
                String clazz = split[0];
                //将字符串类型转换成 int 类型,方便进行计算
                Integer sum = Integer.valueOf(split[1]);
                //判断 map 中是否存在对应的 key
                if(!map.containsKey(clazz)){
                    map.put(clazz,sum);
                }else{
                    //如果存在,value 值相加
                    map.put(clazz, map.get(clazz)+sum);
                }
            }
            //关闭读取文件通道
            br.close();
        }
        //读取文件已经完成,现在开始写入到最终文件中
        BufferedWriter bw = new BufferedWriter(new FileWriter("src/day02/finally_count/finally_sum"));
        //遍历 map 集合,将数据写入到文件中去
        Set<Map.Entry<String, Integer>> entries = map.entrySet();
        for (Map.Entry<String, Integer> entry : entries) {
            String key = entry.getKey();
            Integer value = entry.getValue();
            bw.write(key+":"+value);
            bw.newLine();
            bw.flush();
        }
        //关闭资源
        bw.close();
    }
}

分布式计算的好处:

1)高可靠性:因为Hadoop假设计算元素和存储会出现故障,因为它维护多个工作数据副本,在出现故障时可以对失败的节点重新分布处理。

2)高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点。

3)高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度。

4)高容错性:自动保存多份副本数据,并且能够自动将失败的任务重新分配。

5)成本低(Economical):Hadoop通过普通廉价的机器组成服务器集群来分发以及处理数据,以至于成本很低。

标签:map,null,java,Hadoop,hadoop,bw,new,clazz
来源: https://www.cnblogs.com/atao-BigData/p/16294070.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有