Curator源码解析(五)连接和重试机制分析,curator源码


转载请注明出处: jiq•钦's technical Blog

本文将主要关注Curator是如何处理连接丢失和会话终止这两个关键问题的。

1.   连接丢失的处理

Curator中利用类ConnectionState来管理客户端到ZooKeeper集群的连接状态,其中用到原子布尔型变量来标识当前连接是否已经建立:

private final AtomicBoolean isConnected=new AtomicBoolean(false);
在事件处理函数中(ConnectionState实现了Watcher接口)修改isConnected的值:
@Override
   public void process(WatchedEvent event)
   {
       //逐个调用parentWatchers容器中的Watcher的process函数
        for ( Watcher parentWatcher :parentWatchers )
        {
            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process",tracer.get());
            parentWatcher.process(event);
            timeTrace.commit();
        }
 
        //记录旧连接状态
        boolean wasConnected =isConnected.get();
        boolean newIsConnected = wasConnected;
        if ( event.getType() ==Watcher.Event.EventType.None )
        {
            //获取新连接状态
            newIsConnected =checkState(event.getState(), wasConnected);
        }
 
        //若状态发生变化,则修改
        if ( newIsConnected != wasConnected )
        {
           isConnected.set(newIsConnected);
            connectionStartMs = System.currentTimeMillis();
        }
   }

其中checkState函数获取当前连接状态是否为已连接:

private boolean checkState(Event.KeeperState state, boolean wasConnected)
   {
        boolean isConnected = wasConnected;
        boolean checkNewConnectionString =true;
        switch ( state )
        {
        default:
        //连接丢失
        case Disconnected:
        {
            isConnected = false;
            break;
        }
 
        //连接建立
        case SyncConnected:
        case ConnectedReadOnly:
        {
            isConnected = true;
            break;
        }
 
        //验证失败
        case AuthFailed:
        {
            isConnected = false;
            log.error("Authentication failed");
            break;
        }
 
        //会话终止
        case Expired:
        {
            isConnected = false;
            checkNewConnectionString = false;
            handleExpiredSession();
            break;
        }
 
        case SaslAuthenticated:
        {
            // NOP
            break;
        }
        }
 
        if ( checkNewConnectionString &&zooKeeper.hasNewConnectionString())
        {
            handleNewConnectionString();
        }
 
        return isConnected;
}

若平时发生连接丢失isConnected(标识当前连接状态)被置为false,ZooKeeper自动重连回来之后isConnected被置为true,所以在平时连接与否无关紧要,但是当发起ZooKeeper操作(like getChildren,get/setData, create, delete)时,若发生连接丢失的情况,则会抛出ConnectionLossexception,那么Curator这个时候是如何处理的呢?

 

下面以SetData操作来看,下面是Curator执行SetData操作的代码:

Stat resultStat =RetryLoop.callWithRetry
        (
            client.getZookeeperClient(),
            new Callable<Stat>()
            {
                @Override
                public Stat call()throws Exception
                {
                    returnclient.getZooKeeper().setData(path, data, version);
                }
            }
        );

可以看到真正的setData操作被包装到了callWithRetry函数中:

public static<T>T callWithRetry(CuratorZookeeperClient client, Callable<T> proc)throws Exception
   {
        T result = null;
        RetryLoop retryLoop =client.newRetryLoop();
        while ( retryLoop.shouldContinue() )
        {
            try
           {
                //检测当前连接状态,若未连接则等待一定时间直到连接完成
               client.internalBlockUntilConnectedOrTimedOut();
               
                //调用带返回值的Callable方法
                result = proc.call();
                retryLoop.markComplete();
            }
            catch ( Exception e )
            {
                retryLoop.takeException(e);
            }
        }
        return result;
}

这个函数其实很简单,步骤如下:

(1)     检测当前是否已连接,若已连接则执行下一句代码,否则等待一定时间;

(2)     执行真正的ZooKeeper操作;

(3)     执行成功,标记为执行完成。

 

若执行ZooKeeper操作的时候发生任何异常,将会执行takeException函数:

public void takeException(Exception exception) throws Exception
   {
        boolean rethrow =true;
        if ( isRetryException(exception) )
        {
            //是否允许继续重试
            if (retryPolicy.allowRetry(retryCount++,System.currentTimeMillis() -startTimeMs, sleeper))
            {
                rethrow = false;
            }          
        }
 
        if ( rethrow )
        {
            throw exception;
        }
}
---如果isRetryException函数判断抛出的异常是否是“连接丢失异常/会话终止异常”,如果是则判断是否允许重试(其实传递进来的重试策略就是简单地进行三次重试),允许重试的话就不抛出异常,返回,继续下一轮循环。

