您现在的位置是:首页 > 编程 > 

【自用】Python爬虫学习(六):通过mu8文件下载ts文件并合并为.mp4文件

2025-07-27 19:21:54
Python爬虫学习(六) 下载视频(简单版)的步骤介绍第一步:在网页上到.mu8文件第二步:通过.mu8文件下载对应的.ts视频文件第三步:依据.mu8文件合并.ts文件为一个.mp4文件 下载视

Python爬虫学习(六)

  • 下载视频(简单版)的步骤介绍
    • 第一步:在网页上到.mu8文件
    • 第二步:通过.mu8文件下载对应的.ts视频文件
    • 第三步:依据.mu8文件合并.ts文件为一个.mp4文件
  • 下载视频(复杂版)
  • 下载ts文件并合并为mp4
    • 使用单线程
    • 使用异步协程


下载视频(简单版)的步骤介绍


<video src=	视频.mp4></video:>
一般的视频网站是怎么做的?
用户上传->转码(把视频做处理,2K,1080,标清)->切片处理(把单个的文件进行拆分,形成众多的.ts文件)

需要一个文件记录:1.视频播放顺序,2.视频存放的路径,这个文件就是mu
mu以utf-8编码存储就是mu8文件,本质就是一个文本文件。
MU8 txt json =>文本

想要抓取一个视频:
1.到 mu8(各种手段)
2.通过 mu8下载到 ts文件(这里先不管.ts是否被加密)
.通过各种手段(不仅是编程手段)把ts文件合并为一个mp4文件

第一步:在网页上到.mu8文件

(这里假设网页对应的.mu8文件没有进行jia mi、隐藏等处理,真实的.mu8文件下载链接直接就在页面源码中)

import requests
import re

# 第一步,下载mu8文件
headers = {
    User-Agent: Mozilla/5.0 (Windows T 10.0; Win64; x64) AppleWebKit/57.6 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/57.6 Edg/127.0.0.0
}

# 测试网址,已经不能正常打开
url = https://www.91kanju/vod-play/54812-1-1.html  
# 用来提取到mu8的url地址的预加载正则表达式,需要根据具体网页情况编写合适的表达式
obj = re.compile(rurl: 	(?P<url>.*?)	,, re.S)  

resp = requests.get(url)
mu8_url = obj.search(resp.text).group(url)  # 拿到mu8的地址

# print(mu8_url)
resp.close()

# 下载mu8文件
resp2 = requests.get(mu8_url, headers=headers)
with open(, mode=wb) as f:
    f.write(resp2.content)

resp2.close()
print(下载完毕)
第二步:通过.mu8文件下载对应的.ts视频文件

上一步的网址不能打开,得不到对应的.mu8文件,可以直接用下面的.mu8文件进行测试。
测试链接:https://upyun.luckly-mjw/Assets/media-source/example/media/
打开下载好的.mu8文件如下所示,不带#的行就是.ts视频文件的下载地址,只需要对其进行发送请求就能下载得到对应的.ts视频文件。
注意:这里演示的.mu8比较特殊,直接就是完整的下载链接,大部分只有部分文件名,需要根据网页通过一些手段到对应的网站域名或者网址前缀。

