| """ |
| Live Football Commentary Translator |
| ==================================== |
| Audio in (live commentator) -> Translate -> Audio out (target language). |
| |
| Two modes: |
| 1. Single clip: record/upload, click translate, hear result. |
| 2. Continuous live: start recording, speak naturally, translations queue up |
| and play sequentially. Energy-based VAD chunks speech at ~0.8s pauses. |
| |
| Engines: |
| - Qwen-Omni (qwen3.5-omni-plus) handles audio-in -> translated-speech-out |
| in ONE call for languages it covers (English, German, Spanish, Arabic, |
| Scottish-accented English). |
| - For African target languages (Swahili, Amharic, Afrikaans), Qwen-Omni |
| does audio -> translated text, then YourVoic does text -> speech. |
| |
| Deploy as a Hugging Face Space (SDK: Gradio). Add these secrets: |
| - DASHSCOPE_API_KEY (required, for Qwen-Omni) |
| - YOURVOIC_API_KEY (required for Swahili/Amharic/Afrikaans targets) |
| """ |
|
|
| import os |
| import base64 |
| import json |
| import struct |
| import subprocess |
| import tempfile |
| import threading |
| import time |
| import uuid |
| import queue |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| import numpy as np |
| import gradio as gr |
| import requests as http_requests |
| from openai import OpenAI |
|
|
| |
| |
| |
| OMNI_MODEL = "qwen3.5-omni-plus" |
| DASHSCOPE_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" |
|
|
| YOURVOIC_TTS_URL = "https://yourvoic.com/api/v1/tts/generate" |
|
|
| |
| VAD_SILENCE_SEC = 0.8 |
| VAD_MIN_UTTERANCE_SEC = 1.2 |
| VAD_MAX_UTTERANCE_SEC = 12.0 |
| VAD_RMS_THRESHOLD = 0.015 |
|
|
| |
| OUTPUT_POLL_SEC = 0.3 |
|
|
| |
| |
| |
| SOURCE_LANGUAGES = { |
| "English": {"code": "en", "omni_hint": "English"}, |
| "Scottish English": {"code": "en-scot", "omni_hint": "Scottish-accented English"}, |
| "German": {"code": "de", "omni_hint": "German"}, |
| "Spanish": {"code": "es", "omni_hint": "Spanish"}, |
| "Arabic": {"code": "ar", "omni_hint": "Arabic"}, |
| } |
|
|
| TARGET_LANGUAGES = { |
| "English": {"engine": "qwen", "omni_hint": "English"}, |
| "Scottish English": {"engine": "qwen", "omni_hint": "Scottish-accented English"}, |
| "German": {"engine": "qwen", "omni_hint": "German"}, |
| "Spanish": {"engine": "qwen", "omni_hint": "Spanish"}, |
| "Arabic": {"engine": "qwen", "omni_hint": "Arabic"}, |
| "Swahili": {"engine": "yourvoic", "omni_hint": "Swahili", "yourvoic_lang": "sw-KE"}, |
| "Amharic": {"engine": "yourvoic", "omni_hint": "Amharic", "yourvoic_lang": "am-ET"}, |
| "Afrikaans": {"engine": "yourvoic", "omni_hint": "Afrikaans", "yourvoic_lang": "af-ZA"}, |
| } |
|
|
| QWEN_VOICES = [ |
| "Ethan -- Warm, energetic (good default)", |
| "Ryan -- Dramatic, rhythmic (good for live action)", |
| "Cherry -- Sunny, friendly", |
| "Jennifer -- Cinematic narrator", |
| "Vincent -- Rich, theatrical", |
| "Bellona -- Strong, commanding", |
| ] |
|
|
| YOURVOIC_VOICE_MAP = { |
| "Swahili": ["Peter"], |
| "Amharic": ["Peter"], |
| "Afrikaans": ["Peter"], |
| } |
|
|
| YOURVOIC_MODEL = "aura-prime" |
|
|
| |
| |
| |
| def voice_name(label: str) -> str: |
| return label.split("--")[0].strip() |
|
|
|
|
| def write_wav(samples: np.ndarray, sample_rate: int, output_path: str) -> None: |
| """Write a numpy int16/float audio array to a WAV file.""" |
| if samples.dtype == np.float32 or samples.dtype == np.float64: |
| samples = np.clip(samples, -1.0, 1.0) |
| samples = (samples * 32767).astype(np.int16) |
| elif samples.dtype != np.int16: |
| samples = samples.astype(np.int16) |
| if samples.ndim > 1: |
| samples = samples.mean(axis=1).astype(np.int16) |
|
|
| nc, bps = 1, 16 |
| sr = sample_rate |
| br = sr * nc * bps // 8 |
| ba = nc * bps // 8 |
| raw = samples.tobytes() |
| ds = len(raw) |
| with open(output_path, "wb") as f: |
| f.write(b"RIFF"); f.write(struct.pack("<I", 36 + ds)) |
| f.write(b"WAVE"); f.write(b"fmt ") |
| f.write(struct.pack("<I", 16)); f.write(struct.pack("<H", 1)) |
| f.write(struct.pack("<H", nc)); f.write(struct.pack("<I", sr)) |
| f.write(struct.pack("<I", br)); f.write(struct.pack("<H", ba)) |
| f.write(struct.pack("<H", bps)); f.write(b"data") |
| f.write(struct.pack("<I", ds)); f.write(raw) |
|
|
|
|
| def base64_to_wav(b64_data: str, output_path: str) -> None: |
| """Qwen-Omni returns base64 PCM @ 24kHz. Wrap in WAV container.""" |
| audio_bytes = base64.b64decode(b64_data) |
| sr, nc, bps = 24000, 1, 16 |
| br = sr * nc * bps // 8 |
| ba = nc * bps // 8 |
| ds = len(audio_bytes) |
| with open(output_path, "wb") as f: |
| f.write(b"RIFF"); f.write(struct.pack("<I", 36 + ds)) |
| f.write(b"WAVE"); f.write(b"fmt ") |
| f.write(struct.pack("<I", 16)); f.write(struct.pack("<H", 1)) |
| f.write(struct.pack("<H", nc)); f.write(struct.pack("<I", sr)) |
| f.write(struct.pack("<I", br)); f.write(struct.pack("<H", ba)) |
| f.write(struct.pack("<H", bps)); f.write(b"data") |
| f.write(struct.pack("<I", ds)); f.write(audio_bytes) |
|
|
|
|
| def wav_duration_seconds(wav_path: str) -> float: |
| """Return duration of a WAV file in seconds, or 0 on failure.""" |
| try: |
| result = subprocess.run( |
| ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", |
| "-of", "default=noprint_wrappers=1:nokey=1", wav_path], |
| capture_output=True, text=True, timeout=10, |
| ) |
| return float(result.stdout.strip()) |
| except (subprocess.TimeoutExpired, ValueError, OSError): |
| return 0.0 |
|
|
|
|
| def normalize_audio_file(input_path: str, out_dir: str) -> str: |
| """Convert any audio file to 16kHz mono WAV (what Omni expects).""" |
| out_path = os.path.join(out_dir, f"in_{uuid.uuid4().hex[:8]}.wav") |
| subprocess.run( |
| ["ffmpeg", "-y", "-i", input_path, |
| "-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", out_path], |
| capture_output=True, check=True, |
| ) |
| return out_path |
|
|
|
|
| def audio_file_to_data_uri(path: str) -> str: |
| b64 = base64.b64encode(open(path, "rb").read()).decode() |
| return f"data:audio/wav;base64,{b64}" |
|
|
|
|
| |
| |
| |
| def omni_audio_to_speech(client: OpenAI, |
| audio_path: str, |
| source_hint: str, |
| target_hint: str, |
| voice: str, |
| out_dir: str) -> tuple: |
| audio_uri = audio_file_to_data_uri(audio_path) |
|
|
| sys_prompt = ( |
| f"You are a live football commentary translator. " |
| f"The user will speak in {source_hint}. " |
| f"Listen carefully and respond by speaking the equivalent commentary in {target_hint}. " |
| f"Match the energy and excitement of live football commentary. " |
| f"Keep the same meaning. Do NOT add commentary of your own. " |
| f"Respond ONLY with the spoken {target_hint} translation." |
| ) |
|
|
| try: |
| completion = client.chat.completions.create( |
| model=OMNI_MODEL, |
| messages=[ |
| {"role": "system", "content": sys_prompt}, |
| {"role": "user", "content": [ |
| {"type": "input_audio", |
| "input_audio": {"data": audio_uri, "format": "wav"}}, |
| {"type": "text", |
| "text": f"Translate this commentary into {target_hint} and speak it."}, |
| ]}, |
| ], |
| modalities=["text", "audio"], |
| audio={"voice": voice, "format": "wav"}, |
| stream=True, |
| stream_options={"include_usage": True}, |
| ) |
|
|
| audio_parts, text_parts = [], [] |
| for event in completion: |
| if not event.choices: |
| continue |
| delta = event.choices[0].delta |
| if hasattr(delta, "content") and delta.content: |
| text_parts.append(delta.content) |
| if hasattr(delta, "audio") and delta.audio: |
| if isinstance(delta.audio, dict) and "data" in delta.audio: |
| audio_parts.append(delta.audio["data"]) |
| elif hasattr(delta.audio, "data") and delta.audio.data: |
| audio_parts.append(delta.audio.data) |
|
|
| transcript = "".join(text_parts).strip() |
| if not audio_parts: |
| return None, transcript, "No audio received from Qwen-Omni" |
|
|
| out_wav = os.path.join(out_dir, f"out_{uuid.uuid4().hex[:8]}.wav") |
| base64_to_wav("".join(audio_parts), out_wav) |
| return out_wav, transcript, None |
|
|
| except Exception as e: |
| return None, "", f"Qwen-Omni error: {e}" |
|
|
|
|
| def omni_audio_to_text(client: OpenAI, |
| audio_path: str, |
| source_hint: str, |
| target_hint: str) -> tuple: |
| audio_uri = audio_file_to_data_uri(audio_path) |
|
|
| sys_prompt = ( |
| f"You are a translator. The user will speak in {source_hint}. " |
| f"Translate what they say into {target_hint}. " |
| f"Output ONLY the {target_hint} translation as plain text. No commentary, no quotes." |
| ) |
|
|
| try: |
| completion = client.chat.completions.create( |
| model=OMNI_MODEL, |
| messages=[ |
| {"role": "system", "content": sys_prompt}, |
| {"role": "user", "content": [ |
| {"type": "input_audio", |
| "input_audio": {"data": audio_uri, "format": "wav"}}, |
| {"type": "text", |
| "text": f"Translate into {target_hint}."}, |
| ]}, |
| ], |
| modalities=["text"], |
| ) |
| text = completion.choices[0].message.content.strip() |
| return text, None |
| except Exception as e: |
| return "", f"Qwen-Omni translation error: {e}" |
|
|
|
|
| def yourvoic_speak(text: str, |
| target_language: str, |
| target_config: dict, |
| api_key: str, |
| out_dir: str) -> tuple: |
| yourvoic_lang = target_config["yourvoic_lang"] |
| voices_to_try = list(YOURVOIC_VOICE_MAP.get(target_language, ["Peter"])) |
| if "Peter" not in voices_to_try: |
| voices_to_try.append("Peter") |
|
|
| last_error = None |
| for voice in voices_to_try: |
| payload = { |
| "text": text, |
| "voice": voice, |
| "language": yourvoic_lang, |
| "model": YOURVOIC_MODEL, |
| "speed": 1.0, |
| } |
| try: |
| resp = http_requests.post( |
| YOURVOIC_TTS_URL, |
| json=payload, |
| headers={"X-API-Key": api_key, "Content-Type": "application/json"}, |
| timeout=60, |
| ) |
| if resp.status_code != 200: |
| last_error = f"YourVoic {resp.status_code}: {resp.text[:200]}" |
| if "voice" in resp.text.lower() or resp.status_code == 400: |
| continue |
| return None, last_error |
|
|
| ctype = resp.headers.get("Content-Type", "") |
| ext = "mp3" if "mp3" in ctype.lower() else "wav" |
| raw_path = os.path.join(out_dir, f"yv_{uuid.uuid4().hex[:8]}.{ext}") |
|
|
| if "application/json" in ctype: |
| data = resp.json() |
| audio_url = data.get("audio_url") or data.get("url") |
| if not audio_url: |
| return None, "No audio URL in YourVoic response" |
| audio_resp = http_requests.get(audio_url, timeout=60) |
| with open(raw_path, "wb") as f: |
| f.write(audio_resp.content) |
| else: |
| with open(raw_path, "wb") as f: |
| f.write(resp.content) |
|
|
| wav_path = os.path.join(out_dir, f"yv_{uuid.uuid4().hex[:8]}.wav") |
| subprocess.run( |
| ["ffmpeg", "-y", "-i", raw_path, |
| "-ar", "24000", "-ac", "1", "-acodec", "pcm_s16le", wav_path], |
| capture_output=True, check=True, |
| ) |
| return wav_path, None |
|
|
| except Exception as e: |
| last_error = f"YourVoic exception: {e}" |
| continue |
|
|
| return None, last_error or "YourVoic failed for all candidate voices" |
|
|
|
|
| |
| |
| |
| def translate_audio_file(audio_file: str, |
| source_language: str, |
| target_language: str, |
| qwen_voice_label: str, |
| work_dir: str) -> tuple: |
| """Run audio_file through the pipeline. Returns (wav_path, transcript, error).""" |
| ds_key = os.environ.get("DASHSCOPE_API_KEY", "") |
| if not ds_key: |
| return None, "", "DASHSCOPE_API_KEY not set" |
|
|
| src_config = SOURCE_LANGUAGES[source_language] |
| tgt_config = TARGET_LANGUAGES[target_language] |
| client = OpenAI(api_key=ds_key, base_url=DASHSCOPE_BASE_URL) |
|
|
| try: |
| norm_path = normalize_audio_file(audio_file, work_dir) |
| except subprocess.CalledProcessError as e: |
| return None, "", f"ffmpeg normalize failed: {(e.stderr or b'').decode()[:200]}" |
|
|
| engine = tgt_config["engine"] |
|
|
| if engine == "qwen": |
| voice = voice_name(qwen_voice_label) |
| return omni_audio_to_speech( |
| client, norm_path, |
| src_config["omni_hint"], tgt_config["omni_hint"], |
| voice, work_dir, |
| ) |
|
|
| elif engine == "yourvoic": |
| yv_key = os.environ.get("YOURVOIC_API_KEY", "") |
| if not yv_key: |
| return None, "", "YOURVOIC_API_KEY not set" |
| translated_text, err = omni_audio_to_text( |
| client, norm_path, |
| src_config["omni_hint"], tgt_config["omni_hint"], |
| ) |
| if err or not translated_text: |
| return None, translated_text, err or "empty translation" |
| wav, yv_err = yourvoic_speak( |
| translated_text, target_language, tgt_config, yv_key, work_dir, |
| ) |
| return wav, translated_text, yv_err |
|
|
| return None, "", f"Unknown engine '{engine}'" |
|
|
|
|
| |
| |
| |
| def single_clip_translate(audio_input, |
| source_language: str, |
| target_language: str, |
| qwen_voice_label: str): |
| """Yield (audio_path, status_markdown, transcript) as work progresses.""" |
| if audio_input is None: |
| yield None, "**Status:** no audio provided.", "" |
| return |
|
|
| t0 = time.time() |
| work_dir = tempfile.mkdtemp(prefix="commentary_single_") |
| yield None, f"**Status:** translating {source_language} -> {target_language}...", "" |
| wav, transcript, err = translate_audio_file( |
| audio_input, source_language, target_language, qwen_voice_label, work_dir, |
| ) |
| if err: |
| yield None, f"**Error:** {err}", transcript or "" |
| return |
| elapsed = time.time() - t0 |
| yield wav, f"**Done in {elapsed:.1f}s** - {source_language} -> {target_language}", transcript or "" |
|
|
|
|
| |
| |
| |
| @dataclass |
| class LiveSession: |
| """Holds per-session state for continuous-mode streaming.""" |
| work_dir: str |
| source_language: str |
| target_language: str |
| qwen_voice_label: str |
| buffer: list = field(default_factory=list) |
| sample_rate: int = 16000 |
| last_voice_ts: float = 0.0 |
| in_utterance: bool = False |
| utterance_start_ts: float = 0.0 |
| output_queue: "queue.Queue" = field(default_factory=queue.Queue) |
| transcripts: list = field(default_factory=list) |
| error_msg: str = "" |
| closed: bool = False |
| |
| |
| current_playback_ends_at: float = 0.0 |
| current_playback_path: str = "" |
| PLAYBACK_GAP_SEC: float = 0.4 |
|
|
|
|
| def make_session(source_language: str, |
| target_language: str, |
| qwen_voice_label: str) -> LiveSession: |
| return LiveSession( |
| work_dir=tempfile.mkdtemp(prefix="commentary_live_"), |
| source_language=source_language, |
| target_language=target_language, |
| qwen_voice_label=qwen_voice_label, |
| ) |
|
|
|
|
| def session_translate_utterance(session: LiveSession, utterance_samples: np.ndarray) -> None: |
| """Background thread: translates one utterance, enqueues result.""" |
| try: |
| utt_path = os.path.join(session.work_dir, f"utt_{uuid.uuid4().hex[:8]}.wav") |
| write_wav(utterance_samples, session.sample_rate, utt_path) |
|
|
| wav, transcript, err = translate_audio_file( |
| utt_path, session.source_language, session.target_language, |
| session.qwen_voice_label, session.work_dir, |
| ) |
| if err: |
| session.error_msg = err |
| return |
| if wav: |
| session.output_queue.put({"wav": wav, "transcript": transcript or ""}) |
| except Exception as e: |
| session.error_msg = f"Background translate error: {e}" |
|
|
|
|
| def session_process_chunk(session: LiveSession, |
| sample_rate: int, |
| chunk: np.ndarray) -> None: |
| """Called per streaming audio chunk. Updates session state, |
| fires utterance to background translation when silence detected.""" |
| if session.closed: |
| return |
|
|
| |
| if chunk.ndim > 1: |
| chunk = chunk.mean(axis=1) |
| if chunk.dtype == np.int16: |
| chunk = chunk.astype(np.float32) / 32768.0 |
| elif chunk.dtype != np.float32: |
| chunk = chunk.astype(np.float32) |
|
|
| |
| if sample_rate != session.sample_rate: |
| ratio = session.sample_rate / sample_rate |
| n_out = int(len(chunk) * ratio) |
| if n_out > 0: |
| chunk = np.interp( |
| np.linspace(0, len(chunk) - 1, n_out), |
| np.arange(len(chunk)), |
| chunk, |
| ).astype(np.float32) |
|
|
| now = time.time() |
|
|
| rms = float(np.sqrt(np.mean(chunk ** 2))) if len(chunk) > 0 else 0.0 |
| is_voice = rms > VAD_RMS_THRESHOLD |
|
|
| if is_voice: |
| if not session.in_utterance: |
| session.in_utterance = True |
| session.utterance_start_ts = now |
| session.buffer = [] |
| session.last_voice_ts = now |
| session.buffer.append(chunk) |
| else: |
| if session.in_utterance: |
| |
| session.buffer.append(chunk) |
|
|
| if not session.in_utterance: |
| return |
|
|
| utt_dur = now - session.utterance_start_ts |
| silence_dur = now - session.last_voice_ts |
|
|
| should_flush = ( |
| utt_dur >= VAD_MAX_UTTERANCE_SEC or |
| (silence_dur >= VAD_SILENCE_SEC and utt_dur >= VAD_MIN_UTTERANCE_SEC) |
| ) |
|
|
| if should_flush and session.buffer: |
| all_samples = np.concatenate(session.buffer) |
| session.buffer = [] |
| session.in_utterance = False |
| threading.Thread( |
| target=session_translate_utterance, |
| args=(session, all_samples), |
| daemon=True, |
| ).start() |
|
|
|
|
| |
| |
| |
| def live_start(source_language, target_language, qwen_voice_label): |
| """Click Start: validate keys, create session, reveal mic + drain timer.""" |
| ds_key = os.environ.get("DASHSCOPE_API_KEY", "") |
| if not ds_key: |
| return ( |
| None, |
| "**Error:** DASHSCOPE_API_KEY not set in Space secrets.", |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), |
| gr.update(active=False), "", None, |
| ) |
| tgt_engine = TARGET_LANGUAGES.get(target_language, {}).get("engine") |
| if tgt_engine == "yourvoic" and not os.environ.get("YOURVOIC_API_KEY", ""): |
| return ( |
| None, |
| f"**Error:** YOURVOIC_API_KEY required for {target_language}.", |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), |
| gr.update(active=False), "", None, |
| ) |
|
|
| session = make_session(source_language, target_language, qwen_voice_label) |
| return ( |
| session, |
| f"**Live session active** ({source_language} -> {target_language}). " |
| "Press the record button on the microphone below to begin speaking.", |
| gr.update(visible=True), |
| gr.update(visible=True), |
| gr.update(visible=False), |
| gr.update(active=True), |
| "", |
| None, |
| ) |
|
|
|
|
| def live_stop(session: Optional[LiveSession]): |
| """Click Stop: close session, hide mic, stop drain timer.""" |
| if session is not None: |
| session.closed = True |
| return ( |
| None, |
| "**Status:** session stopped. Click Start to begin a new one.", |
| gr.update(visible=False), |
| gr.update(visible=False), |
| gr.update(visible=True), |
| gr.update(active=False), |
| ) |
|
|
|
|
| def live_on_stream(audio_chunk, session: Optional[LiveSession]): |
| """Called by streaming mic for every chunk. Must return State to keep it alive.""" |
| if session is None or audio_chunk is None: |
| return session |
| try: |
| sample_rate, samples = audio_chunk |
| except (TypeError, ValueError): |
| return session |
| if samples is None: |
| return session |
| samples = np.asarray(samples) |
| if samples.size == 0: |
| return session |
| try: |
| session_process_chunk(session, sample_rate, samples) |
| except Exception as e: |
| session.error_msg = f"Stream chunk error: {e}" |
| return session |
|
|
|
|
| def live_drain(session: Optional[LiveSession]): |
| """gr.Timer tick. Only releases the next translation AFTER the current one |
| has had time to finish playing, so audio plays sequentially without cuts.""" |
| if session is None or session.closed: |
| return None, gr.update(), gr.update() |
|
|
| |
| if session.error_msg: |
| msg = session.error_msg |
| session.error_msg = "" |
| return None, gr.update(), f"**Background error:** {msg}" |
|
|
| now = time.time() |
|
|
| |
| if now < session.current_playback_ends_at: |
| remaining = session.current_playback_ends_at - now |
| qsize = session.output_queue.qsize() |
| status = f"**Status:** playing translation ({remaining:.1f}s left)" |
| if qsize > 0: |
| status += f" -- {qsize} more queued" |
| |
| return gr.update(), gr.update(), gr.update(value=status) |
|
|
| |
| try: |
| item = session.output_queue.get_nowait() |
| except queue.Empty: |
| if session.in_utterance: |
| status = "**Status:** listening (in utterance)..." |
| elif session.transcripts: |
| status = "**Status:** waiting for more speech..." |
| else: |
| status = "**Status:** waiting for speech..." |
| |
| return gr.update(), gr.update(), gr.update(value=status) |
|
|
| |
| duration = wav_duration_seconds(item["wav"]) |
| if duration <= 0: |
| duration = 3.0 |
| session.current_playback_ends_at = now + duration + session.PLAYBACK_GAP_SEC |
| session.current_playback_path = item["wav"] |
|
|
| session.transcripts.append(item["transcript"]) |
| transcript_md = "\n\n---\n\n".join(t for t in session.transcripts if t) |
| qsize = session.output_queue.qsize() |
| status = f"**Status:** playing translation ({duration:.1f}s)" |
| if qsize > 0: |
| status += f" -- {qsize} more queued" |
| return item["wav"], transcript_md, status |
|
|
|
|
| |
| |
| |
| DESCRIPTION = """ |
| # Live Football Commentary Translator |
| |
| Translate live commentary between languages. |
| |
| **Sources:** English, Scottish English, German, Spanish, Arabic |
| **Targets:** all of the above + Swahili, Amharic, Afrikaans |
| |
| Two modes -- pick a tab below: |
| - **Single clip:** record or upload one clip, get one translation. |
| - **Continuous live:** start a session, speak naturally, hear translations queued and played in order. |
| |
| Latency on free ZeroGPU: roughly 3-8 seconds per utterance. |
| """ |
|
|
|
|
| def on_target_change(target_lang_choice): |
| cfg = TARGET_LANGUAGES.get(target_lang_choice, {}) |
| if cfg.get("engine") == "qwen": |
| return gr.update(visible=True) |
| return gr.update(visible=False) |
|
|
|
|
| with gr.Blocks(title="Live Football Commentary Translator") as demo: |
| gr.Markdown(DESCRIPTION) |
|
|
| |
| with gr.Row(): |
| source_lang = gr.Dropdown( |
| choices=list(SOURCE_LANGUAGES.keys()), |
| value="English", |
| label="Source (what the commentator speaks)", |
| ) |
| target_lang = gr.Dropdown( |
| choices=list(TARGET_LANGUAGES.keys()), |
| value="Swahili", |
| label="Target (what you want to hear)", |
| ) |
| qwen_voice = gr.Dropdown( |
| choices=QWEN_VOICES, |
| value=QWEN_VOICES[0], |
| label="Voice (Qwen targets only)", |
| visible=False, |
| ) |
|
|
| |
| with gr.Tabs(): |
|
|
| |
| with gr.Tab("Single clip"): |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Tabs(): |
| with gr.Tab("Live microphone"): |
| mic_input = gr.Audio( |
| sources=["microphone"], type="filepath", |
| label="Speak your commentary (short bursts, 5-15s each)", |
| ) |
| mic_btn = gr.Button("Translate microphone clip", variant="primary") |
| with gr.Tab("Upload file"): |
| file_input = gr.Audio( |
| sources=["upload"], type="filepath", |
| label="Upload an audio clip (.wav, .mp3, .m4a, etc.)", |
| ) |
| file_btn = gr.Button("Translate uploaded clip", variant="primary") |
|
|
| with gr.Column(): |
| single_status = gr.Markdown(value="*Waiting for input...*") |
| single_audio = gr.Audio(label="Translated audio", type="filepath", autoplay=True) |
| single_transcript = gr.Textbox( |
| label="Translated text", lines=4, interactive=False, |
| ) |
|
|
| mic_btn.click( |
| fn=single_clip_translate, |
| inputs=[mic_input, source_lang, target_lang, qwen_voice], |
| outputs=[single_audio, single_status, single_transcript], |
| ) |
| file_btn.click( |
| fn=single_clip_translate, |
| inputs=[file_input, source_lang, target_lang, qwen_voice], |
| outputs=[single_audio, single_status, single_transcript], |
| ) |
|
|
| |
| with gr.Tab("Continuous live"): |
| gr.Markdown( |
| "**How it works:**\n" |
| "1. Pick source and target languages above.\n" |
| "2. Click **Start Live Translation**.\n" |
| "3. Press the record button on the microphone that appears.\n" |
| "4. Speak naturally -- translations chunk at pauses and play in order.\n" |
| "5. Click **Stop** to end the session.\n" |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| start_btn = gr.Button("Start Live Translation", variant="primary", size="lg") |
| stop_btn = gr.Button("Stop", variant="stop", visible=False) |
|
|
| live_mic = gr.Audio( |
| sources=["microphone"], |
| streaming=True, |
| type="numpy", |
| label="Live microphone (press record to begin streaming)", |
| visible=False, |
| ) |
|
|
| with gr.Column(): |
| live_status = gr.Markdown(value="*Click Start to begin.*") |
| live_audio = gr.Audio( |
| label="Translated audio (auto-plays each chunk in order)", |
| type="filepath", |
| autoplay=True, |
| ) |
| live_transcripts = gr.Markdown(value="", label="Translation log") |
|
|
| |
| live_state = gr.State(value=None) |
| drain_timer = gr.Timer(value=OUTPUT_POLL_SEC, active=False) |
|
|
| start_btn.click( |
| fn=live_start, |
| inputs=[source_lang, target_lang, qwen_voice], |
| outputs=[ |
| live_state, live_status, live_mic, stop_btn, start_btn, |
| drain_timer, live_transcripts, live_audio, |
| ], |
| ) |
|
|
| stop_btn.click( |
| fn=live_stop, |
| inputs=[live_state], |
| outputs=[ |
| live_state, live_status, live_mic, stop_btn, start_btn, drain_timer, |
| ], |
| ) |
|
|
| live_mic.stream( |
| fn=live_on_stream, |
| inputs=[live_mic, live_state], |
| outputs=[live_state], |
| show_progress="hidden", |
| ) |
|
|
| drain_timer.tick( |
| fn=live_drain, |
| inputs=[live_state], |
| outputs=[live_audio, live_transcripts, live_status], |
| show_progress="hidden", |
| ) |
|
|
| |
| target_lang.change(fn=on_target_change, inputs=target_lang, outputs=qwen_voice) |
| demo.load(fn=on_target_change, inputs=target_lang, outputs=qwen_voice) |
|
|
| gr.Markdown( |
| "---\n" |
| "**Architecture:** Qwen-Omni (`qwen3.5-omni-plus`) handles audio to speech for " |
| "English / Scottish-EN / German / Spanish / Arabic. For Swahili / Amharic / Afrikaans: " |
| "Omni translates to text, then YourVoic speaks it." |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch(ssr_mode=False, show_api=False) |
|
|