---如果isRetryException函数判断不属于“连接丢失异常/会话终止异常”,比如是其他的一些其他异常(create操作可能引起NodeExists 异常, delete操作可能引起NoNode异常),那么将继续把异常抛出,callWithRetry函数将因为异常而结束返回。

 

这就是Curator处理连接丢失的策略,平时仅仅是通过在watch事件响应函数中记录连接状态isConnected,执行ZooKeeper操作的时候,先等待连接状态isConnected变为true再执行操作,若执行期间若发生异常,仅仅在当异常类型为“连接丢失/会话终止”时进行重试,反复几次。

 

这种机制个人认为已经足够应付所有场景。

 

2.   会话终止的处理

和连接丢失一样,我们需要分别来分析平时和执行ZooKeeper操作时发生“会话终止”异常Curator怎么来处理。

还是看ConnectionState类中watch事件响应函数,其中有这么一段代码:

//会话终止
case Expired:
{
   isConnected = false;
   checkNewConnectionString = false;
   handleExpiredSession();
   break;
}

关键是看handleExpiredSession函数:
private voidhandleExpiredSession()
   {
        try
        {
            reset();
        }
        catch ( Exception e )
        {
            queueBackgroundException(e);
        }
}
就是一个reset函数:
private synchronized voidreset() throws Exception
   {
        log.debug("reset");
 
        instanceIndex.incrementAndGet();
 
        isConnected.set(false);
        connectionStartMs = System.currentTimeMillis();
        zooKeeper.closeAndReset();
        zooKeeper.getZooKeeper();  // initiateconnection
   }

关键是看最后两句代码,先是执行HandleHolder的closeAndReset函数:
void closeAndReset() throws Exception
   {
        internalClose();
 
        helper = new Helper()
        {
            private volatile ZooKeeper zooKeeperHandle =null;
            private volatile String connectionString =null;
 
            @Override
            public ZooKeeper getZooKeeper()throws Exception
            {
                synchronized(this)
                {
                    if (zooKeeperHandle == null)
                    {
                        connectionString =ensembleProvider.getConnectionString();
                        zooKeeperHandle =zookeeperFactory.newZooKeeper(connectionString,sessionTimeout, watcher,canBeReadOnly);
                    }
 
                    helper = new Helper()
                    {
                        @Override
                        public ZooKeepergetZooKeeper()throwsException
                        {
                            returnzooKeeperHandle;
                        }
 
                        @Override
                        public StringgetConnectionString()
                        {
                            returnconnectionString;
                        }
                    };
 
                   returnzooKeeperHandle;
                }
            }
 
            @Override
            public String getConnectionString()
            {
                returnconnectionString;
            }
        };
   }

如果对这个函数不清楚,可以回过头看看之前文章讲的Curator的初始化和启动的源码分析,HandleHolder是原生ZooKeeper对象的持有者,维护ZooKeeper对象的单例就是通过这个函数。

 

一旦发生会话终止异常,ZooKeeper句柄会被自动关闭,所以之前初始化的helper对象中的zooKeeperHandle变量将会变得不可用,所以需要调用这个closeAndReset函数重新初始化helper对象,然后再调用一次getZooKeeper函数执行zookeeperFactory.newZooKeeper初始化好ZooKeeper句柄。

注意:会话终止后ZooKeeper句柄会被自动关闭,但并不是被置为null了,所以在用原来的helper对象的getZooKeeper方法返回的句柄是不可用的。

 

再看看当执行ZooKeeper操作时发生了会话终止时怎么处理。如果执行ZooKeeper时发生了会话终止,watch事件响应函数中会中心构建ZooKeeper句柄,callWithRetry函数中不但判断当前发生的是连接丢失异常时会进行重试,判断是会话终止异常也会进行重试。

 

所以说Curator处理会话终止的方法,就是在收到Expired事件时候重新构建HandleHolder中维护的ZooKeeper句柄。

 

关于临时节点和watch事件

特别注意,有的应用程序创建的临时节点和注册的watch事件至关重要,无法忍受丢失的情况,若发生会话终止,它们必定会被ZooKeeper服务器端删除掉,并且Curator无法帮助你重新还原回来。

 

这个时候就需要应用程序自己处理,在收到会话终止异常之后,重新注册关键的watch事件,以及重新创建关键的临时节点。

相关内容

    暂无相关文章