Spaces:
Running
Running
import subprocess, torch, os, traceback, sys, warnings, shutil, numpy as np | |
from mega import Mega | |
os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1" | |
import threading | |
from time import sleep | |
from subprocess import Popen | |
import faiss | |
from random import shuffle | |
import json, datetime, requests | |
from gtts import gTTS | |
now_dir = os.getcwd() | |
sys.path.append(now_dir) | |
tmp = os.path.join(now_dir, "TEMP") | |
shutil.rmtree(tmp, ignore_errors=True) | |
shutil.rmtree("%s/runtime/Lib/site-packages/infer_pack" % (now_dir), ignore_errors=True) | |
os.makedirs(tmp, exist_ok=True) | |
os.makedirs(os.path.join(now_dir, "logs"), exist_ok=True) | |
os.makedirs(os.path.join(now_dir, "weights"), exist_ok=True) | |
os.environ["TEMP"] = tmp | |
warnings.filterwarnings("ignore") | |
torch.manual_seed(114514) | |
from i18n import I18nAuto | |
import signal | |
import math | |
from utils import load_audio, CSVutil | |
global DoFormant, Quefrency, Timbre | |
if not os.path.isdir('csvdb/'): | |
os.makedirs('csvdb') | |
frmnt, stp = open("csvdb/formanting.csv", 'w'), open("csvdb/stop.csv", 'w') | |
frmnt.close() | |
stp.close() | |
try: | |
DoFormant, Quefrency, Timbre = CSVutil('csvdb/formanting.csv', 'r', 'formanting') | |
DoFormant = ( | |
lambda DoFormant: True if DoFormant.lower() == 'true' else ( | |
False if DoFormant.lower() == 'false' else DoFormant) | |
)(DoFormant) | |
except (ValueError, TypeError, IndexError): | |
DoFormant, Quefrency, Timbre = False, 1.0, 1.0 | |
CSVutil('csvdb/formanting.csv', 'w+', 'formanting', DoFormant, Quefrency, Timbre) | |
def download_models(): | |
# Download hubert base model if not present | |
if not os.path.isfile('./hubert_base.pt'): | |
response = requests.get('https://huggingface.co/lj1995/VoiceConversionWebUI/resolve/main/hubert_base.pt') | |
if response.status_code == 200: | |
with open('./hubert_base.pt', 'wb') as f: | |
f.write(response.content) | |
print("Downloaded hubert base model file successfully. File saved to ./hubert_base.pt.") | |
else: | |
raise Exception( | |
"Failed to download hubert base model file. Status code: " + str(response.status_code) + ".") | |
# Download rmvpe model if not present | |
if not os.path.isfile('./rmvpe.pt'): | |
response = requests.get( | |
'https://drive.usercontent.google.com/download?id=1Hkn4kNuVFRCNQwyxQFRtmzmMBGpQxptI&export=download&authuser=0&confirm=t&uuid=0b3a40de-465b-4c65-8c41-135b0b45c3f7&at=APZUnTV3lA3LnyTbeuduura6Dmi2:1693724254058') | |
if response.status_code == 200: | |
with open('./rmvpe.pt', 'wb') as f: | |
f.write(response.content) | |
print("Downloaded rmvpe model file successfully. File saved to ./rmvpe.pt.") | |
else: | |
raise Exception("Failed to download rmvpe model file. Status code: " + str(response.status_code) + ".") | |
download_models() | |
print("\n-------------------------------\nRVC v2 Easy GUI (Local Edition)\n-------------------------------\n") | |
def formant_apply(qfrency, tmbre): | |
Quefrency = qfrency | |
Timbre = tmbre | |
DoFormant = True | |
CSVutil('csvdb/formanting.csv', 'w+', 'formanting', DoFormant, qfrency, tmbre) | |
return ({"value": Quefrency, "__type__": "update"}, {"value": Timbre, "__type__": "update"}) | |
def get_fshift_presets(): | |
fshift_presets_list = [] | |
for dirpath, _, filenames in os.walk("./formantshiftcfg/"): | |
for filename in filenames: | |
if filename.endswith(".txt"): | |
fshift_presets_list.append(os.path.join(dirpath, filename).replace('\\', '/')) | |
if len(fshift_presets_list) > 0: | |
return fshift_presets_list | |
else: | |
return '' | |
def formant_enabled(cbox, qfrency, tmbre, frmntapply, formantpreset, formant_refresh_button): | |
if (cbox): | |
DoFormant = True | |
CSVutil('csvdb/formanting.csv', 'w+', 'formanting', DoFormant, qfrency, tmbre) | |
# print(f"is checked? - {cbox}\ngot {DoFormant}") | |
return ( | |
{"value": True, "__type__": "update"}, | |
{"visible": True, "__type__": "update"}, | |
{"visible": True, "__type__": "update"}, | |
{"visible": True, "__type__": "update"}, | |
{"visible": True, "__type__": "update"}, | |
{"visible": True, "__type__": "update"}, | |
) | |
else: | |
DoFormant = False | |
CSVutil('csvdb/formanting.csv', 'w+', 'formanting', DoFormant, qfrency, tmbre) | |
# print(f"is checked? - {cbox}\ngot {DoFormant}") | |
return ( | |
{"value": False, "__type__": "update"}, | |
{"visible": False, "__type__": "update"}, | |
{"visible": False, "__type__": "update"}, | |
{"visible": False, "__type__": "update"}, | |
{"visible": False, "__type__": "update"}, | |
{"visible": False, "__type__": "update"}, | |
{"visible": False, "__type__": "update"}, | |
) | |
def preset_apply(preset, qfer, tmbr): | |
if str(preset) != '': | |
with open(str(preset), 'r') as p: | |
content = p.readlines() | |
qfer, tmbr = content[0].split('\n')[0], content[1] | |
formant_apply(qfer, tmbr) | |
else: | |
pass | |
return ({"value": qfer, "__type__": "update"}, {"value": tmbr, "__type__": "update"}) | |
def update_fshift_presets(preset, qfrency, tmbre): | |
qfrency, tmbre = preset_apply(preset, qfrency, tmbre) | |
if (str(preset) != ''): | |
with open(str(preset), 'r') as p: | |
content = p.readlines() | |
qfrency, tmbre = content[0].split('\n')[0], content[1] | |
formant_apply(qfrency, tmbre) | |
else: | |
pass | |
return ( | |
{"choices": get_fshift_presets(), "__type__": "update"}, | |
{"value": qfrency, "__type__": "update"}, | |
{"value": tmbre, "__type__": "update"}, | |
) | |
i18n = I18nAuto() | |
# i18n.print() | |
# 判断是否有能用来训练和加速推理的N卡 | |
ngpu = torch.cuda.device_count() | |
gpu_infos = [] | |
mem = [] | |
if (not torch.cuda.is_available()) or ngpu == 0: | |
if_gpu_ok = False | |
else: | |
if_gpu_ok = False | |
for i in range(ngpu): | |
gpu_name = torch.cuda.get_device_name(i) | |
if ( | |
"10" in gpu_name | |
or "16" in gpu_name | |
or "20" in gpu_name | |
or "30" in gpu_name | |
or "40" in gpu_name | |
or "A2" in gpu_name.upper() | |
or "A3" in gpu_name.upper() | |
or "A4" in gpu_name.upper() | |
or "P4" in gpu_name.upper() | |
or "A50" in gpu_name.upper() | |
or "A60" in gpu_name.upper() | |
or "70" in gpu_name | |
or "80" in gpu_name | |
or "90" in gpu_name | |
or "M4" in gpu_name.upper() | |
or "T4" in gpu_name.upper() | |
or "TITAN" in gpu_name.upper() | |
): # A10#A100#V100#A40#P40#M40#K80#A4500 | |
if_gpu_ok = True # 至少有一张能用的N卡 | |
gpu_infos.append("%s\t%s" % (i, gpu_name)) | |
mem.append( | |
int( | |
torch.cuda.get_device_properties(i).total_memory | |
/ 1024 | |
/ 1024 | |
/ 1024 | |
+ 0.4 | |
) | |
) | |
if if_gpu_ok == True and len(gpu_infos) > 0: | |
gpu_info = "\n".join(gpu_infos) | |
default_batch_size = min(mem) // 2 | |
else: | |
gpu_info = i18n("Unfortunately, you do not have a working graphics card to support your training") | |
default_batch_size = 1 | |
gpus = "-".join([i[0] for i in gpu_infos]) | |
from lib.infer_pack.models import ( | |
SynthesizerTrnMs256NSFsid, | |
SynthesizerTrnMs256NSFsid_nono, | |
SynthesizerTrnMs768NSFsid, | |
SynthesizerTrnMs768NSFsid_nono, | |
) | |
import soundfile as sf | |
from fairseq import checkpoint_utils | |
import gradio as gr | |
import logging | |
from vc_infer_pipeline import VC | |
from config import Config | |
config = Config() | |
# from trainset_preprocess_pipeline import PreProcess | |
logging.getLogger("numba").setLevel(logging.WARNING) | |
hubert_model = None | |
def load_hubert(): | |
global hubert_model | |
models, _, _ = checkpoint_utils.load_model_ensemble_and_task( | |
["hubert_base.pt"], | |
suffix="", | |
) | |
hubert_model = models[0] | |
hubert_model = hubert_model.to(config.device) | |
if config.is_half: | |
hubert_model = hubert_model.half() | |
else: | |
hubert_model = hubert_model.float() | |
hubert_model.eval() | |
weight_root = "weights" | |
index_root = "logs" | |
names = [] | |
for name in os.listdir(weight_root): | |
if name.endswith(".pth"): | |
names.append(name) | |
index_paths = [] | |
for root, dirs, files in os.walk(index_root, topdown=False): | |
for name in files: | |
if name.endswith(".index") and "trained" not in name: | |
index_paths.append("%s/%s" % (root, name)) | |
def vc_single( | |
sid, | |
input_audio_path, | |
f0_up_key, | |
f0_file, | |
f0_method, | |
file_index, | |
# file_index2, | |
# file_big_npy, | |
index_rate, | |
filter_radius, | |
resample_sr, | |
rms_mix_rate, | |
protect, | |
crepe_hop_length, | |
): # spk_item, input_audio0, vc_transform0,f0_file,f0method0 | |
global tgt_sr, net_g, vc, hubert_model, version | |
if input_audio_path is None: | |
return "You need to upload an audio", None | |
f0_up_key = int(f0_up_key) | |
try: | |
audio = load_audio(input_audio_path, 16000, DoFormant, Quefrency, Timbre) | |
audio_max = np.abs(audio).max() / 0.95 | |
if audio_max > 1: | |
audio /= audio_max | |
times = [0, 0, 0] | |
if hubert_model == None: | |
load_hubert() | |
if_f0 = cpt.get("f0", 1) | |
file_index = ( | |
( | |
file_index.strip(" ") | |
.strip('"') | |
.strip("\n") | |
.strip('"') | |
.strip(" ") | |
.replace("trained", "added") | |
) | |
) # 防止小白写错,自动帮他替换掉 | |
# file_big_npy = ( | |
# file_big_npy.strip(" ").strip('"').strip("\n").strip('"').strip(" ") | |
# ) | |
audio_opt = vc.pipeline( | |
hubert_model, | |
net_g, | |
sid, | |
audio, | |
input_audio_path, | |
times, | |
f0_up_key, | |
f0_method, | |
file_index, | |
# file_big_npy, | |
index_rate, | |
if_f0, | |
filter_radius, | |
tgt_sr, | |
resample_sr, | |
rms_mix_rate, | |
version, | |
protect, | |
crepe_hop_length, | |
f0_file=f0_file, | |
) | |
if resample_sr >= 16000 and tgt_sr != resample_sr: | |
tgt_sr = resample_sr | |
index_info = ( | |
"Using index:%s." % file_index | |
if os.path.exists(file_index) | |
else "Index not used." | |
) | |
return "Success.\n %s\nTime:\n npy:%ss, f0:%ss, infer:%ss" % ( | |
index_info, | |
times[0], | |
times[1], | |
times[2], | |
), (tgt_sr, audio_opt) | |
except: | |
info = traceback.format_exc() | |
print(info) | |
return info, (None, None) | |
def vc_multi( | |
sid, | |
dir_path, | |
opt_root, | |
paths, | |
f0_up_key, | |
f0_method, | |
file_index, | |
file_index2, | |
# file_big_npy, | |
index_rate, | |
filter_radius, | |
resample_sr, | |
rms_mix_rate, | |
protect, | |
format1, | |
crepe_hop_length, | |
): | |
try: | |
dir_path = ( | |
dir_path.strip(" ").strip('"').strip("\n").strip('"').strip(" ") | |
) # 防止小白拷路径头尾带了空格和"和回车 | |
opt_root = opt_root.strip(" ").strip('"').strip("\n").strip('"').strip(" ") | |
os.makedirs(opt_root, exist_ok=True) | |
try: | |
if dir_path != "": | |
paths = [os.path.join(dir_path, name) for name in os.listdir(dir_path)] | |
else: | |
paths = [path.name for path in paths] | |
except: | |
traceback.print_exc() | |
paths = [path.name for path in paths] | |
infos = [] | |
for path in paths: | |
info, opt = vc_single( | |
sid, | |
path, | |
f0_up_key, | |
None, | |
f0_method, | |
file_index, | |
# file_big_npy, | |
index_rate, | |
filter_radius, | |
resample_sr, | |
rms_mix_rate, | |
protect, | |
crepe_hop_length | |
) | |
if "Success" in info: | |
try: | |
tgt_sr, audio_opt = opt | |
if format1 in ["wav", "flac"]: | |
sf.write( | |
"%s/%s.%s" % (opt_root, os.path.basename(path), format1), | |
audio_opt, | |
tgt_sr, | |
) | |
else: | |
path = "%s/%s.wav" % (opt_root, os.path.basename(path)) | |
sf.write( | |
path, | |
audio_opt, | |
tgt_sr, | |
) | |
if os.path.exists(path): | |
os.system( | |
"ffmpeg -i %s -vn %s -q:a 2 -y" | |
% (path, path[:-4] + ".%s" % format1) | |
) | |
except: | |
info += traceback.format_exc() | |
infos.append("%s->%s" % (os.path.basename(path), info)) | |
yield "\n".join(infos) | |
yield "\n".join(infos) | |
except: | |
yield traceback.format_exc() | |
# 一个选项卡全局只能有一个音色 | |
def get_vc(sid): | |
global n_spk, tgt_sr, net_g, vc, cpt, version | |
if sid == "" or sid == []: | |
global hubert_model | |
if hubert_model != None: # 考虑到轮询, 需要加个判断看是否 sid 是由有模型切换到无模型的 | |
print("clean_empty_cache") | |
del net_g, n_spk, vc, hubert_model, tgt_sr # ,cpt | |
hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
###楼下不这么折腾清理不干净 | |
if_f0 = cpt.get("f0", 1) | |
version = cpt.get("version", "v1") | |
if version == "v1": | |
if if_f0 == 1: | |
net_g = SynthesizerTrnMs256NSFsid( | |
*cpt["config"], is_half=config.is_half | |
) | |
else: | |
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
elif version == "v2": | |
if if_f0 == 1: | |
net_g = SynthesizerTrnMs768NSFsid( | |
*cpt["config"], is_half=config.is_half | |
) | |
else: | |
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
del net_g, cpt | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
cpt = None | |
return {"visible": False, "__type__": "update"} | |
person = "%s/%s" % (weight_root, sid) | |
print("loading %s" % person) | |
cpt = torch.load(person, map_location="cpu") | |
tgt_sr = cpt["config"][-1] | |
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk | |
if_f0 = cpt.get("f0", 1) | |
version = cpt.get("version", "v1") | |
if version == "v1": | |
if if_f0 == 1: | |
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) | |
else: | |
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
elif version == "v2": | |
if if_f0 == 1: | |
net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) | |
else: | |
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
del net_g.enc_q | |
print(net_g.load_state_dict(cpt["weight"], strict=False)) | |
net_g.eval().to(config.device) | |
if config.is_half: | |
net_g = net_g.half() | |
else: | |
net_g = net_g.float() | |
vc = VC(tgt_sr, config) | |
n_spk = cpt["config"][-3] | |
return {"visible": False, "maximum": n_spk, "__type__": "update"} | |
def change_choices(): | |
names = [] | |
for name in os.listdir(weight_root): | |
if name.endswith(".pth"): | |
names.append(name) | |
index_paths = [] | |
for root, dirs, files in os.walk(index_root, topdown=False): | |
for name in files: | |
if name.endswith(".index") and "trained" not in name: | |
index_paths.append("%s/%s" % (root, name)) | |
return {"choices": sorted(names), "__type__": "update"}, { | |
"choices": sorted(index_paths), | |
"__type__": "update", | |
} | |
def clean(): | |
return {"value": "", "__type__": "update"} | |
sr_dict = { | |
"32k": 32000, | |
"40k": 40000, | |
"48k": 48000, | |
} | |
def if_done(done, p): | |
while 1: | |
if p.poll() == None: | |
sleep(0.5) | |
else: | |
break | |
done[0] = True | |
def if_done_multi(done, ps): | |
while 1: | |
# poll==None代表进程未结束 | |
# 只要有一个进程未结束都不停 | |
flag = 1 | |
for p in ps: | |
if p.poll() == None: | |
flag = 0 | |
sleep(0.5) | |
break | |
if flag == 1: | |
break | |
done[0] = True | |
def preprocess_dataset(trainset_dir, exp_dir, sr, n_p): | |
sr = sr_dict[sr] | |
os.makedirs("%s/logs/%s" % (now_dir, exp_dir), exist_ok=True) | |
f = open("%s/logs/%s/preprocess.log" % (now_dir, exp_dir), "w") | |
f.close() | |
cmd = ( | |
config.python_cmd | |
+ " trainset_preprocess_pipeline_print.py %s %s %s %s/logs/%s " | |
% (trainset_dir, sr, n_p, now_dir, exp_dir) | |
+ str(config.noparallel) | |
) | |
print(cmd) | |
p = Popen(cmd, shell=True) # , stdin=PIPE, stdout=PIPE,stderr=PIPE,cwd=now_dir | |
###煞笔gr, popen read都非得全跑完了再一次性读取, 不用gr就正常读一句输出一句;只能额外弄出一个文本流定时读 | |
done = [False] | |
threading.Thread( | |
target=if_done, | |
args=( | |
done, | |
p, | |
), | |
).start() | |
while 1: | |
with open("%s/logs/%s/preprocess.log" % (now_dir, exp_dir), "r") as f: | |
yield (f.read()) | |
sleep(1) | |
if done[0] == True: | |
break | |
with open("%s/logs/%s/preprocess.log" % (now_dir, exp_dir), "r") as f: | |
log = f.read() | |
print(log) | |
yield log | |
# but2.click(extract_f0,[gpus6,np7,f0method8,if_f0_3,trainset_dir4],[info2]) | |
def extract_f0_feature(gpus, n_p, f0method, if_f0, exp_dir, version19, echl): | |
gpus = gpus.split("-") | |
os.makedirs("%s/logs/%s" % (now_dir, exp_dir), exist_ok=True) | |
f = open("%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "w") | |
f.close() | |
if if_f0: | |
cmd = config.python_cmd + " extract_f0_print.py %s/logs/%s %s %s %s" % ( | |
now_dir, | |
exp_dir, | |
n_p, | |
f0method, | |
echl, | |
) | |
print(cmd) | |
p = Popen(cmd, shell=True, cwd=now_dir) # , stdin=PIPE, stdout=PIPE,stderr=PIPE | |
###煞笔gr, popen read都非得全跑完了再一次性读取, 不用gr就正常读一句输出一句;只能额外弄出一个文本流定时读 | |
done = [False] | |
threading.Thread( | |
target=if_done, | |
args=( | |
done, | |
p, | |
), | |
).start() | |
while 1: | |
with open( | |
"%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "r" | |
) as f: | |
yield (f.read()) | |
sleep(1) | |
if done[0] == True: | |
break | |
with open("%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "r") as f: | |
log = f.read() | |
print(log) | |
yield log | |
####对不同part分别开多进程 | |
""" | |
n_part=int(sys.argv[1]) | |
i_part=int(sys.argv[2]) | |
i_gpu=sys.argv[3] | |
exp_dir=sys.argv[4] | |
os.environ["CUDA_VISIBLE_DEVICES"]=str(i_gpu) | |
""" | |
leng = len(gpus) | |
ps = [] | |
for idx, n_g in enumerate(gpus): | |
cmd = ( | |
config.python_cmd | |
+ " extract_feature_print.py %s %s %s %s %s/logs/%s %s" | |
% ( | |
config.device, | |
leng, | |
idx, | |
n_g, | |
now_dir, | |
exp_dir, | |
version19, | |
) | |
) | |
print(cmd) | |
p = Popen( | |
cmd, shell=True, cwd=now_dir | |
) # , shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=now_dir | |
ps.append(p) | |
###煞笔gr, popen read都非得全跑完了再一次性读取, 不用gr就正常读一句输出一句;只能额外弄出一个文本流定时读 | |
done = [False] | |
threading.Thread( | |
target=if_done_multi, | |
args=( | |
done, | |
ps, | |
), | |
).start() | |
while 1: | |
with open("%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "r") as f: | |
yield (f.read()) | |
sleep(1) | |
if done[0] == True: | |
break | |
with open("%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "r") as f: | |
log = f.read() | |
print(log) | |
yield log | |
def change_sr2(sr2, if_f0_3, version19): | |
path_str = "" if version19 == "v1" else "_v2" | |
f0_str = "f0" if if_f0_3 else "" | |
if_pretrained_generator_exist = os.access("pretrained%s/%sG%s.pth" % (path_str, f0_str, sr2), os.F_OK) | |
if_pretrained_discriminator_exist = os.access("pretrained%s/%sD%s.pth" % (path_str, f0_str, sr2), os.F_OK) | |
if (if_pretrained_generator_exist == False): | |
print("pretrained%s/%sG%s.pth" % (path_str, f0_str, sr2), "not exist, will not use pretrained model") | |
if (if_pretrained_discriminator_exist == False): | |
print("pretrained%s/%sD%s.pth" % (path_str, f0_str, sr2), "not exist, will not use pretrained model") | |
return ( | |
("pretrained%s/%sG%s.pth" % (path_str, f0_str, sr2)) if if_pretrained_generator_exist else "", | |
("pretrained%s/%sD%s.pth" % (path_str, f0_str, sr2)) if if_pretrained_discriminator_exist else "", | |
{"visible": True, "__type__": "update"} | |
) | |
def change_version19(sr2, if_f0_3, version19): | |
path_str = "" if version19 == "v1" else "_v2" | |
f0_str = "f0" if if_f0_3 else "" | |
if_pretrained_generator_exist = os.access("pretrained%s/%sG%s.pth" % (path_str, f0_str, sr2), os.F_OK) | |
if_pretrained_discriminator_exist = os.access("pretrained%s/%sD%s.pth" % (path_str, f0_str, sr2), os.F_OK) | |
if (if_pretrained_generator_exist == False): | |
print("pretrained%s/%sG%s.pth" % (path_str, f0_str, sr2), "not exist, will not use pretrained model") | |
if (if_pretrained_discriminator_exist == False): | |
print("pretrained%s/%sD%s.pth" % (path_str, f0_str, sr2), "not exist, will not use pretrained model") | |
return ( | |
("pretrained%s/%sG%s.pth" % (path_str, f0_str, sr2)) if if_pretrained_generator_exist else "", | |
("pretrained%s/%sD%s.pth" % (path_str, f0_str, sr2)) if if_pretrained_discriminator_exist else "", | |
) | |
def change_f0(if_f0_3, sr2, version19): # f0method8,pretrained_G14,pretrained_D15 | |
path_str = "" if version19 == "v1" else "_v2" | |
if_pretrained_generator_exist = os.access("pretrained%s/f0G%s.pth" % (path_str, sr2), os.F_OK) | |
if_pretrained_discriminator_exist = os.access("pretrained%s/f0D%s.pth" % (path_str, sr2), os.F_OK) | |
if (if_pretrained_generator_exist == False): | |
print("pretrained%s/f0G%s.pth" % (path_str, sr2), "not exist, will not use pretrained model") | |
if (if_pretrained_discriminator_exist == False): | |
print("pretrained%s/f0D%s.pth" % (path_str, sr2), "not exist, will not use pretrained model") | |
if if_f0_3: | |
return ( | |
{"visible": True, "__type__": "update"}, | |
"pretrained%s/f0G%s.pth" % (path_str, sr2) if if_pretrained_generator_exist else "", | |
"pretrained%s/f0D%s.pth" % (path_str, sr2) if if_pretrained_discriminator_exist else "", | |
) | |
return ( | |
{"visible": False, "__type__": "update"}, | |
("pretrained%s/G%s.pth" % (path_str, sr2)) if if_pretrained_generator_exist else "", | |
("pretrained%s/D%s.pth" % (path_str, sr2)) if if_pretrained_discriminator_exist else "", | |
) | |
global log_interval | |
def set_log_interval(exp_dir, batch_size12): | |
log_interval = 1 | |
folder_path = os.path.join(exp_dir, "1_16k_wavs") | |
if os.path.exists(folder_path) and os.path.isdir(folder_path): | |
wav_files = [f for f in os.listdir(folder_path) if f.endswith(".wav")] | |
if wav_files: | |
sample_size = len(wav_files) | |
log_interval = math.ceil(sample_size / batch_size12) | |
if log_interval > 1: | |
log_interval += 1 | |
return log_interval | |
# but3.click(click_train,[exp_dir1,sr2,if_f0_3,save_epoch10,total_epoch11,batch_size12,if_save_latest13,pretrained_G14,pretrained_D15,gpus16]) | |
def click_train( | |
exp_dir1, | |
sr2, | |
if_f0_3, | |
spk_id5, | |
save_epoch10, | |
total_epoch11, | |
batch_size12, | |
if_save_latest13, | |
pretrained_G14, | |
pretrained_D15, | |
gpus16, | |
if_cache_gpu17, | |
if_save_every_weights18, | |
version19, | |
): | |
CSVutil('csvdb/stop.csv', 'w+', 'formanting', False) | |
# 生成filelist | |
exp_dir = "%s/logs/%s" % (now_dir, exp_dir1) | |
os.makedirs(exp_dir, exist_ok=True) | |
gt_wavs_dir = "%s/0_gt_wavs" % (exp_dir) | |
feature_dir = ( | |
"%s/3_feature256" % (exp_dir) | |
if version19 == "v1" | |
else "%s/3_feature768" % (exp_dir) | |
) | |
log_interval = set_log_interval(exp_dir, batch_size12) | |
if if_f0_3: | |
f0_dir = "%s/2a_f0" % (exp_dir) | |
f0nsf_dir = "%s/2b-f0nsf" % (exp_dir) | |
names = ( | |
set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(feature_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(f0_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(f0nsf_dir)]) | |
) | |
else: | |
names = set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) & set( | |
[name.split(".")[0] for name in os.listdir(feature_dir)] | |
) | |
opt = [] | |
for name in names: | |
if if_f0_3: | |
opt.append( | |
"%s/%s.wav|%s/%s.npy|%s/%s.wav.npy|%s/%s.wav.npy|%s" | |
% ( | |
gt_wavs_dir.replace("\\", "\\\\"), | |
name, | |
feature_dir.replace("\\", "\\\\"), | |
name, | |
f0_dir.replace("\\", "\\\\"), | |
name, | |
f0nsf_dir.replace("\\", "\\\\"), | |
name, | |
spk_id5, | |
) | |
) | |
else: | |
opt.append( | |
"%s/%s.wav|%s/%s.npy|%s" | |
% ( | |
gt_wavs_dir.replace("\\", "\\\\"), | |
name, | |
feature_dir.replace("\\", "\\\\"), | |
name, | |
spk_id5, | |
) | |
) | |
fea_dim = 256 if version19 == "v1" else 768 | |
if if_f0_3: | |
for _ in range(2): | |
opt.append( | |
"%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s/logs/mute/2a_f0/mute.wav.npy|%s/logs/mute/2b-f0nsf/mute.wav.npy|%s" | |
% (now_dir, sr2, now_dir, fea_dim, now_dir, now_dir, spk_id5) | |
) | |
else: | |
for _ in range(2): | |
opt.append( | |
"%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s" | |
% (now_dir, sr2, now_dir, fea_dim, spk_id5) | |
) | |
shuffle(opt) | |
with open("%s/filelist.txt" % exp_dir, "w") as f: | |
f.write("\n".join(opt)) | |
print("write filelist done") | |
# 生成config#无需生成config | |
# cmd = python_cmd + " train_nsf_sim_cache_sid_load_pretrain.py -e mi-test -sr 40k -f0 1 -bs 4 -g 0 -te 10 -se 5 -pg pretrained/f0G40k.pth -pd pretrained/f0D40k.pth -l 1 -c 0" | |
print("use gpus:", gpus16) | |
if pretrained_G14 == "": | |
print("no pretrained Generator") | |
if pretrained_D15 == "": | |
print("no pretrained Discriminator") | |
if gpus16: | |
cmd = ( | |
config.python_cmd | |
+ " train_nsf_sim_cache_sid_load_pretrain.py -e %s -sr %s -f0 %s -bs %s -g %s -te %s -se %s %s %s -l %s -c %s -sw %s -v %s -li %s" | |
% ( | |
exp_dir1, | |
sr2, | |
1 if if_f0_3 else 0, | |
batch_size12, | |
gpus16, | |
total_epoch11, | |
save_epoch10, | |
("-pg %s" % pretrained_G14) if pretrained_G14 != "" else "", | |
("-pd %s" % pretrained_D15) if pretrained_D15 != "" else "", | |
1 if if_save_latest13 == True else 0, | |
1 if if_cache_gpu17 == True else 0, | |
1 if if_save_every_weights18 == True else 0, | |
version19, | |
log_interval, | |
) | |
) | |
else: | |
cmd = ( | |
config.python_cmd | |
+ " train_nsf_sim_cache_sid_load_pretrain.py -e %s -sr %s -f0 %s -bs %s -te %s -se %s %s %s -l %s -c %s -sw %s -v %s -li %s" | |
% ( | |
exp_dir1, | |
sr2, | |
1 if if_f0_3 else 0, | |
batch_size12, | |
total_epoch11, | |
save_epoch10, | |
("-pg %s" % pretrained_G14) if pretrained_G14 != "" else "\b", | |
("-pd %s" % pretrained_D15) if pretrained_D15 != "" else "\b", | |
1 if if_save_latest13 == True else 0, | |
1 if if_cache_gpu17 == True else 0, | |
1 if if_save_every_weights18 == True else 0, | |
version19, | |
log_interval, | |
) | |
) | |
print(cmd) | |
p = Popen(cmd, shell=True, cwd=now_dir) | |
global PID | |
PID = p.pid | |
p.wait() | |
return ("训练结束, 您可查看控制台训练日志或实验文件夹下的train.log", {"visible": False, "__type__": "update"}, | |
{"visible": True, "__type__": "update"}) | |
# but4.click(train_index, [exp_dir1], info3) | |
def train_index(exp_dir1, version19): | |
exp_dir = "%s/logs/%s" % (now_dir, exp_dir1) | |
os.makedirs(exp_dir, exist_ok=True) | |
feature_dir = ( | |
"%s/3_feature256" % (exp_dir) | |
if version19 == "v1" | |
else "%s/3_feature768" % (exp_dir) | |
) | |
if os.path.exists(feature_dir) == False: | |
return "请先进行特征提取!" | |
listdir_res = list(os.listdir(feature_dir)) | |
if len(listdir_res) == 0: | |
return "请先进行特征提取!" | |
npys = [] | |
for name in sorted(listdir_res): | |
phone = np.load("%s/%s" % (feature_dir, name)) | |
npys.append(phone) | |
big_npy = np.concatenate(npys, 0) | |
big_npy_idx = np.arange(big_npy.shape[0]) | |
np.random.shuffle(big_npy_idx) | |
big_npy = big_npy[big_npy_idx] | |
np.save("%s/total_fea.npy" % exp_dir, big_npy) | |
# n_ivf = big_npy.shape[0] // 39 | |
n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39) | |
infos = [] | |
infos.append("%s,%s" % (big_npy.shape, n_ivf)) | |
yield "\n".join(infos) | |
index = faiss.index_factory(256 if version19 == "v1" else 768, "IVF%s,Flat" % n_ivf) | |
# index = faiss.index_factory(256if version19=="v1"else 768, "IVF%s,PQ128x4fs,RFlat"%n_ivf) | |
infos.append("training") | |
yield "\n".join(infos) | |
index_ivf = faiss.extract_index_ivf(index) # | |
index_ivf.nprobe = 1 | |
index.train(big_npy) | |
faiss.write_index( | |
index, | |
"%s/trained_IVF%s_Flat_nprobe_%s_%s_%s.index" | |
% (exp_dir, n_ivf, index_ivf.nprobe, exp_dir1, version19), | |
) | |
# faiss.write_index(index, '%s/trained_IVF%s_Flat_FastScan_%s.index'%(exp_dir,n_ivf,version19)) | |
infos.append("adding") | |
yield "\n".join(infos) | |
batch_size_add = 8192 | |
for i in range(0, big_npy.shape[0], batch_size_add): | |
index.add(big_npy[i: i + batch_size_add]) | |
faiss.write_index( | |
index, | |
"%s/added_IVF%s_Flat_nprobe_%s_%s_%s.index" | |
% (exp_dir, n_ivf, index_ivf.nprobe, exp_dir1, version19), | |
) | |
infos.append( | |
"成功构建索引,added_IVF%s_Flat_nprobe_%s_%s_%s.index" | |
% (n_ivf, index_ivf.nprobe, exp_dir1, version19) | |
) | |
# faiss.write_index(index, '%s/added_IVF%s_Flat_FastScan_%s.index'%(exp_dir,n_ivf,version19)) | |
# infos.append("成功构建索引,added_IVF%s_Flat_FastScan_%s.index"%(n_ivf,version19)) | |
yield "\n".join(infos) | |
# but5.click(train1key, [exp_dir1, sr2, if_f0_3, trainset_dir4, spk_id5, gpus6, np7, f0method8, save_epoch10, total_epoch11, batch_size12, if_save_latest13, pretrained_G14, pretrained_D15, gpus16, if_cache_gpu17], info3) | |
def train1key( | |
exp_dir1, | |
sr2, | |
if_f0_3, | |
trainset_dir4, | |
spk_id5, | |
np7, | |
f0method8, | |
save_epoch10, | |
total_epoch11, | |
batch_size12, | |
if_save_latest13, | |
pretrained_G14, | |
pretrained_D15, | |
gpus16, | |
if_cache_gpu17, | |
if_save_every_weights18, | |
version19, | |
echl | |
): | |
infos = [] | |
def get_info_str(strr): | |
infos.append(strr) | |
return "\n".join(infos) | |
model_log_dir = "%s/logs/%s" % (now_dir, exp_dir1) | |
preprocess_log_path = "%s/preprocess.log" % model_log_dir | |
extract_f0_feature_log_path = "%s/extract_f0_feature.log" % model_log_dir | |
gt_wavs_dir = "%s/0_gt_wavs" % model_log_dir | |
feature_dir = ( | |
"%s/3_feature256" % model_log_dir | |
if version19 == "v1" | |
else "%s/3_feature768" % model_log_dir | |
) | |
os.makedirs(model_log_dir, exist_ok=True) | |
#########step1:处理数据 | |
open(preprocess_log_path, "w").close() | |
cmd = ( | |
config.python_cmd | |
+ " trainset_preprocess_pipeline_print.py %s %s %s %s " | |
% (trainset_dir4, sr_dict[sr2], np7, model_log_dir) | |
+ str(config.noparallel) | |
) | |
yield get_info_str(i18n("step1:processing data")) | |
yield get_info_str(cmd) | |
p = Popen(cmd, shell=True) | |
p.wait() | |
with open(preprocess_log_path, "r") as f: | |
print(f.read()) | |
#########step2a:提取音高 | |
open(extract_f0_feature_log_path, "w") | |
if if_f0_3: | |
yield get_info_str("step2a:正在提取音高") | |
cmd = config.python_cmd + " extract_f0_print.py %s %s %s %s" % ( | |
model_log_dir, | |
np7, | |
f0method8, | |
echl | |
) | |
yield get_info_str(cmd) | |
p = Popen(cmd, shell=True, cwd=now_dir) | |
p.wait() | |
with open(extract_f0_feature_log_path, "r") as f: | |
print(f.read()) | |
else: | |
yield get_info_str(i18n("step2a:无需提取音高")) | |
#######step2b:提取特征 | |
yield get_info_str(i18n("step2b:正在提取特征")) | |
gpus = gpus16.split("-") | |
leng = len(gpus) | |
ps = [] | |
for idx, n_g in enumerate(gpus): | |
cmd = config.python_cmd + " extract_feature_print.py %s %s %s %s %s %s" % ( | |
config.device, | |
leng, | |
idx, | |
n_g, | |
model_log_dir, | |
version19, | |
) | |
yield get_info_str(cmd) | |
p = Popen( | |
cmd, shell=True, cwd=now_dir | |
) # , shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=now_dir | |
ps.append(p) | |
for p in ps: | |
p.wait() | |
with open(extract_f0_feature_log_path, "r") as f: | |
print(f.read()) | |
#######step3a:训练模型 | |
yield get_info_str(i18n("step3a:正在训练模型")) | |
# 生成filelist | |
if if_f0_3: | |
f0_dir = "%s/2a_f0" % model_log_dir | |
f0nsf_dir = "%s/2b-f0nsf" % model_log_dir | |
names = ( | |
set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(feature_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(f0_dir)]) | |
& set([name.split(".")[0] for name in os.listdir(f0nsf_dir)]) | |
) | |
else: | |
names = set([name.split(".")[0] for name in os.listdir(gt_wavs_dir)]) & set( | |
[name.split(".")[0] for name in os.listdir(feature_dir)] | |
) | |
opt = [] | |
for name in names: | |
if if_f0_3: | |
opt.append( | |
"%s/%s.wav|%s/%s.npy|%s/%s.wav.npy|%s/%s.wav.npy|%s" | |
% ( | |
gt_wavs_dir.replace("\\", "\\\\"), | |
name, | |
feature_dir.replace("\\", "\\\\"), | |
name, | |
f0_dir.replace("\\", "\\\\"), | |
name, | |
f0nsf_dir.replace("\\", "\\\\"), | |
name, | |
spk_id5, | |
) | |
) | |
else: | |
opt.append( | |
"%s/%s.wav|%s/%s.npy|%s" | |
% ( | |
gt_wavs_dir.replace("\\", "\\\\"), | |
name, | |
feature_dir.replace("\\", "\\\\"), | |
name, | |
spk_id5, | |
) | |
) | |
fea_dim = 256 if version19 == "v1" else 768 | |
if if_f0_3: | |
for _ in range(2): | |
opt.append( | |
"%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s/logs/mute/2a_f0/mute.wav.npy|%s/logs/mute/2b-f0nsf/mute.wav.npy|%s" | |
% (now_dir, sr2, now_dir, fea_dim, now_dir, now_dir, spk_id5) | |
) | |
else: | |
for _ in range(2): | |
opt.append( | |
"%s/logs/mute/0_gt_wavs/mute%s.wav|%s/logs/mute/3_feature%s/mute.npy|%s" | |
% (now_dir, sr2, now_dir, fea_dim, spk_id5) | |
) | |
shuffle(opt) | |
with open("%s/filelist.txt" % model_log_dir, "w") as f: | |
f.write("\n".join(opt)) | |
yield get_info_str("write filelist done") | |
if gpus16: | |
cmd = ( | |
config.python_cmd | |
+ " train_nsf_sim_cache_sid_load_pretrain.py -e %s -sr %s -f0 %s -bs %s -g %s -te %s -se %s %s %s -l %s -c %s -sw %s -v %s" | |
% ( | |
exp_dir1, | |
sr2, | |
1 if if_f0_3 else 0, | |
batch_size12, | |
gpus16, | |
total_epoch11, | |
save_epoch10, | |
("-pg %s" % pretrained_G14) if pretrained_G14 != "" else "", | |
("-pd %s" % pretrained_D15) if pretrained_D15 != "" else "", | |
1 if if_save_latest13 == True else 0, | |
1 if if_cache_gpu17 == True else 0, | |
1 if if_save_every_weights18 == True else 0, | |
version19, | |
) | |
) | |
else: | |
cmd = ( | |
config.python_cmd | |
+ " train_nsf_sim_cache_sid_load_pretrain.py -e %s -sr %s -f0 %s -bs %s -te %s -se %s %s %s -l %s -c %s -sw %s -v %s" | |
% ( | |
exp_dir1, | |
sr2, | |
1 if if_f0_3 else 0, | |
batch_size12, | |
total_epoch11, | |
save_epoch10, | |
("-pg %s" % pretrained_G14) if pretrained_G14 != "" else "", | |
("-pd %s" % pretrained_D15) if pretrained_D15 != "" else "", | |
1 if if_save_latest13 == True else 0, | |
1 if if_cache_gpu17 == True else 0, | |
1 if if_save_every_weights18 == True else 0, | |
version19, | |
) | |
) | |
yield get_info_str(cmd) | |
p = Popen(cmd, shell=True, cwd=now_dir) | |
p.wait() | |
yield get_info_str(i18n("训练结束, 您可查看控制台训练日志或实验文件夹下的train.log")) | |
#######step3b:训练索引 | |
npys = [] | |
listdir_res = list(os.listdir(feature_dir)) | |
for name in sorted(listdir_res): | |
phone = np.load("%s/%s" % (feature_dir, name)) | |
npys.append(phone) | |
big_npy = np.concatenate(npys, 0) | |
big_npy_idx = np.arange(big_npy.shape[0]) | |
np.random.shuffle(big_npy_idx) | |
big_npy = big_npy[big_npy_idx] | |
np.save("%s/total_fea.npy" % model_log_dir, big_npy) | |
# n_ivf = big_npy.shape[0] // 39 | |
n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39) | |
yield get_info_str("%s,%s" % (big_npy.shape, n_ivf)) | |
index = faiss.index_factory(256 if version19 == "v1" else 768, "IVF%s,Flat" % n_ivf) | |
yield get_info_str("training index") | |
index_ivf = faiss.extract_index_ivf(index) # | |
index_ivf.nprobe = 1 | |
index.train(big_npy) | |
faiss.write_index( | |
index, | |
"%s/trained_IVF%s_Flat_nprobe_%s_%s_%s.index" | |
% (model_log_dir, n_ivf, index_ivf.nprobe, exp_dir1, version19), | |
) | |
yield get_info_str("adding index") | |
batch_size_add = 8192 | |
for i in range(0, big_npy.shape[0], batch_size_add): | |
index.add(big_npy[i: i + batch_size_add]) | |
faiss.write_index( | |
index, | |
"%s/added_IVF%s_Flat_nprobe_%s_%s_%s.index" | |
% (model_log_dir, n_ivf, index_ivf.nprobe, exp_dir1, version19), | |
) | |
yield get_info_str( | |
"成功构建索引, added_IVF%s_Flat_nprobe_%s_%s_%s.index" | |
% (n_ivf, index_ivf.nprobe, exp_dir1, version19) | |
) | |
yield get_info_str(i18n("全流程结束!")) | |
def whethercrepeornah(radio): | |
mango = True if radio == 'mangio-crepe' or radio == 'mangio-crepe-tiny' else False | |
return ({"visible": mango, "__type__": "update"}) | |
# ckpt_path2.change(change_info_,[ckpt_path2],[sr__,if_f0__]) | |
def change_info_(ckpt_path): | |
if ( | |
os.path.exists(ckpt_path.replace(os.path.basename(ckpt_path), "train.log")) | |
== False | |
): | |
return {"__type__": "update"}, {"__type__": "update"}, {"__type__": "update"} | |
try: | |
with open( | |
ckpt_path.replace(os.path.basename(ckpt_path), "train.log"), "r" | |
) as f: | |
info = eval(f.read().strip("\n").split("\n")[0].split("\t")[-1]) | |
sr, f0 = info["sample_rate"], info["if_f0"] | |
version = "v2" if ("version" in info and info["version"] == "v2") else "v1" | |
return sr, str(f0), version | |
except: | |
traceback.print_exc() | |
return {"__type__": "update"}, {"__type__": "update"}, {"__type__": "update"} | |
from lib.infer_pack.models_onnx import SynthesizerTrnMsNSFsidM | |
def export_onnx(ModelPath, ExportedPath, MoeVS=True): | |
cpt = torch.load(ModelPath, map_location="cpu") | |
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk | |
hidden_channels = 256 if cpt.get("version", | |
"v1") == "v1" else 768 # cpt["config"][-2] # hidden_channels,为768Vec做准备 | |
test_phone = torch.rand(1, 200, hidden_channels) # hidden unit | |
test_phone_lengths = torch.tensor([200]).long() # hidden unit 长度(貌似没啥用) | |
test_pitch = torch.randint(size=(1, 200), low=5, high=255) # 基频(单位赫兹) | |
test_pitchf = torch.rand(1, 200) # nsf基频 | |
test_ds = torch.LongTensor([0]) # 说话人ID | |
test_rnd = torch.rand(1, 192, 200) # 噪声(加入随机因子) | |
device = "cpu" # 导出时设备(不影响使用模型) | |
net_g = SynthesizerTrnMsNSFsidM( | |
*cpt["config"], is_half=False, version=cpt.get("version", "v1") | |
) # fp32导出(C++要支持fp16必须手动将内存重新排列所以暂时不用fp16) | |
net_g.load_state_dict(cpt["weight"], strict=False) | |
input_names = ["phone", "phone_lengths", "pitch", "pitchf", "ds", "rnd"] | |
output_names = [ | |
"audio", | |
] | |
# net_g.construct_spkmixmap(n_speaker) 多角色混合轨道导出 | |
torch.onnx.export( | |
net_g, | |
( | |
test_phone.to(device), | |
test_phone_lengths.to(device), | |
test_pitch.to(device), | |
test_pitchf.to(device), | |
test_ds.to(device), | |
test_rnd.to(device), | |
), | |
ExportedPath, | |
dynamic_axes={ | |
"phone": [1], | |
"pitch": [1], | |
"pitchf": [1], | |
"rnd": [2], | |
}, | |
do_constant_folding=False, | |
opset_version=16, | |
verbose=False, | |
input_names=input_names, | |
output_names=output_names, | |
) | |
return "Finished" | |
# region RVC WebUI App | |
def get_presets(): | |
data = None | |
with open('../inference-presets.json', 'r') as file: | |
data = json.load(file) | |
preset_names = [] | |
for preset in data['presets']: | |
preset_names.append(preset['name']) | |
return preset_names | |
def change_choices2(): | |
audio_files = [] | |
for filename in os.listdir("./audios"): | |
if filename.endswith(('.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac', '.mp4')): | |
audio_files.append(os.path.join('./audios', filename).replace('\\', '/')) | |
return {"choices": sorted(audio_files), "__type__": "update"}, {"__type__": "update"} | |
audio_files = [] | |
for filename in os.listdir("./audios"): | |
if filename.endswith(('.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac', '.mp4')): | |
audio_files.append(os.path.join('./audios', filename).replace('\\', '/')) | |
def get_index(): | |
if check_for_name() != '': | |
chosen_model = sorted(names)[0].split(".")[0] | |
logs_path = "./logs/" + chosen_model | |
if os.path.exists(logs_path): | |
for file in os.listdir(logs_path): | |
if file.endswith(".index"): | |
return os.path.join(logs_path, file) | |
return '' | |
else: | |
return '' | |
def get_indexes(): | |
indexes_list = [] | |
for dirpath, dirnames, filenames in os.walk("./logs/"): | |
for filename in filenames: | |
if filename.endswith(".index"): | |
indexes_list.append(os.path.join(dirpath, filename)) | |
if len(indexes_list) > 0: | |
return indexes_list | |
else: | |
return '' | |
def get_name(): | |
if len(audio_files) > 0: | |
return sorted(audio_files)[0] | |
else: | |
return '' | |
def save_to_wav(record_button): | |
if record_button is None: | |
pass | |
else: | |
path_to_file = record_button | |
new_name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + '.wav' | |
new_path = './audios/' + new_name | |
shutil.move(path_to_file, new_path) | |
return new_path | |
def save_to_wav2(dropbox): | |
file_path = dropbox.name | |
shutil.move(file_path, './audios') | |
return os.path.join('./audios', os.path.basename(file_path)) | |
def match_index(sid0): | |
folder = sid0.split(".")[0] | |
parent_dir = "./logs/" + folder | |
if os.path.exists(parent_dir): | |
for filename in os.listdir(parent_dir): | |
if filename.endswith(".index"): | |
index_path = os.path.join(parent_dir, filename) | |
return index_path | |
else: | |
return '' | |
def check_for_name(): | |
if len(names) > 0: | |
return sorted(names)[0] | |
else: | |
return '' | |
def download_from_url(url, model): | |
if url == '': | |
return "URL cannot be left empty." | |
if model == '': | |
return "You need to name your model. For example: My-Model" | |
url = url.strip() | |
zip_dirs = ["zips", "unzips"] | |
for directory in zip_dirs: | |
if os.path.exists(directory): | |
shutil.rmtree(directory) | |
os.makedirs("zips", exist_ok=True) | |
os.makedirs("unzips", exist_ok=True) | |
zipfile = model + '.zip' | |
zipfile_path = './zips/' + zipfile | |
try: | |
if "drive.google.com" in url: | |
subprocess.run(["gdown", url, "--fuzzy", "-O", zipfile_path]) | |
elif "mega.nz" in url: | |
m = Mega() | |
m.download_url(url, './zips') | |
else: | |
subprocess.run(["wget", url, "-O", zipfile_path]) | |
for filename in os.listdir("./zips"): | |
if filename.endswith(".zip"): | |
zipfile_path = os.path.join("./zips/", filename) | |
shutil.unpack_archive(zipfile_path, "./unzips", 'zip') | |
else: | |
return "No zipfile found." | |
for root, dirs, files in os.walk('./unzips'): | |
for file in files: | |
file_path = os.path.join(root, file) | |
if file.endswith(".index"): | |
os.mkdir(f'./logs/{model}') | |
shutil.copy2(file_path, f'./logs/{model}') | |
elif "G_" not in file and "D_" not in file and file.endswith(".pth"): | |
shutil.copy(file_path, f'./weights/{model}.pth') | |
shutil.rmtree("zips") | |
shutil.rmtree("unzips") | |
return "Success." | |
except: | |
return "There's been an error." | |
def success_message(face): | |
return f'{face.name} has been uploaded.', 'None' | |
def mouth(size, face, voice, faces): | |
if size == 'Half': | |
size = 2 | |
else: | |
size = 1 | |
if faces == 'None': | |
character = face.name | |
else: | |
if faces == 'Ben Shapiro': | |
character = '/content/wav2lip-HD/inputs/ben-shapiro-10.mp4' | |
elif faces == 'Andrew Tate': | |
character = '/content/wav2lip-HD/inputs/tate-7.mp4' | |
command = "python inference.py " \ | |
"--checkpoint_path checkpoints/wav2lip.pth " \ | |
f"--face {character} " \ | |
f"--audio {voice} " \ | |
"--pads 0 20 0 0 " \ | |
"--outfile /content/wav2lip-HD/outputs/result.mp4 " \ | |
"--fps 24 " \ | |
f"--resize_factor {size}" | |
process = subprocess.Popen(command, shell=True, cwd='/content/wav2lip-HD/Wav2Lip-master') | |
stdout, stderr = process.communicate() | |
return '/content/wav2lip-HD/outputs/result.mp4', 'Animation completed.' | |
eleven_voices = ['Adam', 'Antoni', 'Josh', 'Arnold', 'Sam', 'Bella', 'Rachel', 'Domi', 'Elli'] | |
eleven_voices_ids = ['pNInz6obpgDQGcFmaJgB', 'ErXwobaYiN019PkySvjV', 'TxGEqnHWrfWFTfGW9XjX', 'VR6AewLTigWG4xSOukaG', | |
'yoZ06aMxZJJ28mfd3POQ', 'EXAVITQu4vr4xnSDxMaL', '21m00Tcm4TlvDq8ikWAM', 'AZnzlk1XvdvUeBnXmlld', | |
'MF3mGyEYCl7XYWbV9V6O'] | |
chosen_voice = dict(zip(eleven_voices, eleven_voices_ids)) | |
def stoptraining(mim): | |
if int(mim) == 1: | |
try: | |
CSVutil('csvdb/stop.csv', 'w+', 'stop', 'True') | |
os.kill(PID, signal.SIGTERM) | |
except Exception as e: | |
print(f"Couldn't click due to {e}") | |
return ( | |
{"visible": False, "__type__": "update"}, | |
{"visible": True, "__type__": "update"}, | |
) | |
def elevenTTS(xiapi, text, id, lang): | |
if xiapi != '' and id != '': | |
choice = chosen_voice[id] | |
CHUNK_SIZE = 1024 | |
url = f"https://api.elevenlabs.io/v1/text-to-speech/{choice}" | |
headers = { | |
"Accept": "audio/mpeg", | |
"Content-Type": "application/json", | |
"xi-api-key": xiapi | |
} | |
if lang == 'en': | |
data = { | |
"text": text, | |
"model_id": "eleven_monolingual_v1", | |
"voice_settings": { | |
"stability": 0.5, | |
"similarity_boost": 0.5 | |
} | |
} | |
else: | |
data = { | |
"text": text, | |
"model_id": "eleven_multilingual_v1", | |
"voice_settings": { | |
"stability": 0.5, | |
"similarity_boost": 0.5 | |
} | |
} | |
response = requests.post(url, json=data, headers=headers) | |
with open('./temp_eleven.mp3', 'wb') as f: | |
for chunk in response.iter_content(chunk_size=CHUNK_SIZE): | |
if chunk: | |
f.write(chunk) | |
aud_path = save_to_wav('./temp_eleven.mp3') | |
return aud_path, aud_path | |
else: | |
tts = gTTS(text, lang=lang) | |
tts.save('./temp_gTTS.mp3') | |
aud_path = save_to_wav('./temp_gTTS.mp3') | |
return aud_path, aud_path | |
def upload_to_dataset(files, dir): | |
if dir == '': | |
dir = './dataset' | |
if not os.path.exists(dir): | |
os.makedirs(dir) | |
count = 0 | |
for file in files: | |
path = file.name | |
shutil.copy2(path, dir) | |
count += 1 | |
return f' {count} files uploaded to {dir}.' | |
def zip_downloader(model): | |
if not os.path.exists(f'./weights/{model}.pth'): | |
return {"__type__": "update"}, f'Make sure the Voice Name is correct. I could not find {model}.pth' | |
index_found = False | |
for file in os.listdir(f'./logs/{model}'): | |
if file.endswith('.index') and 'added' in file: | |
log_file = file | |
index_found = True | |
if index_found: | |
return [f'./weights/{model}.pth', f'./logs/{model}/{log_file}'], "Done" | |
else: | |
return f'./weights/{model}.pth', "Could not find Index file." | |
with gr.Blocks(theme=gr.themes.Base(), title='Voice DeepFake 💻') as app: | |
# gr.LogoutButton() | |
with gr.Tabs(): | |
with gr.TabItem("Interfeys"): | |
gr.HTML("<center><h1> Voice DeepFake </h1></span>") | |
# gr.HTML("<center><h3> Если вы хотите использовать это пространство в частном порядке, я рекомендую продублировать его. </h3></span>") | |
# with gr.Row(): | |
# gr.Markdown( | |
# """ | |
# Если не работает, то пробуйте тут https://huggingface.co/spaces/Clebersla/RVC_V2_Huggingface_Version | |
# """ | |
# ) | |
# Inference Preset Row | |
# with gr.Row(): | |
# mangio_preset = gr.Dropdown(label="Inference Preset", choices=sorted(get_presets())) | |
# mangio_preset_name_save = gr.Textbox( | |
# label="Your preset name" | |
# ) | |
# mangio_preset_save_btn = gr.Button('Save Preset', variant="primary") | |
# Other RVC stuff | |
with gr.Row(): | |
sid0 = gr.Dropdown(label="1. Modelni tanlang.", choices=sorted(names), value=check_for_name()) | |
refresh_button = gr.Button("Yangilash", variant="primary") | |
if check_for_name() != '': | |
get_vc(sorted(names)[0]) | |
vc_transform0 = gr.Number( | |
label="Kengaytirilgan: Bu yerda siz ovoz balandligini o'zgartirishingiz yoki uni 0 da qoldirishingiz mumkin.", | |
value=0) | |
# clean_button = gr.Button(i18n("卸载音色省显存"), variant="primary") | |
spk_item = gr.Slider( | |
minimum=0, | |
maximum=2333, | |
step=1, | |
label=i18n("请选择说话人id"), | |
value=0, | |
visible=False, | |
interactive=True, | |
) | |
# clean_button.click(fn=clean, inputs=[], outputs=[sid0]) | |
sid0.change( | |
fn=get_vc, | |
inputs=[sid0], | |
outputs=[spk_item], | |
) | |
but0 = gr.Button("O'zgartiring", variant="primary") | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
dropbox = gr.File( | |
label='Ovoz yozishingizni shu yerga yuboring va "Qayta yuklash" tugmasini bosing.') | |
with gr.Row(): | |
record_button = gr.Audio(source="microphone", label="Ovozni mikrofondan yozib oling.", | |
type="filepath") | |
with gr.Row(): | |
input_audio0 = gr.Dropdown( | |
label="2.Audio yozuvni tanlang.", | |
value="./audios/someguy.mp3", | |
choices=audio_files | |
) | |
dropbox.upload(fn=save_to_wav2, inputs=[dropbox], outputs=[input_audio0]) | |
dropbox.upload(fn=change_choices2, inputs=[], outputs=[input_audio0]) | |
refresh_button2 = gr.Button("Yangilash", variant="primary", size='sm') | |
record_button.change(fn=save_to_wav, inputs=[record_button], outputs=[input_audio0]) | |
record_button.change(fn=change_choices2, inputs=[], outputs=[input_audio0]) | |
# with gr.Row(): | |
# with gr.Accordion('Текст в речь', open=False): | |
# with gr.Column(): | |
# lang = gr.Radio(label='Выберите язык для озвучки.',choices=['en','es','ru','uk','pl','fr','de','tr'], value='en') | |
# elevenid=gr.Dropdown(label="Голос:", choices=eleven_voices) | |
# with gr.Column(): | |
# tfs = gr.Textbox(label="Введите свой текст", interactive=True, value="This is a test.") | |
# tts_button = gr.Button(value="Генерировать") | |
# tts_button.click(fn=elevenTTS, inputs=[tfs, elevenid, lang], outputs=[record_button, input_audio0]) | |
with gr.Row(): | |
with gr.Accordion('Wav2Lip', open=False, visible=False): | |
with gr.Row(): | |
size = gr.Radio(label='Resolution:', choices=['Half', 'Full']) | |
face = gr.UploadButton("Upload A Character", type='file') | |
faces = gr.Dropdown(label="OR Choose one:", | |
choices=['None', 'Ben Shapiro', 'Andrew Tate']) | |
with gr.Row(): | |
preview = gr.Textbox(label="Status:", interactive=False) | |
face.upload(fn=success_message, inputs=[face], outputs=[preview, faces]) | |
with gr.Row(): | |
animation = gr.Video(type='filepath') | |
refresh_button2.click(fn=change_choices2, inputs=[], outputs=[input_audio0, animation]) | |
with gr.Row(): | |
animate_button = gr.Button('Animate') | |
with gr.Column(): | |
with gr.Accordion("Indeks sozlamalari", open=False): | |
file_index1 = gr.Dropdown( | |
label="3. added.index fayliga yo'l (agar u avtomatik ravishda topilmasa).", | |
choices=get_indexes(), | |
value=get_index(), | |
interactive=True, | |
) | |
sid0.change(fn=match_index, inputs=[sid0], outputs=[file_index1]) | |
refresh_button.click( | |
fn=change_choices, inputs=[], outputs=[sid0, file_index1] | |
) | |
# file_big_npy1 = gr.Textbox( | |
# label=i18n("特征文件路径"), | |
# value="E:\\codes\py39\\vits_vc_gpu_train\\logs\\mi-test-1key\\total_fea.npy", | |
# interactive=True, | |
# ) | |
index_rate1 = gr.Slider( | |
minimum=0, | |
maximum=1, | |
label=i18n("Qidiruv funksiyasining nisbati (men uni 0 ga o'rnatishni tavsiya qilaman):"), | |
value=0.66, | |
interactive=True, | |
) | |
vc_output2 = gr.Audio( | |
label="Audio ma'lumotlarni chiqarish (yuklab olish uchun o'ng burchakdagi uchta nuqta ustiga bosing)", | |
type='filepath', | |
interactive=False, | |
) | |
animate_button.click(fn=mouth, inputs=[size, face, vc_output2, faces], outputs=[animation, preview]) | |
with gr.Accordion("Qo'shimcha sozlamalar", open=False): | |
f0method0 = gr.Radio( | |
label='Majburiy emas: Ovozni ajratib olish algoritmini o\'zgartiring.', | |
choices=["pm", "dio", "crepe-tiny", "mangio-crepe-tiny", "crepe", "harvest", "mangio-crepe", | |
"rmvpe"], # Fork Feature. Add Crepe-Tiny | |
value="rmvpe", | |
interactive=True, | |
) | |
crepe_hop_length = gr.Slider( | |
minimum=1, | |
maximum=512, | |
step=1, | |
label="Mangio-Crepe Hop Length. Kattaroq raqamlar tovush balandligining keskin o'zgarishi ehtimolini kamaytiradi, lekin pastroq raqamlar aniqlikni oshiradi. 64-192 - tajriba uchun yaxshi diapazon.", | |
value=120, | |
interactive=True, | |
visible=False, | |
) | |
f0method0.change(fn=whethercrepeornah, inputs=[f0method0], outputs=[crepe_hop_length]) | |
filter_radius0 = gr.Slider( | |
minimum=0, | |
maximum=7, | |
label=i18n( | |
"Agar >=3 boʻlsa: toʻplangan ovoz balandligi natijalariga median filtrlashni qoʻllang. Qiymat filtrlash radiusini ifodalaydi va nafas olishni qisqartirishi mumkin"), | |
value=3, | |
step=1, | |
interactive=True, | |
) | |
resample_sr0 = gr.Slider( | |
minimum=0, | |
maximum=48000, | |
label=i18n("后处理重采样至最终采样率,0为不进行重采样"), | |
value=0, | |
step=1, | |
interactive=True, | |
visible=False | |
) | |
rms_mix_rate0 = gr.Slider( | |
minimum=0, | |
maximum=1, | |
label=i18n( | |
"Chiqish ovozi konvertini almashtirish yoki aralashtirish uchun kirish ovozi konvertidan foydalaning. Bu nisbat 1 ga qanchalik yaqin bo'lsa, chiqish signali konvertidan shunchalik ko'p foydalaniladi:"), | |
value=0.21, | |
interactive=True, | |
) | |
protect0 = gr.Slider( | |
minimum=0, | |
maximum=0.5, | |
label=i18n( | |
"Elektron musiqadagi tanaffuslar kabi artefaktlarning oldini olish uchun ovozsiz undosh tovushlarni va nafas tovushlarini himoya qiladi. O'chirish uchun qiymatni 0,5 ga o'rnating. Xavfsizlikni oshirish uchun qiymatni kamaytiring, lekin bu indekslash aniqligini kamaytirishi mumkin:"), | |
value=0.33, | |
step=0.01, | |
interactive=True, | |
) | |
formanting = gr.Checkbox( | |
value=bool(DoFormant), | |
label="[EXPERIMENTAL] Formant shift inference audio", | |
info="Used for male to female and vice-versa conversions", | |
interactive=True, | |
visible=False, | |
) | |
formant_preset = gr.Dropdown( | |
value='', | |
choices=get_fshift_presets(), | |
label="browse presets for formanting", | |
visible=bool(DoFormant), | |
) | |
formant_refresh_button = gr.Button( | |
value='\U0001f504', | |
visible=bool(DoFormant), | |
variant='primary', | |
) | |
# formant_refresh_button = ToolButton( elem_id='1') | |
# create_refresh_button(formant_preset, lambda: {"choices": formant_preset}, "refresh_list_shiftpresets") | |
qfrency = gr.Slider( | |
value=Quefrency, | |
info="Default value is 1.0", | |
label="Quefrency for formant shifting", | |
minimum=0.0, | |
maximum=16.0, | |
step=0.1, | |
visible=bool(DoFormant), | |
interactive=True, | |
) | |
tmbre = gr.Slider( | |
value=Timbre, | |
info="Default value is 1.0", | |
label="Timbre for formant shifting", | |
minimum=0.0, | |
maximum=16.0, | |
step=0.1, | |
visible=bool(DoFormant), | |
interactive=True, | |
) | |
formant_preset.change(fn=preset_apply, inputs=[formant_preset, qfrency, tmbre], | |
outputs=[qfrency, tmbre]) | |
frmntbut = gr.Button("Apply", variant="primary", visible=bool(DoFormant)) | |
formanting.change(fn=formant_enabled, | |
inputs=[formanting, qfrency, tmbre, frmntbut, formant_preset, | |
formant_refresh_button], | |
outputs=[formanting, qfrency, tmbre, frmntbut, formant_preset, | |
formant_refresh_button]) | |
frmntbut.click(fn=formant_apply, inputs=[qfrency, tmbre], outputs=[qfrency, tmbre]) | |
formant_refresh_button.click(fn=update_fshift_presets, inputs=[formant_preset, qfrency, tmbre], | |
outputs=[formant_preset, qfrency, tmbre]) | |
with gr.Row(): | |
vc_output1 = gr.Textbox("") | |
f0_file = gr.File(label=i18n("F0曲线文件, 可选, 一行一个音高, 代替默认F0及升降调"), visible=False) | |
but0.click( | |
vc_single, | |
[ | |
spk_item, | |
input_audio0, | |
vc_transform0, | |
f0_file, | |
f0method0, | |
file_index1, | |
# file_index2, | |
# file_big_npy1, | |
index_rate1, | |
filter_radius0, | |
resample_sr0, | |
rms_mix_rate0, | |
protect0, | |
crepe_hop_length | |
], | |
[vc_output1, vc_output2], | |
) | |
with gr.Accordion("Batch Conversion", open=False, visible=False): | |
with gr.Row(): | |
with gr.Column(): | |
vc_transform1 = gr.Number( | |
label=i18n("变调(整数, 半音数量, 升八度12降八度-12)"), value=0 | |
) | |
opt_input = gr.Textbox(label=i18n("指定输出文件夹"), value="opt") | |
f0method1 = gr.Radio( | |
label=i18n( | |
"选择音高提取算法,输入歌声可用pm提速,harvest低音好但巨慢无比,crepe效果好但吃GPU" | |
), | |
choices=["pm", "harvest", "crepe", "rmvpe"], | |
value="rmvpe", | |
interactive=True, | |
) | |
filter_radius1 = gr.Slider( | |
minimum=0, | |
maximum=7, | |
label=i18n(">=3则使用对harvest音高识别的结果使用中值滤波,数值为滤波半径,使用可以削弱哑音"), | |
value=3, | |
step=1, | |
interactive=True, | |
) | |
with gr.Column(): | |
file_index3 = gr.Textbox( | |
label=i18n("特征检索库文件路径,为空则使用下拉的选择结果"), | |
value="", | |
interactive=True, | |
) | |
file_index4 = gr.Dropdown( | |
label=i18n("自动检测index路径,下拉式选择(dropdown)"), | |
choices=sorted(index_paths), | |
interactive=True, | |
) | |
refresh_button.click( | |
fn=lambda: change_choices()[1], | |
inputs=[], | |
outputs=file_index4, | |
) | |
# file_big_npy2 = gr.Textbox( | |
# label=i18n("特征文件路径"), | |
# value="E:\\codes\\py39\\vits_vc_gpu_train\\logs\\mi-test-1key\\total_fea.npy", | |
# interactive=True, | |
# ) | |
index_rate2 = gr.Slider( | |
minimum=0, | |
maximum=1, | |
label=i18n("检索特征占比"), | |
value=1, | |
interactive=True, | |
) | |
with gr.Column(): | |
resample_sr1 = gr.Slider( | |
minimum=0, | |
maximum=48000, | |
label=i18n("后处理重采样至最终采样率,0为不进行重采样"), | |
value=0, | |
step=1, | |
interactive=True, | |
) | |
rms_mix_rate1 = gr.Slider( | |
minimum=0, | |
maximum=1, | |
label=i18n("输入源音量包络替换输出音量包络融合比例,越靠近1越使用输出包络"), | |
value=1, | |
interactive=True, | |
) | |
protect1 = gr.Slider( | |
minimum=0, | |
maximum=0.5, | |
label=i18n( | |
"保护清辅音和呼吸声,防止电音撕裂等artifact,拉满0.5不开启,调低加大保护力度但可能降低索引效果" | |
), | |
value=0.33, | |
step=0.01, | |
interactive=True, | |
) | |
with gr.Column(): | |
dir_input = gr.Textbox( | |
label=i18n("输入待处理音频文件夹路径(去文件管理器地址栏拷就行了)"), | |
value="E:\codes\py39\\test-20230416b\\todo-songs", | |
) | |
inputs = gr.File( | |
file_count="multiple", label=i18n("也可批量输入音频文件, 二选一, 优先读文件夹") | |
) | |
with gr.Row(): | |
format1 = gr.Radio( | |
label=i18n("导出文件格式"), | |
choices=["wav", "flac", "mp3", "m4a"], | |
value="flac", | |
interactive=True, | |
) | |
but1 = gr.Button(i18n("转换"), variant="primary") | |
vc_output3 = gr.Textbox(label=i18n("输出信息")) | |
but1.click( | |
vc_multi, | |
[ | |
spk_item, | |
dir_input, | |
opt_input, | |
inputs, | |
vc_transform1, | |
f0method1, | |
file_index3, | |
file_index4, | |
# file_big_npy2, | |
index_rate2, | |
filter_radius1, | |
resample_sr1, | |
rms_mix_rate1, | |
protect1, | |
format1, | |
crepe_hop_length, | |
], | |
[vc_output3], | |
) | |
but1.click(fn=lambda: easy_uploader.clear()) | |
with gr.TabItem("Modelni yuklash"): | |
# with gr.Row(): | |
# gr.Markdown( | |
# """ | |
# Если не работает, то пробуйте тут https://huggingface.co/spaces/Clebersla/RVC_V2_Huggingface_Version | |
# """ | |
# ) | |
with gr.Row(): | |
url = gr.Textbox(label="Model URL manzilini kiriting:", | |
placeholder=".pth va .index fayllarni o'z ichiga olgan zipga havolani kiriting") | |
with gr.Row(): | |
model = gr.Textbox(label="Model nomi:") | |
download_button = gr.Button("Yuklash") | |
with gr.Row(): | |
status_bar = gr.Textbox(label="") | |
download_button.click(fn=download_from_url, inputs=[url, model], outputs=[status_bar]) | |
def has_two_files_in_pretrained_folder(): | |
pretrained_folder = "./pretrained/" | |
if not os.path.exists(pretrained_folder): | |
return False | |
files_in_folder = os.listdir(pretrained_folder) | |
num_files = len(files_in_folder) | |
return num_files >= 2 | |
if has_two_files_in_pretrained_folder(): | |
print("Pretrained weights are downloaded. Training tab enabled!\n-------------------------------") | |
with gr.TabItem("Train", visible=False): | |
with gr.Row(): | |
with gr.Column(): | |
exp_dir1 = gr.Textbox(label="Voice Name:", value="My-Voice") | |
sr2 = gr.Radio( | |
label=i18n("目标采样率"), | |
choices=["40k", "48k"], | |
value="40k", | |
interactive=True, | |
visible=False | |
) | |
if_f0_3 = gr.Radio( | |
label=i18n("模型是否带音高指导(唱歌一定要, 语音可以不要)"), | |
choices=[True, False], | |
value=True, | |
interactive=True, | |
visible=False | |
) | |
version19 = gr.Radio( | |
label="RVC version", | |
choices=["v1", "v2"], | |
value="v2", | |
interactive=True, | |
visible=False, | |
) | |
np7 = gr.Slider( | |
minimum=0, | |
maximum=config.n_cpu, | |
step=1, | |
label="# of CPUs for data processing (Leave as it is)", | |
value=config.n_cpu, | |
interactive=True, | |
visible=True | |
) | |
trainset_dir4 = gr.Textbox(label="Path to your dataset (audios, not zip):", value="./dataset") | |
easy_uploader = gr.Files( | |
label='OR Drop your audios here. They will be uploaded in your dataset path above.', | |
file_types=['audio']) | |
but1 = gr.Button("1. Process The Dataset", variant="primary") | |
info1 = gr.Textbox(label="Status (wait until it says 'end preprocess'):", value="") | |
easy_uploader.upload(fn=upload_to_dataset, inputs=[easy_uploader, trainset_dir4], | |
outputs=[info1]) | |
but1.click( | |
preprocess_dataset, [trainset_dir4, exp_dir1, sr2, np7], [info1] | |
) | |
with gr.Column(): | |
spk_id5 = gr.Slider( | |
minimum=0, | |
maximum=4, | |
step=1, | |
label=i18n("请指定说话人id"), | |
value=0, | |
interactive=True, | |
visible=False | |
) | |
with gr.Accordion('GPU Settings', open=False, visible=False): | |
gpus6 = gr.Textbox( | |
label=i18n("以-分隔输入使用的卡号, 例如 0-1-2 使用卡0和卡1和卡2"), | |
value=gpus, | |
interactive=True, | |
visible=False | |
) | |
gpu_info9 = gr.Textbox(label=i18n("显卡信息"), value=gpu_info) | |
f0method8 = gr.Radio( | |
label=i18n( | |
"选择音高提取算法:输入歌声可用pm提速,高质量语音但CPU差可用dio提速,harvest质量更好但慢" | |
), | |
choices=["harvest", "crepe", "mangio-crepe", "rmvpe"], | |
# Fork feature: Crepe on f0 extraction for training. | |
value="rmvpe", | |
interactive=True, | |
) | |
extraction_crepe_hop_length = gr.Slider( | |
minimum=1, | |
maximum=512, | |
step=1, | |
label=i18n("crepe_hop_length"), | |
value=128, | |
interactive=True, | |
visible=False, | |
) | |
f0method8.change(fn=whethercrepeornah, inputs=[f0method8], | |
outputs=[extraction_crepe_hop_length]) | |
but2 = gr.Button("2. Pitch Extraction", variant="primary") | |
info2 = gr.Textbox(label="Status(Check the Colab Notebook's cell output):", value="", | |
max_lines=8) | |
but2.click( | |
extract_f0_feature, | |
[gpus6, np7, f0method8, if_f0_3, exp_dir1, version19, extraction_crepe_hop_length], | |
[info2], | |
) | |
with gr.Row(): | |
with gr.Column(): | |
total_epoch11 = gr.Slider( | |
minimum=1, | |
maximum=5000, | |
step=10, | |
label="Total # of training epochs (IF you choose a value too high, your model will sound horribly overtrained.):", | |
value=250, | |
interactive=True, | |
) | |
butstop = gr.Button( | |
"Stop Training", | |
variant='primary', | |
visible=False, | |
) | |
but3 = gr.Button("3. Train Model", variant="primary", visible=True) | |
but3.click(fn=stoptraining, inputs=[gr.Number(value=0, visible=False)], | |
outputs=[but3, butstop]) | |
butstop.click(fn=stoptraining, inputs=[gr.Number(value=1, visible=False)], | |
outputs=[butstop, but3]) | |
but4 = gr.Button("4.Train Index", variant="primary") | |
info3 = gr.Textbox(label="Status(Check the Colab Notebook's cell output):", value="", | |
max_lines=10) | |
with gr.Accordion("Training Preferences (You can leave these as they are)", open=False): | |
# gr.Markdown(value=i18n("step3: 填写训练设置, 开始训练模型和索引")) | |
with gr.Column(): | |
save_epoch10 = gr.Slider( | |
minimum=1, | |
maximum=200, | |
step=1, | |
label="Backup every X amount of epochs:", | |
value=10, | |
interactive=True, | |
) | |
batch_size12 = gr.Slider( | |
minimum=1, | |
maximum=40, | |
step=1, | |
label="Batch Size (LEAVE IT unless you know what you're doing!):", | |
value=default_batch_size, | |
interactive=True, | |
) | |
if_save_latest13 = gr.Checkbox( | |
label="Save only the latest '.ckpt' file to save disk space.", | |
value=True, | |
interactive=True, | |
) | |
if_cache_gpu17 = gr.Checkbox( | |
label="Cache all training sets to GPU memory. Caching small datasets (less than 10 minutes) can speed up training, but caching large datasets will consume a lot of GPU memory and may not provide much speed improvement.", | |
value=False, | |
interactive=True, | |
) | |
if_save_every_weights18 = gr.Checkbox( | |
label="Save a small final model to the 'weights' folder at each save point.", | |
value=True, | |
interactive=True, | |
) | |
zip_model = gr.Button('5. Download Model') | |
zipped_model = gr.Files(label='Your Model and Index file can be downloaded here:') | |
zip_model.click(fn=zip_downloader, inputs=[exp_dir1], outputs=[zipped_model, info3]) | |
with gr.Group(): | |
with gr.Accordion("Base Model Locations:", open=False, visible=False): | |
pretrained_G14 = gr.Textbox( | |
label=i18n("加载预训练底模G路径"), | |
value="pretrained_v2/f0G40k.pth", | |
interactive=True, | |
) | |
pretrained_D15 = gr.Textbox( | |
label=i18n("加载预训练底模D路径"), | |
value="pretrained_v2/f0D40k.pth", | |
interactive=True, | |
) | |
gpus16 = gr.Textbox( | |
label=i18n("以-分隔输入使用的卡号, 例如 0-1-2 使用卡0和卡1和卡2"), | |
value=gpus, | |
interactive=True, | |
) | |
sr2.change( | |
change_sr2, | |
[sr2, if_f0_3, version19], | |
[pretrained_G14, pretrained_D15, version19], | |
) | |
version19.change( | |
change_version19, | |
[sr2, if_f0_3, version19], | |
[pretrained_G14, pretrained_D15], | |
) | |
if_f0_3.change( | |
change_f0, | |
[if_f0_3, sr2, version19], | |
[f0method8, pretrained_G14, pretrained_D15], | |
) | |
but5 = gr.Button(i18n("一键训练"), variant="primary", visible=False) | |
but3.click( | |
click_train, | |
[ | |
exp_dir1, | |
sr2, | |
if_f0_3, | |
spk_id5, | |
save_epoch10, | |
total_epoch11, | |
batch_size12, | |
if_save_latest13, | |
pretrained_G14, | |
pretrained_D15, | |
gpus16, | |
if_cache_gpu17, | |
if_save_every_weights18, | |
version19, | |
], | |
[ | |
info3, | |
butstop, | |
but3, | |
], | |
) | |
but4.click(train_index, [exp_dir1, version19], info3) | |
but5.click( | |
train1key, | |
[ | |
exp_dir1, | |
sr2, | |
if_f0_3, | |
trainset_dir4, | |
spk_id5, | |
np7, | |
f0method8, | |
save_epoch10, | |
total_epoch11, | |
batch_size12, | |
if_save_latest13, | |
pretrained_G14, | |
pretrained_D15, | |
gpus16, | |
if_cache_gpu17, | |
if_save_every_weights18, | |
version19, | |
extraction_crepe_hop_length | |
], | |
info3, | |
) | |
app.queue(concurrency_count=511, max_size=1022).launch(auth=("admin", "admin"), share=False, quiet=True) | |
# endregion | |