ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

JAVA HDFS API编程二

2021-12-23 18:04:08  阅读:95  来源: 互联网

标签:HDFS JAVA System API context import println new public


java里面的设计模式:模板模式

把骨架(这个骨架就用通用算法进行抽象出来)定义好,具体实现交给子类去实现。
意思是说在模板里面只要把过程给定义好,具体怎么实现,这个模板方法是不关注的,具体的实现是又子类来完成的,可以有多个子类,每个子类实现的功能可以都不一样。

定义一个模板类:

package com.ruozedata.pattern.template;

public abstract class Mapper {

    //setUp mapper clearUp三个方法都是抽象方法
    //后面会用具体的子类来实现它

    /**
     * 初始化的操作 : 打开冰箱门
     */
    abstract void setUp();

    /**
     * 具体的业务逻辑 : 把大象、狗、猪等放进去
     */
    abstract void mapper();

    /**
     * 资源释放的操作 : 关上冰箱门
     */
    abstract void clearUp();


    /**
     * 定义我们的模板方法的执行流程
     * 这个run方法,会调用前面的方法,定义执行顺序:初始化、执行、结束
     */
    public void run(){
        setUp();

        mapper();

        clearUp();
    }
}

定义一个子类,来实现模板抽象类里的抽象方法:

package com.ruozedata.pattern.template;

public class SubMapper extends Mapper{
    void setUp() {
        System.out.println("SubMapper.setUp");
    }

    void mapper() {
        System.out.println("SubMapper.mapper");
    }

    void clearUp() {
        System.out.println("SubMapper.clearUp");
    }
}

再定义一个子类,来实现模板抽象类里的抽象方法,和上面的子类类似,但是可以实现不同的功能:

package com.ruozedata.pattern.template;

public class SubMapper2 extends Mapper{
    void setUp() {
        System.out.println("SubMapper2.setUp");
    }

    void mapper() {
        System.out.println("SubMapper2.mapper");
    }

    void clearUp() {
        System.out.println("SubMapper2.clearUp");
    }
}

再定义一个类,去调用上面的已经实现好的子类:

package com.ruozedata.pattern.template;

public class Client {
    public static void main(String[] args) {
        SubMapper subMapper = new SubMapper();
        subMapper.run();

        SubMapper2 subMapper2 = new SubMapper2();
        subMapper2.run();
    }
}

运行结果:

SubMapper.setUp
SubMapper.mapper
SubMapper.clearUp
SubMapper2.setUp
SubMapper2.mapper
SubMapper2.clearUp

使用HDFS API来完成词频(wordcount wc)统计

功能拆解

词频统计就是给你一个或者一批文件,让你统计每个单词出现的次数。
当拿到一个功能的时候,千万不要想着代码怎么写,而是要进行功能、需求的分析:用中文描述清楚这个事情是做什么的 1 2 3 4等步骤,把步骤写清楚。然后再是开发:把1234翻译成代码而已。上层的架构,包括上面每个步骤用什么技术框架去实现,这才是重要的。所以思路一定要理清楚。
现在分析这个:使用HDFS API来完成词频(wordcount wc)统计
另外大数据里的东西就是三段论:
1.输入
2.处理
3.输出
所有的都是上面的流程。

对上面进行功能拆解:
第一步:输入:要去使用HDFS 的 API去读取文件;

第二步:处理:词频
1.读文件进来的内容是一行一行的,按照某个指定的分隔符进行行内容的拆分,就变成一堆单词了
2.给每个单词赋上出现的次数为1,如下:
比如这一行单词: wc,word,hello,word ,按照逗号分割,每个单词赋上出现的次数为1
(wc,1)
(word,1)
(hello,1)
(word,1)
上面每个单词出现的次数都是1,我们要的是每个单词出现的总次数,那么怎么给它们累加起来?
3.把上面的分割后的单词放到一个缓存中,如放到map中,map<单词,次数>,当word出现一次的时候是map<word,1>,当出现两次的时候就是map<word,2>,这个map就是缓存。
4.把这个map的缓存中的内容遍历处理 这个就是 词频。

