[原创] Zookeeper注册节点的掉线自动重新注册及测试方法

在一套分布式的online services系统中,各service通常不会放在一台服务器上,而是通过Zookeeper这样的东西,将自己的service信息注册到上面,service的使用者通过Zookeeper来发现各service的信息,从而可以将request发送到不同的service上去处理。

service register on Zookeeper

如上图所示,两个Service Provider 1和2分别在192.168.1.5和192.168.1.6这两台服务器的2688端口上提供服务,服务的地址和端口注册到了Zookeeper中。Service User通过查询Zookeeper,可得知这些服务的信息。通常,Service User与Service Provider之间的通信,是通过connection pool实现的,因为Service User不可能假定在第一次查询到所有Service Provider的信息之后,它们就是一直存活的,假如某个Service Provider因为程序问题死掉了,向它发送request只会造成大量的失败结果,因此通常会实现一个connection pool来保证实时更新节点的信息,当有一个Service Provider从Zookeeper上消失之后,从connection pool中取出的connection总是可用的(即:总能通过它把request发送到一个有效的Service Provider那里)。
文章来源:http://www.codelast.com/
这里,我们保证了当一个Service Provider从Zookeeper上退出后,我们一定不会再用它,但是,我们如何保证一个Service Provider还存活的情况下,都能处于可服务的状态呢?例如,Service Provider 1的程序工作一直很稳定,但是某天由于ISP的原因,它和Zookeeper之间的网络中断了5分钟,于是Zookeeper会无法向它发送心跳包,最终Zookeeper会认为session expired了,从而会把它注册的临时节点给移除掉(必须是注册临时节点啊,要不然当Service Provider挂了之后节点还在,岂不出乱子了)。移除掉之后,问题就来了:5分钟后中断的网络恢复正常了,Service Provider 1的程序也一直没有死掉,它又可以serving了,但是由于它在Zookeeper中注册的节点没了,所以Service User通过connection pool是永远也无法向Service Provider 1发送request的,于是所有的request还是都发到了Service Provider 2那里,造成它的负载一直很大,系统的处理能力减弱。
因此,为了避免这种问题,我们需要在Service Provider中提供Zookeeper掉线自动重新注册的功能。
【1】用Java怎么注册Zookeeper
Curator库是一个绝好的选择。它是Netflix开发的一套开源软件。
什么?没听说过Netflix?全美三分之一的带宽都是被这家公司占用的你不知道?
好吧,那么火爆得一塌糊涂的美剧《纸牌屋》你总听说过吧?就是这家公司花钱制作的。如果这也没听说过的话,那么你只能去Google啦。
题外话:科技和生活娱乐息息相关啊!
如果要问为什么不直接用Zookeeper官方的API,而是使用包装过的Curator,那只能用一句话来解释:Curator很好很强大很方便。
文章来源:http://www.codelast.com/
【2】代码
『A』使用Curator注册Zookeeper的代码

CuratorFramework curator = CuratorFrameworkFactory.newClient("zookeeper.codelast.com:2181", 
    50003000new RetryNTimes(51000));
curator.start();
String regContent = "192.168.1.5:2688";
String zkRegPathPrefix = "/codelast/service-provider-";
//TODO:
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
      .forPath(zkRegPathPrefix, regContent.getBytes("UTF-8"));

解释一下:
zookeeper.codelast.com:2181 是你的Zookeeper server的host:port。
192.168.1.5:2688 是注册的Zookeeper节点的内容,表示Service Provider 1在1912.168.1.5的2688端口提供服务。当你用Zookeeper客户端zkCli.sh的get命令时,获取的返回值的第一行就是这个内容。
CreateMode.EPHEMERAL_SEQUENTIAL 表示注册的节点是临时的,并且其命名是顺序增加的。
/codelast/service-provider- 是把service注册到Zookeeper中的哪个路径下。上面代码的效果就是,注册的节点的完整路径会类似于 /codelast/service-provider-0000000001
文章来源:http://www.codelast.com/
『B』上面的代码在session expired的情况下,是无法自动重新在Zookeeper中注册的。要实现这种功能,需要添加一个实现了ConnectionStateListener接口的类,并应用到CuratorFramework对象上。我们需要在上面代码中的“TODO”处添加如下代码:

