import os import subprocess import sys import spaces # Disable torch.compile / dynamo before any torch import os.environ["TORCH_COMPILE_DISABLE"] = "1" os.environ["TORCHDYNAMO_DISABLE"] = "1" # Install xformers for memory-efficient attention subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False) # Install video preprocessing dependencies subprocess.run([sys.executable, "-m", "pip", "install", "dwpose", "onnxruntime-gpu", "imageio[ffmpeg]", "scikit-image", "opencv-python-headless"], check=False) # Reinstall torchaudio to match the torch CUDA version on this space. # controlnet_aux or other deps can pull in a CPU-only torchaudio that conflicts # with the pre-installed CUDA torch, causing "undefined symbol" errors. _tv = subprocess.run([sys.executable, "-c", "import torch; print(torch.__version__)"], capture_output=True, text=True) if _tv.returncode == 0: _full_ver = _tv.stdout.strip() # Extract CUDA suffix if present (e.g. "2.7.0+cu124" -> "cu124") _cuda_suffix = _full_ver.split("+")[-1] if "+" in _full_ver else "cu124" _base_ver = _full_ver.split("+")[0] print(f"Detected torch {_full_ver}, reinstalling matching torchaudio...") subprocess.run([ sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", f"torchaudio=={_base_ver}", "--index-url", f"https://download.pytorch.org/whl/{_cuda_suffix}", ], check=False) # Clone LTX-2 repo and install packages LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git" LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2") if not os.path.exists(LTX_REPO_DIR): print(f"Cloning {LTX_REPO_URL}...") subprocess.run(["git", "clone", "--depth", "1", LTX_REPO_URL, LTX_REPO_DIR], check=True) print("Installing ltx-core and ltx-pipelines from cloned repo...") subprocess.run( [sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-core"), "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")], check=True, ) sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src")) sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src")) import logging import random import tempfile from pathlib import Path import torch torch._dynamo.config.suppress_errors = True torch._dynamo.config.disable = True import gradio as gr import numpy as np from huggingface_hub import hf_hub_download, snapshot_download from safetensors import safe_open from ltx_core.components.diffusion_steps import EulerDiffusionStep from ltx_core.components.noisers import GaussianNoiser from ltx_core.conditioning import ( ConditioningItem, ConditioningItemAttentionStrengthWrapper, VideoConditionByReferenceLatent, ) from ltx_core.loader import LoraPathStrengthAndSDOps, LTXV_LORA_COMFY_RENAMING_MAP from ltx_core.model.audio_vae import decode_audio as vae_decode_audio from ltx_core.model.audio_vae import encode_audio as vae_encode_audio from ltx_core.model.upsampler import upsample_video from ltx_core.model.video_vae import TilingConfig, VideoEncoder, get_video_chunks_number from ltx_core.model.video_vae import decode_video as vae_decode_video from ltx_core.quantization import QuantizationPolicy from ltx_core.types import Audio, AudioLatentShape, LatentState, VideoLatentShape, VideoPixelShape from ltx_pipelines.utils import ModelLedger, euler_denoising_loop from ltx_pipelines.utils.args import ImageConditioningInput from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES from ltx_pipelines.utils.helpers import ( assert_resolution, cleanup_memory, combined_image_conditionings, denoise_audio_video, denoise_video_only, encode_prompts, get_device, simple_denoising_func, ) from ltx_pipelines.utils.media_io import ( decode_audio_from_file, encode_video, load_video_conditioning, ) from ltx_pipelines.utils.types import PipelineComponents # Force-patch xformers attention into the LTX attention module. from ltx_core.model.transformer import attention as _attn_mod print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") try: from xformers.ops import memory_efficient_attention as _mea _attn_mod.memory_efficient_attention = _mea print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") except Exception as e: print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}") logging.getLogger().setLevel(logging.INFO) # ───────────────────────────────────────────────────────────────────────────── # Video Preprocessing: Strip appearance, keep structure # ───────────────────────────────────────────────────────────────────────────── import imageio import cv2 from PIL import Image from dwpose import DwposeDetector _pose_processor = None _depth_processor = None def _get_pose_processor(): global _pose_processor if _pose_processor is None: _pose_processor = DwposeDetector.from_pretrained_default() print("[Preprocess] DWPose processor loaded") return _pose_processor def _get_depth_processor(): """Placeholder — uses simple Laplacian edge-based depth approximation via OpenCV.""" global _depth_processor if _depth_processor is None: _depth_processor = "cv2" # sentinel — we use cv2 directly print("[Preprocess] CV2-based depth processor loaded") return _depth_processor def load_video_frames(video_path: str) -> list[np.ndarray]: """Load video frames as list of HWC uint8 numpy arrays.""" frames = [] with imageio.get_reader(video_path) as reader: for frame in reader: frames.append(frame) return frames def write_video_mp4(frames_float_01: list[np.ndarray], fps: float, out_path: str) -> str: """Write float [0,1] frames to mp4.""" frames_uint8 = [(f * 255).astype(np.uint8) for f in frames_float_01] with imageio.get_writer(out_path, fps=fps, macro_block_size=1) as writer: for fr in frames_uint8: writer.append_data(fr) return out_path def extract_first_frame(video_path: str) -> str: """Extract first frame as a temp PNG file, return path.""" frames = load_video_frames(video_path) if not frames: raise ValueError("No frames in video") out_path = tempfile.mktemp(suffix=".png") Image.fromarray(frames[0]).save(out_path) return out_path def preprocess_video_pose(frames: list[np.ndarray], width: int, height: int) -> list[np.ndarray]: """Extract DWPose skeletons from each frame. Returns float [0,1] frames.""" processor = _get_pose_processor() result = [] for frame in frames: pil = Image.fromarray(frame.astype(np.uint8)).convert("RGB") pose_img = processor(pil, include_body=True, include_hand=True, include_face=True) if not isinstance(pose_img, Image.Image): pose_img = Image.fromarray(np.array(pose_img).astype(np.uint8)) pose_img = pose_img.convert("RGB").resize((width, height), Image.BILINEAR) result.append(np.array(pose_img).astype(np.float32) / 255.0) return result def preprocess_video_canny(frames: list[np.ndarray], width: int, height: int, low_threshold: int = 50, high_threshold: int = 100) -> list[np.ndarray]: """Extract Canny edges from each frame. Returns float [0,1] frames.""" result = [] for frame in frames: # Resize first resized = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA) gray = cv2.cvtColor(resized, cv2.COLOR_RGB2GRAY) edges = cv2.Canny(gray, low_threshold, high_threshold) # Convert single-channel to 3-channel edges_3ch = np.stack([edges, edges, edges], axis=-1) result.append(edges_3ch.astype(np.float32) / 255.0) return result def preprocess_video_depth(frames: list[np.ndarray], width: int, height: int) -> list[np.ndarray]: """Estimate depth-like maps from each frame using Laplacian gradient magnitude. This is a fast approximation — for true depth, use MiDaS externally.""" result = [] for frame in frames: resized = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA) gray = cv2.cvtColor(resized, cv2.COLOR_RGB2GRAY).astype(np.float32) # Laplacian gives edge/gradient info that approximates depth discontinuities lap = np.abs(cv2.Laplacian(gray, cv2.CV_32F, ksize=5)) # Normalize to [0, 1] lap = lap / (lap.max() + 1e-8) depth_3ch = np.stack([lap, lap, lap], axis=-1) result.append(depth_3ch) return result def preprocess_conditioning_video( video_path: str, mode: str, width: int, height: int, num_frames: int, fps: float, ) -> tuple[str, str]: """ Preprocess a video for conditioning. Strips appearance, keeps structure. Returns: (conditioning_mp4_path, first_frame_png_path) """ frames = load_video_frames(video_path) if not frames: raise ValueError("No frames decoded from video") # Trim to num_frames frames = frames[:num_frames] # Save first frame (original appearance) for image conditioning first_png = tempfile.mktemp(suffix=".png") Image.fromarray(frames[0]).save(first_png) # Process based on mode if mode == "Pose (DWPose)": processed = preprocess_video_pose(frames, width, height) elif mode == "Canny Edge": processed = preprocess_video_canny(frames, width, height) elif mode == "Depth (Laplacian)": processed = preprocess_video_depth(frames, width, height) else: # "Raw" mode — no preprocessing processed = [f.astype(np.float32) / 255.0 for f in frames] cond_mp4 = tempfile.mktemp(suffix=".mp4") write_video_mp4(processed, fps=fps, out_path=cond_mp4) return cond_mp4, first_png # ───────────────────────────────────────────────────────────────────────────── # Helper: read reference downscale factor from IC-LoRA metadata # ───────────────────────────────────────────────────────────────────────────── def _read_lora_reference_downscale_factor(lora_path: str) -> int: try: with safe_open(lora_path, framework="pt") as f: metadata = f.metadata() or {} return int(metadata.get("reference_downscale_factor", 1)) except Exception as e: logging.warning(f"Failed to read metadata from LoRA file '{lora_path}': {e}") return 1 # ───────────────────────────────────────────────────────────────────────────── # Unified Pipeline: Distilled + Audio + IC-LoRA Video-to-Video # ───────────────────────────────────────────────────────────────────────────── class LTX23UnifiedPipeline: """ Unified LTX-2.3 pipeline supporting all generation modes: • Text-to-Video • Image-to-Video (first-frame conditioning) • Audio-to-Video (lip-sync / BGM conditioning with external audio) • Video-to-Video (IC-LoRA reference video conditioning) • Any combination of the above Architecture: - stage_1_model_ledger: transformer WITH IC-LoRA fused (used for Stage 1) - stage_2_model_ledger: transformer WITHOUT IC-LoRA (used for Stage 2 upsampling) - When no IC-LoRA is provided, both stages use the same base model. """ def __init__( self, distilled_checkpoint_path: str, spatial_upsampler_path: str, gemma_root: str, ic_loras: list[LoraPathStrengthAndSDOps] | None = None, device: torch.device | None = None, quantization: QuantizationPolicy | None = None, reference_downscale_factor: int | None = None, ): self.device = device or get_device() self.dtype = torch.bfloat16 ic_loras = ic_loras or [] self.has_ic_lora = len(ic_loras) > 0 # Stage 1: transformer with IC-LoRA (if provided) self.stage_1_model_ledger = ModelLedger( dtype=self.dtype, device=self.device, checkpoint_path=distilled_checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root_path=gemma_root, loras=ic_loras, quantization=quantization, ) if self.has_ic_lora: # Stage 2 needs a separate transformer WITHOUT IC-LoRA self.stage_2_model_ledger = ModelLedger( dtype=self.dtype, device=self.device, checkpoint_path=distilled_checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root_path=gemma_root, loras=[], quantization=quantization, ) else: # No IC-LoRA: share a single ledger for both stages (saves ~half VRAM) self.stage_2_model_ledger = self.stage_1_model_ledger self.pipeline_components = PipelineComponents( dtype=self.dtype, device=self.device, ) # Reference downscale factor: explicit value takes priority, # otherwise read from IC-LoRA metadata, otherwise default to 1. if reference_downscale_factor is not None: self.reference_downscale_factor = reference_downscale_factor else: self.reference_downscale_factor = 1 for lora in ic_loras: scale = _read_lora_reference_downscale_factor(lora.path) if scale != 1: if self.reference_downscale_factor not in (1, scale): raise ValueError( f"Conflicting reference_downscale_factor: " f"already {self.reference_downscale_factor}, got {scale}" ) self.reference_downscale_factor = scale logging.info(f"[Pipeline] reference_downscale_factor={self.reference_downscale_factor}") # ── Video reference conditioning (from ICLoraPipeline) ─────────────── def _create_ic_conditionings( self, video_conditioning: list[tuple[str, float]], height: int, width: int, num_frames: int, video_encoder: VideoEncoder, conditioning_strength: float = 1.0, ) -> list[ConditioningItem]: """Create IC-LoRA video reference conditioning items.""" conditionings: list[ConditioningItem] = [] scale = self.reference_downscale_factor ref_height = height // scale ref_width = width // scale for video_path, strength in video_conditioning: video = load_video_conditioning( video_path=video_path, height=ref_height, width=ref_width, frame_cap=num_frames, dtype=self.dtype, device=self.device, ) encoded_video = video_encoder(video) cond = VideoConditionByReferenceLatent( latent=encoded_video, downscale_factor=scale, strength=strength, ) if conditioning_strength < 1.0: cond = ConditioningItemAttentionStrengthWrapper( cond, attention_mask=conditioning_strength ) conditionings.append(cond) if conditionings: logging.info(f"[IC-LoRA] Added {len(conditionings)} video conditioning(s)") return conditionings # ── Main generation entry point ────────────────────────────────────── def __call__( self, prompt: str, seed: int, height: int, width: int, num_frames: int, frame_rate: float, images: list[ImageConditioningInput], audio_path: str | None = None, video_conditioning: list[tuple[str, float]] | None = None, tiling_config: TilingConfig | None = None, enhance_prompt: bool = False, conditioning_strength: float = 1.0, ): """ Generate video with any combination of conditioning. Args: audio_path: Path to external audio file for lipsync/BGM conditioning. video_conditioning: List of (path, strength) tuples for IC-LoRA V2V. conditioning_strength: Scale for IC-LoRA attention influence [0, 1]. Returns: Tuple of (decoded_video_iterator, Audio). """ assert_resolution(height=height, width=width, is_two_stage=True) has_audio = audio_path is not None has_video_cond = bool(video_conditioning) generator = torch.Generator(device=self.device).manual_seed(seed) noiser = GaussianNoiser(generator=generator) stepper = EulerDiffusionStep() dtype = torch.bfloat16 # ── Encode text prompt ─────────────────────────────────────────── # Use stage_1 ledger for prompt encoding (has text encoder) (ctx_p,) = encode_prompts( [prompt], self.stage_1_model_ledger, enhance_first_prompt=enhance_prompt, enhance_prompt_image=images[0].path if len(images) > 0 else None, ) video_context, audio_context = ctx_p.video_encoding, ctx_p.audio_encoding # ── Encode external audio (if provided) ───────────────────────── encoded_audio_latent = None decoded_audio_for_output = None if has_audio: video_duration = num_frames / frame_rate decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration) if decoded_audio is None: raise ValueError(f"Could not extract audio stream from {audio_path}") encoded_audio_latent = vae_encode_audio( decoded_audio, self.stage_1_model_ledger.audio_encoder() ) audio_shape = AudioLatentShape.from_duration( batch=1, duration=video_duration, channels=8, mel_bins=16 ) expected_frames = audio_shape.frames actual_frames = encoded_audio_latent.shape[2] if actual_frames > expected_frames: encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :] elif actual_frames < expected_frames: pad = torch.zeros( encoded_audio_latent.shape[0], encoded_audio_latent.shape[1], expected_frames - actual_frames, encoded_audio_latent.shape[3], device=encoded_audio_latent.device, dtype=encoded_audio_latent.dtype, ) encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2) decoded_audio_for_output = Audio( waveform=decoded_audio.waveform.squeeze(0), sampling_rate=decoded_audio.sampling_rate, ) # ── Build conditionings for Stage 1 ────────────────────────────── # Use stage_1 video encoder (has IC-LoRA context) video_encoder = self.stage_1_model_ledger.video_encoder() stage_1_output_shape = VideoPixelShape( batch=1, frames=num_frames, width=width // 2, height=height // 2, fps=frame_rate, ) # Image conditionings stage_1_conditionings = combined_image_conditionings( images=images, height=stage_1_output_shape.height, width=stage_1_output_shape.width, video_encoder=video_encoder, dtype=dtype, device=self.device, ) # IC-LoRA video reference conditionings if has_video_cond: ic_conds = self._create_ic_conditionings( video_conditioning=video_conditioning, height=stage_1_output_shape.height, width=stage_1_output_shape.width, num_frames=num_frames, video_encoder=video_encoder, conditioning_strength=conditioning_strength, ) stage_1_conditionings.extend(ic_conds) # ── Stage 1: Low-res generation ────────────────────────────────── transformer = self.stage_1_model_ledger.transformer() stage_1_sigmas = torch.Tensor(DISTILLED_SIGMA_VALUES).to(self.device) def denoising_loop(sigmas, video_state, audio_state, stepper): return euler_denoising_loop( sigmas=sigmas, video_state=video_state, audio_state=audio_state, stepper=stepper, denoise_fn=simple_denoising_func( video_context=video_context, audio_context=audio_context, transformer=transformer, ), ) if has_audio: # Audio mode: denoise video only, use external audio latent video_state = denoise_video_only( output_shape=stage_1_output_shape, conditionings=stage_1_conditionings, noiser=noiser, sigmas=stage_1_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop, components=self.pipeline_components, dtype=dtype, device=self.device, initial_audio_latent=encoded_audio_latent, ) audio_state = None # we'll use the original audio for output else: # Standard / IC-only mode: denoise both audio and video video_state, audio_state = denoise_audio_video( output_shape=stage_1_output_shape, conditionings=stage_1_conditionings, noiser=noiser, sigmas=stage_1_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop, components=self.pipeline_components, dtype=dtype, device=self.device, ) torch.cuda.synchronize() cleanup_memory() # ── Stage 2: Upsample + Refine ────────────────────────────────── upscaled_video_latent = upsample_video( latent=video_state.latent[:1], video_encoder=video_encoder, upsampler=self.stage_2_model_ledger.spatial_upsampler(), ) torch.cuda.synchronize() cleanup_memory() # Stage 2 uses the transformer WITHOUT IC-LoRA transformer_s2 = self.stage_2_model_ledger.transformer() stage_2_sigmas = torch.Tensor(STAGE_2_DISTILLED_SIGMA_VALUES).to(self.device) def denoising_loop_s2(sigmas, video_state, audio_state, stepper): return euler_denoising_loop( sigmas=sigmas, video_state=video_state, audio_state=audio_state, stepper=stepper, denoise_fn=simple_denoising_func( video_context=video_context, audio_context=audio_context, transformer=transformer_s2, ), ) stage_2_output_shape = VideoPixelShape( batch=1, frames=num_frames, width=width, height=height, fps=frame_rate, ) stage_2_conditionings = combined_image_conditionings( images=images, height=stage_2_output_shape.height, width=stage_2_output_shape.width, video_encoder=video_encoder, dtype=dtype, device=self.device, ) if has_audio: video_state = denoise_video_only( output_shape=stage_2_output_shape, conditionings=stage_2_conditionings, noiser=noiser, sigmas=stage_2_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop_s2, components=self.pipeline_components, dtype=dtype, device=self.device, noise_scale=stage_2_sigmas[0], initial_video_latent=upscaled_video_latent, initial_audio_latent=encoded_audio_latent, ) audio_state = None else: video_state, audio_state = denoise_audio_video( output_shape=stage_2_output_shape, conditionings=stage_2_conditionings, noiser=noiser, sigmas=stage_2_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop_s2, components=self.pipeline_components, dtype=dtype, device=self.device, noise_scale=stage_2_sigmas[0], initial_video_latent=upscaled_video_latent, initial_audio_latent=audio_state.latent, ) torch.cuda.synchronize() del transformer, transformer_s2, video_encoder cleanup_memory() # ── Decode ─────────────────────────────────────────────────────── decoded_video = vae_decode_video( video_state.latent, self.stage_2_model_ledger.video_decoder(), tiling_config, generator, ) if has_audio: output_audio = decoded_audio_for_output else: output_audio = vae_decode_audio( audio_state.latent, self.stage_2_model_ledger.audio_decoder(), self.stage_2_model_ledger.vocoder(), ) return decoded_video, output_audio # ───────────────────────────────────────────────────────────────────────────── # Constants # ───────────────────────────────────────────────────────────────────────────── MAX_SEED = np.iinfo(np.int32).max DEFAULT_PROMPT = ( "An astronaut hatches from a fragile egg on the surface of the Moon, " "the shell cracking and peeling apart in gentle low-gravity motion." ) DEFAULT_FRAME_RATE = 24.0 RESOLUTIONS = { "high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)}, "low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)}, } # Available IC-LoRA models IC_LORA_OPTIONS = { "Union Control (Depth + Edge)": { "repo": "Lightricks/LTX-2.3-22b-IC-LoRA-Union-Control", "filename": "ltx-2.3-22b-ic-lora-union-control-ref0.5.safetensors", }, "Motion Track Control": { "repo": "Lightricks/LTX-2.3-22b-IC-LoRA-Motion-Track-Control", "filename": "ltx-2.3-22b-ic-lora-motion-track-control-ref0.5.safetensors", }, } DEFAULT_IC_LORA = "Union Control (Depth + Edge)" # ───────────────────────────────────────────────────────────────────────────── # Download Models # ───────────────────────────────────────────────────────────────────────────── LTX_MODEL_REPO = "Lightricks/LTX-2.3" CHECKPOINT_PATH = "linoyts/ltx-2.3-22b-fused-union-control" #ltx 2.3 with fused union control lora because it breaks on quantization otherwise GEMMA_REPO = "google/gemma-3-12b-it-qat-q4_0-unquantized" print("=" * 80) print("Downloading LTX-2.3 distilled model + Gemma + IC-LoRA...") print("=" * 80) checkpoint_path = hf_hub_download( # repo_id=LTX_MODEL_REPO, filename="ltx-2.3-22b-distilled.safetensors" repo_id=CHECKPOINT_PATH, filename="ltx-2.3-22b-fused-union-control.safetensors" ) spatial_upsampler_path = hf_hub_download( repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors" ) gemma_root = snapshot_download(repo_id=GEMMA_REPO) # Download default IC-LoRA default_lora_info = IC_LORA_OPTIONS[DEFAULT_IC_LORA] default_ic_lora_path = hf_hub_download( repo_id=default_lora_info["repo"], filename=default_lora_info["filename"] ) print(f"Checkpoint: {checkpoint_path}") print(f"Spatial upsampler: {spatial_upsampler_path}") print(f"Gemma root: {gemma_root}") print(f"IC-LoRA: {default_ic_lora_path}") # ───────────────────────────────────────────────────────────────────────────── # Initialize Pipeline # ───────────────────────────────────────────────────────────────────────────── ic_loras = [ LoraPathStrengthAndSDOps(default_ic_lora_path, 1.0, LTXV_LORA_COMFY_RENAMING_MAP) ] pipeline = LTX23UnifiedPipeline( distilled_checkpoint_path=checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root=gemma_root, # ic_loras=ic_loras, # LoRA already fused into checkpoint quantization=QuantizationPolicy.fp8_cast(), # Union Control IC-LoRA was trained with reference videos at half resolution. # Set explicitly so it works both with separate LoRA and fused checkpoints. reference_downscale_factor=2, ) # Preload all models for ZeroGPU tensor packing. print("Preloading all models (including Gemma, Audio encoders)...") # Shared ledger: preload once. Separate ledgers (IC-LoRA): preload both. _ledger_1 = pipeline.stage_1_model_ledger _ledger_2 = pipeline.stage_2_model_ledger _shared = _ledger_1 is _ledger_2 # Stage 1 models (with IC-LoRA if loaded) _s1_transformer = _ledger_1.transformer() _s1_video_encoder = _ledger_1.video_encoder() _s1_text_encoder = _ledger_1.text_encoder() _s1_embeddings = _ledger_1.gemma_embeddings_processor() _s1_audio_encoder = _ledger_1.audio_encoder() _ledger_1.transformer = lambda: _s1_transformer _ledger_1.video_encoder = lambda: _s1_video_encoder _ledger_1.text_encoder = lambda: _s1_text_encoder _ledger_1.gemma_embeddings_processor = lambda: _s1_embeddings _ledger_1.audio_encoder = lambda: _s1_audio_encoder if _shared: # Single ledger — also preload decoder/upsampler/vocoder on the same object _video_decoder = _ledger_1.video_decoder() _audio_decoder = _ledger_1.audio_decoder() _vocoder = _ledger_1.vocoder() _spatial_upsampler = _ledger_1.spatial_upsampler() _ledger_1.video_decoder = lambda: _video_decoder _ledger_1.audio_decoder = lambda: _audio_decoder _ledger_1.vocoder = lambda: _vocoder _ledger_1.spatial_upsampler = lambda: _spatial_upsampler print(" (single shared ledger — no IC-LoRA)") else: # Stage 2 models (separate transformer without IC-LoRA) _s2_transformer = _ledger_2.transformer() _s2_video_encoder = _ledger_2.video_encoder() _s2_video_decoder = _ledger_2.video_decoder() _s2_audio_decoder = _ledger_2.audio_decoder() _s2_vocoder = _ledger_2.vocoder() _s2_spatial_upsampler = _ledger_2.spatial_upsampler() _s2_text_encoder = _ledger_2.text_encoder() _s2_embeddings = _ledger_2.gemma_embeddings_processor() _s2_audio_encoder = _ledger_2.audio_encoder() _ledger_2.transformer = lambda: _s2_transformer _ledger_2.video_encoder = lambda: _s2_video_encoder _ledger_2.video_decoder = lambda: _s2_video_decoder _ledger_2.audio_decoder = lambda: _s2_audio_decoder _ledger_2.vocoder = lambda: _s2_vocoder _ledger_2.spatial_upsampler = lambda: _s2_spatial_upsampler _ledger_2.text_encoder = lambda: _s2_text_encoder _ledger_2.gemma_embeddings_processor = lambda: _s2_embeddings _ledger_2.audio_encoder = lambda: _s2_audio_encoder print(" (two separate ledgers — IC-LoRA active)") print("All models preloaded!") print("=" * 80) # ───────────────────────────────────────────────────────────────────────────── # UI Helpers # ───────────────────────────────────────────────────────────────────────────── def detect_aspect_ratio(media_path) -> str: """Detect the closest aspect ratio from an image or video.""" if media_path is None: return "16:9" ext = str(media_path).lower().rsplit(".", 1)[-1] if "." in str(media_path) else "" # Try as image first if ext in ("jpg", "jpeg", "png", "bmp", "webp", "gif", "tiff"): import PIL.Image try: with PIL.Image.open(media_path) as img: w, h = img.size except Exception: return "16:9" else: # Try as video try: import av with av.open(str(media_path)) as container: stream = container.streams.video[0] w, h = stream.codec_context.width, stream.codec_context.height except Exception: # Fallback: try as image anyway import PIL.Image try: with PIL.Image.open(media_path) as img: w, h = img.size except Exception: return "16:9" ratio = w / h candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0} return min(candidates, key=lambda k: abs(ratio - candidates[k])) def on_image_upload(image, video, high_res): """Auto-set resolution when image is uploaded.""" media = image if image is not None else video aspect = detect_aspect_ratio(media) tier = "high" if high_res else "low" w, h = RESOLUTIONS[tier][aspect] return gr.update(value=w), gr.update(value=h) def on_video_upload(video, image, high_res): """Auto-set resolution when video is uploaded.""" media = video if video is not None else image aspect = detect_aspect_ratio(media) tier = "high" if high_res else "low" w, h = RESOLUTIONS[tier][aspect] return gr.update(value=w), gr.update(value=h) def on_highres_toggle(image, video, high_res): """Update resolution when high-res toggle changes.""" media = image if image is not None else video aspect = detect_aspect_ratio(media) tier = "high" if high_res else "low" w, h = RESOLUTIONS[tier][aspect] return gr.update(value=w), gr.update(value=h) # ───────────────────────────────────────────────────────────────────────────── # Generation # ───────────────────────────────────────────────────────────────────────────── def _extract_audio_from_video(video_path: str) -> str | None: """Extract audio from video as a temp WAV file. Returns None if no audio.""" out_path = tempfile.mktemp(suffix=".wav") try: # Check if video has an audio stream probe = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "default=nw=1:nk=1", video_path], capture_output=True, text=True, ) if not probe.stdout.strip(): return None # Extract audio subprocess.run( ["ffmpeg", "-y", "-v", "error", "-i", video_path, "-vn", "-ac", "2", "-ar", "48000", "-c:a", "pcm_s16le", out_path], check=True, ) return out_path except (subprocess.CalledProcessError, FileNotFoundError): return None @spaces.GPU(duration=180) @torch.inference_mode() def generate_video( input_image, input_video, input_audio, prompt: str, duration: float, conditioning_strength: float, video_preprocess: str, enhance_prompt: bool, use_video_audio: bool, seed: int, randomize_seed: bool, height: int, width: int, progress=gr.Progress(track_tqdm=True), ): try: torch.cuda.reset_peak_memory_stats() current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) frame_rate = DEFAULT_FRAME_RATE num_frames = int(duration * frame_rate) + 1 num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1 mode_parts = [] if input_image is not None: mode_parts.append("Image") if input_video is not None: mode_parts.append(f"Video({video_preprocess})") if input_audio is not None: mode_parts.append("Audio") if not mode_parts: mode_parts.append("Text") mode_str = " + ".join(mode_parts) print(f"[{mode_str}] Generating: {height}x{width}, {num_frames} frames " f"({duration}s), seed={current_seed}") # Build image conditionings images = [] if input_image is not None: images = [ImageConditioningInput(path=str(input_image), frame_idx=0, strength=1.0)] # Build video conditionings — preprocess to strip appearance video_conditioning = None if input_video is not None: video_path = str(input_video) if video_preprocess != "Raw (no preprocessing)": print(f"[Preprocess] Running {video_preprocess} on input video...") cond_mp4, first_frame_png = preprocess_conditioning_video( video_path=video_path, mode=video_preprocess, width=int(width) // 2, # Stage 1 operates at half res height=int(height) // 2, num_frames=num_frames, fps=frame_rate, ) video_conditioning = [(cond_mp4, 1.0)] # If no image was provided, use the video's first frame # (original appearance) as the image conditioning if input_image is None: images = [ImageConditioningInput( path=first_frame_png, frame_idx=0, strength=1.0, )] print(f"[Preprocess] Using video first frame as image conditioning") else: # Raw mode — pass video as-is video_conditioning = [(video_path, 1.0)] # If no audio was provided, optionally extract audio from the video if input_audio is None and use_video_audio: extracted_audio = _extract_audio_from_video(video_path) if extracted_audio is not None: input_audio = extracted_audio print(f"[Preprocess] Extracted audio from input video") tiling_config = TilingConfig.default() video_chunks_number = get_video_chunks_number(num_frames, tiling_config) video, audio = pipeline( prompt=prompt, seed=current_seed, height=int(height), width=int(width), num_frames=num_frames, frame_rate=frame_rate, images=images, audio_path=input_audio, video_conditioning=video_conditioning, tiling_config=tiling_config, enhance_prompt=enhance_prompt, conditioning_strength=conditioning_strength, ) output_path = tempfile.mktemp(suffix=".mp4") encode_video( video=video, fps=frame_rate, audio=audio, output_path=output_path, video_chunks_number=video_chunks_number, ) return str(output_path), current_seed except Exception as e: import traceback print(f"Error: {str(e)}\n{traceback.format_exc()}") return None, current_seed # ───────────────────────────────────────────────────────────────────────────── # Gradio UI # ───────────────────────────────────────────────────────────────────────────── with gr.Blocks(title="LTX-2.3 Unified: V2V + I2V + A2V") as demo: gr.Markdown("# LTX-2.3 Unified: Video/Image/Audio → Video") gr.Markdown( "Unified pipeline for **video-to-video** (IC-LoRA), **image-to-video**, " "and **audio-conditioned** generation with LTX-2.3 — use any combination of inputs. " "[[model]](https://huggingface.co/Lightricks/LTX-2.3) " "[[code]](https://github.com/Lightricks/LTX-2)" ) with gr.Row(): with gr.Column(): # All three inputs visible at once with gr.Row(): input_image = gr.Image( label="🖼️ Input Image (I2V — first frame)", type="filepath", ) with gr.Column(): input_video = gr.Video( label="🎬 Reference Video (V2V)", sources=["upload"], ) video_preprocess = gr.Dropdown( label="Video Preprocessing", choices=[ "Pose (DWPose)", "Canny Edge", "Depth (Laplacian)", "Raw (no preprocessing)", ], value="Pose (DWPose)", info="Strips appearance from video → style comes from image/prompt instead", ) input_audio = gr.Audio( label="🔊 Input Audio (A2V — lipsync / BGM)", type="filepath", ) prompt = gr.Textbox( label="Prompt", info="Describe the desired output — be as detailed as possible", value="Make this come alive with cinematic motion, smooth animation", lines=3, placeholder="Describe the motion, style, and content you want...", ) with gr.Row(): duration = gr.Slider( label="Duration (seconds)", minimum=1.0, maximum=10.0, value=3.0, step=0.1, ) conditioning_strength = gr.Slider( label="V2V Conditioning Strength", info="How closely to follow the reference video", minimum=0.0, maximum=1.0, value=1.0, step=0.05, ) with gr.Row(): enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=True) high_res = gr.Checkbox(label="High Resolution", value=True) use_video_audio = gr.Checkbox( label="Use Audio from Video", value=True, info="Extract and use the audio track from the reference video", ) generate_btn = gr.Button("Generate Video", variant="primary", size="lg") with gr.Accordion("Advanced Settings", open=False): seed = gr.Slider( label="Seed", minimum=0, maximum=MAX_SEED, value=42, step=1, ) randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) with gr.Row(): width = gr.Number(label="Width", value=1536, precision=0) height = gr.Number(label="Height", value=1024, precision=0) with gr.Column(): output_video = gr.Video(label="Generated Video", autoplay=True) # ── Event handlers ─────────────────────────────────────────────────── input_image.change( fn=on_image_upload, inputs=[input_image, input_video, high_res], outputs=[width, height], ) input_video.change( fn=on_video_upload, inputs=[input_video, input_image, high_res], outputs=[width, height], ) high_res.change( fn=on_highres_toggle, inputs=[input_image, input_video, high_res], outputs=[width, height], ) generate_btn.click( fn=generate_video, inputs=[ input_image, input_video, input_audio, prompt, duration, conditioning_strength, video_preprocess, enhance_prompt, use_video_audio, seed, randomize_seed, height, width, ], outputs=[output_video, seed], ) if __name__ == "__main__": demo.launch(theme=gr.themes.Citrus())