一、Watcher---变更的通知------)客户端注册


Zookeeper 提供的了分布式数据的发布/订阅功能,通过 Watch 机制来实现这种分布式的通知功能。
Zookeeper 允许客户端向服务器注册一个Watch监听,当服务端的一些指定的事件触发了这个Watch ,就会向指定的客户端发送一个事件通知来实现分布式通知。 整个Watch的注册和通知过程如图: \ Zookeeper的 Watcher 机制主要包括客户端现成、客户端 WatchManager、Zookeeper服务器三部分,具体流程是:client 向Zookeeper注册的同时,把Watcher对象存储在客户端的WatchManager中,当Zookeeper服务器端触发Watcher事件后,会向客户端发送通知,客户端线程从WatchManager中取出对应的Watcher对象执行回掉逻辑。
客户端注册Watcher 可用在创建一个客户端实例时,向构造方法中传入一个默认的Watcher对象,也可以在调用 getData() , getChildren() , exist() 三个接口向 Zookeeper 服务器注册Watcher,无论使用哪种方式注册,原理都是一致的。 这里已 getData接口说明:
public byte[] getData(final String path, Watcher watcher , Stat stat)
在向 getData 接口注册 Watcher 后,客户端首先会对当前客户端请求 request 进行标记,将其置为“使用 Watcher监听”,同时会封装一个 Watcher 的注册对象 WatcherRegistration 对象,用于暂时保存数据节点路径和Watcher的对应关系,具体逻辑代码:
public Stat getData(final String path, Watcher watcher , Stat stat){ .......... WatcherRegistration wrt = null ; if(watcher !=null){ wrt = new DataWatchRegistration(watcher,path); } ...... request.setWatche(watcher!=null) ;//------注意此处后续会用到 ReplyHeader r = cnxn.submitRequest(h,request,response,wrt);
在Zookeeper中,Packet 可以被看做是一个最小的通信协议单元,用于客户端和服务器端之间进行网络传输,任何一个需要传输的对象都要被包装成 Packet 对象进行传输。因为 ClientCnxn 中的 WatcherRegistration 又会被封装到 Packet 中去,然后放入发送队列中等待客户端发送。
Packet queuePacket(RequestHeader h , ReplyHeader r , Record reqest , Record response, AsyncCallback cb ,String clientPath , String serverPath , Object ctx , WatcherRegistration wrt){ ....... synchronized(outgoingqueue){ packet = new Packet(h,r,request,response,wrt) ; ......... outgoingqueue.add(packet); } .... }
随后,Zookeeper 客户端会向服务端发送这个请求,同时等待服务端的返回。完成请求发送后,会由客户端 SendThread 线程的 readResponse 方法接受来自服务端的响应,finishPacket 方法会从 Packet 中取出对应的 Watcher 并注册到ZkManager中去,代码逻辑: private void finishPacket(Packet p){ if(p.watcherRegistration != null){ p.watcherRegistration.register(p.replyHeader.getErr()); } ....... }
现在把 Watcher 对象从 WatchRegistration 对象中取出来: protected Map> getWatchers(int rc){ return watchManager.dataWatches ;
}
public void register(int rc){ if(shouldAddWatch(rc)){ Map> watches = getWatchers(rc) ;
synchronized(watches){ Set watchers = watches.get(clientPath) ; if(watchers ==null){ watchers =new HashSet(); watches.put(clientPath,watchers); }
watchers.add(watcher); ......... } 在上面的register 方法中会吧暂时存放的 watcher 对象转交给 ZKWatcherManager , 并最终保存到dataWatchers 中 ,dataWatchers , ZKWatcherManager 都是 Map> 类型的数据结构,用于把数据节点的路径和Watcher对象一一映射后保存起来。 \
在上面的流程中,WatcherRegistration 在底层的网络序列化到底层的字节数组中。



相关内容