[原创]Hadoop开发基础知识记录

 

 当你把一个文件加入distribution cache的时候,要注意:如果你是以addCacheFile()的方式添加的,而你在mapper中取出来的时候,却是以archive的方式取出来——getLocalCacheArchives(),那么,你将得不到cache文件的路径,因为放进去和取出来的方式要一致。

 在mapper中获取当前正在处理的HDFS文件名/HDFS目录名

有时候,Hadoop是按行来对数据进行处理的,由于对每一行数据,map()函数会被调用一次,我们有时可以根据文件名/目录名来获取一些信息,从而把它们输出,例如,目录名中包含了日期,则我们可以取出来并输出到Reducer。在map()函数中,我们可以这样取文件名:

InputSplit inputSplit = context.getInputSplit();
String fileName = ((FileSplit) inputSplit).getName();

假设当前正在处理的HDFS文件路径为:/user/hadoop/abc/myFile.txt,则上面的 fileName 取到的是“myFile.txt”这样的字符串。但如果要获取其目录名“abc”,则可以这样做:

InputSplit inputSplit = context.getInputSplit();
String dirName = ((FileSplit) inputSplit).getPath().getParent().getName();

文章来源:http://www.codelast.com/
再来一个问题:如何获取当前正在处理的HDFS文件的路径(例如 /user/hadoop/abc.txt)?方法如下:

InputSplit inputSplit = context.getInputSplit();
String filePath = ((FileSplit) inputSplit).getPath().toString();

上面的filePath得到的是形如“hdfs://xxx:43540/user/hadoop/abc.txt”的路径,如果你只想获取 /user 开始到最后的那一段路径(例如 /user/hadoop/abc.txt),需要这样做:

InputSplit inputSplit = context.getInputSplit();
String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath();

 从HDFS上下载同一目录下的一堆文件
如果是从HDFS上下载一个文件到本地文件系统,这样做:

hadoop fs -get /your/hdfs/file /your/local/fs/file

但如果是要下载一个目录下的N个M-R输出文件(到一个文件),则应这样:

hadoop fs -getmerge /your/hdfs/directory /your/local/fs/file

或者你干脆把HDFS上的文件内容打印出来,重定向到一个文件:

hadoop fs -cat /your/hdfs/directory/part* > /your/local/fs/file

 关于InputFormat
具体可看这个链接。这里摘抄一段下来:

The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats. These types are described in more detail in Module 4.
More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.
即:InputFormat定义了如何从文件中将数据读取到Mapper的实例里。Hadoop已经自带了一些InputFormat的实现了,其中有一些用于处理文本文件,它们描述了如何解释文本文件的多个不同方法;其他的实现——例如SequenceFileInputFormat——是为读取特殊二进制文件格式而生的。
更加强大的是,你可以定义你自己的InputFormat实现来格式化输入到你程序的数据——无论你想要什么样的输入。例如,默认的TextInputFormat读取文本文件的一行行的数据。它为每条记录emit的key是正在读取的行的偏移字节(以LongWritable的形式体现),而value则是该行的内容直到结束的 \n 字符(以Text对象的形式体现)。如果你有多行记录,这些记录是以 $ 字符来分隔的,那么你可以写一个自己的InputFormat用于根据这个字符来分割解析文件。
文章来源:http://www.codelast.com/
 为什么要启用LZO压缩,现在有什么可用的Hadoop LZO实现
这篇文章很好地解释了Twitter的Hadoop LZO实践,看完它,你就明白为什么要用LZO了。
这个项目,就是Twitter的Hadoop LZO实现,非常有用。
一句话总结就是:gzip不能将数据分块压缩,虽然减小了存储的数据量(同时也就减小了IO),但却无法利用Map-Reduce进行并行处理;bzip可以将数据分块压缩,虽然减小了存储的数据量(同时也就减小了IO),但是却在解压的时候很慢,耗费掉太多的CPU资源,从而导致CPU处理速度跟不上读取压缩文件的速度;LZO在这二者之间达到了一个平衡,虽然其压缩比没有gzip那么高,却可以分块压缩(从而可以利用Map-Reduce进行并行处理),并且其解压速度非常快,整体上达到的效果就是:减小了数据存储量,减小了IO,虽然CPU资源比原来占用多了一些,但是Hadoop集群整体上的计算能力提升了很多。

 启动Haoop进程时的错误及解决方法:localhost: ssh: connect to host localhost port 22: Connection refused