第三步:输出 可以按照你想输出的地方进行输出
1.打印到本地
2.写到本地文件系统
3.写到HDFS文件系统
4.写到MySQL数据库…

上面的骨架已经定义好,下面来进行实现。

代码实现

1.首先定义一个抽象类或者接口Mapper,只是定义了功能,但并不关注具体怎么实现。

package com.ruozedata.hadoop.hdfs;

public interface Mapper {
    /**
     * map 一一操作 对每个元素进行操作
     * 现在读进来是一行数据,对读进来的每行数据进行操作
     */
    public void map(String line,Context context);
}

这个接口,只是定义了一个map方法,它的功能是传进来一行数据line,中间处理过程数据以及结果数据会放在context缓存中,所以可以理解line是一行数据,context是一个缓存,临时存放数据的一个东西。

2.定义这个缓存Context
这个缓存Context ,它有一个cacheMap 对象,这个对象是一个HashMap实例,有两个参数,第一个是key,第二个是value,可以理解为可以存放<key,value>的数据,就是缓存。
代码如下:

package com.ruozedata.hadoop.hdfs;

import java.util.HashMap;
import java.util.Map;

public class Context {
    private Map<Object,Object> cacheMap = new HashMap<Object, Object>();

    //get方法
    public Map<Object, Object> getCacheMap() {
        return cacheMap;
    }

    /**
     * set方法
     * 把数据写入到缓存中
     * @param key  单词
     * @param value 次数
     */
    public void write(Object key, Object value) {
        cacheMap.put(key, value);
    }
    /**
     * 从缓存中获取单词对应的次数
     * @param key 单词
     * @return  次数
     */
    public Object get(Object key) {
        return cacheMap.get(key);
    }
}

3.定义一个类WordCountMapper ,来实现上面的接口Mapper,具体怎么实现是由WordCountMapper来完成。
传进去一行数据和一个缓存,把这行数据按照空格进行分割,分割后是一个数组,然后对这个数组进行遍历,然后根据key的值去缓存中看看有没有对应的value,如果没有,则把这个单词,也就是key放进缓存中,并且value值给1。如果有这个单词key,那么把对应的value取出来再加1,然后再放进缓存中去。

package com.ruozedata.hadoop.hdfs;

public class WordCountMapper implements Mapper {

    public void map(String line, Context context) {
        String[] splits = line.split(" ");
        for (String word : splits) {
            Object value = context.get(word);
            if (null == value){ //单词不存在的情况
                context.write(word,1);
            } else { //单词存在的情况  先把读取出来的值加1,去然后再写进去
                context.write(word,Integer.parseInt(value.toString()) + 1);
            }
        }
    }

}

4.定义一个类HDFSWCAPI01,读取文件,处理数据,然后输出。
定义一个configuration,然后给它配置相关的hdfs地址等,然后定义一个fileSystem,有了configuration,就有了fileSystem入口。

package com.ruozedata.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import javax.swing.plaf.synth.ColorType;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;


public class HDFSWCAPI01 {
    public static void main(String[] args) throws Exception{

        //Configuration和FileSystem
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
        configuration.set("dfs.replication","1");
        System.setProperty("HADOOP_USER_NAME","ruoze");
        FileSystem fileSystem = FileSystem.get(configuration);

        //读取数据 input
        Path input = new Path("/hdfsapi/test3/");

        WordCountMapper mapper = new WordCountMapper();
        Context context = new Context();

        //远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件
        RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true);

        //如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加
        while (iterator.hasNext()){
            LocatedFileStatus status = iterator.next();
            FSDataInputStream in = fileSystem.open(status.getPath());

            BufferedReader read = new BufferedReader(new InputStreamReader(in));

            String line = "";
            while ((line = read.readLine()) != null){
                System.out.println(line);

                mapper.map(line,context);

            }
            read.close();
            in.close();

            //从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。
            Map<Object, Object> cacheMap = context.getCacheMap();
            for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) {
                System.out.println(entry.getKey() + "\t" + entry.getValue());
            }
        }
    }
}

