[原创]使用C++(通过Thrift)访问/操作/读写Hbase

无奈,网上关于C++访问Hbase的文章实在太少,所以只好自己折腾一下,然后写出来了。

要使用C++访问Hbase,可以走的途径少之又少,据说当前最好的方法就是通过Thrift来实现:http://thrift.apache.org/

所以本文分成几部分:(1)安装Thrift;(2)用Thrift 生成访问Hbase所需的C++文件;(3)在程序中通过Thrift来访问Hbase。

另外,本文只包含读写Hbase数据的例子,不包含配置Hbase的方法,如需这些内容,请自行搜索。

首先声明一下,本文基于以下环境:

操作系统:RHEL 5.3,64位

Thrift 版本:0.7.0

要访问的 Hbase 版本:0.20.6

我使用0.90.4的 Hbase 安装包来生成C++所需的Hbase.h等文件(用新版的应该能兼容旧版的)

文章来源:http://www.codelast.com/

下面开始,一步步来。

(1)安装Thrift

不是一件很轻松的事。如果你的系统比较干净,可能很顺利地就搞定了,如果有依赖库缺失问题或者库冲突问题,那么就只能根据具体情况,一个个问题去fix了,谁知道会有多少麻烦。

我运气比较好,在一个干净的系统上,很快就完成了。

Thrift 至少会依赖于两个系统中一般不会带的库:libevent,boost。

libevent 到这里下载:http://monkey.org/~provos/libevent/  我使用的版本是:2.0.12-stable

boost 到这里下载:http://www.boost.org/  我使用的版本是:1.47.0

文章来源:http://www.codelast.com/

安装libevent:

./configure --prefix=/usr/local/libevent
make
make install

安装boost(boost不像一般的Linux源码安装包一样,它的安装不是configure,make,make install,有点怪):

./bootstrap.sh --prefix=/usr/local/boost

不出错的话接着执行以下命令开始编译(也可以通过编辑project-config.jam文件调整编译参数):

./b2
./b2 install

安装Thrift:

chmod +x configure
./configure --with-boost=/usr/local --prefix=/usr/local/thrift
make
make install

至此,安装Thrift 的工作就完成了。

文章来源:http://www.codelast.com/

(2)用Thrift 生成访问Hbase所需的C++文件

访问Hbase需要在你的程序中使用若干.h,.cpp文件,这些文件是用 Thrift 生成的。

解压Hbase源码安装包:

tar zxf hbase-0.90.4.tar.gz
cd hbase-0.90.4

在解压出来的文件中, 你可以找到一个名为 Hbase.thrift 的文件,这个文件定义了如何通过 Thrift 接口来访问Hbase。用这个Thrift文件,可以生成访问Hbase所需的C++文件:

/usr/local/thrift/bin/thrift --gen cpp ./src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift

会发现生成了gen-cpp目录:

ls gen-cpp/

输出:

Hbase_constants.cpp  Hbase_constants.h  Hbase.cpp  Hbase.h  Hbase_server.skeleton.cpp  Hbase_types.cpp  Hbase_types.h

除了Hbase_server.skeleton.cpp之外,其余文件都是在我们的程序里要用到的,将它们拷贝到我们的工程目录下。

文章来源:http://www.codelast.com/

(3)在程序中使用Thrift来访问Hbase

要能通过 Thrift 访问Hbase,你必须首先要打开HBase的 Thrift 服务,请参考其他文档确保这一点是可用的。

下一步,我们在程序中如何读取Hbase的数据?

我们先看看hbase源码安装包中自带的例子:在解压出来的安装包中的 examples/thrift/ 目录下的 DemoClient.cpp 文件,有如下代码:

  boost::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
  boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
  boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
  HbaseClient client(protocol);
  try {
    transport->open();

    // do something

    transport->close();
  } catch (TException &tx) {
    printf("ERROR: %s\n", tx.what());
  }

我们就仿照这个例子来做。从DemoClient.cpp可见,我们要先创建三个指针socket,transport和protocol,后两个分别依赖于前两个,最后再创建一个client对象,我们操作Hbase就是使用这个client对象。在操作Hbase前,需要先打开到Hbase Thrift service的连接,即 transport->open(),在操作完 Hbase之后,需要关闭连接,即 transport->close(),这下就比较清楚了:我们可以写一个自己的类CHbaseOperate,它应该有一个connect函数和一个disconnect函数,分别用于打开、关闭连接,还应该有读写Hbase的基本功能。读写Hbase的方法,请参考Hbase.h中的函数,例子还是看DemoClient.cpp。
文章来源:http://www.codelast.com/

