一键下载 M3U8 并转换为 MP4升级版

 之前的下载 M3U8程序,有很多问题, 为此做了一些升级,分享给大家。

  • 增加了存在播放列表的情况处理
  • 播放列表路径和ts路径错误问题
  • 多线程问题
  • 对于电视剧多文件下载的处理

这里从网上找了一部的链接,可以参考这个网站https://www.zuida001.com/


import os
import urllib3
import requests
import subprocess
import m3u8
from urllib import parse
from tenacity import retry, wait_random, stop_after_attempt
import gevent
from gevent.threadpool import ThreadPool

urllib3.disable_warnings()

class M3u8Downloader:
    def __init__(self, pool_size=10):
        self.pool = ThreadPool(pool_size)


    @retry(stop=stop_after_attempt(3), wait=wait_random(2, 5))
    def request(self, url):
        """发送请求"""
        try:
            res = requests.get(url, verify=False, timeout=5)
            return res
        except Exception as e:
            print(url, e)
            raise e


    def download_segment(self, url, file):
        """下载ts文件"""
        if os.path.exists(file):
            return 

        res = self.request(url)
        with open(file, "wb")as f:
            f.write(res.content)


    def convert_mp4(self, path, output, key):
        if not os.path.exists(output):
            # 使用FFmpeg将所有.ts文件合并为一个MP4文件    ffmpeg -allowed_extensions ALL -i index.m3u8 -c copy xxx.mp4
            if key:
                subprocess.call(['ffmpeg', '-allowed_extensions', 'ALL', '-i', 'local.m3u8', '-c', 'copy', output], cwd=path)
            else:
                subprocess.call(['ffmpeg', '-i', 'local.m3u8', '-c', 'copy', output], cwd=path)

    def download_m3u8(self, url, path):
        """下载M3U8文件,有些存在播放列表,默认选择第一个"""
        m3u8_file_name = os.path.join(path, "index.m3u8")
        res = self.request(url)
        with open(m3u8_file_name, "w", encoding="utf-8")as f:
            f.write(res.text)

        # 解析M3U8文件
        m3u8_obj = m3u8.loads(res.text)

        # 如果存在清晰度列表,请求解析清晰度列表
        m3u8_playlist = []
        for playlist in m3u8_obj.playlists:
            uri = parse.urljoin(url, playlist.uri)
            bandwidth = playlist.stream_info.bandwidth
            resolution= playlist.stream_info.resolution

            m3u8_file_name = os.path.join(path, f"{'x'.join([str(i) for i in resolution])}_{bandwidth}.m3u8")
            res = self.request(uri)
            with open(m3u8_file_name, "w", encoding="utf-8")as f:
                f.write(res.text)
            cur_m3u8 = m3u8.loads(res.text)
            cur_m3u8.uri = uri
            m3u8_playlist.append(cur_m3u8)

        if m3u8_playlist:
            # 播放列表默认选择第一个
            return m3u8_playlist[0]
        else:
            return m3u8_obj


    def download(self, url, path, output):
        """
        下载单个m3u8主程序
        url: m3u8链接url
        path: 单个m3u8目录
        output: 转换输出文件路径名
        """
        # 创建目录
        segment_dir = os.path.join(path, "index")
        if not os.path.exists(segment_dir):
            os.makedirs(segment_dir)
        output_dir = os.path.dirname(output)
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # 解析M3U8文件
        m3u8_obj = self.download_m3u8(url, path)

        # 下载key文件 
        for item in m3u8_obj.keys:
            if item:
                key_url = parse.urljoin(url, item.uri)
                key_file_name = os.path.join(path, key_url.split("/")[-1])
                res = self.request(key_url)
                with open(key_file_name, "w", encoding="utf-8")as f:
                    f.write(res.text)

        segments = []
        # 生成新的本地M3U8文件内容
        for index, segment in enumerate(m3u8_obj.segments):
            uri = parse.urljoin(m3u8_obj.uri, segment.uri)
            segments.append(uri)
            # 有些ts文件名过长,对其以序号重新命名
            segment.uri = f"index/{index}.{uri.split('.')[-1]}"

        # 保存M3U8文件
        local_file_name = os.path.join(path, "local.m3u8")
        with open(local_file_name, 'w') as f:
            f.write(m3u8_obj.dumps())

        # 下载ts文件
        for index, url in enumerate(segments):
            file = os.path.join(segment_dir, f"{index}.{url.split('.')[-1]}")
            self.pool.spawn(self.download_segment, url, file)
        gevent.wait()

        # ts文件下载完成, 转换成mp4文件
        if len(segments) == len(os.listdir(segment_dir)):
            self.convert_mp4(path, output, key=[item.uri for item in m3u8_obj.keys if item])


if __name__ == "__main__":
    cur_path = os.path.abspath(os.path.dirname(__file__))
    data = [
        {"name": "凶劫601航班第01集", "url": "https://v4.mstopq.com/202404/11/4cSTt8dMgB7/video/index.m3u8"},
        {"name": "凶劫601航班第02集", "url": "https://v4.mstopq.com/202404/11/JtNttP8HfS7/video/index.m3u8"},
        {"name": "凶劫601航班第03集", "url": "https://v4.mstopq.com/202404/11/b10TUD8C4T7/video/index.m3u8"},
        {"name": "凶劫601航班第04集", "url": "https://v4.mstopq.com/202404/11/eWLuKWqFBL7/video/index.m3u8"},
        {"name": "凶劫601航班第05集", "url": "https://v4.mstopq.com/202404/11/Sj3pwtX0hN7/video/index.m3u8"},
        {"name": "凶劫601航班第06集", "url": "https://v4.mstopq.com/202404/11/i62BN8wMfc7/video/index.m3u8"},
    ]

    for item in data:
        url = item["url"]
        name = item["name"]
        path = os.path.join(cur_path, "凶劫601航班_tmp", f"{name}") 
        output = os.path.join(cur_path, "凶劫601航班", f"{name}.mp4")

        # 因为是多线程下载,可能存在某个线程下载失败的情况, 如果发现下载不完整,没有输出文件,可以尝试重新运行,已经下载过的不会再次下载。
        download = M3u8Downloader(pool_size=20)
        download.download(url, path, output)

 

🌐本文链接:https://wizops.net/archives/202404/324.html(转载时请注明本文出处及文章链接)
⚠️本站部分资源文章出自互联网收集整理,本站不参与制作,如果侵犯了您的合法权益,请联系本站我们会及时删除。
⚠️本站资源仅供研究、学习交流之用,若使用商业用途,请购买正版授权,否则产生的一切后果将由下载用户自行承担。
💌联系方式: [email protected]
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