香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > python >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

Python装饰器限制函数运行时间超时则退出执行


时间:2022-04-02 10:28 作者:admin610456


实际项目中会涉及到需要对有些函数的响应时间做一些限制,如果超时就退出函数的执行,停止等待。

可以利用python/' target='_blank'>python中的装饰器实现对函数执行时间的控制。

Python装饰器简单来说可以在不改变某个函数内部实现和原来调用方式的前提下对该函数增加一些附件的功能,提供了对该函数功能的扩展。

方法一. 使用 signal

# coding=utf-8import signalimport timedef set_timeout(num, callback):  def wrap(func):    def handle(signum, frame): # 收到信号 SIGALRM 后的回调函数,第一个参数是信号的数字,第二个参数是the interrupted stack frame.      raise RuntimeError    def to_do(*args, **kwargs):      try:        signal.signal(signal.SIGALRM, handle) # 设置信号和回调函数        signal.alarm(num) # 设置 num 秒的闹钟        print('start alarm signal.')        r = func(*args, **kwargs)        print('close alarm signal.')        signal.alarm(0) # 关闭闹钟        return r      except RuntimeError as e:        callback()    return to_do  return wrapdef after_timeout(): # 超时后的处理函数  print("Time out!")@set_timeout(2, after_timeout) # 限时 2 秒超时def connect(): # 要执行的函数  time.sleep(3) # 函数执行时间,写大于2的值,可测试超时  print('Finished without timeout.')if __name__ == '__main__':  connect()

方法一中使用的signal有所限制,需要在linux系统上,并且需要在主线程中使用。方法二使用线程计时,不受此限制。

方法二. 使用Thread

# -*- coding: utf-8 -*-from threading import Threadimport timeclass TimeoutException(Exception):  passThreadStop = Thread._Thread__stopdef timelimited(timeout):  def decorator(function):    def decorator2(*args,**kwargs):      class TimeLimited(Thread):        def __init__(self,_error= None,):          Thread.__init__(self)          self._error = _error        def run(self):          try:            self.result = function(*args,**kwargs)          except Exception,e:            self._error = str(e)        def _stop(self):          if self.isAlive():            ThreadStop(self)      t = TimeLimited()      t.start()      t.join(timeout)      if isinstance(t._error,TimeoutException):        t._stop()        raise TimeoutException('timeout for %s' % (repr(function)))      if t.isAlive():        t._stop()        raise TimeoutException('timeout for %s' % (repr(function)))      if t._error is None:        return t.result    return decorator2  return decorator@timelimited(2) # 设置运行超时时间2Sdef fn_1(secs):  time.sleep(secs)  return 'Finished without timeout'def do_something_after_timeout():  print('Time out!')if __name__ == "__main__":  try:    print(fn_1(3)) # 设置函数执行3S  except TimeoutException as e:    print(str(e))    do_something_after_timeout()

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。如果你想了解更多相关内容请查看下面相关链接

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)