下面上代码:
HbaseOperate.h:

#ifndef __HBASE_OPERATE_H
#define __HBASE_OPERATE_H

#include <string>
#include <protocol/TBinaryProtocol.h>
#include <transport/TSocket.h>
#include <transport/TTransportUtils.h>
#include "Hbase.h"

/**
 * Class to operate Hbase.
 *
 * @author Darran Zhang (codelast.com)
 * @version 11-08-24
 * @declaration These codes are only for non-commercial use, and are distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.
 * You must not remove this declaration at any time.
 */

using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::hadoop::hbase::thrift;

typedef struct hbaseRet {
  std::string rowValue;
  time_t ts;

  hbaseRet() {
    ts = 0;
  }

} hbaseRet;

class CHbaseOperate
{
public:
	CHbaseOperate();
	virtual ~CHbaseOperate();

private:
  boost::shared_ptr<TTransport> socket;
  boost::shared_ptr<TTransport> transport;
  boost::shared_ptr<TProtocol> protocol;

  HbaseClient *client;

  std::string  hbaseServiceHost;
  int     hbaseServicePort;
  bool    isConnected;

public:
  bool  connect();

  bool  connect(std::string host, int port);

  bool  disconnect();

  bool  putRow(const std::string &tableName,
              const std::string &rowKey,
              const std::string &column,
              const std::string &rowValue);

  bool  getRow(hbaseRet &result,
              const std::string &tableName,
              const std::string &rowKey,
              const std::string &columnName);
};

#endif

文章来源:http://www.codelast.com/

HbaseOperate.cpp:

#include "HbaseOperate.h"
#include "log4cxx/log4cxx.h"
#include "log4cxx/propertyconfigurator.h"

static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("HbaseOperate.cpp"));

/**
 * Class to operate Hbase.
 *
 * @author Darran Zhang (codelast.com)
 * @version 11-08-24
 * @declaration These codes are only for non-commercial use, and are distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.
 * You must not remove this declaration at any time.
 */

using namespace std;

CHbaseOperate::CHbaseOperate() :
socket((TSocket*)NULL), transport((TBufferedTransport*)NULL), protocol((TBinaryProtocol*)NULL), client(NULL), hbaseServicePort(9090), isConnected(false)
{
}

CHbaseOperate::~CHbaseOperate()
{
  if (isConnected) {
    disconnect();
  }
  if (NULL != client) {
    delete client;
    client = NULL;
  }
}

/**
 * Connect Hbase.
 *
 */
bool CHbaseOperate::connect()
{
  if (isConnected) {
    LOG4CXX_INFO(logger, "Already connected, don't need to connect it again");
    return true;
  }

  try {
    socket.reset(new TSocket(hbaseServiceHost, hbaseServicePort));
    transport.reset(new TBufferedTransport(socket));
    protocol.reset(new TBinaryProtocol(transport));

    client = new HbaseClient(protocol);

    transport->open();
  } catch (const TException &tx) {
    LOG4CXX_ERROR(logger, "Connect Hbase error : " << tx.what());
    return false;
  }

  isConnected = true;
  return isConnected;
}

/**
 * Connect Hbase.
 *
 */
bool CHbaseOperate::connect(std::string host, int port)
{
  hbaseServiceHost = host;
  hbaseServicePort = port;

  return connect();
}

/**
 * Disconnect from Hbase.
 *
 */
bool CHbaseOperate::disconnect()
{
  if (!isConnected) {
    LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't disconnect from it");
    return false;
  }

  if (NULL != transport) {
    try {
      transport->close();
    } catch (const TException &tx) {
      LOG4CXX_ERROR(logger, "Disconnect Hbase error : " << tx.what());
      return false;
    }
  } else {
    return false;
  }

  isConnected = false;
  return true;
}

/**
 * Put a row to Hbase.
 *
 * @param tableName   [IN] The table name.
 * @param rowKey      [IN] The row key.
 * @param column      [IN] The "column family : qualifier".
 * @param rowValue    [IN] The row value.
 * @return True for successfully put the row, false otherwise.
 */