启动Hadoop进程时可用Hadoop安装目录下的 bin/start-all.sh 脚本,如果执行该脚本提示错误:

localhost: ssh: connect to host localhost port 22: Connection refused
那么你应该先检查你是否安装了sshd,然后再检查防火墙是否阻止了连接本机的22端口。依据不同的Linux发行版,这些检测方法会有不同。以Ubuntu为例,执行sshd命令,如果提示你sshd没有安装,那么你可以使用以下命令安装之:

sudo apt-get install openssh-server

检查防火墙状态:

sudo ufw status

如果防火墙是打开的,那么还要确保22端口是允许连接的。
可以在开机启动时就启动Hadoop进程:编辑 /etc/rc.local 文件,添加一行即可:

/usr/local/hadoop/bin/start-all.sh

当然,需视情况改成你的Hadoop安装路径。

 在 Hadoop Map/Reduce Administration 的web页面中看不到运行中的job(Running Jobs)的可能原因
可能是 TaskTracker 没有启动,导致无法在页面中看到任何Running Jobs。这时你可以先查看一下其是否启动了:

ps -ef | grep java

里面应该有 org.apache.hadoop.mapred.TaskTracker 这一项。
如果没有,则可以重启Hadoop进程试试看。

 向HDFS中put文件时揭示“Name node is in safe mode”的原因及解决办法
向HDFS中put文件时,如果揭示:

put: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot create file /XXX. Name node is in safe mode.

原因很显然了,name node处于安全模式,解决办法也很简单:kill掉name node的进程,然后重启之:

ps -ef | grep java

用该命令查看name node进程的PID,然后kill掉,然后再启动之:

start-dfs.sh

再确定一下name node进程是否启动了,如果成功启动了,就OK了。
文章来源:http://www.codelast.com/
这里有一个批量kill进程的技巧,其实就是几句shell语句:

PIDS=`ps -ef | grep -v grep | grep java | awk '{print $2}'`; for PID in $PIDS; do kill $PID; done

其中,“grep java”表示查找含有“java”关键字的进程名,“grep -v grep”表示过滤掉grep自己的这个进程名,awk 是用于打印出第2列的内容,即PID(进程号),而后面的 for 循环则是批量kill掉找到的进程。

 在shell中判断一个HDFS目录/文件是否存在
直接看shell代码:

hadoop fs -test -e /hdfs_dir
if [ $? -ne 0 ]; then
    echo "Directory not exists!"
fi

hadoop fs -test -e 用于判断HDFS目录/文件是否存在,下一步检测该命令的返回值,以确定其判断结果。

