香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > python >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

python爬取基于m3u8协议的ts文件并合并


时间:2022-04-02 10:31 作者:admin


前言

简单学习过网络爬虫,只是之前都是照着书上做并发,大概能理解,却还是无法自己用到自己项目中,这里自己研究实现一个网页嗅探HTML5播放控件中基于m3u8协议ts格式视频资源的项目,并未考虑过复杂情况,毕竟只是练练手.

源码

# coding=utf-8import asyncioimport multiprocessingimport osimport reimport timefrom math import floorfrom multiprocessing import Managerimport aiohttpimport requestsfrom lxml import htmlimport threadingfrom src.my_lib import retryfrom src.my_lib import time_statisticsclass M3U8Download: _path = "./resource\\" # 本地文件路径 _url_seed = None # 资源所在链接前缀 _target_url = {} # 资源任务目标字典 _mode = "" _headers = {"User-agent": "Mozilla/5.0"} # 浏览器代理 _target_num = 100 def __init__(self): self._ml = Manager().list() # 进程通信列表 if not os.path.exists(self._path): # 检测本地目录存在否  os.makedirs(self._path) exec_str = r'chcp 65001' os.system(exec_str) # 先切换utf-8输出,防止控制台乱码 def sniffing(self, url): self._url = url print("开始嗅探...") try:  r = requests.get(self._url) # 访问嗅探网址,获取网页信息 except:  print("嗅探失败,网址不正确")  os.system("pause") else:  tree = html.fromstring(r.content)  try:  source_url = tree.xpath('//video//source/@src')[0] # 嗅探资源控制文件链接,这里只针对一个资源控制文件  # self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 从资源控制文件链接解析域名  except:  print("嗅探失败,未发现资源")  os.system("pause")  else:  self.analysis(source_url) def analysis(self, source_url): try:  self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 从资源控制文件链接解析域名  with requests.get(source_url) as r: # 访问资源控制文件,获得资源信息  src = re.split("\n*#.+\n", r.text) # 解析资源信息  for sub_src in src: # 将资源地址储存到任务字典   if sub_src:   self._target_url[sub_src] = self._url_seed + "/" + sub_src except Exception as e:  print("资源无法成功解析", e)  os.system("pause") else:  self._target_num = len(self._target_url)  print("sniffing success!!!,found", self._target_num, "url.")  self._mode = input(  "1:-> 单进程(Low B)\n2:-> 多进程+多线程(网速开始biubiu飞起!)\n3:-> 多进程+协程(最先进的并发!!!)\n")  if self._mode == "1":  for path, url in self._target_url.items():   self._download(path, url)  elif self._mode == "2" or self._mode == "3":  self._multiprocessing() def _multiprocessing(self, processing_num=4): # 多进程,多线程 target_list = {} # 进程任务字典,储存每个进程分配的任务 pool = multiprocessing.Pool(processes=processing_num) # 开启进程池 i = 0 # 任务分配标识 for path, url in self._target_url.items(): # 分配进程任务  target_list[path] = url  i += 1  if i % 10 == 0 or i == len(self._target_url): # 每个进程分配十个任务  if self._mode == "2":   pool.apply_async(self._sub_multithreading, kwds=target_list) # 使用多线程驱动方法  else:   pool.apply_async(self._sub_coroutine, kwds=target_list) # 使用协程驱动方法  target_list = {} pool.close() # join函数等待所有子进程结束 pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool while True:  if self._judge_over():  self._combine()  break def _sub_multithreading(self, **kwargs): for path, url in kwargs.items(): # 根据进程任务开启线程  t = threading.Thread(target=self._download, args=(path, url,))  t.start() @retry() def _download(self, path, url): # 同步下载方法 with requests.get(url, headers=self._headers) as r:  if r.status_code == 200:  with open(self._path + path, "wb")as file:   file.write(r.content)  self._ml.append(0) # 每成功一个就往进程通信列表增加一个值  percent = '%.2f' % (len(self._ml) / self._target_num * 100)  print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 显示下载进度  else:  print(path, r.status_code, r.reason) def _sub_coroutine(self, **kwargs): tasks = [] for path, url in kwargs.items(): # 根据进程任务创建协程任务列表  tasks.append(asyncio.ensure_future(self._async_download(path, url))) loop = asyncio.get_event_loop() # 创建异步事件循环 loop.run_until_complete(asyncio.wait(tasks)) # 注册任务列表 async def _async_download(self, path, url): # 异步下载方法 async with aiohttp.ClientSession() as session:  async with session.get(url, headers=self._headers) as resp:  try:   assert resp.status == 200, "E" # 断言状态码为200,否则抛异常,触发重试装饰器   with open(self._path + path, "wb")as file:   file.write(await resp.read())  except Exception as e:   print(e)  else:   self._ml.append(0) # 每成功一个就往进程通信列表增加一个值   percent = '%.2f' % (len(self._ml) / self._target_num * 100)   print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 显示下载进度 def _combine(self): # 组合资源方法 try:  print("开始组合资源...")  identification = str(floor(time.time()))  exec_str = r'copy /b "' + self._path + r'*.ts" "' + self._path + 'video' + identification + '.mp4"'  os.system(exec_str) # 使用cmd命令将资源整合  exec_str = r'del "' + self._path + r'*.ts"'  os.system(exec_str) # 删除原来的文件 except:  print("资源组合失败") else:  print("资源组合成功!") def _judge_over(self): # 判断是否全部下载完成 if len(self._ml) == len(self._target_url):  return True return False@time_statisticsdef app(): multiprocessing.freeze_support() url = input("输入嗅探网址:\n") m3u8 = M3U8Download() m3u8.sniffing(url) # m3u8.analysis(url)if __name__ == "__main__": app()

这里是两个装饰器的实现:

import timedef time_statistics(fun): def function_timer(*args, **kwargs): t0 = time.time() result = fun(*args, **kwargs) t1 = time.time() print("Total time running %s: %s seconds" % (fun.__name__, str(t1 - t0))) return result return function_timerdef retry(retries=3): def _retry(fun): def wrapper(*args, **kwargs):  for _ in range(retries):  try:   return fun(*args, **kwargs)  except Exception as e:   print("@", fun.__name__, "->", e) return wrapper return _retry

打包成exe文件

使用PyInstaller -F download.py将程序打包成单个可执行文件.
这里需要注意一下,因为程序含有多进程,需要在执行前加一句multiprocessing.freeze_support(),不然程序会反复执行多进程前的功能.

关于协程

协程在python/' target='_blank'>python3.5进化到了async await版本,用 async 标记异步方法,在异步方法里对耗时操作使用await标记.这里使用了一个进程驱动协程的方法,在进程池创建多个协程任务,使用asyncio.get_event_loop()创建协程事件循环,使用run_until_complete()注册协程任务,asyncio.wait()方法接收一个任务列表进行协程注册.

关于装饰器

装饰器源于闭包原理,这里使用了两种装饰器.

@time_statistics:统计耗时,装饰器自己无参型 @retry():设置重试次数,装饰器自己有参型 按我理解是有参型是将无参型装饰器包含在内部,而调用是加()的,关于(): 不带括号时,调用的是这个函数本身 带括号(此时必须传入需要的参数),调用的是函数的return结果

关于CMD控制台

程序会使用CMD命令来将下载的ts文件合并.
因为CMD默认使用GB2312编码,调用os.system()需要先切换成通用的UTF-8输出,否则系统信息会乱码.
而且使用cmd命令时参数最好加双引号,以避免特殊符号报错.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)