# 第二步,解析mu8文件
with open(data_file/, mode=r, encoding=utf-8) as f:
    for line in f:
        line = line.strip()  # 先去掉空格,空白,换行符
        if line.startswith(#):  # 如果以#开头,跳过这一行
            continue

        # print(line)
        ts_name = line.split(	/	)[-1]  # 

        resp = requests.get(line)
        f = open(fdata_file/video/{ts_name}, mode=wb)
        f.write(resp.content)
        resp.close()
        print(f{ts_name},下载成功!)
第三步:依据.mu8文件合并.ts文件为一个.mp4文件

注意:
1、.ts文件的名称要与.mu8文件记录的名称一样才可以利用下面的代码进行合并。
2、这个合并代码仅适用于Windows系统,合并二进制文件使用的是copy命令,对于mac系统应该使用cat命令,具体细节请“百度”。

import os
import subprocess


def merge_ts_to_mp4(mu8_file, ts_folder_path, merge_video_name):
    # 检查mu8文件路径是否存在
    if not os.path.isfile(mu8_file):
        print(f错误:mu8文件 	{mu8_file}	 不存在!)
        return

    # 检查ts文件夹路径是否存在
    if not os.path.isdir(ts_folder_path):
        print(f错误:TS文件夹 	{ts_folder_path}	 不存在!)
        return

    lst = []
    try:
        with open(mu8_file, mode=	r	, encoding=	utf-8	) as f:
            for line in f:
                if line.startswith(	#	):
                    continue

                line = line.strip()  # 去掉空格和换行
                ts_name = line.split(	/	)[-1]  # 提取文件名

                ts_path = os.path.join(ts_folder_path, ts_name)  # 构建完整路径
                # 检查每个ts文件是否存在
                if os.path.isfile(ts_path):
                    lst.append(ts_path)
                else:
                    print(f警告:TS文件 	{ts_path}	 不存在,将被跳过。)

        if not lst:
            print(没有有效的TS文件可供合并。)
            return

        temp_output_path = os.path.join(ts_folder_path, 	temp_	)  # 临时文件路径
        total_ts_files = len(lst)

        print(开始合并视频文件...)
        for index, ts_file in enumerate(lst):
            command = f	copy /b {temp_output_path}  {ts_file} {temp_output_path}	 if os.path.exists(
                temp_output_path) else f	copy /b {ts_file} {temp_output_path}	
            result = subprocess.run(command, shell=True, stdout=subprocess.DEVULL, stderr=subprocess.DEVULL)

            if result.returncode == 0:
                percentage = (index  1) * 100 / total_ts_files
                print(f{percentage:.2f}% - 已合并: {index  1}/{total_ts_files})
            else:
                print(f合并失败:{ts_file},跳过该文件。)

        # 最终重命名
        final_output_path = os.path.join(ts_folder_path, merge_video_name)
        os.rename(temp_output_path, final_output_path)  # 重命名临时文件为最终输出文件名

        # 完成合并后提示用户
        print(f所有.ts文件已合并到:\n 	{final_output_path}	。)

    except Exception as e:
        print(f发生异常:{e})


if __name__ == 	__main__	:
    # .mu8文件路径
    mu8_file = r	D:\User_Data\Documents\PycharmProjects\ewFile\data_file\	
    # 从.mu8文件下载的.ts文件的目录路径,该目录下放置下载的众多.ts文件
    ts_folder_path = r	D:\User_Data\Documents\PycharmProjects\ewFile\data_file\video	
    # 最终合并的视频名称,放在与.ts文件相同的目录下
    merge_video_name = 	ts视频合并.mp4	
    merge_ts_to_mp4(mu8_file, ts_folder_path, merge_video_name)

运行结果如下所示:

下载视频(复杂版)

思路:

  1. 拿到主页面的页面源代码,到iframe
  2. 从iframe的页面源代码中拿到mu8文件
  3. 下载第一层mu8文件 -->下载第二层mu8文件(真实的视频存放路径)
  4. 下载视频
  5. 下载mi yao,进行jie mi操作
  6. 合并所有ts文件为一个mp4文件

注意:演示网址的视频没有jia mi,只是yin cang了真实的.mu8下载地址,不需要jie mi,对于需要jie mi的也有代码演示,代码不可直接运行,部分存在问题,实际需要具体情况具体分析。


网页播放地址:
    https://www.555dy16/vodplay/12810-7-2/
iframe对应的网页地址:
    https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDcWE/&dianshiju&next=https://www.555dy16/vodplay/12810-7-/
iframe里面的mu8地址:
    https://vip.kuaikan-cdn4/20240808/s1aDcWE/
抓包里面的mu8地址:
第一个
    https://vip.kuaikan-cdn4/20240808/s1aDcWE/
    预览:
    #EXTMU
    #EXT-X-STREAM-IF:PROGRAM-ID=1,BADWIDTH=9000,RESOLUTIO=1280x720
    /20240808/s1aDcWE/9kb/hls/
第二个
    https://vip.kuaikan-cdn4/20240808/s1aDcWE/9kb/hls/
    预览:
    #EXTMU
    #EXT-X-VERSIO:
    #EXT-X-TARGETDURATIO:2
    #EXT-X-PLAYLIST-TYPE:VOD
    #EXT-X-MEDIA-SEQUECE:0
    #EXTIF:1,
    /20240808/s1aDcWE/9kb/hls/
    #EXTIF:1,
    /20240808/s1aDcWE/9kb/hls/
    #EXTIF:1,
    /20240808/s1aDcWE/9kb/hls/
    ……


import re
import os
import requests
import asyncio
import aiohttp
import aiofiles
import subprocess
from bs4 import BeautifulSoup
from Crypto.Cipher import AES

# 代码想要正常运行需要对一些位置进行适当修改,切勿直接运行!
# 对某网页的视频进行下载

headers = {
    User-Agent: Mozilla/5.0 (Windows T 10.0; Win64; x64) AppleWebKit/57.6 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/57.6 Edg/127.0.0.0
}


def get_iframe_src(url):
    # resp = requests.get(url, headers=headers)
    #  = 	utf-8	
    # print()

    # main_page = BeautifulSoup(, html.parser)
    # iframe_src = main_page.find(	iframe	).get(	src	)

    # 网页原因不能到iframe,先直接给定
    iframe_src = 	https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDcWE/&dianshiju&next=https://www.555dy16/vodplay/12810-7-/	
    print(iframe_src)
    return iframe_src


def get_first_mu8_url(url):
    resp = requests.get(url, headers=headers)
    resp.encoding = 	utf-8	
    # print()
    resp_html = resp.text

    obj = re.compile(r	url: (?P<mu8_url>.*?)	, re.S)

    mu8_url = obj.search(resp_html).group(	mu8_url	)
    print(mu8_url)
    return mu8_url


def download_mu8_file(url, file_name):
    resp = requests.get(url, headers=headers)
    with open(	data_file/	  file_name, mode=	wb	) as f:
        f.write(resp.content)
    print(f	{file_name}	 下载成功!)


async def download_ts(ts_url, ts_name, session):
    async with session.get(ts_url) as resp:
        async with aiofiles.open(f	data_file/video_ts/{ts_name}	, mode=	wb	) as f:
            await f.write(await resp.content.read())  # 下载到的内容写入到文件
    print(f{ts_name} 下载完成!)


async def aio_download(up_url):
    tasks = []
    async with aiohttp.ClientSession() as session:  # 提前准备好session
        async with aiofiles.open(	data_file/	  second_, mode=	r	, encoding=	utf-8	) as f:
            async for line in f:
                if line.startswith(	#	):  # 可能pycharm提示高亮,实际运行没有问题,不必理会
                    continue

                line = line.strip()  # 去掉没用的空格和换行
                # /20240808/s1aDcWE/9kb/hls/
                ts_name = line.split(	/	)[-1]
                # 

                # 拼接得到真正的ts下载路径
                ts_url = up_url  line
                # https://vip.kuaikan-cdn4/20240808/s1aDcWE/9kb/hls/

                task = asyncio.create_task(download_ts(ts_url, ts_name, session))  # 创建任务
                tasks.append(task)

            await asyncio.wait(tasks)  # 等待任务结束


def merge_ts_to_mp4():
    # mac: cat   .ts > xxx mp4
    # windows: copy /b .ts 

    lst = []
    with open(	data_file/	  second_, mode=	r	, encoding=	utf-8	) as f:
        for line in f:
            # line = await line  # 确保获取的是字符串
            if line.startswith(	#	):
                continue

            line = line.strip()  # 去掉没用的空格和换行
            # /20240808/s1aDcWE/9kb/hls/
            ts_name = line.split(	/	)[-1]
            # 

            ts_path = 	data_file/video_ts/	  ts_name  # 构建完整路径
            # data_file/video_ts/

            lst.append(ts_path)

    # Windows系统使用copy命令
    temp_output_path = os.path.join(	data_file/video_ts	, 	temp_	)  # 临时文件路径
    total_ts_files = len(lst)  # 所有的ts文件数量

    print(开始合并.ts视频文件...)
    for index, ts_file in enumerate(lst):
        # 使用加号连接文件名,Windows系统使用copy命令
        command = f	copy /b {temp_output_path}  {ts_file} {temp_output_path}	 if os.path.exists(
            temp_output_path) else f	copy /b {ts_file} {temp_output_path}	

        result = subprocess.run(command, shell=True, stdout=subprocess.DEVULL, stderr=subprocess.DEVULL)

        if result.returncode == 0:
            percentage = (index  1) * 100 / total_ts_files
            print(f{percentage:.2f}% - 已合并: {index  1}/{total_ts_files})
        else:
            print(f合并失败:{ts_file},跳过该文件。)

    # 最终重命名
    final_output_path = os.path.join(	data_file/video_ts	, 		)
    os.rename(temp_output_path, final_output_path)  # 重命名临时文件为最终输出文件名


def main(url):
    # 1.拿到主页面的页面源代码,到iframe对应的url
    iframe_src = get_iframe_src(url)
    # https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDcWE/&dianshiju&next=https://www.555dy16/vodplay/12810-7-/

    # 2.拿到第一层的mu8文件的下载地址,看具体情况对拿到的地址进行拼接处理
    first_mu8_url = get_first_mu8_url(iframe_src)
    # https://vip.kuaikan-cdn4/20240808/s1aDcWE/
    mu8_domain = first_mu8_url.split(		)[0]  		

    # .1.下载第一层mu8文件
    first_txt_filename = first_
    download_mu8_file(first_mu8_url, first_txt_filename)

    # .2.下载第二层mu8文件
    with open(	data_file/	  first_txt_filename, mode=	r	, encoding=	utf-8	) as f:
        for line in f:
            if line.startswith(	#	):
                continue
            else:
                # 去掉空白或者换行符
                line = line.strip()  # /20240808/s1aDcWE/9kb/hls/
                # 拼接第二层mu8的下载地址
                second_mu8_url = mu8_domain  line
                # https://vip.kuaikan-cdn4/20240808/s1aDcWE/9kb/hls/

                print(second_mu8_url)
                # 下载第二层mu8文件
                second_txt_filename = second_
                download_mu8_file(second_mu8_url, second_txt_filename)

    # 4.下载视频
    ts_domain_url = 	https://vip.kuaikan-cdn4	
    # 异步协程
    asyncio.run(aio_download(ts_domain_url))

    # =======================这一部分看网站mu8文件具体情况,是否需要jie mi==========================
    # 关注.mu8文件是否包含这一行:#EXT-X-KEY:METHOD=AES-128,URI=Key.Key,
    # 有代表不能直接对下载的.ts文件进行合并,合并前需要对.ts文件进行jie mi,对jie mi后的.ts文件进行合并
    # 5.1 拿到mi yao  (后面内容仅做示范,代码不可运行,需要具体情况具体分析,为了方便理顺流程这部分函数与代码直接写在一起)
    def get_key(url):
        resp = requests.get(url)
        # print()  # c5878c26baaaac8c,会得到诸如注释类似格式的文本
        return resp.text

    key_url = 	https://vip.kuaikan-cdn4/……/key.key	  # 要从mu8文件里去获取
    key = get_key(key_url)

    # 5.2 jie mi(要对下载的每一个.ts文件进行解密,需要使用异步协程提高效率)
    async def dec_ts(ts_name, key):
        aes = AES.new(key=key, IV=b0000000000000000, mode=AES.MODE_CBC)
        async with aiofiles.open(f	data_file/video_ts/{ts_name}	, mode=rb) as f1, \
                aiofiles.open(f	data_file/video_ts/temp_{ts_name}	, mode=wb) as f2:
            bs = await f1.read()  # 从源文件读取内容
            await f2.write(aes.decrypt(bs))  # 把解密好的内容写入文件
        print(f	temp_{ts_name} 处理完毕!	)

    async def aio_dec(key):
        # jie mi
        tasks = []
        async with aiofiles.open(	data_file/	  second_, mode=	r	, encoding=	utf-8	) as f:
            async for line in f:
                # line = await line  # 确保获取的是字符串
                if line.startswith(	#	):
                    continue

                line = line.strip()  # 去掉没用的空格和换行
                # /20240808/s1aDcWE/9kb/hls/
                ts_name = line.split(	/	)[-1]
                # 

                # 开始创建异步任务
                task = asyncio.create_task(dec_ts(ts_name, key))
                tasks.append(task)

            await asyncio.wait(tasks)  # 等待任务结束

        pass

    asyncio.run(aio_dec(key))

    # ======================================================================================
    # 6.合并ts文件
    merge_ts_to_mp4()  # 合并ts文件为mp4文件


# 主程序
if __name__ == 	__main__	:
    url = 	https://www.555dy16/vodplay/12810-7-2/	
    main(url)
    print(所有文件下载完毕!)

下载ts文件并合并为mp4

简单记录一下如何获取想要的mu8与ts文件的请求地址:


注意

  • 两种方式最主要的区别在于下载ts文件的速度,经测试,对于一个合并为mp4后,大小1.6G左右的文件(ts文件数量超过1800个),仅下载所有ts文件速度而言,使用单线程下载耗时会超过1个小时,而使用异步协程下载耗时仅不到2分钟,差别巨大。
  • 经测试,有部分网址使用异步协程进行下载速度反而更慢甚至会报错,应该是加了fan pa机制,对于这类可以使用单线程进行下载。
  • 两种方式的ts合并为mp4的函数一样,对于ts数量较多的情况耗时较长,有待进一步优化。
使用单线程
import os
import time
import requests
import subprocess
from tqdm import tqdm

headers = {
    User-Agent: Mozilla/5.0 (Windows T 10.0; Win64; x64) AppleWebKit/57.6 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/57.6 Edg/127.0.0.0
}


def download_mu8(mu8_name, mu8_url, files_save_path):
    # 下载mu8文件
    resp = requests.get(mu8_url, headers=headers)
    if resp.ok:
        if not mu8_name.endswith(	.mu8	):
            mu8_name = 	.mu8	
        mu8_file = os.path.join(files_save_path, mu8_name)
        print(	1.开始下载mu8文件...	)
        with open(mu8_file, mode=wb) as f:
            f.write(resp.content)
        resp.close()
        # print(	-------------------->.mu8文件下载完毕!<--------------------	)
        return mu8_file
    else:
        print(f{mu8_name} 文件下载失败!服务器响应码:{resp.status_code})
        return one


def download_ts(mu8_file_path, ts_url_prefix):
    # 获取mu8文件所在的文件夹路径,将下载的ts文件放到与mu8文件相同的目录下
    mu8_folder_path = os.path.dirname(mu8_file_path)

    if ts_url_prefix[-1] != 	/	:
        ts_url_prefix = ts_url_prefix  	/	

    ts_url_lst = []
    with open(mu8_file_path, mode=r, encoding=utf-8) as f:
        for line in f:
            line = line.strip()
            if line.startswith(#):  # 以#开头,跳过这行
                continue
            ts_name = line.split(	/	)[-1]  # 
            ts_url = ts_url_prefix  ts_name
            ts_url_lst.append(ts_url)
    # 所有的.ts文件数量
    total_ts_url = len(ts_url_lst)
    print(	2.开始下载ts文件...	)
    with tqdm(total=total_ts_url, desc=	下载进度	, unit=	个文件	) as pro_bar:
        for index, ts_url in enumerate(ts_url_lst):
            resp_ts = requests.get(ts_url, headers=headers)
            if resp_ts.ok:
                ts_name = ts_url.split(	/	)[-1]
                ts_file = os.path.join(mu8_folder_path, ts_name)

                with open(ts_file, mode=wb) as f:
                    f.write(resp_ts.content)
                resp_ts.close()
                pro_bar.update(1)  # 更新进度条
            else:
                print(f下载失败:{ts_url})
                continue



def merge_ts_to_mp4(mu8_file, ts_folder_path, merge_video_name, delete_ts_flag=False):
    # 检查mu8文件路径是否存在
    if not os.path.isfile(mu8_file):
        print(f错误:mu8文件 	{mu8_file}	 不存在!)
        return

        # 检查ts文件夹路径是否存在
    if not os.path.isdir(ts_folder_path):
        print(f错误:TS文件夹 	{ts_folder_path}	 不存在!)
        return

    lst = []
    try:
        with open(mu8_file, mode=	r	, encoding=	utf-8	) as f:
            for line in f:
                if line.startswith(	#	):
                    continue

                line = line.strip()  # 去掉空格和换行
                ts_name = line.split(	/	)[-1]  # 提取文件名

                ts_path = os.path.join(ts_folder_path, ts_name)  # 构建完整路径
                # 检查每个ts文件是否存在
                if os.path.isfile(ts_path):
                    lst.append(ts_path)
                else:
                    print(f警告:TS文件 	{ts_path}	 不存在,将被跳过。)

        if not lst:
            print(没有有效的TS文件可供合并。)
            return

        temp_output_path = os.path.join(ts_folder_path, 	temp_	)  # 临时文件路径
        total_ts_files = len(lst)

        print(.开始合并视频文件...)
        # 使用 tqdm 显示进度条
        with tqdm(total=total_ts_files, desc=	合并进度	, unit=	个文件	) as progress_bar:
            for index, ts_file in enumerate(lst):
                command = f	copy /b {temp_output_path}  {ts_file} {temp_output_path}	 if os.path.exists(
                    temp_output_path) else f	copy /b {ts_file} {temp_output_path}	
                result = subprocess.run(command, shell=True, stdout=subprocess.DEVULL, stderr=subprocess.DEVULL)

                if result.returncode == 0:
                    progress_bar.update(1)  # 更新进度条
                else:
                    print(f合并失败:{ts_file},跳过该文件。)

        # 最终重命名
        if not merge_video_name.endswith(	.mp4	):
            merge_video_name = 	.mp4	

        final_output_path = os.path.join(ts_folder_path, merge_video_name)
        os.rename(temp_output_path, final_output_path)  # 重命名临时文件为最终输出文件名

        # 完成合并后提示用户
        print(f-------------------->所有.ts文件已合并到:<--------------------\n 	{final_output_path}	。)

        if delete_ts_flag == True:
            # 删除ts文件与mu8文件
            for file in os.listdir(ts_folder_path):
                if file.endswith(.ts) or file.endswith(.mu8):
                    file_path = os.path.join(ts_folder_path, file)
                    os.remove(file_path)
            print(	已删除文件夹下的ts与mu8文件!	)

    except Exception as e:
        print(f发生异常:{e})


def main(mu8_url, files_save_path, video_name):
    # 1.下载mu8文件
    start0 = time.time()
    mu8_file_path = download_mu8(video_name, mu8_url, files_save_path)

    # 2.下载ts文件
    start1 = time.time()
    # 通过抓包工具获取的某个具体的的请求url,结合mu8文件的ts名称可以得出的所有ts文件url下载地址
    ts_url_prefix = mu8_url[:mu8_url.rfind(	/	)  1]
    download_ts(mu8_file_path, ts_url_prefix)
    end1 = time.time()

    minute = int((end1 - start1) / 60)
    second = round((end1 - start1) - minute * 60, 2)
    print(f下载ts耗时⌛:{minute} m {second} s)

    # .合并ts文件
    start2 = time.time()
    ts_folder_path = os.path.dirname(mu8_file_path)
    merge_ts_to_mp4(mu8_file_path, ts_folder_path, video_name, delete_ts_flag=True)
    end2 = time.time()

    minute = int((end2 - start2) / 60)
    second = round((end2 - start2) - minute * 60, 2)
    print(f合并ts耗时⌛:{minute} m {second} s)

    total_minute = int((end2 - start0) / 60)
    total_second = round((end2 - start0) - total_minute * 60, 2)
    print(f累计耗时⏱:{total_minute} m {total_second} s)


if __name__ == 	__main__	:
    # 通过抓包工具获取的mu8_url地址
    mu8_url = 	https:///ts/20240811/kAtnpIai/mp4//	
    # 下载的文件保存路径
    files_save_path = r	D:\User_Data\Videos\Movies_Download\测试	
    # 下载的视频名称,作为下载的.mu8文件名、最终合并的.mp4文件名,不用加任何后缀
    video_name = 	某逆 49	

    # 执行下载视频程序
    main(mu8_url, files_save_path, video_name)

使用异步协程
import os
import time
import asyncio
import aiohttp
import subprocess
from tqdm import tqdm

# 请求头
headers = {
    User-Agent: Mozilla/5.0 (Windows T 10.0; Win64; x64) AppleWebKit/57.6 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/57.6 Edg/127.0.0.0
}


async def download_mu8(mu8_name, mu8_url, files_save_path):
    async with aiohttp.ClientSession() as session:
        async with session.get(mu8_url, headers=headers) as resp:
            if resp.status == 200:
                if not mu8_name.endswith(	.mu8	):
                    mu8_name = 	.mu8	
                mu8_file = os.path.join(files_save_path, mu8_name)
                print(	1.开始下载mu8文件...	)
                with open(mu8_file, mode=wb) as f:
                    f.write(await resp.read())
                print(	-------------------->.mu8文件下载完毕!<--------------------	)
                return mu8_file
            else:
                print(f{mu8_name} 文件下载失败!服务器响应码:{resp.status})
                return one


async def download_ts(session, ts_url, ts_folder_path, retries=):
    for attempt in range(retries):
        try:
            async with session.get(ts_url, headers=headers) as resp_ts:
                resp_ts.raise_for_status()  # 抛出 HTTPError
                ts_name = ts_url.split(	/	)[-1]
                ts_file = os.path.join(ts_folder_path, ts_name)
                with open(ts_file, mode=wb) as f:
                    f.write(await resp_ts.read())
                return True
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt < retries - 1:
                print(f下载失败,正在重试...({attempt  1}) {ts_url})
                await asyncio.sleep(1)  # 等待一秒再重试
            else:
                print(f下载失败,已达到最大重试次数: {ts_url},错误: {e})
                return False


async def download_all_ts(mu8_file_path, ts_url_prefix):
    mu8_folder_path = os.path.dirname(mu8_file_path)

    # 生成所有TS文件的URL
    ts_url_lst = []
    with open(mu8_file_path, mode=r, encoding=utf-8) as f:
        for line in f:
            line = line.strip()
            if line.startswith(#):
                continue
            ts_name = line.split(	/	)[-1]
            ts_url = ts_url_prefix  ts_name
            ts_url_lst.append(ts_url)

    total_ts_url = len(ts_url_lst)
    print(	2.开始下载ts文件...	)

    # 使用 asyncio.Semaphore 控制并发数
    semaphore = asyncio.Semaphore(5)  # 限制同时请求的数量

    async with aiohttp.ClientSession() as session:
        tasks = []
        for ts_url in ts_url_lst:
            task = download_ts(session, ts_url, mu8_folder_path)
            tasks.append(task)

        with tqdm(total=total_ts_url, desc=	下载进度	, unit=	个文件	) as pro_bar:
            for result in await asyncio.gather(*tasks):
                if result:
                    pro_bar.update(1)


def merge_ts_to_mp4(mu8_file, ts_folder_path, merge_video_name, delete_ts_flag=False):
    # 检查mu8文件路径是否存在
    if not os.path.isfile(mu8_file):
        print(f错误:mu8文件 	{mu8_file}	 不存在!)
        return

    # 检查ts文件夹路径是否存在
    if not os.path.isdir(ts_folder_path):
        print(f错误:TS文件夹 	{ts_folder_path}	 不存在!)
        return

    lst = []
    try:
        with open(mu8_file, mode=	r	, encoding=	utf-8	) as f:
            for line in f:
                if line.startswith(	#	):
                    continue

                line = line.strip()  # 去掉空格和换行
                ts_name = line.split(	/	)[-1]  # 提取文件名

                ts_path = os.path.join(ts_folder_path, ts_name)  # 构建完整路径
                # 检查每个ts文件是否存在
                if os.path.isfile(ts_path):
                    lst.append(ts_path)
                else:
                    print(f警告:TS文件 	{ts_path}	 不存在,将被跳过。)

        if not lst:
            print(没有有效的TS文件可供合并。)
            return

        temp_output_path = os.path.join(ts_folder_path, 	temp_	)  # 临时文件路径
        total_ts_files = len(lst)

        print(.开始合并视频文件...)
        # 使用 tqdm 显示进度条
        with tqdm(total=total_ts_files, desc=	合并进度	, unit=	个文件	) as progress_bar:
            for index, ts_file in enumerate(lst):
                command = f	copy /b {temp_output_path}  {ts_file} {temp_output_path}	 if os.path.exists(
                    temp_output_path) else f	copy /b {ts_file} {temp_output_path}	
                result = subprocess.run(command, shell=True, stdout=subprocess.DEVULL, stderr=subprocess.DEVULL)

                if result.returncode == 0:
                    progress_bar.update(1)  # 更新进度条
                else:
                    print(f合并失败:{ts_file},跳过该文件。)

        # 最终重命名
        if not merge_video_name.endswith(	.mp4	):
            merge_video_name = 	.mp4	

        final_output_path = os.path.join(ts_folder_path, merge_video_name)
        os.rename(temp_output_path, final_output_path)  # 重命名临时文件为最终输出文件名

        # 完成合并后提示用户
        print(f-------------------->所有.ts文件已合并到:<--------------------\n 	{final_output_path}	。)

        if delete_ts_flag == True:
            # 删除临时ts文件与mu8文件
            for file in os.listdir(ts_folder_path):
                if file.endswith(.ts) or file.endswith(.mu8):
                    file_path = os.path.join(ts_folder_path, file)
                    os.remove(file_path)
            print(	已删除文件夹下的ts与mu8文件!	)

    except Exception as e:
        print(f发生异常:{e})


def get_non_ad_ts_start_str(mu8_file_path):
    with open(mu8_file_path, mode=r, encoding=utf-8) as f:
        for line in f:
            line = line.strip()
            if line.startswith(#):
                continue
            ts_name = line.split(	/	)[-1]
            ts_name_start_str = ts_name[0:]

            return ts_name_start_str


def remove_ad_ts_files(mu8_file_path, non_ad_ts_start_str):
    ts_folder_path = os.path.dirname(mu8_file_path)
    # 删除广告对应的ts文件
    for file in os.listdir(ts_folder_path):
        if file.endswith(.ts) and not file.startswith(non_ad_ts_start_str):
            ad_ts_file_path = os.path.join(ts_folder_path, file)
            print(ad_ts_file_path)
            os.remove(ad_ts_file_path)
    print(	已删除对应为广告的ts文件!	)


async def main(mu8_url, files_save_path, video_name):
    start_time = time.time()

    # 1. 下载 mu8 文件
    mu8_file_path = await download_mu8(video, mu8_url, files_save_path)
    if not mu8_file_path:
        return

    # 2. 读取 mu8 文件并下载所有 ts 文件
    download_ts_start_time = time.time()
    ts_url_prefix = 	/	.join(mu8_url.split(	/	)[:-1])  	/	
    await download_all_ts(mu8_file_path, ts_url_prefix)
    download_ts_end_time = time.time()
    elapsed_time_ts = download_ts_end_time - download_ts_start_time
    minute_ts = int(elapsed_time_ts / 60)
    second_ts = round(elapsed_time_ts - minute_ts * 60, 2)
    print(f下载ts耗时⌛:{minute_ts} m {second_ts} s)

    # 去除广告(可选),视具体情况而定,去除原理为有用的ts文件名开头字符都一样只有末尾存在差别
    non_ad_ts_start_str = get_non_ad_ts_start_str(mu8_file_path)
    print(f	非广告的ts文件名开头字符:{non_ad_ts_start_str}	)
    remove_ad_ts_files(mu8_file_path, non_ad_ts_start_str)

    # . 合并 ts 文件到 mp4
    merge_ts_to_mp4(mu8_file_path, os.path.dirname(mu8_file_path), video_name, delete_ts_flag=True)
    end_time = time.time()
    elapsed_time = end_time - start_time
    minute = int(elapsed_time / 60)
    second = round(elapsed_time - minute * 60, 2)
    print(f全部过程耗时⏱:{minute} m {second} s)


if __name__ == __main__:
    # 通过抓包工具获取的mu8_url地址
    mu8_url = 	https://vip.ffzy-play6/20240817/28179_cddca21d/2000k/hls/	
    # 下载的文件保存路径
    files_save_path = r	D:\User_Data\Videos\Movies_Download\ceshi	
    # 下载的视频名称,作为下载的.mu8文件名、最终合并的.mp4文件名,不用加任何后缀
    video_name = 	ceshi	

    # 执行下载视频程序
    asyncio.run(main(mu8_url, files_save_path, video_name))

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/biancheng/1207602.html

相关标签:无
上传时间: 2025-07-24 14:23:23
留言与评论(共有 18 条评论)
本站网友 涂博士
9分钟前 发表
# .mu8文件路径 mu8_file = r D
本站网友 抚州房产网
22分钟前 发表
ts_folder_path
本站网友 港口租房
25分钟前 发表
video_name
本站网友 坏死性筋膜炎
10分钟前 发表
ts_folder_path
本站网友 制远志
6分钟前 发表
print(没有有效的TS文件可供合并
本站网友 男生强吻女生的胸
3分钟前 发表
headers=headers) if resp_ts.ok
本站网友 海之元生长激素
20分钟前 发表
temp_ ) # 临时文件路径 total_ts_files = len(lst) print(.开始合并视频文件...) # 使用 tqdm 显示进度条 with tqdm(total=total_ts_files
本站网友 投资收益表
24分钟前 发表
ts_file in enumerate(lst)
本站网友 阴囊湿疹
7分钟前 发表
ts_url_prefix) end1 = time.time() minute = int((end1 - start1) / 60) second = round((end1 - start1) - minute * 60
本站网友 纳斯达克是什么
18分钟前 发表
mode= r
本站网友 360杀毒64位
25分钟前 发表
print(f错误:mu8文件 {mu8_file} 不存在!) return # 检查ts文件夹路径是否存在 if not os.path.isdir(ts_folder_path)
本站网友 宫寒怎么调理
19分钟前 发表
for index
本站网友 陈村租房
23分钟前 发表
mu8_url
本站网友 成都人流医院
24分钟前 发表
files_save_path
本站网友 西安出租房
3分钟前 发表
mode= wb ) as f
本站网友 击鼓声
21分钟前 发表
for index
本站网友 网上占卜
8分钟前 发表
async for line in f