[原创] 如何在Apache Pig中判断一个bag中是否包含特定的元素

查看更多Apache Pig的教程请点击这里

In Pig Latin, how to check if an element is present in a bag?

假设一个bag是由 int 元素组成的(可以理解为一个list),那么,如何判断这个bag中是否包含指定的元素(例如 5)呢?
如果你看过Pig的doc,就知道它并没有自带这样一个函数,可以输入一个bag,以及另一个值作为参数,然后输出1或0来表示bag是否包含这个元素。
所以,我们该如何实现这个功能?

现在我就以一个实际的例子来说明这个问题。
假设我们有数据文件 1.txt:

[codelast@ ~]$ cat 1.txt 
a    {(1),(2),(3),(5),(6)}
b    {(1)}
c    {(1),(2)}
d    {(1),(3),(5)}
3    {(1),(2),(5),(6)}

一共有两列,用 \t 分隔。其中,第一列是一个字符串,第二列样子很怪,它之所以写成那样,是为了可以在Pig读入的时候直接加载为一个bag,在这里,你可以把第二列理解为一个list,以第一行为例,这个list包含的元素就是1、2、3、5、6。
如果我想判断每一行数据的第二列中,是否包含 5 这个元素,代码该怎么写?
文章来源:http://www.codelast.com/

  • 方法一

我们不妨直接来看看正确的实现方法:

A = LOAD '1.txt' AS (name: chararray, aList:bag{(item:int)});
B = FOREACH A {
  FILTERED_LIST = FILTER aList BY ($0 == 5);
  GENERATE
  (IsEmpty(FILTERED_LIST) ? 'not-contain' : 'contain') AS flag,
  name;
}
DUMP B;

这段Pig代码的输出是:

(contain,a)
(not-contain,b)
(not-contain,c)
(contain,d)
(contain,e)

可以看到,第一行(a)是contain(包含5),第二行(b)是not-contain(不包含5),第三行(c)是not-contain(不包含5),等等。
从输入数据上我们可以很容易地判断出来,这个输出结果是完全正确的。所以,上面的Pig代码是怎么做到的呢?
且听我一行行分析下来。
文章来源:http://www.codelast.com/
第1行是加载数据, aList:bag{(item:int)} 这个形式有点怪,但它是由我们的输入数据 1.txt 里的格式决定的,这样做我们就能把第二列加载成一个bag,这个bag里有N个tuple,每个tuple里有一个int元素。
第2到第7行是用了一个嵌套的FOREACH来实现“判断一个bag中是否包含指定元素”的功能,这一句:

FILTERED_LIST = FILTER aList BY ($0 == 5);

会先filter得到包含 5 这个元素的bag,然后下面的这一句:

(IsEmpty(FILTERED_LIST) ? 'not-contain' : 'contain') AS flag,

会根根据filter得到的bag是否为空,来输出一个字符串,表示当前数据行是“不包含”还是“包含”指定的元素5。
但我觉得有很多人一定有疑问:为什么 FILTER 那一句可以得到包含5的bag? $0 == 5 是个什么鬼?其实,这就是查找“包含5的bag”,千万不要认为 $0 只表示bag的第一个元素!大家可以看看这个Pig文档,然而它里面也没有对 $0 做具体的解释,大家就强行理解一下吧。
所有又有人会问,那我能不能不用嵌套的FOREACH,直接像下面这样:

B = FILTER A BY (aList.$0 == 5);

来得到那些包含5的记录呢?答案是不行,这样写语法都是错的,一试便知。
关于方法一,大家也可以参考这个stackoverflow的讨论(但它里面其实是有一些错误的)。
文章来源:http://www.codelast.com/

  • 方法二

方法一会让第一次接触的同学有些费解。那么方法二就非常直接明了了。思路是:把bag FLATTEN(展开)出来,每一行数据展开成N行,然后第二列就变成了一个标量(int),就可以不用嵌套的FOREACH,也可以FILTER出来包含5的记录:

A = LOAD '1.txt' AS (name: chararray, aList:bag{(item:int)});
B = FOREACH A GENERATE name, FLATTEN(aList) AS item;
C = FILTER B BY (item == 5);
DUMP C;

输出结果:

(a,5)
(d,5)
(e,5)

可见结果是正确的。这里输出的只是bag中包含5的那些记录,如果要找到不包含5的那些记录,可以拿这个输出结果与原始数据做OUTER JOIN,但这显然比方法一麻烦,而且事实上它也不如方法一高效。所以,可以这么说,这算是“看上去很容易理解”的代价吧。
文章来源:http://www.codelast.com/

  • 方法三

所以有没有一种方法,它既写起来简单,看起来又容易理解呢?那就是用UDF啦。但UDF终归还是要自己写的,这个工作量我们没有把它计算在内,所以,这算是方法三的劣势。

假设我们要编写的UDF名为BagContains,它接受两个参数,第一个参数是bag,第二个参数是我们要在bag中查找的值(例如5),当bag中包含指定元素时返回1,否则返回0。
根据这个定义,我们来看看UDF的用法:

REGISTER '/home/codelast/my-pig-lib.jar';

DEFINE BagContains com.codelast.BagContains();

A = LOAD '1.txt' AS (name: chararray, aList:bag{(item:int)});
B = FOREACH A GENERATE name, ((BagContains(aList, 5) == 1) ? 'contain' : 'not-contain') AS flag;
DUMP B;

其中,REGISTER 那行引入的jar包,是我编写的UDF编译生成的jar包的路径。
这段代码的输出结果:

(a,contain)
(b,not-contain)
(c,not-contain)
(d,contain)
(e,contain)
这个结果与方法一实际上是一样的。
使用了UDF的代码是如此清晰易懂,但是话说回来,这个UDF该怎么写呢?
文章来源:http://www.codelast.com/

一言不合直接上UDF的代码:

package com.codelast;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;

import java.io.IOException;

/**
 * Check whether a list contains a specified item.
 * Usage: suppose aList is a bag contains int items, then
 * ContainsItem(aList, 5)
 * will return 1(the list contains 5) or 0(the list doesn't contains 5)
 *
 * @author Darran Zhang @ codelast.com
 */
public class BagContains extends EvalFunc<Integer{
  @Override
  public Integer exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0 || input.get(0) == null) {
      return null;
    }
    DataBag inputBag = DataType.toBag(input.get(0));
    int item2Find = DataType.toInteger(input.get(1));
    for (Tuple entry : inputBag) {
      int item = DataType.toInteger(entry.get(0));
      if (item == item2Find) {
        return 1;
      }
    }
    return 0;
  }
}

在这里,我就不解释每一行代码了,Pig UDF(JAVA)的编写有一个比较类似的套路,很多简单的UDF都可以用上面的格式稍微改改得到。

文章来源:https://www.codelast.com/
➤➤ 版权声明 ➤➤ 
转载需注明出处:codelast.com 
感谢关注我的微信公众号(微信扫一扫):

wechat qrcode of codelast
以及我的微信视频号:

发表评论