香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > python >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

python多线程抽象编程模型详解


时间:2022-04-02 10:24 作者:admin610456


最近需要完成一个多线程下载的工具,对其中的多线程下载进行了一个抽象,可以对所有需要使用到多线程编程的地方统一使用这个模型来进行编写。

主要结构:

1、基于Queue标准库实现了一个类似线程池的工具,用户指定提交任务线程submitter与工作线程worker数目,所有线程分别设置为后台运行,提供等待线程运行完成的接口。

2、所有需要完成的任务抽象成task,提供单独的无参数调用方式,供worker线程调用;task以生成器的方式作为参数提供,供submitter调用。

3、所有需要进行线程交互的信息放在context类中。

主要实现代码如下:

#Submitter线程类实现,主要是`task_generator`调用class SubmitterThread(threading.Thread):  _DEFAULT_WAIT_TIMEOUT = 2 #seconds  def __init__(self, queue, task_gen, timeout=2):    super(SubmitterThread, self).__init__()    self.queue = queue    if not isinstance(timeout, int):      _logger.error('Thread wait timeout value error: %s, '             'use default instead.' % timeout)      self.timeout = self._DEFAULT_WAIT_TIMEOUT    self.timeout = timeout    self.task_generator = task_gen  def run(self):    while True:      try:        task = self.task_generator.next()        self.queue.put(task, True, self.timeout)      except Queue.Full:        _logger.debug('Task queue is full. %s wait %d second%s timeout' %               (self.name, self.timeout, 's' if (self.timeout > 1) else ''))        break      except (StopIteration, ValueError) as e:        _logger.debug('Task finished')        break
#Worker线程实现,主要就是try块内的func调用class WorkerThread(threading.Thread):  _DEFAULT_WAIT_TIMEOUT = 2 #seconds  def __init__(self, queue, timeout=2):    super(WorkerThread, self).__init__()    self.queue = queue    if not isinstance(timeout, int):      _logger.error('Thread wait timeout value error: %s, '             'use default instead.' % timeout)      self.timeout = self._DEFAULT_WAIT_TIMEOUT    self.timeout = timeout  def run(self):    while True:      try:        func = self.queue.get(True, self.timeout)      except Queue.Empty:        _logger.debug('Task queue is empty. %s wait %d second%s timeout' %               (self.name, self.timeout, 's' if (self.timeout > 1) else ''))        break      if not callable(func):        time.sleep(1)      try:        func()      except Exception as e:        _logger.error('Thread %s running occurs error: %s' %               (self.name, e))        print('Thread running error: %s' % e)
class Executor(object):  """  The really place to execute executor  """  thread_list = []  submitters = 0  workers = 0  queue = None  task_generator = None  timeout = 0  def __init__(self, task_gen, submitters=1, workers=1 , timeout=2):    if len(self.thread_list) != 0:      raise RuntimeError('Executor can only instance once.')    self.queue = Queue.Queue(maxsize=submitters * 2 + workers * 2)    self.submitters = submitters    self.workers = workers    self.task_generator = task_gen    self.timeout = timeout  def start(self):    for i in range(self.submitters):      submitter = SubmitterThread(self.queue, self.task_generator, self.timeout)      self.thread_list.append(submitter)      submitter.setName('Submitter-%d' % i)      submitter.setDaemon(True)      submitter.start()    for i in range(self.workers):      worker = WorkerThread(self.queue, self.timeout)      self.thread_list.append(worker)      worker.setName('Worker-%d' % i)      worker.setDaemon(True)      worker.start()  def is_alive(self):    alive = False    for t in self.thread_list:      if t.isAlive():        alive = True        break    return alive  def wait_to_shutdown(self):    _logger.debug('Start to wait to shutdown')    for t in self.thread_list:      t.join()      _logger.debug('Shutdown thread : %s' % t.name)

Executor类保存了线程池,提供相应接口。有了这个抽象之后,只需要实例化Executor类的对象,然后调用start方法进行多线程任务的运行。并可以用is_alive等接口再主线程内进行其他处理。

后续再使用这个抽象进行实际多线程任务的实现。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)