忽略单词大小写以及多态的使用

如果忽略单词大小写去统计wc,只需要把上面的WordCountMapper的复制一份,CaseIgnoreWordCountMapper只需要加上line.toLowerCase()。

package com.ruozedata.hadoop.hdfs;

public class CaseIgnoreWordCountMapper implements Mapper {

    public void map(String line, Context context) {
        String[] splits = line.toLowerCase().split(" ");
        for (String word : splits) {
            Object value = context.get(word);
            if (null == value){ //单词不存在的情况
                context.write(word,1);
            } else { //单词存在的情况  先把读取出来的值加1,去然后再写进去
                context.write(word,Integer.parseInt(value.toString()) + 1);
            }
        }
    }
}

然后加入了Mapper mapper = new CaseIgnoreWordCountMapper();进行调用,会忽略大小写,这个也是多态的使用。

package com.ruozedata.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;


public class HDFSWCAPI01 {
    public static void main(String[] args) throws Exception{

        //Configuration和FileSystem
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
        configuration.set("dfs.replication","1");
        System.setProperty("HADOOP_USER_NAME","ruoze");
        FileSystem fileSystem = FileSystem.get(configuration);

        //读取数据 input
        Path input = new Path("/hdfsapi/test3/");

        //WordCountMapper mapper = new WordCountMapper();  --这个不会忽略大小写
        //下面这个加了Mapper,为多态的使用,如果把Mapper换成CaseIgnoreWordCountMapper,则不是
        //CaseIgnoreWordCountMapper这个会忽略大小写
        Mapper mapper = new CaseIgnoreWordCountMapper();
        Context context = new Context();

        //远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件
        RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true);

        //如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加
        while (iterator.hasNext()){
            LocatedFileStatus status = iterator.next();
            FSDataInputStream in = fileSystem.open(status.getPath());

            BufferedReader read = new BufferedReader(new InputStreamReader(in));

            String line = "";
            while ((line = read.readLine()) != null){
                System.out.println(line);

                mapper.map(line,context);

            }
            read.close();
            in.close();

            System.out.println("\n\n");

            //TODO... 后面可以考虑把结果写入到hdfs的某个文件中去
            //Path result = new Path("/hdfsapi/result/result.txt");

            //从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。
            Map<Object, Object> cacheMap = context.getCacheMap();
            for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) {
                System.out.println(entry.getKey() + "\t" + entry.getValue());
            }

        }
    }
}

代码改造

上面的写法还不够灵活,可以通过配置文件的形式进行改造,把需要的东西放在配置文件中,需要什么读什么,读进来之后通过反射的方式进行处理。
1.在resources中创建一个文件:wc.properties

INPUT_PATH=/hdfsapi/test3/
OUTPUT_PATH=/hdfsapi/result/
HDFS_URI=hdfs://hadoop001:9000
##具体用哪个子类来实现,写这个类,底层要用反射才能实现
MAPPER_CLASS=com.ruozedata.hadoop.hdfs.WordCountMapper

2.创建一个工具类ParamsUtils去读取上面的配置文件

package com.ruozedata.hadoop.hdfs;

import java.io.IOException;
import java.util.Properties;

public class ParamsUtils {
    private static Properties properties = new Properties();

