| import os |
| import re |
| import sys |
| import math |
| import torch |
| import parselmouth |
|
|
| import numba as nb |
| import numpy as np |
|
|
| from scipy.signal import medfilt |
| from librosa import yin, pyin, piptrack |
|
|
| sys.path.append(os.getcwd()) |
|
|
| from infer.lib.predictors.CREPE.filter import mean, median |
| from infer.lib.predictors.WORLD.SWIPE import swipe, stonemask |
| from infer.lib.variables import config, configs, logger, translations |
| from infer.lib.utils import autotune_f0, proposal_f0_up_key, circular_write |
|
|
| @nb.jit(nopython=True) |
| def post_process( |
| tf0, |
| f0, |
| f0_up_key, |
| manual_x_pad, |
| f0_mel_min, |
| f0_mel_max, |
| manual_f0 = None |
| ): |
| f0 *= pow(2, f0_up_key / 12) |
|
|
| if manual_f0 is not None: |
| replace_f0 = np.interp( |
| list( |
| range( |
| np.round( |
| (manual_f0[:, 0].max() - manual_f0[:, 0].min()) * tf0 + 1 |
| ).astype(np.int16) |
| ) |
| ), |
| manual_f0[:, 0] * 100, |
| manual_f0[:, 1] |
| ) |
|
|
| f0[ |
| manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0) |
| ] = replace_f0[ |
| :f0[ |
| manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0) |
| ].shape[0] |
| ] |
|
|
| f0_mel = 1127 * np.log(1 + f0 / 700) |
| f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1 |
| f0_mel[f0_mel <= 1] = 1 |
| f0_mel[f0_mel > 255] = 255 |
|
|
| return np.rint(f0_mel).astype(np.int32), f0 |
|
|
| def realtime_post_process( |
| f0, |
| pitch, |
| pitchf, |
| f0_up_key = 0, |
| f0_mel_min = 50.0, |
| f0_mel_max = 1100.0 |
| ): |
| f0 *= 2 ** (f0_up_key / 12) |
|
|
| f0_mel = 1127.0 * (1.0 + f0 / 700.0).log() |
| f0_mel = torch.clip((f0_mel - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1, 1, 255, out=f0_mel) |
| f0_coarse = torch.round(f0_mel, out=f0_mel).long() |
|
|
| if pitch is not None and pitchf is not None: |
| circular_write(f0_coarse, pitch) |
| circular_write(f0, pitchf) |
| else: |
| pitch = f0_coarse |
| pitchf = f0 |
|
|
| return pitch.unsqueeze(0), pitchf.unsqueeze(0) |
|
|
| class Generator: |
| def __init__( |
| self, |
| sample_rate = 16000, |
| hop_length = 160, |
| f0_min = 50, |
| f0_max = 1100, |
| alpha = 0.5, |
| is_half = False, |
| device = "cpu", |
| predictor_onnx = False, |
| delete_predictor_onnx = True |
| ): |
| self.sample_rate = sample_rate |
| self.hop_length = hop_length |
| self.f0_min = f0_min |
| self.f0_max = f0_max |
| self.is_half = is_half |
| self.device = device |
| self.providers = config.providers |
| self.predictor_onnx = predictor_onnx |
| self.delete_predictor_onnx = delete_predictor_onnx |
| self.window = 160 |
| self.batch_size = 512 |
| self.alpha = alpha |
| self.ref_freqs = [ |
| 49.00, |
| 51.91, |
| 55.00, |
| 58.27, |
| 61.74, |
| 65.41, |
| 69.30, |
| 73.42, |
| 77.78, |
| 82.41, |
| 87.31, |
| 92.50, |
| 98.00, |
| 103.83, |
| 110.00, |
| 116.54, |
| 123.47, |
| 130.81, |
| 138.59, |
| 146.83, |
| 155.56, |
| 164.81, |
| 174.61, |
| 185.00, |
| 196.00, |
| 207.65, |
| 220.00, |
| 233.08, |
| 246.94, |
| 261.63, |
| 277.18, |
| 293.66, |
| 311.13, |
| 329.63, |
| 349.23, |
| 369.99, |
| 392.00, |
| 415.30, |
| 440.00, |
| 466.16, |
| 493.88, |
| 523.25, |
| 554.37, |
| 587.33, |
| 622.25, |
| 659.25, |
| 698.46, |
| 739.99, |
| 783.99, |
| 830.61, |
| 880.00, |
| 932.33, |
| 987.77, |
| 1046.50 |
| ] |
|
|
| def calculator( |
| self, |
| x_pad, |
| f0_method, |
| x, |
| f0_up_key = 0, |
| p_len = None, |
| filter_radius = 3, |
| f0_autotune = False, |
| f0_autotune_strength = 1, |
| manual_f0 = None, |
| proposal_pitch = False, |
| proposal_pitch_threshold = 255.0 |
| ): |
| if p_len is None: p_len = x.shape[0] // self.window |
| if "hybrid" in f0_method: logger.debug(translations["hybrid_calc"].format(f0_method=f0_method)) |
|
|
| compute_fn = ( |
| self.get_f0_hybrid if "hybrid" in f0_method else self.compute_f0 |
| ) |
|
|
| f0 = compute_fn( |
| f0_method, |
| x, |
| p_len, |
| filter_radius if filter_radius % 2 != 0 else filter_radius + 1 |
| ) |
| |
| if proposal_pitch: |
| up_key = proposal_f0_up_key( |
| f0, |
| proposal_pitch_threshold, |
| configs["limit_f0"] |
| ) |
|
|
| logger.debug(translations["proposal_f0"].format(up_key=up_key)) |
| f0_up_key += up_key |
|
|
| if f0_autotune: |
| logger.debug(translations["startautotune"]) |
|
|
| f0 = autotune_f0( |
| self.ref_freqs, |
| f0, |
| f0_autotune_strength |
| ) |
|
|
| return post_process( |
| self.sample_rate // self.window, |
| f0, |
| f0_up_key, |
| x_pad, |
| 1127 * math.log(1 + self.f0_min / 700), |
| 1127 * math.log(1 + self.f0_max / 700), |
| manual_f0 |
| ) |
|
|
| def realtime_calculator( |
| self, |
| audio, |
| f0_method, |
| pitch, |
| pitchf, |
| f0_up_key = 0, |
| filter_radius = 3, |
| f0_autotune = False, |
| f0_autotune_strength = 1, |
| proposal_pitch = False, |
| proposal_pitch_threshold = 255.0 |
| ): |
| if torch.is_tensor(audio): audio = audio.cpu().numpy() |
| p_len = audio.shape[0] // self.window |
|
|
| f0 = self.compute_f0( |
| f0_method, |
| audio, |
| p_len, |
| filter_radius if filter_radius % 2 != 0 else filter_radius + 1 |
| ) |
|
|
| if f0_autotune: |
| f0 = autotune_f0( |
| self.ref_freqs, |
| f0, |
| f0_autotune_strength |
| ) |
|
|
| if proposal_pitch: |
| up_key = proposal_f0_up_key( |
| f0, |
| proposal_pitch_threshold, |
| configs["limit_f0"] |
| ) |
|
|
| f0_up_key += up_key |
|
|
| return realtime_post_process( |
| torch.from_numpy(f0).float().to(self.device), |
| pitch, |
| pitchf, |
| f0_up_key, |
| self.f0_min, |
| self.f0_max |
| ) |
|
|
| def _resize_f0(self, x, target_len): |
| if len(x) == target_len: return x |
|
|
| source = np.array(x) |
| source[source < 0.001] = np.nan |
|
|
| return np.nan_to_num( |
| np.interp( |
| np.arange(0, len(source) * target_len, len(source)) / target_len, |
| np.arange(0, len(source)), |
| source |
| ) |
| ) |
| |
| def compute_f0(self, f0_method, x, p_len, filter_radius): |
| if "pm" in f0_method: |
| f0 = self.get_f0_pm( |
| x, |
| p_len, |
| filter_radius=filter_radius, |
| mode=f0_method.split("-")[1] |
| ) |
| elif f0_method.split("-")[0] in ["harvest", "dio"]: |
| f0 = self.get_f0_pyworld( |
| x, |
| p_len, |
| filter_radius, |
| f0_method.split("-")[0], |
| use_stonemask="stonemask" in f0_method |
| ) |
| elif "crepe" in f0_method: |
| split_f0 = f0_method.split("-") |
| f0 = ( |
| self.get_f0_mangio_crepe( |
| x, |
| p_len, |
| split_f0[2] |
| ) |
| ) if split_f0[0] == "mangio" else ( |
| self.get_f0_crepe( |
| x, |
| p_len, |
| split_f0[1], |
| filter_radius=filter_radius |
| ) |
| ) |
| elif "fcpe" in f0_method: |
| f0 = self.get_f0_fcpe( |
| x, |
| p_len, |
| legacy="legacy" in f0_method and "previous" not in f0_method, |
| previous="previous" in f0_method, |
| filter_radius=filter_radius |
| ) |
| elif "rmvpe" in f0_method: |
| f0 = self.get_f0_rmvpe( |
| x, |
| p_len, |
| clipping="clipping" in f0_method, |
| filter_radius=filter_radius, |
| hpa="hpa" in f0_method, |
| previous="previous" in f0_method |
| ) |
| elif f0_method in ["yin", "pyin", "piptrack"]: |
| f0 = self.get_f0_librosa( |
| x, |
| p_len, |
| mode=f0_method, |
| filter_radius=filter_radius |
| ) |
| |
| elif "djcm" in f0_method: |
| f0 = self.get_f0_djcm( |
| x, |
| p_len, |
| clipping="clipping" in f0_method, |
| svs="svs" in f0_method, |
| filter_radius=filter_radius |
| ) |
| |
| else: |
| raise ValueError(translations["option_not_valid"]) |
| |
| if isinstance(f0, tuple): f0 = f0[0] |
| if "medfilt" in f0_method or "svs" in f0_method: f0 = medfilt(f0, kernel_size=5) |
|
|
| return f0 |
| |
| def get_f0_hybrid(self, methods_str, x, p_len, filter_radius): |
| methods_str = re.search(r"hybrid\[(.+)\]", methods_str) |
| if methods_str: |
| methods = [ |
| method.strip() |
| for method in methods_str.group(1).split("+") |
| ] |
|
|
| n = len(methods) |
| f0_stack = [] |
|
|
| for method in methods: |
| f0_stack.append( |
| self._resize_f0( |
| self.compute_f0( |
| method, |
| x, |
| p_len, |
| filter_radius |
| ), |
| p_len |
| ) |
| ) |
| |
| f0_mix = np.zeros(p_len) |
|
|
| if not f0_stack: return f0_mix |
| if len(f0_stack) == 1: return f0_stack[0] |
|
|
| weights = (1 - np.abs(np.arange(n) / (n - 1) - (1 - self.alpha))) ** 2 |
| weights /= weights.sum() |
|
|
| stacked = np.vstack(f0_stack) |
| voiced_mask = np.any(stacked > 0, axis=0) |
|
|
| f0_mix[voiced_mask] = np.exp( |
| np.nansum( |
| np.log(stacked + 1e-6) * weights[:, None], axis=0 |
| )[voiced_mask] |
| ) |
|
|
| return f0_mix |
|
|
| def get_f0_pm(self, x, p_len, filter_radius=3, mode="ac"): |
| time_step = self.window / self.sample_rate * 1000 / 1000 |
|
|
| pm = parselmouth.Sound( |
| x, |
| self.sample_rate |
| ) |
| pm_fn = { |
| "ac": pm.to_pitch_ac, |
| "cc": pm.to_pitch_cc, |
| "shs": pm.to_pitch_shs |
| }.get(mode, pm.to_pitch_ac) |
|
|
| pitch = ( |
| pm_fn( |
| time_step=time_step, |
| voicing_threshold=filter_radius / 10 * 2, |
| pitch_floor=self.f0_min, |
| pitch_ceiling=self.f0_max |
| ) |
| ) if mode != "shs" else ( |
| pm_fn( |
| time_step=time_step, |
| minimum_pitch=self.f0_min, |
| maximum_frequency_component=self.f0_max |
| ) |
| ) |
|
|
| f0 = pitch.selected_array["frequency"] |
| pad_size = (p_len - len(f0) + 1) // 2 |
|
|
| if pad_size > 0 or p_len - len(f0) - pad_size > 0: |
| f0 = np.pad( |
| f0, |
| [[pad_size, p_len - len(f0) - pad_size]], |
| mode="constant" |
| ) |
|
|
| return f0 |
| |
| def get_f0_mangio_crepe(self, x, p_len, model="full"): |
| if not hasattr(self, "mangio_crepe"): |
| from infer.lib.predictors.CREPE.CREPE import CREPE |
|
|
| self.mangio_crepe = CREPE( |
| os.path.join( |
| configs["predictors_path"], |
| f"crepe_{model}.{'onnx' if self.predictor_onnx else 'pth'}" |
| ), |
| model_size=model, |
| hop_length=self.hop_length, |
| batch_size=self.hop_length * 2, |
| f0_min=self.f0_min, |
| f0_max=self.f0_max, |
| device=self.device, |
| sample_rate=self.sample_rate, |
| providers=self.providers, |
| onnx=self.predictor_onnx, |
| return_periodicity=False |
| ) |
|
|
| x = x.astype(np.float32) |
| x /= np.quantile(np.abs(x), 0.999) |
|
|
| audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(dim=0) |
| if audio.ndim == 2 and audio.shape[0] > 1: audio = audio.mean(dim=0, keepdim=True).detach() |
|
|
| f0 = self.mangio_crepe.compute_f0(audio.detach(), pad=True) |
| if self.predictor_onnx and self.delete_predictor_onnx: del self.mangio_crepe.model, self.mangio_crepe |
|
|
| return self._resize_f0(f0.squeeze(0).cpu().float().numpy(), p_len) |
| |
| def get_f0_crepe(self, x, p_len, model="full", filter_radius=3): |
| if not hasattr(self, "crepe"): |
| from infer.lib.predictors.CREPE.CREPE import CREPE |
|
|
| self.crepe = CREPE( |
| os.path.join( |
| configs["predictors_path"], |
| f"crepe_{model}.{'onnx' if self.predictor_onnx else 'pth'}" |
| ), |
| model_size=model, |
| hop_length=self.window, |
| batch_size=self.batch_size, |
| f0_min=self.f0_min, |
| f0_max=self.f0_max, |
| device=self.device, |
| sample_rate=self.sample_rate, |
| providers=self.providers, |
| onnx=self.predictor_onnx, |
| return_periodicity=True |
| ) |
|
|
| f0, pd = self.crepe.compute_f0(torch.tensor(np.copy(x))[None].float(), pad=True) |
| if self.predictor_onnx and self.delete_predictor_onnx: del self.crepe.model, self.crepe |
|
|
| f0, pd = mean(f0, filter_radius), median(pd, filter_radius) |
| f0[pd < 0.1] = 0 |
|
|
| return self._resize_f0(f0[0].cpu().numpy(), p_len) |
| |
| def get_f0_fcpe(self, x, p_len, legacy=False, previous=False, filter_radius=3): |
| if not hasattr(self, "fcpe"): |
| from infer.lib.predictors.FCPE.FCPE import FCPE |
|
|
| self.fcpe = FCPE( |
| configs, |
| os.path.join( |
| configs["predictors_path"], |
| ( |
| "fcpe_legacy" |
| if legacy else |
| ("fcpe" if previous else "ddsp_200k") |
| ) + (".onnx" if self.predictor_onnx else ".pt") |
| ), |
| hop_length=self.hop_length, |
| f0_min=self.f0_min, |
| f0_max=self.f0_max, |
| dtype=torch.float32, |
| device=self.device, |
| sample_rate=self.sample_rate, |
| threshold=( |
| filter_radius / 100 |
| ) if legacy else ( |
| filter_radius / 1000 * 2 |
| ), |
| providers=self.providers, |
| onnx=self.predictor_onnx, |
| legacy=legacy |
| ) |
| |
| f0 = self.fcpe.compute_f0(x, p_len) |
| if self.predictor_onnx and self.delete_predictor_onnx: del self.fcpe.fcpe.model, self.fcpe |
|
|
| return f0 |
| |
| def get_f0_rmvpe(self, x, p_len, clipping=False, filter_radius=3, hpa=False, previous=False): |
| if not hasattr(self, "rmvpe"): |
| from infer.lib.predictors.RMVPE.RMVPE import RMVPE |
|
|
| self.rmvpe = RMVPE( |
| os.path.join( |
| configs["predictors_path"], |
| ( |
| ( |
| "hpa-rmvpe-76000" |
| if previous else |
| "hpa-rmvpe-112000" |
| ) if hpa else "rmvpe" |
| ) + (".onnx" if self.predictor_onnx else ".pt") |
| ), |
| is_half=self.is_half, |
| device=self.device, |
| onnx=self.predictor_onnx, |
| providers=self.providers, |
| hpa=hpa |
| ) |
|
|
| filter_radius = filter_radius / 100 |
|
|
| f0 = ( |
| self.rmvpe.infer_from_audio_with_pitch( |
| x, |
| thred=filter_radius, |
| f0_min=self.f0_min, |
| f0_max=self.f0_max |
| ) |
| ) if clipping else ( |
| self.rmvpe.infer_from_audio( |
| x, |
| thred=filter_radius |
| ) |
| ) |
| |
| if self.predictor_onnx and self.delete_predictor_onnx: del self.rmvpe.model, self.rmvpe |
| return self._resize_f0(f0, p_len) |
| |
| |
| def get_f0_librosa(self, x, p_len, mode="yin", filter_radius=3): |
| if mode != "piptrack": |
| self.if_yin = mode == "yin" |
| self.yin = yin if self.if_yin else pyin |
|
|
| f0 = self.yin( |
| x.astype(np.float32), |
| sr=self.sample_rate, |
| fmin=self.f0_min, |
| fmax=self.f0_max, |
| hop_length=self.hop_length |
| ) |
|
|
| if not self.if_yin: f0 = f0[0] |
| else: |
| pitches, magnitudes = piptrack( |
| y=x.astype(np.float32), |
| sr=self.sample_rate, |
| fmin=self.f0_min, |
| fmax=self.f0_max, |
| hop_length=self.hop_length, |
| threshold=filter_radius / 10 |
| ) |
|
|
| max_indexes = np.argmax(magnitudes, axis=0) |
| f0 = pitches[max_indexes, range(magnitudes.shape[1])] |
|
|
| return self._resize_f0(f0, p_len) |
|
|
| |
| |
| def get_f0_djcm(self, x, p_len, clipping=False, svs=False, filter_radius=3): |
| if not hasattr(self, "djcm"): |
| from main.library.predictors.DJCM.DJCM import DJCM |
| |
| self.djcm = DJCM( |
| os.path.join( |
| configs["predictors_path"], |
| ( |
| "djcm-svs" |
| if svs else |
| "djcm" |
| ) + (".onnx" if self.predictor_onnx else ".pt") |
| ), |
| is_half=self.is_half, |
| device=self.device, |
| onnx=self.predictor_onnx, |
| svs=svs, |
| providers=self.providers |
| ) |
|
|
| filter_radius /= 10 |
|
|
| f0 = ( |
| self.djcm.infer_from_audio_with_pitch( |
| x, |
| thred=filter_radius, |
| f0_min=self.f0_min, |
| f0_max=self.f0_max |
| ) |
| ) if clipping else ( |
| self.djcm.infer_from_audio( |
| x, |
| thred=filter_radius |
| ) |
| ) |
| |
| if self.predictor_onnx and self.delete_predictor_onnx: del self.djcm.model, self.djcm |
| return self._resize_f0(f0, p_len) |
| |
| |
| |