OpenStack_Swift源码分析——Object-auditor源码分析(1)


1 Object-auditor 的启动

Object-auditor的启动和object-replicator的启动过程是一样的,首先是执行启动脚本

swift-init object-auditor start

启动脚本会运行swift源码bin目录下的swift-ojbect-auditor

if __name__ == '__main__':
    parser = OptionParser("%prog CONFIG [options]")
    parser.add_option('-z', '--zero_byte_fps',
                      help='Audit only zero byte files at specified files/sec')
    parser.add_option('-d', '--devices',
                      help='Audit only given devices. Comma-separated list')
    conf_file, options = parse_options(parser=parser, once=True)
    run_daemon(ObjectAuditor, conf_file, **options)
这里是传入的ObjectAuditor类,而且ObjectorAuditor同样继承Daemon类,其的过程和 Replicator类似,下面看Auditor类中runforever方法,其代码为:

def run_forever(self, *args, **kwargs):
        """Run the object audit until stopped."""
        # zero byte only command line option
        zbo_fps = kwargs.get('zero_byte_fps', 0)
        parent = False
        if zbo_fps:
            # only start parent
            parent = True
        kwargs = {'mode': 'forever'}

        while True:
            try:
                #循环审计
                self.audit_loop(parent, zbo_fps, **kwargs)
            except (Exception, Timeout) as err:
                self.logger.exception(_('ERROR auditing: %s' % err))
            #每隔30秒执行一次
            self._sleep()

while循环在执行完audit_loop方法后会睡眠30秒,让后继续执行。重点看循环审计方法aduit_loop方法

   def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
        """Audit loop"""
        self.clear_recon_cache('ALL')
        self.clear_recon_cache('ZBF')
        kwargs['device_dirs'] = override_devices    #获得需要重写的设备
        #第一次启动 parent 为false
        if parent:
            kwargs['zero_byte_fps'] = zbo_fps
            #运行 run_audit方法
            self.run_audit(**kwargs)
        else:
            pids = []
            # self.conf_zero_byte_fps = int(conf.get('zero_byte_files_per_second', 50))
            if self.conf_zero_byte_fps:
                zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
                #将子进程加入到pids中
                pids.append(zbf_pid)
            pids.append(self.fork_child(**kwargs))
            #
            while pids:
                pid = os.wait()[0]
                # ZBF scanner must be restarted as soon as it finishes
                if self.conf_zero_byte_fps and pid == zbf_pid and \
                   len(pids) > 1:
                    kwargs['device_dirs'] = override_devices
                    zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
                    pids.append(zbf_pid)
                pids.remove(pid)
在audit_loop中主要是执行fork_child方法,其返还子进程的pid,下面为fork_child的具体实现

    def fork_child(self, zero_byte_fps=False, **kwargs):
        """Child execution"""
        #返回子进程的进程号
        pid = os.fork()
        if pid:
            return pid
        else:
            signal.signal(signal.SIGTERM, signal.SIG_DFL)
            if zero_byte_fps:
                kwargs['zero_byte_fps'] = self.conf_zero_byte_fps
            #运行审计
            self.run_audit(**kwargs)
            sys.exit()
首先是获得子进程的pid,并返回,若果没有此进程pid这需要创建子进程,子进程的创建是是执行run_audit方法,其中run_audit方法的实现代码如下:

def run_audit(self, **kwargs):
        """Run the object audit"""
        mode = kwargs.get('mode')
        zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
        device_dirs = kwargs.get('device_dirs')

        worker = AuditorWorker(self.conf, self.logger, self.rcache,
                               self.devices,
                               zero_byte_only_at_fps=zero_byte_only_at_fps)
        #审计所有的对象
        worker.audit_all_objects(mode=mode, device_dirs=device_dirs)

首先是获得mode,此时的mode为 forever然后是获得设备中之前挂载的存储文件的设备。创建AuditorWorker类的实例,并执行audit_all_objects。审计器是不停的扫描当前服务器存储设备下的所有对象,并将损坏的文件放入隔离区,等待replicate用完好的来替代。

下一篇博客中重点介绍具体执行审计的AuditorWorker类的实现以及对于损坏的文件是如何创建隔离区的。

由于本人水平有限,文中难免出现理解错误,敬请指正、交流,谢谢!

相关内容