[原创] Hadoop 2.6.x 下Distributed Cache的用法

仔细记录一下Java的Map-Reduce job使用distributed cache的方法,毕竟以前一直都是copy paste ~

 适用的Hadoop版本
CDH 5.8.0(Hadoop 2.6.0)
别的版本没有测试过,但后面相近的版本应该也能用。

 准备工作:上传本地文件到HDFS
为了在Java代码中把一个文件加入 distributed cache,需要先把它上传到HDFS,之后应使用 HDFS 路径来加入 distributed cache。
假设要加入 distributed cache 的文件为 file.txt:

hadoop fs -put file.txt /your/hdfs/dir/


 job 配置

    String fileName = "file.txt";
    String filePathHdfs = "/your/hdfs/dir/file.txt";

    Job job = Job.getInstance(getConf());
    Configuration conf = job.getConfiguration();
    conf.set("fileName", fileName);
    conf.set("filePathHdfs", filePathHdfs);

    job.addCacheFile(new URI(String.format("%s#%s", filePathHdfs, fileName)));

文章来源:https://www.codelast.com/
这里往 conf 里塞了两个变量,一个是文件名 fileName,另一个是文件的HDFS路径 filePathHdfs,并且在 addCacheFile() 的时候,拼成了 /your/hdfs/dir/file.txt#file.txt 这样奇怪的形式,这种形式使得在 mapper 或 reducer 中读取 distributed cache 文件的时候,直接用文件名就能读出文件,特别方便!

 在 mapper 或 reducer 的 setup() 方法中读取 distributed cache 文件

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      Configuration conf = context.getConfiguration();

      File myFile = new File(conf.get("file.txt"));
      FileInputStream fis = new FileInputStream(myFile);
      BufferedReader reader = new BufferedReader(new InputStreamReader(fis));

      String line;
      while ((line = reader.readLine()) != null) {
        //TODO: deal with each line
      }
    }

看到没?上面的 conf.get("file.txt") 就只使用了文件名就能找到 distributed cache 里的文件,可以做这样“神奇”的操作是因为背后有一种叫symbolic link的技术。
下面留空的 TODO 那里,需要你自己填写处理每一行数据的逻辑。
文章来源:https://www.codelast.com/
➤➤ 版权声明 ➤➤ 
转载需注明出处:codelast.com 
感谢关注我的微信公众号(微信扫一扫):

wechat qrcode of codelast

发表评论