Python服务端多进程压测工具


本文描述一个Python实现的多进程压测工具,这个压测工具的特点如下:

  • 多进程

在大多数情况下,压测一般适用于IO密集型场景(如访问接口并等待返回),在这种场景下多线程多进程的区分并不明显(详情请参见GIL相关)。不过一旦出现词表参数加密、返回内容校验等事情的话,多进程对发送效率的提升还是很明显的。

  • 可以指定发送QPS

可以指定发压的QPS,根据并行度和请求相应时间,可以估算出可发送QPS峰值。例如并行度是10,响应时间是100ms,那么QPS峰值应该是(1s/100ms * 10)=100,此工具可以将QPS稳定的维持在小于峰值的一个量上。

  • 便于扩展

为什么要DIY压测工具了?一般的服务端压测工具,例如http_load和jmeter,不是http协议的,就是需要通过代码进行扩展。例如在压测thrift接口的时候,即使通过jmeter扩展java程序也很麻烦。但是当涉及到场景化压测,或者是奇怪的SDK,例如本文要压测的接口是通过java代码自动生成的python消息类SDK,并且涉及到场景化的压测,很难通过一般的服务端压测工具搞定。

1、发压代码

解耦

下面是压测代码的实现,可以看到,我这里使用abc包,做了一个抽象类。

业务测试代码,例如自动化case,只要继承了这个抽象类,就获得压测的能力,做到压测和自动化测试的解耦。

这里有两个抽象方法

vocab() - 构造词表

press() - 发压逻辑

是被@abc.abstractmethod装饰器装饰,在子类中,是一定要被实现的。

run()方法是压测执行的方法,实现子类的词表方法和发压逻辑之后,直接调用run()方法就可以压测了。

固定QPS

固定QPS是通过管理进程实现的。可以看到有两种进程:

一种是worker_process进程,调用了press()发压逻辑函数,并且这个进程可以指定并发度concurrent,是实际的发压进程,值得注意的是在worker_process中使用了time.sleep(),是为了控制发送速度。

另一种是manager_process进程,这个进程每隔一段时间计算实际的qps,并和设置的qps比较,然后调整worker_process中的sleep时间,例如实际qps小于设定qps,那么就少睡一会儿。

这里不得不提到的是,多进程如何共享变量?

这里使用的是multiprocessing中的Manager包,这个包提供了多进程共享变量的能力,我这里用到的是Namespace数据结构来存储多进程的计数。在使用过程中我怀疑Manager Namespace是通过读写文件的形式进行进程间共享变量的,这个我没有深入的研究。

# -*- coding:utf-8 -*-
import abc
import time
from multiprocessing import Lock, Process, Manager


class Press(object):

    __metaclass__ = abc.ABCMeta

    def __init__(self, qps=100, concurrent=10):
        self.qps = qps
        self.concurrent = concurrent
        self.mutex = Lock()
        self.local = Manager().Namespace()
        self.local.count = 0
        self.local.sleep = 0.1
        self.manager_gap = 0.5
        self.precision = 0.1
        self.vocab_list = list()
        self.vocab()

    def manager_process(self):
        while True:
            with self.mutex:
                current_qps = self.local.count / self.manager_gap
                self.local.count = 0
                print self.local.sleep, current_qps

            if current_qps < self.qps:
                self.local.sleep = self.local.sleep * (1.0 - self.precision)
            else:
                self.local.sleep = self.local.sleep * (1.0 + self.precision)
            time.sleep(self.manager_gap)

    def worker_process(self):
        while True:
            with self.mutex:
                self.local.count += 1
            time.sleep(self.local.sleep)
            self.press()

    @abc.abstractmethod
    def vocab(self):
        return

    @abc.abstractmethod
    def press(self):
        return

    def run(self):
        processes = [Process(target=self.worker_process) for index in range(self.concurrent)]
        processes.append(Process(target=self.manager_process))
        for process in processes:
            process.start()
        for process in processes:
            process.join()

2、实际压测

给出一个发压的例子。分三步~

QueryVmPress继承了Press类,获得了发压能力。

然后实现了vocab方法,构造了词表。

实现了press方法,这里是发压逻辑,可以看到QueryVmScenario.press_vm(vocab),QueryVmScenario放的是自动化case。发压只是调用了其中的一个接口。这个接口的编写很复杂,也是为什么要自己做一个压测工具的原因。