MyConnectionStateListener stateListener = new MyConnectionStateListener(zkRegPathPrefix, regContent);
curator.getConnectionStateListenable().addListener(stateListener);

然后再实现MyConnectionStateListener类:

/**
 * A class to monitor connection state & re-register to Zookeeper when connection lost.
 *
 * @author Darran Zhang @ codelast.com
 */
public class MyConnectionStateListener implements ConnectionStateListener {
  private String zkRegPathPrefix;
  private String regContent;

  public MyConnectionStateListener(String zkRegPathPrefix, String regContent) {
    this.zkRegPathPrefix = zkRegPathPrefix;
    this.regContent = regContent;
  }

  @Override
  public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
    if (connectionState == ConnectionState.LOST) {
      while (true) {
        try {
          if (curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
      .forPath(zkRegPathPrefix, regContent.getBytes("UTF-8"));
            break;
          }
        } catch (InterruptedException e) {
          //TODO: log something 
          break;
        } catch (Exception e) {
          //TODO: log something 
        }
      }
    }
  }
}

文章来源:http://www.codelast.com/
此类负责监听connection的状态,并在检测到LOST状态时(此时client已经从Zookeeper中掉线)重新注册。
注意,在检测到LOST状态后,上面的代码用了一个while (true) 死循环来不断尝试重新连接Zookeeper server,连不上不罢休。

【3】测试
如何测试?当然要创造出一种session expired的情况,让Zookeeper server把它认为已经掉线的节点的注册信息给移除掉。怎么让session expire呢?官方网站上有这么一段话

Is there an easy way to expire a session for testing?
 
Yes, a ZooKeeper handle can take a session id and password. This constructor is used to recover a session after total application failure. For example, an application can connect to ZooKeeper, save the session id and password to a file, terminate, restart, read the session id and password, and reconnect to ZooKeeper without loosing the session and the corresponding ephemeral nodes. It is up to the programmer to ensure that the session id and password isn't passed around to multiple instances of an application, otherwise problems can result.
 
In the case of testing we want to cause a problem, so to explicitly expire a session an application connects to ZooKeeper, saves the session id and password, creates another ZooKeeper handle with that id and password, and then closes the new handle. Since both handles reference the same session, the close on second handle will invalidate the session causing a SESSION_EXPIRED on the first handle.

文章来源:http://www.codelast.com/
但是需要这么麻烦吗?直接通过OS的防火墙就可以做到。例如,在RedHat上,通过如下命令:

iptables -A INPUT -d codelast.com -p tcp --sport 2181 -j DROP

你将可以阻止来自codelast.com这个域名的所有输入(INPUT)的流量。这会导致Zookeeper server和client之间的心跳失效,互相认为对方已经掉线了。对Zookeeper server来说,当它认为client掉线时,就会把client节点从Zookeeper中移除。这个时候,如果client程序没有重新注册的能力,那么当网络恢复后,client程序虽然是能正常运行的,但是也失去了提供service的能力——因为service的使用者已经无法通过Zookeeper发现它了。
我们先启动Service Provider,然后利用执行上面的命令(如果不是RedHat,请自行Google)阻断网络连接一段时间,你最终将会观察到Zookeeper中注册的节点已经没了,也就是说我们让session expired了。
然后,执行如下命令:

iptables -F

文章来源:http://www.codelast.com/
该命令可以恢复防火墙的设置。此时,网络恢复连接,你会看到Curator打印出来很多信息,提示已经重新连接上了Zookeeper server。但是注意,如果你没有像前面的代码一样提供自动重新注册的能力,那么,先前在Zookeeper中注册的节点并不会出现!也就是说,连上了也没用,它已经不能被发现了。
在添加了自动重新注册的功能后,Zookeeper中注册的节点就会自动重新被创建出来了。这就达到了我们要的效果。

文章来源:https://www.codelast.com/
➤➤ 版权声明 ➤➤ 
转载需注明出处:codelast.com 
感谢关注我的微信公众号(微信扫一扫):

wechat qrcode of codelast

《[原创] Zookeeper注册节点的掉线自动重新注册及测试方法》有4条评论

回复 yangfan 取消回复