香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > python >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

python使用threading.Condition交替打印两个字符


时间:2022-04-02 10:33 作者:admin610456


python/' target='_blank'>python中使用threading.Condition交替打印两个字符的程序。

这个程序涉及到两个线程的的协调问题,两个线程为了能够相互协调运行,必须持有一个共同的状态,通过这个状态来维护两个线程的执行,通过使用threading.Condition对象就能够完成两个线程之间的这种协调工作。

threading.Condition默认情况下会通过持有一个ReentrantLock来协调线程之间的工作,所谓可重入锁,是只一个可以由一个线程递归获取的锁,此锁对象会维护当前锁的所有者(线程)和当前所有者递归获取锁的次数(本文在逻辑上和可重入锁没有任何关系,完全可以用一个普通锁替代)。

Python文档中给出的描述是:它是一个与某个锁相联系的变量。同时它实现了上下文管理协议。其对象中除了acquire和release方法之外,其它方法的调用的前提是,当前线程必须是这个锁的所有者。

通过代码和其中的注释,能够非常明白地弄清楚Condition的原理是怎样的:

import threadingimport timeimport functools  def worker(cond, name): """worker running in different thread""" with cond: # 通过__enter__方法,获取cond对象中的锁,默认是一个ReentrantLock对象  print('...{}-{}-{}'.format(name, threading.current_thread().getName(), cond._is_owned()))  cond.wait() # 创建一个新的锁NEWLOCK,调用acquire将NEWLOCK获取,然后将NEWLOCK放入等待列表中,\  # 释放cond._lock锁(_release_save),最后再次调用acquire让NEWLOCK阻塞 print('wait returned in {}'.format(name))  if __name__ == '__main__': condition = threading.Condition() t1 = threading.Thread(target=functools.partial(worker, condition, 't1')) t2 = threading.Thread(target=functools.partial(worker, condition, 't2'))  t2.start() # 启动线程2 t1.start() # 启动线程1  time.sleep(2) with condition:  condition.notify(1) # 按照FIFO顺序(wait调用顺序),释放一个锁,并将其从等待列表中删除  time.sleep(2)  with condition:  condition.notify(1) # 按照FIFO顺序(wait调用顺序),释放另一个锁,并将其从等待队列中删除  t1.join() # 主线程等待子线程结束 t2.join() # 主线程等待子线程结束  print('All done')

其输出为:

...t2-Thread-2-True...t1-Thread-1-Truewait returned in t2wait returned in t1All done

其中wait方法要求获取到threading.Condition对象中的锁(如果没有提供,默认使用一个可重入锁),然后自己创建一个新的普通锁(NEWLOCK),并获取这个NEWLOCK;之后调用_release_save方法释放threading.Condition对象中的锁,让其它线程能够获取到;最后再次调用NEWLOCK上的acquire方法,由于在创建时已经acquire过,所以此线程会阻塞在此。而wait想要继续执行,必须等待其它线程将产生阻塞的这个NEWLOCK给release掉,当然,这就是notify方法的责任了。

notify方法接收一个数字n,从等待列表中取出相应数量的等待对象(让wait方法阻塞的锁对象),调用其release方法,让对应的wait方法能够返回。而notify_all方法仅仅就是将n设置为等待列表的总长度而已。

在理解了threading.Condition对象中wait和notify的工作原理之后,我们就可以利用它们来实现两个线程交替打印字符的功能了:

import threadingimport functoolsimport time  def print_a(state): while True:  if state.closed:   print('Close a')   return  print('A')  time.sleep(2)  state.set_current_is_a(True)  state.wait_for_b()  def print_b(state): while True:  if state.closed:   print('Close b')   return  state.wait_for_a()  print('B')  time.sleep(2)  state.set_current_is_a(False)  if __name__ == '__main__': class State(object):  """state used to coordinate multiple(two here) threads"""  def __init__(self):   self.condition = threading.Condition()   self.current_is_a = False   self.closed = False   def wait_for_a(self):   with self.condition:    while not self.current_is_a:     self.condition.wait()   def wait_for_b(self):   with self.condition:    while self.current_is_a:     self.condition.wait()   def set_current_is_a(self, flag):   self.current_is_a = flag   with self.condition:    self.condition.notify_all()   state = State() t1 = threading.Thread(target=functools.partial(print_a, state)) t2 = threading.Thread(target=functools.partial(print_b, state))  try:  t1.start()  t2.start()   t1.join()  t2.join() except KeyboardInterrupt:  state.closed = True  print('Closed')

可以看到有两种类型的任务,一个用于打印字符A,一个用于打印字符B,我们的实现种让A先于B打印,所以在print_a中,先打印A,再设置当前字符状态并释放等待列表中的所有锁(set_current_is_a),如果没有这一步,current_is_a将一直是False,wait_for_b能够返回,而wait_for_a却永远不会返回,最终效果就是每隔两秒就打印一个字符A,而B永远不会打印。另一个副作用是如果wait_for_a永远不会返回,那print_b所在线程的关闭逻辑也就无法执行,最终会成为僵尸线程(这里的关闭逻辑只用作示例,生产环境需要更加完善的关闭机制)。

考虑另一种情况,print_a种将set_current_is_a和wait_for_b交换一下位置会怎么样。从观察到的输出我们看到,程序首先输出了一个字符A,以后,每隔2秒钟,就会同时输出A和B,而不是交替输出。原因在于,由于current_is_a还是False,我们先调用的wait_for_b其会立即返回,之后调用set_current_is_a,将current_is_a设置为True,并释放所有的阻塞wait的锁(notify_all),这个过程中没有阻塞,print_a紧接着进入了下一个打印循环;与此同时,print_b中的wait_for_a也返回了,进入到B的打印循环,故最终我们看到A和B总是一起打印。