# -*- coding:utf-8 -*-
import random
from query.query_vm_scenario import QueryVmScenario
from db.vm_dao import Dao as vm_dao
from db.account_dao import Dao as account_dao
from press import Press
from lib import common
from vocab import Vocab

class QueryVmVocab(Vocab):

    def __init__(self):
        Vocab.__init__(self)


class QueryVmPress(Press):

    def __init__(self, qps=100, concurrent=10):
        Press.__init__(self, qps, concurrent)

    def vocab(self):
        for account in account_dao.query_all_account(limit=10):
            account_name = account[1]
            account_password = account[2]
            res = common.login_by_account(account_name, account_password)
            for item in vm_dao.query_vm_by_account(account_name, limit=100):
                vm_uuid = item[1]
                vocab = QueryVmVocab()
                vocab.add('session_uuid', res.inventory.uuid)
                vocab.add('vm_uuid', vm_uuid)
                self.vocab_list.append(vocab)
        return self.vocab_list

    def press(self):
        vocab = self.vocab_list[random.randint(0, len(self.vocab_list)-1)]
        QueryVmScenario.press_vm(vocab)


if __name__ == '__main__':
    QueryVmPress(qps=100, concurrent=10).run()

QueryVmPress(qps=100, concurrent=10).run(),就按照100QPS进行压测了。

0.1 20.0
0.09 40.0
0.081 60.0
0.0729 80.0
0.06561 60.0
0.059049 80.0
0.0531441 60.0
0.04782969 80.0
0.043046721 80.0
0.0387420489 80.0
0.03486784401 80.0
0.031381059609 100.0
0.0345191655699 80.0
0.0310672490129 88.0
0.0279605241116 92.0
0.0251644717005 100.0
0.0276809188705 80.0
0.0249128269835 100.0
0.0274041096818 100.0
0.03014452065 80.0
0.027130068585 100.0
0.0298430754435 80.0
0.0268587678991 100.0
0.029544644689 92.0

第一列是sleep时间,第二列是实际QPS,可以看到,qps会被动态的稳定在设置的值上。

3、混压

当要做多个接口混压的时候,可以这样做。

先写好单压的python类,在单压的代码里,可以看到我实现了QueryVmVocab类,表名了词表的类型,这个类集成自Vocab,Vocab就是一个字典的封装。

混压的时候,先将词表汇总,并且shuffle,然后弹出词表的时候,使用isinstance判断词表的类型,调用不同的发压函数进行压测。

vocab的实现

# -*- coding:utf-8 -*-
import abc

class Vocab(object):

    __metaclass__ = abc.ABCMeta

    def __init__(self):
        self.vocab = dict()

    def add(self, key, value):
        self.vocab[key] = value

    def get(self, key):
        return self.vocab.get(key)

    def remove(self, key):
        del self.vocab[key]

混压的实现

# -*- coding:utf-8 -*-
import random

from press import Press
from query_eip_press import QueryEipPress, QueryEipVocab
from query_image_press import QueryImagePress, QueryImageVocab
from query_snapshot_press import QuerySnapshotPress, QuerySnapshotVocab
from query_vm_press import QueryVmPress, QueryVmVocab

from query.query_eip_scenario import QueryEipScenario
from query.query_image_scenario import QueryImageScenario
from query.query_snapshot_scenario import QuerySnapshotScenario
from query.query_vm_scenario import QueryVmScenario


class MixedPress(Press):

    def __init__(self, qps=100, concurrent=10):
        Press.__init__(self, qps, concurrent)

    def vocab(self):
        self.vocab_list.extend(QueryEipPress().vocab())
        self.vocab_list.extend(QueryImagePress().vocab())
        self.vocab_list.extend(QuerySnapshotPress().vocab())
        self.vocab_list.extend(QueryVmPress().vocab())

    def press(self):
        vocab = self.vocab_list[random.randint(0, len(self.vocab_list)-1)]
        if isinstance(vocab, QueryEipVocab):
            QueryEipScenario.press_eip(vocab)
        elif isinstance(vocab, QueryImageVocab):
            QueryImageScenario.press_image(vocab)
        elif isinstance(vocab, QuerySnapshotVocab):
            QuerySnapshotScenario.press_snapshot(vocab)
        elif isinstance(vocab, QueryVmVocab):
            QueryVmScenario.press_vm(vocab)


if __name__ == '__main__':
    MixedPress(200, 50).run()

后记

这只是一个很小的功能实现,提供给大家参考。如果有不对的地方,希望得到大家指正。

相关内容