Skip to content

ytdlp服务部署指南

本指南介绍如何在本地或云端(如 AWS Lambda)部署基于 yt-dlp 的视频解析服务。

1. 服务源码

以下为推荐的 Python 服务源码,可直接用于本地或云端部署:

python
import json
import traceback
import yt_dlp

def _build_format_entry(f: dict) -> dict:
    return {
        'format_id': f.get('format_id'),
        'url': f.get('url'),
        'ext': f.get('ext'),
        'resolution': f.get('resolution') or f.get('height'),
        'width': f.get('width'),
        'height': f.get('height'),
        'fps': f.get('fps'),
        'vcodec': f.get('vcodec'),
        'acodec': f.get('acodec'),
        'abr': f.get('abr'),
        'vbr': f.get('vbr'),
        'tbr': f.get('tbr'),
        'filesize': f.get('filesize') or f.get('filesize_approx'),
        'protocol': f.get('protocol'),
        'language': f.get('language'),
        'format_note': f.get('format_note'),
    }


def resolve(url: str) -> dict:
    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
        'noplaylist': True,
        'format': 'bestvideo+bestaudio/best',
        'skip_download': True,
        'writesubtitles': True,
        'writeautomaticsub': True,
        'cookiefile': '/tmp/cookie.txt',
        'http_headers': {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-us,en;q=0.5',
            'Sec-Fetch-Mode': 'navigate',
        }
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)

        if 'requested_formats' in info:
            best = [_build_format_entry(f) for f in info['requested_formats']]
        else:
            best = [_build_format_entry(info)]

        videos, audios, combined = [], [], []
        for f in info.get('formats') or []:
            entry = _build_format_entry(f)
            vc = f.get('vcodec', 'none')
            ac = f.get('acodec', 'none')
            has_video = vc and vc != 'none'
            has_audio = ac and ac != 'none'
            if has_video and has_audio:
                combined.append(entry)
            elif has_video:
                videos.append(entry)
            elif has_audio:
                audios.append(entry)

        subtitles = {}
        for lang, subs in (info.get('subtitles') or {}).items():
            subtitles[lang] = [
                {'ext': s.get('ext'), 'url': s.get('url'), 'name': s.get('name')}
                for s in subs
            ]

        auto_subtitles = {}
        for lang, subs in (info.get('automatic_captions') or {}).items():
            auto_subtitles[lang] = [
                {'ext': s.get('ext'), 'url': s.get('url'), 'name': s.get('name')}
                for s in subs
            ]

        danmaku_url = None
        for lang, subs in (info.get('subtitles') or {}).items():
            if 'danmaku' in lang.lower():
                danmaku_url = subs[0].get('url') if subs else None
                break
        comments = info.get('comments')  # list[dict] or None

        thumbnails = [
            {'url': t.get('url'), 'width': t.get('width'), 'height': t.get('height'), 'id': t.get('id')}
            for t in (info.get('thumbnails') or [])
        ]

        chapters = [
            {'title': c.get('title'), 'start_time': c.get('start_time'), 'end_time': c.get('end_time')}
            for c in (info.get('chapters') or [])
        ]

        return {
            'title': info.get('title'),
            'description': info.get('description'),
            'thumbnail': info.get('thumbnail'),
            'thumbnails': thumbnails,
            'duration': info.get('duration'),
            'uploader': info.get('uploader'),
            'upload_date': info.get('upload_date'),
            'view_count': info.get('view_count'),
            'like_count': info.get('like_count'),
            'webpage_url': info.get('webpage_url'),
            'best': best,
            'videos': videos,
            'audios': audios,
            'combined': combined,
            'subtitles': subtitles,
            'auto_subtitles': auto_subtitles,
            'danmaku_url': danmaku_url,
            'comments': comments,
            'chapters': chapters,
        }


def handler(event, context):
    params = event.get('queryStringParameters') or {}
    body = event.get('body')
    if body:
        params = json.loads(body)
    url = params.get('url', '').strip()
    print(f"URL: {url}")
    cookies = params.get('cookies', '').strip()
    if cookies:
        with open('/tmp/cookie.txt', 'w') as f:
            import base64
            cookies = base64.b64decode(cookies).decode('utf-8')
            f.write(cookies)

    if not url:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Missing url parameter'}),
        }

    try:
        result = resolve(url)
        print(f"Result: \n{json.dumps(result, ensure_ascii=False)}")
        return {
            'statusCode': 200,
            'body': json.dumps(result, ensure_ascii=False),
        }
    except Exception as e:
        traceback.print_exc()
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)}),
        }



if __name__ == '__main__':
    import os
    from http.server import BaseHTTPRequestHandler, HTTPServer
    from urllib.parse import urlparse, parse_qs

    class VideoResolverHandler(BaseHTTPRequestHandler):
        def do_POST(self):
            body_length = int(self.headers.get('Content-Length', 0))
            body = self.rfile.read(body_length).decode('utf-8') if body_length > 0 else None
            self.do_GET(body=body)

        def do_GET(self, body=None):
            parsed_url = urlparse(self.path)
            query_params = parse_qs(parsed_url.query)
            
            event = {
                'queryStringParameters': {k: v[0] for k, v in query_params.items()},
                'body': body
            }

            response = handler(event, None)

            self.send_response(response['statusCode'])
            self.send_header('Content-Type', 'application/json; charset=utf-8')
            self.end_headers()
            self.wfile.write(response['body'].encode('utf-8'))
        
    port = int(os.environ.get('PORT', 8080))
    print(f"Server starting on http://localhost:{port}")
    server = HTTPServer(('0.0.0.0', port), VideoResolverHandler)
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        server.server_close()

2. 本地部署

  1. 安装依赖:
    bash
    pip install 'yt-dlp[default,curl-cffi]'
  2. 运行服务:
    bash
    python ytdlp_service.py
  3. 访问接口: 打开浏览器访问 http://localhost:8080/?url=<视频页面URL>

3. 云端部署(如 AWS Lambda)

  • 可将上述代码作为 Lambda 函数上传,入口为 handler
  • 需在部署包中包含 yt-dlp 依赖。
  • 通过 API Gateway 触发,参数为 ?url=<视频页面URL>

  • 支持 cookie.txt 配置,适用于需要登录的站点。
  • 支持解析视频、音频、字幕、弹幕等多种资源。
  • 也可根据实际需求修改源码。

如有问题欢迎反馈。