Spaces:
Sleeping
Sleeping
| """ | |
| PlotWeaver — Live Commentary Translation Platform (Single File) | |
| ================================================================ | |
| Two engines: Qwen Omni | YourVoic API (with NLLB MT) | |
| """ | |
| import os, io, re, time, base64, struct, shutil, subprocess, tempfile, logging | |
| import torch, numpy as np, requests, soundfile as sf, gradio as gr | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================= | |
| # LANGUAGES | |
| # ============================================================================= | |
| # Qwen Omni voices (work across all Qwen-supported languages) | |
| QWEN_VOICES = [ | |
| "Cherry", "Serena", "Ethan", "Chelsie", "Momo", "Vivian", "Moon", "Maia", | |
| "Kai", "Nofish", "Bella", "Jennifer", "Ryan", "Katerina", "Aiden", | |
| "Eldric Sage", "Mia", "Mochi", "Bellona", "Vincent", "Bunny", "Neil", | |
| "Elias", "Arthur", "Seren", "Bodega", "Sonrisa", "Alek", "Dolce", | |
| "Sohee", "Ono Anna", "Lenn", "Emilien", "Andre", | |
| ] | |
| # Each language entry: | |
| # "Display Name": { | |
| # "nllb": NLLB-200 language code (for local/yourvoic pipeline translation), | |
| # "yourvoic_lang": YourVoic language code (or None), | |
| # "yourvoic_voices": list of YourVoic voice names, | |
| # "tts_engine": "qwen" | "yourvoic" | "local", | |
| # "qwen_code": short language code for Qwen prompts (or None), | |
| # "qwen_name": full language name for Qwen system prompt (or None), | |
| # } | |
| LANGUAGES = { | |
| # ---- Qwen Omni Languages (end-to-end speech-to-speech, 11 languages) ---- | |
| "English": { | |
| "nllb": "eng_Latn", "yourvoic_lang": "en-US", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "en", "qwen_name": "English", | |
| }, | |
| "Chinese (Mandarin)": { | |
| "nllb": "zho_Hans", "yourvoic_lang": "zh-CN", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "zh", "qwen_name": "Mandarin Chinese", | |
| }, | |
| "Japanese": { | |
| "nllb": "jpn_Jpan", "yourvoic_lang": "ja-JP", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "ja", "qwen_name": "Japanese", | |
| }, | |
| "Korean": { | |
| "nllb": "kor_Hang", "yourvoic_lang": "ko-KR", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "ko", "qwen_name": "Korean", | |
| }, | |
| "German": { | |
| "nllb": "deu_Latn", "yourvoic_lang": "de-DE", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "de", "qwen_name": "German", | |
| }, | |
| "French": { | |
| "nllb": "fra_Latn", "yourvoic_lang": "fr-FR", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "fr", "qwen_name": "French", | |
| }, | |
| "Russian": { | |
| "nllb": "rus_Cyrl", "yourvoic_lang": "ru-RU", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "ru", "qwen_name": "Russian", | |
| }, | |
| "Portuguese": { | |
| "nllb": "por_Latn", "yourvoic_lang": "pt-BR", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "pt", "qwen_name": "Portuguese", | |
| }, | |
| "Spanish": { | |
| "nllb": "spa_Latn", "yourvoic_lang": "es-ES", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "es", "qwen_name": "Spanish", | |
| }, | |
| "Italian": { | |
| "nllb": "ita_Latn", "yourvoic_lang": "it-IT", | |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", | |
| "qwen_code": "it", "qwen_name": "Italian", | |
| }, | |
| "Arabic": { | |
| "nllb": "arb_Arab", "yourvoic_lang": "ar-SA", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "qwen", | |
| "qwen_code": "ar", "qwen_name": "Modern Standard Arabic", | |
| }, | |
| # ---- African Languages (YourVoic API) ---- | |
| "Swahili": { | |
| "nllb": "swh_Latn", "yourvoic_lang": "sw-KE", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Amharic": { | |
| "nllb": "amh_Ethi", "yourvoic_lang": "am-ET", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Afrikaans": { | |
| "nllb": "afr_Latn", "yourvoic_lang": "af-ZA", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| # ---- South Asian (YourVoic TTS + NLLB MT) ---- | |
| "Hindi": { | |
| "nllb": "hin_Deva", "yourvoic_lang": "hi-IN", | |
| "yourvoic_voices": ["Rahul", "Deepika", "Aditya"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Bengali": { | |
| "nllb": "ben_Beng", "yourvoic_lang": "bn-IN", | |
| "yourvoic_voices": ["Sneha", "Aryan"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Tamil": { | |
| "nllb": "tam_Taml", "yourvoic_lang": "ta-IN", | |
| "yourvoic_voices": ["Priya", "Kumar"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Telugu": { | |
| "nllb": "tel_Telu", "yourvoic_lang": "te-IN", | |
| "yourvoic_voices": ["Arjun", "Lakshmi"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Marathi": { | |
| "nllb": "mar_Deva", "yourvoic_lang": "mr-IN", | |
| "yourvoic_voices": ["Anjali", "Rohan"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Urdu": { | |
| "nllb": "urd_Arab", "yourvoic_lang": "ur-PK", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Nepali": { | |
| "nllb": "npi_Deva", "yourvoic_lang": "ne-NP", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| # ---- Southeast Asian (YourVoic) ---- | |
| "Indonesian": { | |
| "nllb": "ind_Latn", "yourvoic_lang": "id-ID", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Vietnamese": { | |
| "nllb": "vie_Latn", "yourvoic_lang": "vi-VN", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Thai": { | |
| "nllb": "tha_Thai", "yourvoic_lang": "th-TH", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Malay": { | |
| "nllb": "zsm_Latn", "yourvoic_lang": "ms-MY", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Filipino": { | |
| "nllb": "tgl_Latn", "yourvoic_lang": "fil-PH", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| # ---- European (YourVoic) ---- | |
| "Dutch": { | |
| "nllb": "nld_Latn", "yourvoic_lang": "nl-NL", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Polish": { | |
| "nllb": "pol_Latn", "yourvoic_lang": "pl-PL", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Turkish": { | |
| "nllb": "tur_Latn", "yourvoic_lang": "tr-TR", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Swedish": { | |
| "nllb": "swe_Latn", "yourvoic_lang": "sv-SE", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Romanian": { | |
| "nllb": "ron_Latn", "yourvoic_lang": "ro-RO", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Greek": { | |
| "nllb": "ell_Grek", "yourvoic_lang": "el-GR", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Ukrainian": { | |
| "nllb": "ukr_Cyrl", "yourvoic_lang": "uk-UA", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Finnish": { | |
| "nllb": "fin_Latn", "yourvoic_lang": "fi-FI", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Danish": { | |
| "nllb": "dan_Latn", "yourvoic_lang": "da-DK", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Norwegian": { | |
| "nllb": "nob_Latn", "yourvoic_lang": "nb-NO", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| # ---- Middle Eastern (YourVoic) ---- | |
| "Persian": { | |
| "nllb": "pes_Arab", "yourvoic_lang": "fa-IR", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| "Hebrew": { | |
| "nllb": "heb_Hebr", "yourvoic_lang": "he-IL", | |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", | |
| "qwen_code": None, "qwen_name": None, | |
| }, | |
| } | |
| # Group languages by category for the UI | |
| LANGUAGE_GROUPS = { | |
| "Global Languages": [ | |
| "Spanish", "French", "German", "Mandarin", "Italian", | |
| "Japanese", "Portuguese", "Hindi", "Arabic", "Korean", "Russian", | |
| ], | |
| "African Languages": [ | |
| "Swahili", "Amharic", "Afrikaans", | |
| ], | |
| "South Asian": [ | |
| "Bengali", "Tamil", "Telugu", "Marathi", "Urdu", "Nepali", | |
| ], | |
| "Southeast Asian": [ | |
| "Indonesian", "Vietnamese", "Thai", "Malay", "Filipino", | |
| ], | |
| "European": [ | |
| "Dutch", "Polish", "Turkish", "Swedish", "Romanian", | |
| "Greek", "Ukrainian", "Finnish", "Danish", "Norwegian", | |
| ], | |
| "Middle Eastern": [ | |
| "Persian", "Hebrew", | |
| ], | |
| } | |
| # All language display names (for dropdowns) | |
| ALL_LANGUAGE_NAMES = sorted(LANGUAGES.keys()) | |
| # Languages that use YourVoic API | |
| YOURVOIC_LANGUAGES = [k for k, v in LANGUAGES.items() if v["tts_engine"] == "yourvoic"] | |
| # Languages that use YourVoic API | |
| YOURVOIC_LANGUAGES = [k for k, v in LANGUAGES.items() if v["tts_engine"] == "yourvoic"] | |
| # ============================================================================= | |
| # PIPELINE: ASR + MT + Video helpers | |
| # ============================================================================= | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| TORCH_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| # Models (loaded once at startup) | |
| asr_pipe = None | |
| mt_tokenizer = None | |
| mt_model = None | |
| def load_models(): | |
| """Load all models at startup.""" | |
| global asr_pipe, mt_tokenizer, mt_model | |
| from transformers import ( | |
| pipeline as hf_pipeline, | |
| AutoTokenizer, | |
| AutoModelForSeq2SeqLM, | |
| ) | |
| print(f"Device: {DEVICE} | Dtype: {TORCH_DTYPE}") | |
| print("Loading models...") | |
| # ASR | |
| ASR_MODEL_ID = "PlotweaverAI/whisper-small-de-en" | |
| print(f" Loading ASR: {ASR_MODEL_ID}") | |
| asr_pipe = hf_pipeline( | |
| "automatic-speech-recognition", | |
| model=ASR_MODEL_ID, | |
| device=DEVICE, | |
| torch_dtype=TORCH_DTYPE, | |
| ) | |
| print(" ASR loaded") | |
| # MT | |
| MT_MODEL_ID = "PlotweaverAI/nllb-200-distilled-600M-african-6lang" | |
| print(f" Loading MT: {MT_MODEL_ID}") | |
| mt_tokenizer = AutoTokenizer.from_pretrained(MT_MODEL_ID) | |
| mt_model = AutoModelForSeq2SeqLM.from_pretrained( | |
| MT_MODEL_ID, torch_dtype=TORCH_DTYPE | |
| ).to(DEVICE) | |
| mt_tokenizer.src_lang = "eng_Latn" | |
| print(" MT loaded") | |
| # Diagnostics | |
| print(f"\n=== Device diagnostics ===") | |
| print(f"CUDA available: {torch.cuda.is_available()}") | |
| if torch.cuda.is_available(): | |
| print(f"CUDA device: {torch.cuda.get_device_name(0)}") | |
| print(f"ASR on: {next(asr_pipe.model.parameters()).device}") | |
| print(f"MT on: {next(mt_model.parameters()).device}") | |
| print(f"YourVoic API key: {'set' if os.environ.get('YOURVOIC_API_KEY') else 'NOT SET'}") | |
| print(f"Dashscope key: {'set' if os.environ.get('DASHSCOPE_API_KEY') else 'NOT SET'}") | |
| print(f"==========================\n") | |
| print("All models loaded!") | |
| # ---- Text Processing ---- | |
| def split_into_sentences(text): | |
| """Split raw ASR text into individual sentences.""" | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| text = '. '.join(s.strip().capitalize() for s in text.split('. ') if s.strip()) | |
| if re.search(r'[.!?]', text): | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| return [s.strip() for s in sentences if s.strip()] | |
| words = text.split() | |
| MAX_WORDS = 12 | |
| sentences = [] | |
| for i in range(0, len(words), MAX_WORDS): | |
| chunk = ' '.join(words[i:i + MAX_WORDS]) | |
| if not chunk.endswith(('.', '!', '?')): | |
| chunk += '.' | |
| chunk = chunk[0].upper() + chunk[1:] if len(chunk) > 1 else chunk.upper() | |
| sentences.append(chunk) | |
| return sentences | |
| # ---- ASR ---- | |
| def transcribe(audio_array, sample_rate=16000): | |
| """ASR: English audio to text. Handles both short and long audio.""" | |
| if len(audio_array) < 1600: | |
| return "" | |
| duration_s = len(audio_array) / sample_rate | |
| if sample_rate != 16000: | |
| import torchaudio.functional as F_audio | |
| audio_tensor = torch.from_numpy(audio_array).float() | |
| audio_tensor = F_audio.resample(audio_tensor, sample_rate, 16000) | |
| audio_array = audio_tensor.numpy() | |
| sample_rate = 16000 | |
| if duration_s <= 28: | |
| result = asr_pipe( | |
| {"raw": audio_array, "sampling_rate": sample_rate}, | |
| return_timestamps=False, | |
| ) | |
| return result["text"].strip() | |
| # Long-form: native Whisper generate | |
| model = asr_pipe.model | |
| processor = asr_pipe.feature_extractor | |
| tokenizer = asr_pipe.tokenizer | |
| inputs = processor( | |
| audio_array, sampling_rate=16000, return_tensors="pt", | |
| truncation=False, padding="longest", return_attention_mask=True, | |
| ) | |
| input_features = inputs.input_features.to(DEVICE, dtype=TORCH_DTYPE) | |
| attention_mask = inputs.attention_mask.to(DEVICE) if "attention_mask" in inputs else None | |
| generate_kwargs = {"return_timestamps": True, "language": "en", "task": "transcribe"} | |
| if attention_mask is not None: | |
| generate_kwargs["attention_mask"] = attention_mask | |
| with torch.no_grad(): | |
| predicted_ids = model.generate(input_features, **generate_kwargs) | |
| transcription = tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0] | |
| return transcription.strip() | |
| # ---- MT ---- | |
| def translate_sentence(text, target_nllb_code, fast=True, max_length=256): | |
| """Translate a single sentence from English to target language.""" | |
| inputs = mt_tokenizer(text, return_tensors="pt", truncation=True).to(DEVICE) | |
| tgt_lang_id = mt_tokenizer.convert_tokens_to_ids(target_nllb_code) | |
| generate_kwargs = { | |
| "forced_bos_token_id": tgt_lang_id, | |
| "repetition_penalty": 1.5, | |
| "no_repeat_ngram_size": 3, | |
| } | |
| if fast: | |
| generate_kwargs.update({"max_length": 128, "num_beams": 1, "do_sample": False}) | |
| else: | |
| generate_kwargs.update({"max_length": max_length, "num_beams": 4, "early_stopping": True}) | |
| with torch.no_grad(): | |
| output_ids = mt_model.generate(**inputs, **generate_kwargs) | |
| return mt_tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
| def translate_text(text, target_nllb_code, fast=True): | |
| """Split and translate full text sentence-by-sentence.""" | |
| sentences = split_into_sentences(text) | |
| if not sentences: | |
| return "", [], [] | |
| translations = [] | |
| for s in sentences: | |
| yo = translate_sentence(s, target_nllb_code, fast=fast) | |
| translations.append(yo) | |
| return ' '.join(translations), sentences, translations | |
| # ---- Video Processing ---- | |
| def extract_audio_from_video(video_path, output_path, target_sr=16000): | |
| """Extract audio track from video as 16kHz mono WAV.""" | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", video_path, | |
| "-vn", "-acodec", "pcm_s16le", "-ar", str(target_sr), "-ac", "1", | |
| output_path, | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg extraction failed: {result.stderr[:200]}") | |
| return output_path | |
| def get_media_duration(path): | |
| """Get duration in seconds.""" | |
| cmd = [ | |
| "ffprobe", "-v", "error", | |
| "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", path, | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffprobe failed: {result.stderr[:200]}") | |
| return float(result.stdout.strip()) | |
| def stretch_audio_to_duration(input_path, output_path, target_duration_s): | |
| """Stretch/compress audio to match target duration.""" | |
| current_duration = get_media_duration(input_path) | |
| if current_duration <= 0: | |
| raise RuntimeError("Invalid audio duration") | |
| ratio = current_duration / target_duration_s | |
| filters = [] | |
| remaining = ratio | |
| while remaining > 2.0: | |
| filters.append("atempo=2.0") | |
| remaining /= 2.0 | |
| while remaining < 0.5: | |
| filters.append("atempo=0.5") | |
| remaining /= 0.5 | |
| filters.append(f"atempo={remaining:.4f}") | |
| cmd = ["ffmpeg", "-y", "-i", input_path, "-filter:a", ",".join(filters), output_path] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg tempo failed: {result.stderr[:200]}") | |
| return output_path | |
| def mux_video_audio(video_path, audio_path, output_path, extend_video=False, target_duration=None): | |
| """Combine video with new audio. Optionally extend video by freezing last frame.""" | |
| if extend_video and target_duration: | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", video_path, "-i", audio_path, | |
| "-filter_complex", f"[0:v]tpad=stop_mode=clone:stop_duration={target_duration}[v]", | |
| "-map", "[v]", "-map", "1:a:0", | |
| "-c:v", "libx264", "-preset", "fast", "-c:a", "aac", | |
| "-t", str(target_duration), output_path, | |
| ] | |
| else: | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", video_path, "-i", audio_path, | |
| "-c:v", "copy", "-c:a", "aac", | |
| "-map", "0:v:0", "-map", "1:a:0", "-shortest", output_path, | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg mux failed: {result.stderr[:200]}") | |
| return output_path | |
| # ============================================================================= | |
| # TTS ENGINE: YourVoic API | |
| # ============================================================================= | |
| YOURVOIC_API_KEY = os.environ.get("YOURVOIC_API_KEY", "") | |
| YOURVOIC_STREAM_URL = "https://yourvoic.com/api/v1/tts/stream" | |
| def synthesize_yourvoic(text, language_code, voice="Peter", speed=1.0): | |
| """Synthesize text using YourVoic API.""" | |
| if not YOURVOIC_API_KEY: | |
| raise RuntimeError("YOURVOIC_API_KEY not set.") | |
| headers = {"X-API-Key": YOURVOIC_API_KEY, "Content-Type": "application/json"} | |
| payload = {"text": text, "voice": voice, "language": language_code, "model": "aura-prime", "speed": speed} | |
| t0 = time.time() | |
| response = requests.post(YOURVOIC_STREAM_URL, headers=headers, json=payload, stream=True, timeout=60) | |
| if response.status_code != 200: | |
| raise RuntimeError(f"YourVoic error {response.status_code}: {response.text[:200]}") | |
| # Detect format from content-type header | |
| ct = response.headers.get("content-type", "").lower() | |
| logger.info(f"YourVoic content-type: {ct}") | |
| # Collect audio bytes | |
| audio_data = b"" | |
| for chunk in response.iter_content(chunk_size=8192): | |
| audio_data += chunk | |
| elapsed = time.time() - t0 | |
| logger.info(f"YourVoic TTS: {len(text)} chars, {elapsed:.2f}s, {len(audio_data)} bytes") | |
| # Log first bytes for format detection | |
| magic = audio_data[:16] if len(audio_data) > 16 else audio_data | |
| logger.info(f"YourVoic first bytes: {magic[:8]}") | |
| # Determine file extension from content-type or magic bytes | |
| if b"RIFF" in audio_data[:4]: | |
| ext = ".wav" | |
| elif b"\xff\xfb" in audio_data[:3] or b"\xff\xf3" in audio_data[:3] or b"ID3" in audio_data[:3]: | |
| ext = ".mp3" | |
| elif b"OggS" in audio_data[:4]: | |
| ext = ".ogg" | |
| elif b"fLaC" in audio_data[:4]: | |
| ext = ".flac" | |
| elif "mp3" in ct or "mpeg" in ct: | |
| ext = ".mp3" | |
| elif "ogg" in ct: | |
| ext = ".ogg" | |
| elif "wav" in ct: | |
| ext = ".wav" | |
| elif "flac" in ct: | |
| ext = ".flac" | |
| elif "linear16" in ct or "pcm" in ct or "l16" in ct: | |
| ext = ".raw" | |
| else: | |
| ext = ".mp3" # Most common API default | |
| logger.warning(f"Unknown YourVoic format (ct={ct}), guessing mp3") | |
| # Save with correct extension | |
| tmp_path = tempfile.NamedTemporaryFile(suffix=ext, delete=False).name | |
| with open(tmp_path, "wb") as f: | |
| f.write(audio_data) | |
| # Try reading directly with soundfile | |
| try: | |
| audio_array, sample_rate = sf.read(tmp_path, dtype="float32") | |
| os.unlink(tmp_path) | |
| return audio_array, sample_rate | |
| except Exception as e: | |
| logger.warning(f"soundfile can't read {ext}: {e}") | |
| # Handle raw PCM (linear16): wrap in WAV header | |
| if ext == ".raw": | |
| try: | |
| sr = 24000 | |
| raw_data = audio_data | |
| wav_path = tmp_path + ".wav" | |
| with open(wav_path, "wb") as f: | |
| f.write(b"RIFF") | |
| f.write(struct.pack("<I", 36 + len(raw_data))) | |
| f.write(b"WAVE") | |
| f.write(b"fmt ") | |
| f.write(struct.pack("<IHHIIHH", 16, 1, 1, sr, sr * 2, 2, 16)) | |
| f.write(b"data") | |
| f.write(struct.pack("<I", len(raw_data))) | |
| f.write(raw_data) | |
| audio_array, sample_rate = sf.read(wav_path, dtype="float32") | |
| os.unlink(tmp_path) | |
| os.unlink(wav_path) | |
| return audio_array, sample_rate | |
| except Exception as e: | |
| logger.warning(f"Raw PCM wrap failed: {e}") | |
| # Fallback: convert with ffmpeg | |
| try: | |
| wav_path = tmp_path + ".wav" | |
| result = subprocess.run( | |
| ["ffmpeg", "-y", "-i", tmp_path, "-acodec", "pcm_s16le", "-ar", "24000", "-ac", "1", wav_path], | |
| capture_output=True, text=True, | |
| ) | |
| os.unlink(tmp_path) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg failed: {result.stderr[-300:]}") | |
| audio_array, sample_rate = sf.read(wav_path, dtype="float32") | |
| os.unlink(wav_path) | |
| return audio_array, sample_rate | |
| except Exception as e2: | |
| for f in [tmp_path, tmp_path + ".wav"]: | |
| if os.path.exists(f): os.unlink(f) | |
| raise RuntimeError(f"YourVoic decode failed: {e2}") | |
| def synthesize_yourvoic_to_file(text, language_code, output_path, voice="Peter", speed=1.0): | |
| """Synthesize via YourVoic and save to file.""" | |
| audio, sr = synthesize_yourvoic(text, language_code, voice, speed) | |
| sf.write(output_path, audio, sr) | |
| return output_path, sr | |
| def synthesize_chunked(text, language_config, sentences_per_chunk=2): | |
| """ | |
| Synthesize long text by chunking into sentence groups via YourVoic API. | |
| Args: | |
| text: Full text to synthesize | |
| language_config: Dict from LANGUAGES (has yourvoic_lang, yourvoic_voices, etc.) | |
| sentences_per_chunk: How many sentences to synthesize per API call | |
| Returns: | |
| (audio_array, sample_rate) | |
| """ | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if not sentences: | |
| return np.zeros(int(0.5 * 16000), dtype=np.float32), 16000 | |
| audio_segments = [] | |
| output_sr = None | |
| for i in range(0, len(sentences), sentences_per_chunk): | |
| chunk_text = ' '.join(sentences[i:i + sentences_per_chunk]) | |
| if not chunk_text: | |
| continue | |
| try: | |
| voice = language_config["yourvoic_voices"][0] if language_config.get("yourvoic_voices") else "Peter" | |
| lang_code = language_config["yourvoic_lang"] | |
| audio_seg, seg_sr = synthesize_yourvoic(chunk_text, lang_code, voice) | |
| if output_sr is None: | |
| output_sr = seg_sr | |
| if len(audio_seg) > 0: | |
| audio_segments.append(audio_seg) | |
| silence = np.zeros(int(0.15 * seg_sr), dtype=np.float32) | |
| audio_segments.append(silence) | |
| except Exception as e: | |
| logger.error(f"TTS chunk failed: {e}") | |
| continue | |
| if not audio_segments: | |
| fallback_sr = output_sr or 16000 | |
| logger.warning("All TTS chunks failed — returning silence") | |
| return np.zeros(int(0.5 * fallback_sr), dtype=np.float32), fallback_sr | |
| return np.concatenate(audio_segments), output_sr | |
| # ============================================================================= | |
| # QWEN OMNI ENGINE | |
| # ============================================================================= | |
| QWEN_MODEL = "qwen3.5-omni-plus" | |
| QWEN_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" | |
| def _get_client(): | |
| """Create OpenAI-compatible client for Qwen Dashscope API.""" | |
| from openai import OpenAI | |
| api_key = os.environ.get("DASHSCOPE_API_KEY", "") | |
| if not api_key: | |
| raise RuntimeError( | |
| "DASHSCOPE_API_KEY not set. Add it as a Space secret." | |
| ) | |
| return OpenAI(api_key=api_key, base_url=QWEN_BASE_URL) | |
| def _wav_to_base64(wav_path): | |
| """Read WAV file and return base64 string.""" | |
| with open(wav_path, "rb") as f: | |
| return base64.b64encode(f.read()).decode("utf-8") | |
| def _base64_to_wav(b64_data, output_path): | |
| """Convert raw PCM base64 audio to WAV file (24kHz, mono, 16-bit).""" | |
| audio_bytes = base64.b64decode(b64_data) | |
| sample_rate = 24000 | |
| num_channels = 1 | |
| bits_per_sample = 16 | |
| byte_rate = sample_rate * num_channels * bits_per_sample // 8 | |
| block_align = num_channels * bits_per_sample // 8 | |
| data_size = len(audio_bytes) | |
| with open(output_path, "wb") as f: | |
| f.write(b"RIFF") | |
| f.write(struct.pack("<I", 36 + data_size)) | |
| f.write(b"WAVE") | |
| f.write(b"fmt ") | |
| f.write(struct.pack("<I", 16)) | |
| f.write(struct.pack("<H", 1)) | |
| f.write(struct.pack("<H", num_channels)) | |
| f.write(struct.pack("<I", sample_rate)) | |
| f.write(struct.pack("<I", byte_rate)) | |
| f.write(struct.pack("<H", block_align)) | |
| f.write(struct.pack("<H", bits_per_sample)) | |
| f.write(b"data") | |
| f.write(struct.pack("<I", data_size)) | |
| f.write(audio_bytes) | |
| def _extract_audio_chunk(video_path, output_wav, start_sec, duration_sec): | |
| """Extract a chunk of audio from video as 16kHz mono WAV.""" | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-ss", str(start_sec), "-t", str(duration_sec), | |
| "-i", video_path, "-vn", "-acodec", "pcm_s16le", | |
| "-ar", "16000", "-ac", "1", output_wav], | |
| capture_output=True, check=True, | |
| ) | |
| def _get_duration(filepath): | |
| """Get media file duration in seconds.""" | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", filepath], | |
| capture_output=True, text=True, | |
| ) | |
| return float(result.stdout.strip()) | |
| def _concatenate_wavs(wav_files, output_path): | |
| """Concatenate WAV files using ffmpeg.""" | |
| if len(wav_files) == 1: | |
| shutil.copy2(wav_files[0], output_path) | |
| return | |
| list_file = output_path + ".txt" | |
| with open(list_file, "w") as f: | |
| for wav in wav_files: | |
| f.write(f"file '{wav}'\n") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", | |
| "-i", list_file, "-c", "copy", output_path], | |
| capture_output=True, check=True, | |
| ) | |
| os.remove(list_file) | |
| def _build_system_prompt(language_name): | |
| """Build Qwen system prompt for a target language.""" | |
| return ( | |
| f"You are a professional video dubbing translator. You will receive audio in English.\n" | |
| f"Your task:\n" | |
| f"1. Listen carefully to the English speech.\n" | |
| f"2. Translate it into natural, fluent {language_name}.\n" | |
| f"3. Respond ONLY with the {language_name} translation spoken aloud — no English, no commentary,\n" | |
| f" no meta-text, no transliteration. Speak entirely in {language_name}.\n" | |
| f"4. Match the tone, emotion, and pacing of the original speaker as closely as possible.\n" | |
| f"5. If there are pauses or silence in the original audio, maintain similar pacing.\n" | |
| f"6. Translate idioms and cultural references into their {language_name} equivalents.\n" | |
| f"7. Use clear, professional pronunciation suitable for a broad audience." | |
| ) | |
| def translate_chunk_qwen(wav_path, voice, language_name, chunk_index=0): | |
| """ | |
| Translate a single audio chunk using Qwen Omni. | |
| Args: | |
| wav_path: Path to input WAV file (English audio) | |
| voice: Qwen voice name (e.g. "Ethan", "Cherry") | |
| language_name: Full language name for the system prompt | |
| chunk_index: For logging | |
| Returns: | |
| (output_wav_path, transcript) or (None, transcript) if no audio | |
| """ | |
| client = _get_client() | |
| audio_b64 = _wav_to_base64(wav_path) | |
| output_wav = wav_path.replace(".wav", f"_qwen_{chunk_index}.wav") | |
| system_prompt = _build_system_prompt(language_name) | |
| user_prompt = f"Translate this English speech into {language_name}. Respond only with the spoken {language_name} translation." | |
| t0 = time.time() | |
| completion = client.chat.completions.create( | |
| model=QWEN_MODEL, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_audio", | |
| "input_audio": { | |
| "data": f"data:audio/wav;base64,{audio_b64}", | |
| "format": "wav", | |
| }, | |
| }, | |
| {"type": "text", "text": user_prompt}, | |
| ], | |
| }, | |
| ], | |
| modalities=["text", "audio"], | |
| audio={"voice": voice, "format": "wav"}, | |
| stream=True, | |
| stream_options={"include_usage": True}, | |
| ) | |
| audio_chunks = [] | |
| transcript_parts = [] | |
| for event in completion: | |
| if not event.choices: | |
| continue | |
| delta = event.choices[0].delta | |
| if hasattr(delta, "content") and delta.content: | |
| transcript_parts.append(delta.content) | |
| if hasattr(delta, "audio") and delta.audio: | |
| if isinstance(delta.audio, dict): | |
| if "data" in delta.audio: | |
| audio_chunks.append(delta.audio["data"]) | |
| elif hasattr(delta.audio, "data") and delta.audio.data: | |
| audio_chunks.append(delta.audio.data) | |
| transcript = "".join(transcript_parts) | |
| elapsed = time.time() - t0 | |
| logger.info(f"Qwen chunk {chunk_index}: {elapsed:.1f}s, transcript={transcript[:60]}") | |
| if audio_chunks: | |
| full_audio_b64 = "".join(audio_chunks) | |
| _base64_to_wav(full_audio_b64, output_wav) | |
| return output_wav, transcript | |
| return None, transcript | |
| def dub_video_qwen(video_path, language_name, voice="Ethan", chunk_seconds=120, progress_fn=None): | |
| """ | |
| Full video dubbing pipeline using Qwen Omni. | |
| Splits video into chunks, translates each chunk via Qwen API, | |
| concatenates results, and muxes back onto video. | |
| Args: | |
| video_path: Path to input video | |
| language_name: Full language name (e.g. "French", "Arabic") | |
| voice: Qwen voice name | |
| chunk_seconds: Audio chunk duration for API calls | |
| progress_fn: Optional gradio progress callback | |
| Returns: | |
| (output_video_path, log_text) | |
| """ | |
| tmp_dir = tempfile.mkdtemp(prefix=f"qwen_dub_") | |
| log = [] | |
| try: | |
| # Duration | |
| if progress_fn: | |
| progress_fn(0.05, desc="Analyzing video...") | |
| total_duration = _get_duration(video_path) | |
| log.append(f"**Video:** {total_duration:.1f}s") | |
| log.append(f"**Engine:** Qwen 3.5 Omni") | |
| log.append(f"**Voice:** {voice}") | |
| log.append(f"**Language:** {language_name}") | |
| if total_duration > 3600: | |
| return None, "Video longer than 1 hour — please use a shorter clip." | |
| # Split into chunks | |
| if progress_fn: | |
| progress_fn(0.1, desc="Extracting audio chunks...") | |
| num_chunks = max(1, int(total_duration // chunk_seconds) + (1 if total_duration % chunk_seconds > 0 else 0)) | |
| log.append(f"**Chunks:** {num_chunks} ({chunk_seconds}s each)") | |
| input_chunks = [] | |
| for i in range(num_chunks): | |
| start = i * chunk_seconds | |
| duration = min(chunk_seconds, total_duration - start) | |
| chunk_path = os.path.join(tmp_dir, f"chunk_{i:03d}.wav") | |
| _extract_audio_chunk(video_path, chunk_path, start, duration) | |
| input_chunks.append(chunk_path) | |
| # Translate each chunk | |
| output_chunks = [] | |
| all_transcripts = [] | |
| for i, chunk_path in enumerate(input_chunks): | |
| if progress_fn: | |
| frac = 0.15 + 0.7 * (i / num_chunks) | |
| progress_fn(frac, desc=f"Translating chunk {i+1}/{num_chunks}...") | |
| result_path, transcript = translate_chunk_qwen( | |
| chunk_path, voice, language_name, i | |
| ) | |
| if transcript: | |
| all_transcripts.append(f"**[{i+1}]** {transcript}") | |
| if result_path: | |
| output_chunks.append(result_path) | |
| else: | |
| # Silence fallback | |
| duration = _get_duration(chunk_path) | |
| silence_path = os.path.join(tmp_dir, f"silence_{i:03d}.wav") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "lavfi", | |
| "-i", "anullsrc=r=24000:cl=mono", | |
| "-t", str(duration), "-acodec", "pcm_s16le", silence_path], | |
| capture_output=True, check=True, | |
| ) | |
| output_chunks.append(silence_path) | |
| # Concatenate | |
| if progress_fn: | |
| progress_fn(0.88, desc="Assembling audio...") | |
| full_audio = os.path.join(tmp_dir, "full_dubbed.wav") | |
| _concatenate_wavs(output_chunks, full_audio) | |
| # Mux onto video | |
| if progress_fn: | |
| progress_fn(0.93, desc="Combining audio and video...") | |
| output_video = os.path.join(tmp_dir, "dubbed_output.mp4") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", video_path, "-i", full_audio, | |
| "-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0", | |
| "-shortest", output_video], | |
| capture_output=True, check=True, | |
| ) | |
| if progress_fn: | |
| progress_fn(1.0, desc="Done!") | |
| log.append(f"\n**Transcript:**") | |
| log.extend(all_transcripts) | |
| return output_video, "\n".join(log) | |
| except Exception as e: | |
| logger.exception("Qwen dubbing failed") | |
| shutil.rmtree(tmp_dir, ignore_errors=True) | |
| return None, f"Error: {str(e)}" | |
| # ============================================================================= | |
| # GRADIO APP | |
| # ============================================================================= | |
| # Load models at startup | |
| load_models() | |
| # ============================================================================= | |
| # Helper functions | |
| # ============================================================================= | |
| def get_voices_for_language(lang_name): | |
| """Get available voices for a language based on its engine.""" | |
| config = LANGUAGES.get(lang_name, {}) | |
| engine = config.get("tts_engine", "local") | |
| if engine == "qwen": | |
| return QWEN_VOICES | |
| elif engine == "yourvoic" and config.get("yourvoic_voices"): | |
| return config["yourvoic_voices"] | |
| elif engine == "local": | |
| return ["Peter"] | |
| return ["Peter"] | |
| def full_pipeline_audio(audio_input, target_language): | |
| """Full pipeline: English audio → target language audio.""" | |
| if audio_input is None: | |
| return None, "Please upload or record audio." | |
| lang_config = LANGUAGES.get(target_language) | |
| if not lang_config: | |
| return None, f"Language '{target_language}' not configured." | |
| sample_rate, audio_array = audio_input | |
| audio_array = audio_array.astype(np.float32) | |
| if audio_array.ndim > 1: | |
| audio_array = audio_array.mean(axis=1) | |
| if audio_array.max() > 1.0 or audio_array.min() < -1.0: | |
| max_val = max(abs(audio_array.max()), abs(audio_array.min())) | |
| if max_val > 0: | |
| audio_array = audio_array / max_val | |
| log = [] | |
| total_start = time.time() | |
| # ASR | |
| t0 = time.time() | |
| english = transcribe(audio_array, sample_rate) | |
| log.append(f"**ASR** ({time.time()-t0:.2f}s)\n{english}") | |
| if not english: | |
| return None, "ASR returned empty text." | |
| # MT | |
| t0 = time.time() | |
| nllb_code = lang_config["nllb"] | |
| translated, en_sents, tgt_sents = translate_text(english, nllb_code, fast=False) | |
| log.append(f"\n**Translation** ({time.time()-t0:.2f}s)") | |
| for e, t in zip(en_sents, tgt_sents): | |
| log.append(f" EN: {e}\n {target_language.upper()}: {t}") | |
| if not translated: | |
| return None, "Translation returned empty." | |
| # TTS | |
| t0 = time.time() | |
| audio_out, sr_out = synthesize_chunked( | |
| translated, lang_config | |
| ) | |
| log.append(f"\n**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio") | |
| total = time.time() - total_start | |
| log.append(f"\n**Total: {total:.2f}s**") | |
| return (sr_out, audio_out), "\n".join(log) | |
| def full_pipeline_text(english_text, target_language, voice_name): | |
| """Text-only pipeline: English text → target language audio.""" | |
| if not english_text or not english_text.strip(): | |
| return None, "Please enter English text." | |
| lang_config = LANGUAGES.get(target_language) | |
| if not lang_config: | |
| return None, f"Language '{target_language}' not configured." | |
| log = [] | |
| total_start = time.time() | |
| # MT | |
| t0 = time.time() | |
| nllb_code = lang_config["nllb"] | |
| translated, en_sents, tgt_sents = translate_text(english_text.strip(), nllb_code, fast=False) | |
| log.append(f"**Translation** ({time.time()-t0:.2f}s)") | |
| for e, t in zip(en_sents, tgt_sents): | |
| log.append(f" EN: {e}\n {target_language.upper()}: {t}") | |
| if not translated: | |
| return None, "Translation returned empty." | |
| # TTS | |
| t0 = time.time() | |
| audio_out, sr_out = synthesize_chunked( | |
| translated, lang_config | |
| ) | |
| log.append(f"\n**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio") | |
| total = time.time() - total_start | |
| log.append(f"\n**Total: {total:.2f}s**") | |
| return (sr_out, audio_out), "\n".join(log) | |
| def dub_video(video_path, target_languages, dub_voice, chunk_seconds, progress=gr.Progress()): | |
| """ | |
| Dub a video into one or more target languages. | |
| Routes to Qwen Omni for global languages, YourVoic for others. | |
| """ | |
| if video_path is None: | |
| return None, "Please upload a video." | |
| if not target_languages: | |
| return None, "Please select at least one target language." | |
| results_log = [] | |
| output_videos = [] | |
| for lang_name in target_languages: | |
| lang_config = LANGUAGES.get(lang_name) | |
| if not lang_config: | |
| results_log.append(f"**{lang_name}**: not configured, skipped") | |
| continue | |
| engine = lang_config.get("tts_engine", "local") | |
| results_log.append(f"\n{'='*50}") | |
| results_log.append(f"**Dubbing: {lang_name}** (engine: {engine})") | |
| results_log.append(f"{'='*50}") | |
| try: | |
| if engine == "qwen": | |
| # Qwen Omni: end-to-end speech-to-speech (best for global languages) | |
| qwen_lang_name = lang_config.get("qwen_name", lang_name) | |
| voice = dub_voice if dub_voice in QWEN_VOICES else "Ethan" | |
| out_video, log_text = dub_video_qwen( | |
| video_path, qwen_lang_name, voice=voice, | |
| chunk_seconds=chunk_seconds, progress_fn=progress, | |
| ) | |
| results_log.append(log_text) | |
| if out_video: | |
| output_videos.append(out_video) | |
| else: | |
| # Local/YourVoic pipeline: ASR → NLLB → TTS | |
| work_dir = tempfile.mkdtemp(prefix=f"dub_{lang_name}_") | |
| extracted_audio = os.path.join(work_dir, "audio.wav") | |
| tgt_audio_raw = os.path.join(work_dir, "tgt_raw.wav") | |
| tgt_audio_aligned = os.path.join(work_dir, "tgt_aligned.wav") | |
| output_video = os.path.join(work_dir, f"dubbed_{lang_name}.mp4") | |
| progress(0.05, desc=f"{lang_name}: extracting audio...") | |
| extract_audio_from_video(video_path, extracted_audio) | |
| video_duration = get_media_duration(video_path) | |
| results_log.append(f"Video: {video_duration:.1f}s") | |
| audio_array, sr = sf.read(extracted_audio, dtype="float32") | |
| if audio_array.ndim > 1: | |
| audio_array = audio_array.mean(axis=1) | |
| progress(0.15, desc=f"{lang_name}: transcribing...") | |
| t0 = time.time() | |
| english = transcribe(audio_array, sr) | |
| results_log.append(f"ASR: {time.time()-t0:.1f}s") | |
| if not english: | |
| results_log.append("ASR empty — skipped") | |
| continue | |
| progress(0.4, desc=f"{lang_name}: translating...") | |
| t0 = time.time() | |
| nllb_code = lang_config["nllb"] | |
| translated, _, _ = translate_text(english, nllb_code, fast=True) | |
| results_log.append(f"MT: {time.time()-t0:.1f}s") | |
| if not translated: | |
| results_log.append("Translation empty — skipped") | |
| continue | |
| progress(0.65, desc=f"{lang_name}: synthesizing...") | |
| t0 = time.time() | |
| tgt_audio, tgt_sr = synthesize_chunked( | |
| translated, lang_config | |
| ) | |
| sf.write(tgt_audio_raw, tgt_audio, tgt_sr) | |
| tgt_duration = len(tgt_audio) / tgt_sr | |
| results_log.append(f"TTS: {time.time()-t0:.1f}s ({tgt_duration:.1f}s audio)") | |
| progress(0.85, desc=f"{lang_name}: aligning...") | |
| MAX_STRETCH = 1.2 | |
| stretch_ratio = tgt_duration / video_duration | |
| if stretch_ratio <= MAX_STRETCH: | |
| if abs(stretch_ratio - 1.0) > 0.02: | |
| stretch_audio_to_duration(tgt_audio_raw, tgt_audio_aligned, video_duration) | |
| else: | |
| import shutil | |
| shutil.copy(tgt_audio_raw, tgt_audio_aligned) | |
| extend_video = False | |
| final_duration = video_duration | |
| else: | |
| shutil.copy(tgt_audio_raw, tgt_audio_aligned) | |
| extend_video = True | |
| final_duration = tgt_duration | |
| results_log.append(f"Audio longer ({stretch_ratio:.1f}x) — extending video") | |
| progress(0.95, desc=f"{lang_name}: combining...") | |
| mux_video_audio( | |
| video_path, tgt_audio_aligned, output_video, | |
| extend_video=extend_video, target_duration=final_duration | |
| ) | |
| output_videos.append(output_video) | |
| except Exception as e: | |
| logger.exception(f"Dubbing {lang_name} failed") | |
| results_log.append(f"Error: {str(e)}") | |
| progress(1.0, desc="Done!") | |
| final_video = output_videos[0] if output_videos else None | |
| return final_video, "\n".join(results_log) | |
| def update_voices(language): | |
| """Update voice dropdown when language changes.""" | |
| voices = get_voices_for_language(language) | |
| return gr.update(choices=voices, value=voices[0]) | |
| # ============================================================================= | |
| # Gradio UI | |
| # ============================================================================= | |
| EXAMPLES = [ | |
| "And it's a brilliant goal from the striker!", | |
| "The referee has shown a yellow card. Corner kick for the home team.", | |
| "What a save by the goalkeeper! The match is heading into injury time.", | |
| "He dribbles past two defenders and shoots! The ball hits the back of the net!", | |
| ] | |
| CSS = """ | |
| .main-header { text-align: center; margin-bottom: 0.5rem; } | |
| .main-header h1 { font-size: 1.8rem; font-weight: 700; margin: 0; } | |
| .main-header p { color: #666; font-size: 0.95rem; } | |
| .lang-group-label { font-weight: 600; font-size: 0.85rem; color: #888; text-transform: uppercase; letter-spacing: 0.05em; margin-top: 0.5rem; } | |
| """ | |
| with gr.Blocks( | |
| title="PlotWeaver — Live Commentary Translation", | |
| theme=gr.themes.Soft(), | |
| css=CSS, | |
| ) as demo: | |
| gr.HTML(""" | |
| <div class="main-header"> | |
| <h1>PlotWeaver</h1> | |
| <p>Live commentary translation platform — English to 40+ languages</p> | |
| <p style="font-size:0.8rem; color:#999">Qwen Omni (11 languages) + YourVoic API + NLLB-200 (27 languages)</p> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| # ====== TAB 1: EVENT MANAGEMENT ====== | |
| with gr.TabItem("Event Management"): | |
| gr.Markdown("### Create new event") | |
| gr.Markdown("Configure your live broadcast event with target languages and input source.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| event_name = gr.Textbox( | |
| label="Event name", | |
| placeholder="e.g. Premier League: Arsenal vs. Chelsea", | |
| ) | |
| with gr.Row(): | |
| start_time = gr.Textbox(label="Start time", placeholder="08:30 PM") | |
| end_time = gr.Textbox(label="End time", placeholder="10:30 PM") | |
| event_date = gr.Textbox(label="Date", placeholder="2026-06-06") | |
| gr.Markdown("#### Input source") | |
| input_method = gr.Radio( | |
| choices=["RTMP Stream", "WebRTC (Browser)", "Direct Audio Feed"], | |
| value="RTMP Stream", | |
| label="Input method", | |
| ) | |
| gr.Markdown("#### Target languages") | |
| gr.Markdown("Select languages for simultaneous broadcast. Additional languages consume more stream minutes.") | |
| # Language checkboxes grouped by category | |
| target_langs = gr.CheckboxGroup( | |
| choices=ALL_LANGUAGE_NAMES, | |
| label="Languages", | |
| value=["Spanish"], | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("#### Estimate summary") | |
| estimate_display = gr.Markdown( | |
| value="**Event:** Not configured\n\n**Languages:** 1 selected\n\n**Estimated duration:** --\n\n**Total estimate:** --" | |
| ) | |
| create_event_btn = gr.Button("Create Event", variant="primary", size="lg") | |
| event_status = gr.Markdown("") | |
| def update_estimate(name, langs, start, end): | |
| n_langs = len(langs) if langs else 0 | |
| lang_list = ", ".join(langs) if langs else "None" | |
| return ( | |
| f"**Event:** {name or 'Not set'}\n\n" | |
| f"**Languages:** {n_langs} selected\n\n" | |
| f"{lang_list}\n\n" | |
| f"**Input:** Configured\n\n" | |
| f"**Rate:** 1x (Standard)" | |
| ) | |
| for inp in [event_name, target_langs, start_time, end_time]: | |
| inp.change( | |
| fn=update_estimate, | |
| inputs=[event_name, target_langs, start_time, end_time], | |
| outputs=[estimate_display], | |
| ) | |
| def create_event(name, langs): | |
| if not name: | |
| return "Please enter an event name." | |
| if not langs: | |
| return "Please select at least one language." | |
| return f"Event **{name}** created with {len(langs)} languages: {', '.join(langs)}" | |
| create_event_btn.click( | |
| fn=create_event, | |
| inputs=[event_name, target_langs], | |
| outputs=[event_status], | |
| ) | |
| # ====== TAB 2: LIVE STUDIO ====== | |
| with gr.TabItem("Live Studio"): | |
| gr.Markdown("### Live streaming translation") | |
| gr.Markdown("Record or stream English commentary and hear it translated in real-time.") | |
| with gr.Row(): | |
| studio_language = gr.Dropdown( | |
| choices=ALL_LANGUAGE_NAMES, | |
| value="Spanish", | |
| label="Target language", | |
| ) | |
| studio_voice = gr.Dropdown( | |
| choices=get_voices_for_language("Spanish"), | |
| value=get_voices_for_language("Spanish")[0], | |
| label="Voice", | |
| ) | |
| studio_language.change( | |
| fn=update_voices, | |
| inputs=[studio_language], | |
| outputs=[studio_voice], | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| studio_audio_in = gr.Audio( | |
| label="English commentary (upload or record)", | |
| type="numpy", | |
| sources=["upload", "microphone"], | |
| ) | |
| studio_translate_btn = gr.Button("Translate", variant="primary", size="lg") | |
| with gr.Column(): | |
| studio_audio_out = gr.Audio(label="Translated audio", type="numpy", autoplay=True) | |
| studio_log = gr.Markdown(label="Pipeline log") | |
| studio_translate_btn.click( | |
| fn=full_pipeline_audio, | |
| inputs=[studio_audio_in, studio_language], | |
| outputs=[studio_audio_out, studio_log], | |
| ) | |
| # ====== TAB 3: VIDEO DUBBING ====== | |
| with gr.TabItem("Video Dubbing"): | |
| gr.Markdown("### Video dubbing (English → multi-language)") | |
| gr.Markdown( | |
| "Upload a video with English commentary and get back a dubbed version. " | |
| "**Global languages** (Arabic, French, Spanish, etc.) use Qwen Omni for best quality. " | |
| "**African/regional languages** use YourVoic API with NLLB translation." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| dub_video_in = gr.Video(label="Upload English video", sources=["upload"]) | |
| dub_languages = gr.CheckboxGroup( | |
| choices=ALL_LANGUAGE_NAMES, | |
| label="Target languages", | |
| value=["Spanish"], | |
| ) | |
| with gr.Row(): | |
| dub_voice = gr.Dropdown( | |
| choices=QWEN_VOICES, | |
| value="Ethan", | |
| label="Voice (for Qwen languages)", | |
| info="Applies to Arabic, French, Spanish, etc. Local languages use default voice.", | |
| ) | |
| dub_chunk_slider = gr.Slider( | |
| minimum=30, maximum=300, value=120, step=10, | |
| label="Chunk duration (seconds)", | |
| info="Shorter = more API calls but less timeout risk.", | |
| ) | |
| dub_btn = gr.Button("Dub Video", variant="primary", size="lg") | |
| with gr.Column(): | |
| dub_video_out = gr.Video(label="Dubbed video (download from player)") | |
| dub_log = gr.Markdown( | |
| label="Processing log", | |
| value="Upload a video and select languages to start." | |
| ) | |
| dub_btn.click( | |
| fn=dub_video, | |
| inputs=[dub_video_in, dub_languages, dub_voice, dub_chunk_slider], | |
| outputs=[dub_video_out, dub_log], | |
| ) | |
| # ====== TAB 4: TEXT TRANSLATION ====== | |
| with gr.TabItem("Text \u2192 Audio"): | |
| gr.Markdown("### Text to translated speech") | |
| gr.Markdown("Type English text, choose a language, and hear the translated audio.") | |
| with gr.Row(): | |
| text_language = gr.Dropdown( | |
| choices=ALL_LANGUAGE_NAMES, | |
| value="Spanish", | |
| label="Target language", | |
| ) | |
| text_voice = gr.Dropdown( | |
| choices=get_voices_for_language("Spanish"), | |
| value=get_voices_for_language("Spanish")[0], | |
| label="Voice", | |
| ) | |
| text_language.change( | |
| fn=update_voices, | |
| inputs=[text_language], | |
| outputs=[text_voice], | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| text_input = gr.Textbox( | |
| label="English text", | |
| placeholder="Type English football commentary here...", | |
| lines=4, | |
| ) | |
| text_btn = gr.Button("Translate to speech", variant="primary", size="lg") | |
| gr.Examples( | |
| examples=[[e] for e in EXAMPLES], | |
| inputs=[text_input], | |
| label="Example commentary", | |
| ) | |
| with gr.Column(): | |
| text_audio_out = gr.Audio(label="Translated audio", type="numpy", autoplay=True) | |
| text_log = gr.Markdown(label="Pipeline log") | |
| text_btn.click( | |
| fn=full_pipeline_text, | |
| inputs=[text_input, text_language, text_voice], | |
| outputs=[text_audio_out, text_log], | |
| ) | |
| # ====== TAB 5: RECORDINGS ====== | |
| with gr.TabItem("Recordings & Clips"): | |
| gr.Markdown("### Recordings management") | |
| gr.Markdown( | |
| "Past dubbed recordings will appear here. " | |
| "This feature is coming soon — for now, use Video Dubbing to create new recordings " | |
| "and download them from the player." | |
| ) | |
| # ====== TAB 6: VOICE MODELS ====== | |
| with gr.TabItem("Voice Models"): | |
| gr.Markdown("### Voice model library") | |
| gr.Markdown("Browse available voices for each language.") | |
| voice_lang_select = gr.Dropdown( | |
| choices=ALL_LANGUAGE_NAMES, | |
| value="Spanish", | |
| label="Select language", | |
| ) | |
| voice_info = gr.Markdown() | |
| def show_voice_info(lang): | |
| config = LANGUAGES.get(lang, {}) | |
| engine = config.get("tts_engine", "unknown") | |
| voices = config.get("yourvoic_voices", []) | |
| info = f"### {lang}\n\n" | |
| if engine == "qwen": | |
| info += f"**Engine:** Qwen 3.5 Omni (end-to-end speech-to-speech)\n\n" | |
| info += f"This is the highest quality option. Qwen handles ASR + translation + TTS in a single API call, " | |
| info += f"preserving tone, emotion, and pacing from the original speaker.\n\n" | |
| info += f"**Available voices ({len(QWEN_VOICES)}):** {', '.join(QWEN_VOICES[:10])}... and {len(QWEN_VOICES)-10} more\n\n" | |
| info += f"All voices support all Qwen languages." | |
| elif engine == "yourvoic": | |
| info += f"**Engine:** YourVoic API (TTS) + NLLB-200 (translation)\n\n" | |
| info += f"**YourVoic language:** `{config.get('yourvoic_lang', 'N/A')}`\n\n" | |
| info += f"**Available voices:** {', '.join(voices) if voices else 'Peter (default)'}" | |
| else: | |
| info += f"**Engine:** Not available\n\n" | |
| info += f"**NLLB code:** `{config.get('nllb', 'N/A')}`\n\n" | |
| info += "Uses locally fine-tuned models on GPU. Voice selection not available." | |
| return info | |
| voice_lang_select.change(fn=show_voice_info, inputs=[voice_lang_select], outputs=[voice_info]) | |
| demo.load(fn=show_voice_info, inputs=[voice_lang_select], outputs=[voice_info]) | |
| gr.Markdown(""" | |
| --- | |
| **PlotWeaver** by PlotweaverAI | Models: | |
| [ASR](https://huggingface.co/PlotweaverAI/whisper-small-de-en) | | |
| [MT](https://huggingface.co/PlotweaverAI/nllb-200-distilled-600M-african-6lang) | | |
| [TTS](https://yourvoic.com) | | |
| [Qwen Omni](https://www.alibabacloud.com/help/en/model-studio/qwen-omni) | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() | |