读书人

复建第一次写的线程池

发布时间: 2012-10-08 19:54:56 作者: rapoo

重构第一次写的线程池
最近没有什么学习欲望,修改之前的线程池的计划一直搁置,这几天比较闲,还是做了一次重构,由之前的2个类拆分为现在的4个类.

1、首先是工作线程类:TaskThread,此类为一个工作线程,用于完成一个工作任务,提供等待(wait),继续(proceed),绑定任务(bindTask)等方法

#!/usr/bin/env python# -*- coding:utf8 -*-import threadingclass TaskThread(threading.Thread):    def __init__(self):        super(TaskThread,self).__init__()        self._e = threading.Event()        self.setDaemon(True)        self.isReady(True)        self.isActive(True)           def run(self):        while self.isActive():            try:                self.task()            except:                pass            finally:                self.wait()    def wait(self):          self.isReady(True)        self._e.clear()        self._e.wait()    def proceed(self):        self.isReady(False)        self._e.set()                if not self.isAlive() and self.isActive():            self.start()    def isReady(self,flag = None):        if isinstance(flag,bool):            self.is_ready = flag            return self.is_ready    def isActive(self,flag = None):         if isinstance(flag,bool):            self.is_active = flag            return self.is_active         def bindTask(self,task):        self.task = task


2、线程池类(ThreadPool),此类是一个单例,用于模拟一个池,并提一个同步方法getThread用于获取池中线程,若池已经空了,返回None
#!/usr/bin/env python# -*- coding:utf8 -*-import threadingfrom TaskThread import TaskThreadclass ThreadPool(object):    __instance = None    __lock = threading.Lock()    def __init__():        pass    @classmethod    def getInstance(self):        self.__lock.acquire()        if not self.__instance:            self.__instance = super(ThreadPool,self).__new__(self)        self.__lock.release()                return self.__instance    def initPool(self,pool_min_size = 5,pool_max_size = 10):        self.pool_min_size = pool_min_size        self.pool_max_size = pool_max_size        self.pool = []        for i in range(self.pool_min_size):            self.pool.append(TaskThread())    def getThread(self):        th = None                self.__lock.acquire()        for t in self.pool:            if not t._e.isSet() and t.isReady():                t.isReady(False)                th = t                break        self.__lock.release()        if th is None and len(self.pool) < self.pool_max_size:            th = TaskThread()            self.pool.append(th)                return th


3、任务队列类(TaskQueue),此类是一个单例,对Queue进行了简单的封装
#!/usr/bin/env python# -*- coding:utf8 -*-import threadingfrom Queue import Queueclass TaskQueue(object):    __instance = None    __lock = threading.Lock()        def __init__():        pass    @classmethod    def getInstance(self):        self.__lock.acquire()        if not self.__instance:            self.__instance = super(TaskQueue,self).__new__(self)        self.__lock.release()                return self.__instance    def initQueue(self,task_queue_size = 100):        self.tasks = Queue(task_queue_size)    def getTask(self):        try:            return self.tasks.get(0)        except:            raise Exception,'This queue is empty.'    def addTask(self,task):        try:            self.tasks.put(task,0)        except:            raise Exception,'This queue is full.'


4、线程池管理类(ThreadPoolManager),此类负责从任务队列中获取任务,然后绑定到线程池中获取空闲线程中执行.
#!/usr/bin/env python# -*- coding:utf8 -*-import threadingclass ThreadPoolManager(threading.Thread):    def __init__(self,pool,tasks):        super(ThreadPoolManager,self).__init__()        self.setDaemon(True)        self.pool = pool        self.tasks = tasks            def run(self):        while True:            t = self.pool.getThread()                        if t is not None:                try:                    task = self.tasks.getTask()                except:                    t.isReady(True)                else:                    t.bindTask(task)                    t.proceed()



第一次重构算是完成了,目前就差一个调节负载的轮询线程,不过暂时没有想到一个好点的调节策略,只有搁置...

读书人网 >编程

热点推荐