Curator源码解析(三)访问接口分析,curator源码


接着上一篇,将分析测试程序中的访问接口部分。
2调用ZooKeeper访问接口

初始化和启动分析完了,操作接口调用代码如下:

String path = ZKPaths.makePath(PATH, name);
byte[] bytes =args[1].getBytes();
try
{
    client.setData().forPath(path,bytes);
}
catch (KeeperException.NoNodeException e )
{
   client.create().creatingParentsIfNeeded().forPath(path,bytes);
}

 

真正操作ZooKeeper节点调用的是实现了CuratorFramework接口的CuratorFrameworkImpl类的setData方法。定义如下:

@Override
   publicSetDataBuildersetData()
   {
        Preconditions.checkState(getState() ==CuratorFrameworkState.STARTED,"instance must bestarted before calling this method");
 
        return new SetDataBuilderImpl(this);
}

 

除此之外还有其他ZooKeeper操作接口:

@Override
   publicGetDataBuilder getData()
   {
        Preconditions.checkState(getState() ==CuratorFrameworkState.STARTED,"instance must bestarted before calling this method");
 
        return new GetDataBuilderImpl(this);
   }
 
   @Override
   publicGetChildrenBuildergetChildren()
   {
        Preconditions.checkState(getState() ==CuratorFrameworkState.STARTED,"instance must bestarted before calling this method");
 
        returnnew GetChildrenBuilderImpl(this);
   }
 
   @Override
   publicGetACLBuilder getACL()
   {
        Preconditions.checkState(getState() ==CuratorFrameworkState.STARTED,"instance must bestarted before calling this method");
 
        return new GetACLBuilderImpl(this);
}

 

所以CuratorFrameworkImpl类不但封装了CuratorZookeeperClient类,还提供了类似Zookeeper风格的操作接口,下面具体看一下setData方法如何实现。

可以看到每种操作接口都返回一个对应的操作类,如DeleteBuilderImpl,GetDataBuilderImpl,SetDataBuilderImpl等,每一个操作类都实现了一个方法叫做forPath,下面以SetDataBuilderImpl的forPath方法为例说明:

@Override
   publicStat forPath(String path,byte[] data) throwsException
   {
        if ( compress )
        {
            data = client.getCompressionProvider().compress(path,data);
        }
 
        path = client.fixForNamespace(path);
 
        Stat resultStat = null;
       
        //如果调用了inBackground()方法设置为异步操作,执行此分支
        if ( backgrounding.inBackground()  )
        {
            client.processBackgroundOperation(newOperationAndData<PathAndBytes>(this,new PathAndBytes(path, data),backgrounding.getCallback(),null,backgrounding.getContext()), null);
        }
        else
        {
            //否则执行此分支
            resultStat = pathInForeground(path,data);
        }
        returnresultStat;
}

我们这里只关注同步操作,代码如下:

private Stat pathInForeground(final String path,final byte[]data)throwsException
   {
        Stat  resultStat= RetryLoop.callWithRetry
        (
           client.getZookeeperClient(),
            new Callable<Stat>()
            {
                @Override
                public Stat call()throws Exception
                {
                    returnclient.getZooKeeper().setData(path, data, version);
                }
            }
        );
        trace.commit();
        return resultStat;
    }

可以看到最终是通过调用RetryLoop.callWithRetry方法来执行原生ZooKeeper的对应的操作接口。为么要封装到RetryLoop.callWithRetry中执行呢?这里面涉及到Curator封装ZooKeeper最重要的一点,就是内部封装了复杂的客户端到ZooKeeper集群的连接和重试机制,详细见下一节。

相关内容

    暂无相关文章