可见对于threading.Condition的使用需要多加小心,要注意逻辑上的严谨性。

附一个队列版本:

import threadingimport functoolsimport timefrom queue import Queue  def print_a(q_a, q_b): while True:  char_a = q_a.get()  if char_a == 'closed':   return  print(char_a)  time.sleep(2)  q_b.put('B')  def print_b(q_a, q_b): while True:  char_b = q_b.get()  if char_b == 'closed':   return  print(char_b)  time.sleep(2)  q_a.put('A')  if __name__ == '__main__': q_a = Queue() q_b = Queue()  t1 = threading.Thread(target=functools.partial(print_a, q_a, q_b)) t2 = threading.Thread(target=functools.partial(print_b, q_a, q_b))  try:  t1.start()  t2.start()   q_a.put('A')   t1.join()  t2.join() except KeyboardInterrupt:  q_a.put('closed')  q_b.put('closed')  print('Done')

队列版本逻辑更清晰,更不容易出错,实际应用中应该选用队列。

附一个协程版本(Python 3.5+):

import timeimport asyncio  async def print_a(): while True:  print('a')  time.sleep(2) # simulate the CPU block time  await asyncio.sleep(0) # release control to event loop  async def print_b(): while True:  print('b')  time.sleep(2) # simulate the CPU block time  await asyncio.sleep(0) # release control to event loop  async def main(): await asyncio.wait([print_a(), print_b()])  if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())

协程的运行需要依附于一个事件循环(select/poll/epoll/kqueue),通过async def将一个函数定义为协程,通过await主动让渡控制权,通过相互让渡控制权完成交替打印字符。整个程序运行于一个线程中,这样就没有线程间协调的工作,仅仅是控制权的让渡逻辑。对于IO密集型操作,而没有明显的CPU阻塞(计算复杂,以致出现明显的延时,比如复杂加解密算法)的情况下非常合适。

附一个Java版本:

PrintMain类,用于管理和协调打印A和打印B的两个线程:

package com.cuttyfox.tests.self.version1; import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit; public class PrintMain { private boolean currentIsA = false;  public synchronized void waitingForPrintingA() throws InterruptedException {  while (this.currentIsA == false) {   wait();  } }  public synchronized void waitingForPrintingB() throws InterruptedException {  while (this.currentIsA == true) {   wait();  } }  public synchronized void setCurrentIsA(boolean flag) {  this.currentIsA = flag;  notifyAll(); }  public static void main(String[] args) throws Exception {  PrintMain state = new PrintMain();  ExecutorService executorService = Executors.newCachedThreadPool();  executorService.execute(new PrintB(state));  executorService.execute(new PrintA(state));  executorService.shutdown();  executorService.awaitTermination(10, TimeUnit.SECONDS);  System.out.println("Done");  System.exit(0); }}

打印A的线程(首先打印A):

package com.cuttyfox.tests.self.version1; import java.util.concurrent.TimeUnit; public class PrintA implements Runnable{ private PrintMain state;  public PrintA(PrintMain state) {  this.state = state; }  public void run() {  try {   while (!Thread.interrupted()){    System.out.println("Print A");    TimeUnit.SECONDS.sleep(1);    this.state.setCurrentIsA(true);    this.state.waitingForPrintingB();   }  } catch (InterruptedException e) {   System.out.println("Exit through Interrupting.");  }  }}

打印B的线程:

package com.cuttyfox.tests.self.version1; import java.util.concurrent.TimeUnit; public class PrintB implements Runnable{ private PrintMain state;  public PrintB(PrintMain state) {  this.state = state; }  public void run() {  try{   while (!Thread.interrupted()) {    this.state.waitingForPrintingA();    System.out.println("Print B");    TimeUnit.SECONDS.sleep(1);    this.state.setCurrentIsA(false);   }  } catch (InterruptedException e) {   System.out.println("Exit through Interrupting.");  }  }}

Java对象本身有对象锁,故这里没有像Python中那样需要显式通过创建一个Condition对象来得到一把锁。

使用Python实现交替打印abcdef的过程:

import threadingimport timeimport functoolsfrom collections import deque  LETTERS = [chr(code) for code in range(97, 97+6)] LENGTH = len(LETTERS)   class State(object):  def __init__(self):   self.condition = threading.Condition()   self.index_value = 0   def set_next_index(self, index):   with self.condition:    self.index_value = index    self.condition.notify_all()   def wait_for(self, index_value):   with self.condition:    while not self.index_value == index_value:     self.condition.wait()   def print_letter(state: State, wait_ident: int):  print('Got: {}!'.format(wait_ident))  while True:   state.wait_for(wait_ident)   time.sleep(2)   print(LETTERS[state.index_value])   print('PRINT: {} AND SET NEXT: {}'.format(state.index_value,              (state.index_value + 1) % LENGTH              ))   state.set_next_index((state.index_value + 1) % LENGTH)   state = State() d = deque() d.extend(range(LENGTH)) d.rotate(1) print(d)  threads = [] for wait_ident in d:  t = threading.Thread(target=functools.partial(print_letter, state, wait_ident))  threads.append(t)  for thread in threads:  thread.start()  for thread in threads:  thread.join()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)