-test -[ezd] <path>: If file { exists, has zero length, is a directory
then return 0, else return 1.
e,z,d参数必用其一,不可缺少。

 一次添加多个输入目录/文件到Map-Reduce job中
使用 FileInputFormat.addInputPaths(Job job, String commaSeparatedPaths) 方法,可以一次将多个目录/文件添加到M-R job中,其中,第二个参数是一个逗号分隔的路径列表,例如“/user/root/2012-01-01,/user/root/2012-01-02,/user/root/2012-01-03”。

 HBase中的TTL的单位
在hbase shell中,describe '表名'可以查看一个HBase表的结构和基本参数,例如:

hbase(main):005:0> describe 'TableName'
DESCRIPTION                                                             ENABLED                               
 {NAME => 'TableName', FAMILIES => [{NAME => 'fam', COMPRESSION = > 'NONE', VERSIONS => '2', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}

里面的TTL的单位是秒,不做特别设置的时候,就是这个默认值(约为69年),超过此时间的记录会被删除。

 HBase中的VERSIONS(版本)的含义
如上例所示,你已经看到了VERSIONS这个参数,假设其值为2,那么它表示:row key、column family、qualifier 都相同的记录最多可以有2条,这2条记录的timestamp不同。例如用hbase shell查到的下面两条记录:

abc     column=fam:\x11\x00\x00\x99, timestamp=1325260900000, value=?\x80\x00\x00
abc     column=fam:\x11\x00\x00\x99, timestamp=1326828800000, value=?\x80\x00\x00

其中,“abc”是row key,“fam”是column family,“\x11\x00\x00\x99”是qualifier,这三者均相同,而这两条记录的timestamp不同,也就是VERSIONS为2。

 context.progress()的作用
假设在map()方法中,你有一个从数据库读取大量数据的操作,是用一个循环来完成的,并且,在读完全部的数据之前,你不会有任何的数据输出(纯读),那么,读完全部数据所需的时间可能很长,一直没有输出的话,这个task就会因为超时被杀掉,为了避免这个问题,可在以读取数据的循环中使用context.progress()方法来报告进度,那么该task就会被认为还活着,从而解决超时问题。

 Map-Reduce的单元测试,用MRUnit来做
我们不可能把每个M-R job都放到到实际的环境中去运行,靠打印log来调试其中的问题,单元测试是必须的,M-R的单元测试用MRUnit来做。
①MRUnit有两个ReduceDriver,一个是 org.apache.hadoop.mrunit.mapreduce.ReduceDriver,另一个是 org.apache.hadoop.mrunit.ReduceDriver,其中,前者是为更新的Hadoop API准备的,如果你发现IDE在ReduceDriver这里提示错误,但是又不知道哪里写错了的时候,可以查看一下是否是这个问题。
②在测试一个mapper类中的时候,MRUnit的setUp()函数比mapper类的setup(Context context)函数要先执行。

 调用一个Java Map-Reduce程序时,在命令行传入参数“-D mapred.queue.name=XXX”的作用
Hadoop集群中的job被分在不同的队列中,如果不设置mapred.queue.name参数,则job被放置在默认队列中,否则就被放在指定的队列中。各队列之间是有优先级之分的,同一个队列中的各job也有优先级之分,所以,我们需要的话,可以既设置队列,又设置job的优先级:

-D mapred.queue.name=XXX
-D mapred.job.priority=HIGH

这表示设置优先级为HIGH。
文章来源:http://www.codelast.com/
 继承自org.apache.hadoop.hbase.mapreduce.TableMapper这个抽象类的一个mapper类,当它的map()方法每被调用一次时,就有HBase的一行(row)被读入处理,由于是处理一行,所以对一个map()方法来说,row key是唯一的,column family可能有N个,每个column family下又可能有M个qualifier,每一个qualifier还可能会对应X个timestamp的记录(取决于你HBase的VERSIONS设置),你可以在map()方法中,一级级地遍历得到所有记录。

 在大多数情况下,一个split里的数据(由一个mapper所处理)是来自于同一个文件的;少数情况下,一个split里的数据是来自多个文件的。

 org.apache.hadoop.mapreduce.lib.output 和 org.apache.hadoop.mapreduce.output 这两个package都有 TextOutputFormat 类,其中,前者比后者版本新,使用的时候注意。

 执行Map-Reduce Java程序时,传入 -D hadoop.job.ugi=hadoop,hadoop 参数可以使得该job以hadoop用户来执行,例如,你是以Linux root用户来执行一个脚本,脚本中执行了一个M-R Java程序,那么该程序就无法将输出结果写入到HDFS上的 /user/hadoop/ 目录下,如果按上面的方法传入一个参数,就解决了这个问题:

hadoop com.codelast.DoSomething -D hadoop.job.ugi=hadoop,hadoop

其中,com.codelast.DoSomething是你的M-R Java程序。

 用MRUnit怎么测试含有FileSplit.getPath()的方法
如果mapper中的一个方法myMethod(Context context)含有如下代码片段:

String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();

这句话是用来取当前mapper正在处理的文件名。那么,方法myMethod()就不能用MRUnit来测,因为无法使用MRUnit来设置mapper中当前正在处理的文件。为了测这个方法,你需要把上面的代码段抽取出来,单独放在一个方法中,我们假设其为:

public int getName(Context context) {
    return ((FileSplit) context.getInputSplit()).getPath().getName();
}

然后,在单元测试文件中,你的tester类里重写这个方法,自己指定一个返回值:

@Test
public void test_1() throws IOException {
	mapper = new MyMapper() {
	  @Override
	  public int getName(Context context) {
		return "part-r-00000";
	  }
	};

	Configuration configuration = new Configuration();
	mapDriver.withConfiguration(configuration);
	mapDriver.withMapper(mapper);
	
	mapDriver.withInput(new LongWritable(1), new Text("XXXXXX"));
	//TODO:
}

其中,MyMapper是你的mapper类名,在这里我们强制指定了getName方法返回一个字段串“part-r-00000”,从而在下面的“//TODO:”测试代码中,就可以在调用待测的myMethod方法时(间接地会调用getName方法),自然会得到“part-r-00000”这个字符串。

 HBase中的Pair类
如果你只要保存一对对象,那么Map可能不好用,你可以用 org.apache.hadoop.hbase.util 包中的 Pair<T1, T2> 类:

Pair<String, String> aPair = new Pair<String, String>("abc", "def");
String firstStr = aPair.getFirst();
String secondStr = aPair.getSecond();

显然,getFirst()方法用于取第一个值,getScond()方法用于取第二个值。

 用MRUnit测试mapper时,如何避开从 DistributedCache 加载文件
可以在unit test里set一个值到 Configuration 对象中,在mapper里判断这个变量是否set了,set了就从用于测试的local file读数据,没有set就从DistributedCache读文件。
当然,也可以让DistributedCache加载一个本地文件:

Configuration conf = new Configuration();

String myFile = "localFile.txt";
DistributedCache.setLocalFiles(conf, myFile);

文章来源:http://www.codelast.com/
 只有map的job,如何在一定程度上控制map的数量
如果一个job只有map,那么,map的数量就是输出文件的数量,为了能减少输出文件的数量,可以采用减少map的数量的方法,那么,如何减少呢?其中一个办法是设置最小的input split size。例如以下代码:

FileInputFormat.setMinInputSplitSize(job, 2L * 1024 * 1024 * 1024);

将使得小于 2G 的输入文件不会被分割处理。如果你的输入文件中有很多都是小于2G的,并且你的Hadoop集群配置了一个split的大小是默认的64M,那么就会导致一个1点几G的文件就会被很多个map处理,从而导致输出文件数量很多。使用上面的方法设置了min input split size之后,减小输出文件数量的效果很明显。

 如何使用elephant-bird的 LzoTextOutputFormat 对纯文本数据进行LZO压缩
假设你有一堆纯文本数据,要将它们用LZO来压缩,那么,可以用elephant-bird的 LzoTextOutputFormat 来实现。
一个只有map的job就可以完成这个工作,你需要做的,首先是设置输出格式:

job.setMapperClass(MyMapper.class);
job.setOutputFormatClass(LzoTextOutputFormat.class);

其次,你需要这样的一个mapper类:

public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    context.write(null, value);
  }
}

其余代码此处省略。

 如何使MapReduce job执行开始时不检查某目录是否已经存在
如果M-R job的HDFS输出目录已经存在,那么job执行时会报错。为了让它不检查,或者改变默认的检查办法(例如,我们会在HDFS输出目录下生成几个子目录,在里面输出最终数据,只要确保这几个子目录不存在即可),那么就需要override checkOutputSpecs 这个方法:

  @Override
  public void checkOutputSpecs(JobContext job) throws IOException {
    //TODO:
  }

在这里面,你只要把exception吃掉即可使得输出目录存在时不会报错。

 使用HBase的程序报错“java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.<init>”的一个原因
如果你的程序使用了HBase,并且有HDFS操作(即使用了hadoop的jar包),那么出现上面所说的错误提示时,请检查Hadoop的安装路径下的lib目录下,HBase的jar包版本是否与你的程序路径下的HBase jar包版本相同,如果不同,那么就有可能导致这个问题。

 实现了接口 org.apache.hadoop.util.Tool 的类,必须要实现 setConf() 和 getConf() 方法吗
当然要实现。通常是下面这样的两个方法:

  @Override
  public void setConf(Configuration entries) {
    this.configuration = entries;
  }

  @Override
  public Configuration getConf() {
    return this.configuration;
  }

如果不想实现这两个方法,那么只需要把你的类继承自 org.apache.hadoop.conf.Configured 即可,例如:

public class A extends Configured implements Tool {
    //TODO:
}

这样可以少写两个方法,代码更简洁一些。

 如果mapper的输出value为一个 ThriftWritable 类型的对象,在设置Hadoop job属性时,setMapOutputValueClass() 应该怎样写
假设你的mapper类是这样定义的:

public class A extends Mapper<LongWritable, Text, Text, ThriftWritable<MyType>> {
	//TODO:
}

其中,MyType是一个实现了 org.apache.thrift.TBase 接口的类。那么,在设置Hadoop job的属性时,我们可以这样写:

Job job = new Job(configuration, "My example job.");
job.setMapOutputValueClass(ThriftWritable.class);

这样写要注意,在reducer中取出同一个key的各value值时,需要用 setConverter() 方法来指定Thrift对象类型:

protected void reduce(Text key, Iterable<ThriftWritable<MyType>> values, Context context)
	  throws IOException, InterruptedException {
  for (ThriftWritable<MyType> value : values) {
	  value.setConverter(MyType.class);    // must set the class
	  MyType obj = value.get();
	  //TODO:
  }
}

如果你不 setConverter() 的话,将抛出一个java.lang.IllegalStateException异常,提示你无法识别类型。
文章来源:http://www.codelast.com/
 要注意在reducer中对同一个key的多个value循环取值的方法
假设在一个reduce()方法中对同一个key的多个value循环,做一些处理后取出想要的那个value,并输出:

protected void reduce(Text key, Iterable<Text> values, Context context)
	  throws IOException, InterruptedException {
	Text outputValue = null;
	for (Text value : values) {
	  //TODO: some logic processing 
	  outputValue = value;
	}

	if (outputValue != null) {
	  context.write(key, outputValue);
	}
}

那么,输出的outputValue将永远是所有value里面,最后一个循环到的value,这是因为程序会复用value这个对象,当使用 outputValue = value 这种赋值方式时,outputValue得到的是value的引用,而value又被复用了,所以outputValue最后将被赋予最后一个value的值。这可能会导致你在“TODO”那里做的处理失效(例如取了一个含有最大数字值的value),所以,为了保持逻辑正确,可以把 outputValue = value 换成:

outputValue = new Text(value);

创建一个新的对象,这样就不会出现上面所说的问题了。

 Zookeeper client的基本使用方法
如果你的一个程序要注册到Zookeeper中,你如何验证结果是正确的?当然是利用Zookeeper client来查看Zookeeper中的数据啦。
Zookeeper client程序就是Zookeeper安装目录下的 bin/zkCli.sh 脚本,执行后会进入交互命令行:

[zk: localhost:2181(CONNECTED) 0] 

查看你的程序注册的路径是否存在(假设注册到了 /abc/def ):

[zk: localhost:2181(CONNECTED) 0] ls /abc/def

查看你的程序注册的内容是否正确:

[zk: localhost:2181(CONNECTED) 0] get /abc/def

会输出一堆内容,其中,第一行就是你写入的内容,如果正确的话就OK了。

 获取HDFS文件的checksum
命令行我不知道怎么取HDFS的checksum(不把HDFS文件download到local fs的情况下),用Hadoop API获取checksum的Java程序片段如下:

Path path = new Path("/user/abc.txt");
try {
    FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
    FileChecksum fileChecksum = fs.getFileChecksum(path);
    System.out.print(fileChecksum);
    System.exit(0);
} catch (IOException e) {
    System.exit(1);
}

需要import的package是:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

特别需要注意的是:就算是同一个文件,在不同的Hadoop集群上,得到的checksum也可能是不同的!这是个非常大的陷阱,例如,我拿一个小文件在两个Hadoop集群上做了试验,得到的checksum是相同的;而对一个较大的文件(其大小超过了其中一个Hadoop集群的block size),取到的checksum是不相同的。这跟Hadoop集群的block size有关,具体可以看Hadoop的源码。
文章来源:http://www.codelast.com/
 用Hadoop的文件API来操作本地/HDFS文件的基本示例

Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);