bool CHbaseOperate::putRow(const string &tableName, const string &rowKey, const string &column, const string &rowValue)
{
  if (!isConnected) {
    LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't put row");
    return false;
  }

  try {
    std::vector<Mutation> mutations;
    mutations.push_back(Mutation());
    mutations.back().column = column;
    mutations.back().value = rowValue;
    client->mutateRow(tableName, rowKey, mutations);

  } catch (const TException &tx) {
    LOG4CXX_ERROR(logger, "Operate Hbase error : " << tx.what());
    return false;
  }

  return true;
}

/**
 * Get a Hbase row.
 *
 * @param result      [OUT] The object which contains the returned data.
 * @param tableName   [IN] The Hbase table name, e.g. "MyTable".
 * @param rowKey      [IN] The Hbase row key, e.g. "kdr23790".
 * @param columnName  [IN] The "column family : qualifier".
 * @return True for successfully get the row, false otherwise.
 */
bool CHbaseOperate::getRow(hbaseRet &result, const std::string &tableName, const std::string &rowKey, const std::string &columnName)
{
  if (!isConnected) {
    LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't read data from it");
    return false;
  }

  std::vector<std::string> columnNames;
  columnNames.push_back(columnName);

  std::vector<TRowResult> rowResult;
  try {
    client->getRowWithColumns(rowResult, tableName, rowKey, columnNames);
  } catch (const TException &tx) {
    LOG4CXX_ERROR(logger, "Operate Hbase error : " << tx.what());
    return false;
  }

  if (0 == rowResult.size()) {
    LOG4CXX_WARN(logger, "Got no record with the key : [" << rowKey << "]");
    return false;
  }

  std::map<std::string, TCell>::const_iterator it = rowResult[rowResult.size() -1].columns.begin();
  result.rowValue = it->second.value;
  result.ts = it->second.timestamp;

  return true;
}

注意我在程序中使用了Apache log4cxx这个记录日志的库来打印/保存程序运行日志,使用方法可参考此链接。如果你不想用,可以自己改为std::cout。
代码有了,使用方法为:可以在你的程序中创建一个全局对象:

CHbaseOperate g_ho;

在需要操作Hbase之前:

g_ho.connect("192.168.55.66", 9090);

其中,“192.168.55.66”和9090分别是你的Hbase Thrift service的服务器地址和端口号,你需要正确地配置好,才能使用。本文开头已经说了,本文不讨论这方面的问题。
在操作完Hbase之后:

g_ho.disconnect();

文章来源:http://www.codelast.com/
现在再来说一下读写操作Hbase的两个函数:putRow()和getRow()。
putRow():

bool  putRow(const std::string &tableName,
              const std::string &rowKey,
              const std::string &column,
              const std::string &rowValue);

这是向Hbase写入一条记录的函数,参数tableName为Hbase表名,即你要将记录写到哪个Hbase表中;参数rowKey为待写入的记录的key;参数column为待写入的记录的“column family:qualifier”组合,参数rowValue为待写入的记录的value。

getRow():

bool  getRow(hbaseRet &result,
              const std::string &tableName,
              const std::string &rowKey,
              const std::string &columnName);

这是从Hbase中读取一条记录的函数,参数tableName为Hbase表名,即你要从哪个Hbase表中读取记录;参数rowKey为你要查询的记录的key;参数columnName为你要查询的记录的“column family:qualifier”组合;参数result为返回的Hbase的数据,它包含记录的value和记录的时间戳:

typedef struct hbaseRet {
  std::string rowValue;
  time_t ts; 

  hbaseRet() {
    ts = 0;
  }

} hbaseRet;

文章来源:http://www.codelast.com/
至于操作的结果对不对,可以在Hbase shell中用get, scan等命令来验证,具体方法请看Hbase shell的help。另外,最好再写一些unit test来测试。
如果你要为CHbaseOperate类添加功能,可以参考Hbase.h文件中的函数定义。如你所见,CHbaseOperate类主要也是调用了里面的函数,只不过这个类可以让一些不太熟悉Hbase概念的人可以更方便地操作Hbase罢了。

文章来源:https://www.codelast.com/
➤➤ 版权声明 ➤➤ 
转载需注明出处:codelast.com 
感谢关注我的微信公众号(微信扫一扫):

wechat qrcode of codelast

《[原创]使用C++(通过Thrift)访问/操作/读写Hbase》有6条评论

发表评论