Skip to content

ytdlp Service Deployment Guide

This guide explains how to deploy a video parsing service based on yt-dlp either locally or on the cloud (such as AWS Lambda).

1. Service Source Code

Below is a recommended Python service implementation, ready for local or cloud deployment:

python
import json
import traceback
import yt_dlp

def _build_format_entry(f: dict) -> dict:
    return {
        'format_id': f.get('format_id'),
        'url': f.get('url'),
        'ext': f.get('ext'),
        'resolution': f.get('resolution') or f.get('height'),
        'width': f.get('width'),
        'height': f.get('height'),
        'fps': f.get('fps'),
        'vcodec': f.get('vcodec'),
        'acodec': f.get('acodec'),
        'abr': f.get('abr'),
        'vbr': f.get('vbr'),
        'tbr': f.get('tbr'),
        'filesize': f.get('filesize') or f.get('filesize_approx'),
        'protocol': f.get('protocol'),
        'language': f.get('language'),
        'format_note': f.get('format_note'),
    }


def resolve(url: str) -> dict:
    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
        'noplaylist': True,
        'format': 'bestvideo+bestaudio/best',
        'skip_download': True,
        'writesubtitles': True,
        'writeautomaticsub': True,
        'cookiefile': '/tmp/cookie.txt',
        'http_headers': {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-us,en;q=0.5',
            'Sec-Fetch-Mode': 'navigate',
        }
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)

        if 'requested_formats' in info:
            best = [_build_format_entry(f) for f in info['requested_formats']]
        else:
            best = [_build_format_entry(info)]

        videos, audios, combined = [], [], []
        for f in info.get('formats') or []:
            entry = _build_format_entry(f)
            vc = f.get('vcodec', 'none')
            ac = f.get('acodec', 'none')
            has_video = vc and vc != 'none'
            has_audio = ac and ac != 'none'
            if has_video and has_audio:
                combined.append(entry)
            elif has_video:
                videos.append(entry)
            elif has_audio:
                audios.append(entry)

        subtitles = {}
        for lang, subs in (info.get('subtitles') or {}).items():
            subtitles[lang] = [
                {'ext': s.get('ext'), 'url': s.get('url'), 'name': s.get('name')}
                for s in subs
            ]

        auto_subtitles = {}
        for lang, subs in (info.get('automatic_captions') or {}).items():
            auto_subtitles[lang] = [
                {'ext': s.get('ext'), 'url': s.get('url'), 'name': s.get('name')}
                for s in subs
            ]

        danmaku_url = None
        for lang, subs in (info.get('subtitles') or {}).items():
            if 'danmaku' in lang.lower():
                danmaku_url = subs[0].get('url') if subs else None
                break
        comments = info.get('comments')  # list[dict] or None

        thumbnails = [
            {'url': t.get('url'), 'width': t.get('width'), 'height': t.get('height'), 'id': t.get('id')}
            for t in (info.get('thumbnails') or [])
        ]

        chapters = [
            {'title': c.get('title'), 'start_time': c.get('start_time'), 'end_time': c.get('end_time')}
            for c in (info.get('chapters') or [])
        ]

        return {
            'title': info.get('title'),
            'description': info.get('description'),
            'thumbnail': info.get('thumbnail'),
            'thumbnails': thumbnails,
            'duration': info.get('duration'),
            'uploader': info.get('uploader'),
            'upload_date': info.get('upload_date'),
            'view_count': info.get('view_count'),
            'like_count': info.get('like_count'),
            'webpage_url': info.get('webpage_url'),
            'best': best,
            'videos': videos,
            'audios': audios,
            'combined': combined,
            'subtitles': subtitles,
            'auto_subtitles': auto_subtitles,
            'danmaku_url': danmaku_url,
            'comments': comments,
            'chapters': chapters,
        }


def handler(event, context):
    params = event.get('queryStringParameters') or {}
    body = event.get('body')
    if body:
        params = json.loads(body)
    url = params.get('url', '').strip()
    print(f"URL: {url}")
    cookies = params.get('cookies', '').strip()
    if cookies:
        with open('/tmp/cookie.txt', 'w') as f:
            import base64
            cookies = base64.b64decode(cookies).decode('utf-8')
            f.write(cookies)

    if not url:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Missing url parameter'}),
        }

    try:
        result = resolve(url)
        print(f"Result: \n{json.dumps(result, ensure_ascii=False)}")
        return {
            'statusCode': 200,
            'body': json.dumps(result, ensure_ascii=False),
        }
    except Exception as e:
        traceback.print_exc()
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)}),
        }



if __name__ == '__main__':
    import os
    from http.server import BaseHTTPRequestHandler, HTTPServer
    from urllib.parse import urlparse, parse_qs

    class VideoResolverHandler(BaseHTTPRequestHandler):
        def do_POST(self):
            body_length = int(self.headers.get('Content-Length', 0))
            body = self.rfile.read(body_length).decode('utf-8') if body_length > 0 else None
            self.do_GET(body=body)

        def do_GET(self, body=None):
            parsed_url = urlparse(self.path)
            query_params = parse_qs(parsed_url.query)
            
            event = {
                'queryStringParameters': {k: v[0] for k, v in query_params.items()},
                'body': body
            }

            response = handler(event, None)

            self.send_response(response['statusCode'])
            self.send_header('Content-Type', 'application/json; charset=utf-8')
            self.end_headers()
            self.wfile.write(response['body'].encode('utf-8'))
        
    port = int(os.environ.get('PORT', 8080))
    print(f"Server starting on http://localhost:{port}")
    server = HTTPServer(('0.0.0.0', port), VideoResolverHandler)
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        server.server_close()

2. Local Deployment

  1. Install dependencies:
    bash
    pip install 'yt-dlp[default,curl-cffi]'
  2. Run the service:
    bash
    python ytdlp_service.py
  3. Access the API: Open your browser and visit http://localhost:8080/?url=<video_page_URL>

3. Cloud Deployment (e.g. AWS Lambda)

  • Upload the above code as a Lambda function, with entry point handler.
  • Make sure to include the yt-dlp dependency in your deployment package.
  • Trigger via API Gateway, with parameter ?url=<video_page_URL>.

  • Supports cookie.txt for sites requiring login.
  • Supports parsing video, audio, subtitles, danmaku, and more.
  • You can modify the code as needed for your use case.

For questions or feedback, please contact us.