FileStatus[] localFiles = local.listStatus("/home/codelast/");    // a local directory 
FSDataOutputStream outputStream = hdfs.create("/user/codelast/abc.txt");    // a HDFS file 

上面的代码只演示了:对本地的一个目录进行文件列表操作,以及试图在HDFS上创建一个文件。

 MultipleOutputs.setCountersEnabled()方法的作用
官方解释:

public static void setCountersEnabled(Job job, boolean enabled)
    Enables or disables counters for the named outputs. The counters group is the MultipleOutputs class name. The names of the counters are the same as the named outputs. These counters count the number records written to each output name. By default these counters are disabled.
 
Parameters:
    job - job to enable counters
    enabled - indicates if the counters will be enabled or not.
简单说来就是:启用counter记录。counter group的名字是MultipleOutputs类名,counter的名字与output相同(我们知道,MultipleOutputs一般会有多个output),counter计的数是写入每一个output的记录数。默认情况下这些counter是被禁用的。

 job.setJarByClass(XXX.class) 的作用(转载)
当在Hadoop集群上运行该job时,需要把代码打包成一个jar包(Hadoop会在集群上分发这个文件),通过job.setJarByClass(XXX.class)设置一个类,Hadoop会根据这个类找到其所在的jar包。

 Hadoop job提交后提示 Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException 错误的原因