    static {
        try {
            properties.load(ParamsUtils.class.getClassLoader().getResourceAsStream("wc.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //get方法
    public static Properties getProperties() {
        return properties;
    }

    public static void main(String[] args) {
        System.out.println(getProperties().getProperty("MAPPER_CLASS"));
        System.out.println(getProperties().getProperty("INPUT_PATH"));
    }
}

上面这个getProperties().getProperty(“MAPPER_CLASS”)都是写死的,可以用一个常量类来封装一下,不封装也可以。

package com.ruozedata.hadoop.hdfs;

public class Constants {
    public static final String INPUT_PATH = "INPUT_PATH";
    public static final String OUTPUT_PATH = "OUTPUT_PATH";
    public static final String HDFS_URI = "HDFS_URI";
    public static final String MAPPER_CLASS = "MAPPER_CLASS";
}

然后再调用常量类:

package com.ruozedata.hadoop.hdfs;

import java.io.IOException;
import java.util.Properties;

public class ParamsUtils {
    private static Properties properties = new Properties();

    static {
        try {
            properties.load(ParamsUtils.class.getClassLoader().getResourceAsStream("wc.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //get方法
    public static Properties getProperties() {
        return properties;
    }

    public static void main(String[] args) {
//        System.out.println(getProperties().getProperty("MAPPER_CLASS"));
//        System.out.println(getProperties().getProperty("INPUT_PATH"));
        System.out.println(getProperties().getProperty(Constants.MAPPER_CLASS));
        System.out.println(getProperties().getProperty(Constants.INPUT_PATH));
        System.out.println(getProperties().getProperty(Constants.HDFS_URI));
        System.out.println(getProperties().getProperty(Constants.OUTPUT_PATH));
    }
}

输出结果:

com.ruozedata.hadoop.hdfs.WordCountMapper
/hdfsapi/test3/
hdfs://hadoop001:9000
/hdfsapi/result/

最后再进行测试,HDFSWCAPI02 再HDFSWCAPI01基础上只是修改了通过properties 拿到input,以及通过反射拿到MAPPER_CLASS类,其他都不需要修改。

package com.ruozedata.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Properties;


public class HDFSWCAPI02 {
    public static void main(String[] args) throws Exception{

        //拿到配置
        Properties properties = ParamsUtils.getProperties();
        Path input = new Path(properties.getProperty(Constants.INPUT_PATH));

        //Configuration和FileSystem
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
        configuration.set("dfs.replication","1");
        System.setProperty("HADOOP_USER_NAME","ruoze");
        FileSystem fileSystem = FileSystem.get(configuration);

        //因为上面有了input,所以这里不需要了
//        //读取数据 input
//        Path input = new Path("/hdfsapi/test3/");


        //因为配置文件中有MAPPER_CLASS,在下面会用到,所以在这里需要通过反射把它给拿出来
        Class<?> aClass = Class.forName(properties.getProperty(Constants.MAPPER_CLASS));
        //然后通过aClass反射,去new一个instance:aClass.newInstance()

        //因前面Class<?> 的类型是不明确的,所以在这里
        // 需要在aClass.newInstance()前面加一个强制转换(Mapper)成Mapper类型,就是转成它的父类
        Mapper mapper = (Mapper) aClass.newInstance();
        Context context = new Context();

        //远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件
        RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true);

        //如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加
        while (iterator.hasNext()){
            LocatedFileStatus status = iterator.next();
            FSDataInputStream in = fileSystem.open(status.getPath());

            BufferedReader read = new BufferedReader(new InputStreamReader(in));

            String line = "";
            while ((line = read.readLine()) != null){
                System.out.println(line);

                mapper.map(line,context);

            }
            read.close();
            in.close();

            System.out.println("\n\n");

            //TODO... 后面可以考虑把结果写入到hdfs的某个文件中去
            //Path result = new Path("/hdfsapi/result/result.txt");

            //从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。
            Map<Object, Object> cacheMap = context.getCacheMap();
            for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) {
                System.out.println(entry.getKey() + "\t" + entry.getValue());
            }

        }
    }
}

这样的话所有的输入的东西都在配置文件中完成,包括用哪个类也指定好了,上面统计是区分大小写的,如果要调用区分大小写的类,只需要在wc.properties中把:
MAPPER_CLASS=com.ruozedata.hadoop.hdfs.WordCountMapper
修改成即可:
MAPPER_CLASS=com.ruozedata.hadoop.hdfs.CaseIgnoreWordCountMapper

标签:HDFS,JAVA,System,API,context,import,println,new,public
来源: https://blog.csdn.net/liweihope/article/details/122095627

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有