香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > python >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

python使用MQTT给硬件传输图片的实现方法


时间:2022-04-02 10:33 作者:admin


最近因需要用python/' target='_blank'>python写一个微服务来用MQTT给硬件传输图片,其中Python用的是flask框架,大概流程如下:


协议为:

需要将图片数据封装成多个消息进行传输,每个消息传输的数据字节数为1400Byte。
消息(MQTT Payload) 格式:Web服务器-------->BASE:

反馈:BASE---------> Web服务器:

如果Web服务器发送完一个“数据传输消息”后,5S内没有收到MQTT“反馈消息”或者收到的反馈中显示“数据包不完整”,则重发该“数据传输消息”。

程序流程图

根据上面的协议,可以得到如下的流程图:


代码如下:

# encoding:utf-8from flask import Flask, jsonifyfrom flask_restful import Api, Resource, reqparsefrom PIL import Imagefrom io import BytesIOimport requestsimport os, logging, timeimport paho.mqtt.client as mqttimport structfrom flask_cors import *# 日志配置信息logging.basicConfig(  level=logging.INFO,  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s',)class Mqtt(object):  def __init__(self, img_data, size):    self.MQTTHOST = '*******'    self.MQTTPORT = "******"    # 订阅和发送的主题    self.topic_from_base = 'mqttTestSub'    self.topic_to_base = 'mqttTestPub'    self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))    self.client = mqtt.Client(self.client_id)    # 完成链接后的回掉函数    self.client.on_connect = self.on_connect    # 图片大小    self.size = size    # 用于跳出死循环,结束任务    self.finished = None    # 包的编号    self.index = 0    # 将收到的图片数据按大小分成列表    self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]    # 记录发布后的数据,用于监控时延    self.pub_time = 0    self.header_to_base = 0xffffeeee    self.header_from_base = 0xeeeeffff    # 功能标识    self.function_begin = 0x01    self.function_doing = 0x02    self.function_finished = 0x03    # 包的完整和非完整状态    self.whole_package = 0x01    self.bad_package = 0x00    # 头信息的格式,小端模式    self.format_to_base = "<Lbhh"    self.format_from_base = "<Lbhb"    # 如果重发包时,用于检查是否重发第一个包    self.first = True    # 如果重发包时,用于检查是否重发最后一个包    self.last = False    self.begin_data = 'image.jpg;' + str(self.size)  # 链接mqtt服务器函数  def on_mqtt_connect(self):    self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)    self.client.loop_start()  # 链接完成后的回调函数  def on_connect(self, client, userdata, flags, rc):    logging.info("+++ Connected with result code {} +++".format(str(rc)))    self.client.subscribe(self.topic_from_base)  # 订阅函数  def subscribe(self):    self.client.subscribe(self.topic_from_base, 1)    # 消息到来处理函数    self.client.on_message = self.on_message  # 接收到信息后的回调函数  def on_message(self, client, userdata, msg):    # 如果接受第一个包则不需要重发第一个    self.first = False    # 将接受到的包进行解压,得到一个元组    base_tuple = struct.unpack(self.format_from_base, msg.payload)    logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))    logging.info("+++ package_number is {}, package_status_from_base is {} +++"           .format(base_tuple[2], base_tuple[3]))    # 检查接受到信息的头部是否正确    if base_tuple[0] == self.header_from_base:      logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))      # 是否完成传输,如果完成则退出      if base_tuple[1] == self.function_finished:        logging.info("+++ finish work +++")        self.finished = 1        self.client.disconnect()      else:        # 是否是最后一个包        if self.index == len(self.image_data_list) - 1:          self.publish('finished', self.function_finished)          self.last = True          logging.info("+++ finished_data_to_base is finished+++")        else:          # 如果接收到的包不是 0x03则进行传送数据          if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:            logging.info("+++ package_number is {}, package_status_from_base is {} +++"                   .format(base_tuple[2],base_tuple[3]))            # 如果数据的反馈中,包的状态是1则继续发下一个包            if base_tuple[3] == self.whole_package:              self.publish(self.index, self.function_doing)              logging.info("+++ data_to_base is finished+++")              self.index += 1            # 如果数据的反馈中,包的状态是0则重发数据包            elif base_tuple[3] == self.bad_package:              re_package_number = base_tuple[2]              self.publish(re_package_number-1, self.function_doing)              logging.info("+++ re_data_to_base is finished+++")            else:              logging.info("+++ package_status_from_base is not 0 or 1 +++")              self.client.disconnect()          else:            logging.info("+++ function_identifier is illegal +++")            self.client.disconnect()    else:      logging.info("+++ header_from_base is illegal +++")      self.client.disconnect()  # 数据发送函数  def publish(self, index, fuc):    # 看是否是最后一个包    if index == 'finished':      length = 0      package_number = 0      data = b''    else:      length = len(self.image_data_list[index])      package_number = index      data = self.image_data_list[index]    # 打包数据头信息    buffer = struct.pack(      self.format_to_base,      self.header_to_base,      fuc,      package_number,      length    )    to_base_data = buffer + data    # mqtt发送    self.client.publish(      self.topic_to_base,      to_base_data    )    self.pub_time = time.time()  # 发送第一个包函数  def publish_begin(self):    buffer = struct.pack(      self.format_to_base,      self.header_to_base,      self.function_begin,      0,      len(self.begin_data.encode('utf-8')),    )    begin_data = buffer + self.begin_data.encode('utf-8')    self.client.publish(self.topic_to_base, begin_data)  # 控制函数  def control(self):    self.on_mqtt_connect()    self.publish_begin()    begin_time = time.time()    self.pub_time = time.time()    self.subscribe()    while True:      time.sleep(1)      # 超过5秒重传      date = time.time() - self.pub_time      if date > 5:        # 是否重传第一个包        if self.first == True:          self.publish_begin()          logging.info('+++ this is timeout first_data +++')        # 是否重传最后一个包        elif self.last == True:          self.publish('finished', self.function_finished)          logging.info('+++ this is timeout last_data +++')        else:          self.publish(self.index-1, self.function_doing)          logging.info('+++ this is timeout middle_data +++')      if self.finished == 1:        logging.info('+++ all works is finished+++')        break    print(str(time.time()-begin_time) + 'begin_time - end_time')app = Flask(__name__)api = Api(app)CORS(app, supports_credentials=True)# 接受参数parser = reqparse.RequestParser()parser.add_argument('url', help='mqttImage url', location='args', type=str)class GetImage(Resource):  # 得到参数并从图床下载到本地  def get(self):    args = parser.parse_args()    url = args.get('url')    response = requests.get(url)    # 获取图片    image = Image.open(BytesIO(response.content))    # 存取图片    add = os.path.join(os.path.abspath(''), 'image.jpg')    image.save(add)    # 得到图片大小    size = os.path.getsize(add)    f = open(add, 'rb')    imageData = f.read()    f.close()    # 进行mqtt传输    mqtt = Mqtt(imageData, size)    mqtt.control()    # 删除文件    os.remove(add)    logging.info('*** the result of control is {} ***'.format(1))    return jsonify({      "imageData": 1    })api.add_resource(GetImage, '/image')if __name__ == '__main__':  app.run(debug=True, host='0.0.0.0')

总结

以上所述是小编给大家介绍的python使用MQTT给硬件传输图片的实现方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对脚本之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)