我遇到的情况类似于这个链接的原因,具体如下:
我想输出的文件格式是LZO压缩的纯文本文件,因此我写了下面这句代码:

job.setOutputFormatClass(LzoOutputFormat.class);

错误就在这里——LzoOutputFormat是一个abstract class,所以不能用在这里。要输出LZO压缩的纯文本文件,可以设置:

job.setOutputFormatClass(TextOutputFormat.class);

然后在调用此Java程序的脚本中用参数指定输出LZO压缩的文件:

hadoop com.codelast.MyJob \
    -D mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec \
    -D mapred.output.compress=true \
    ......

当然,程序能接受传入参数的前提是,你的类(此处为MyJob)要实现org.apache.hadoop.util.Tool接口:

public class MyJob extends Configured implements Tool {
  ...
}

文章来源:http://www.codelast.com/
 修改已经在running的YARN application的队列(queue)

yarn application -movetoqueue application_1888888888888_288888 -queue adhoc

其中,application_1888888888888_288888是你的YARN application的id,最后的“adhoc”表示要修改成的队列名称。

 查看一个HDFS路径的 quota(配额) 信息

hadoop fs -count -q /your/hdfs/path

输出:

QUOTA  REMAINING_QUOTA SPACE_QUOTA    REMAINING_SPACE_QUOTA DIR_COUNT  FILE_COUNT CONTENT_SIZE   FILE_NAME
none   inf             54975581388800 5277747062870         3922       418464     16565944775310 /your/hdfs/path

文章来源:https://www.codelast.com/
➤➤ 版权声明 ➤➤ 
转载需注明出处:codelast.com 
感谢关注我的微信公众号(微信扫一扫):

wechat qrcode of codelast

《[原创]Hadoop开发基础知识记录》有3条评论

回复 三江小渡 取消回复