|
import argparse |
|
import os |
|
import csv |
|
|
|
import torch |
|
|
|
import pandas as pd |
|
import numpy as np |
|
import pickle as pkl |
|
import decord |
|
import yaml |
|
|
|
from scipy import stats |
|
from sklearn.metrics import mean_squared_error |
|
from scipy.optimize import curve_fit |
|
from cover.datasets import UnifiedFrameSampler, spatial_temporal_view_decomposition |
|
from cover.models import COVER |
|
|
|
|
|
|
|
|
|
def save_to_csv(video_name, pre_smos, pre_tmos, pre_amos, pre_overall, filename): |
|
combined_data = list(zip(video_name, pre_smos, pre_tmos, pre_amos, pre_overall)) |
|
|
|
with open(filename, 'w', newline='') as csvfile: |
|
writer = csv.writer(csvfile) |
|
writer.writerow(['Video', 'semantic score', 'technical score', 'aesthetic score', 'overall/final score']) |
|
writer.writerows(combined_data) |
|
|
|
mean_cover, std_cover = ( |
|
torch.FloatTensor([123.675, 116.28, 103.53]), |
|
torch.FloatTensor([58.395, 57.12, 57.375]), |
|
) |
|
|
|
mean_clip, std_clip = ( |
|
torch.FloatTensor([122.77, 116.75, 104.09]), |
|
torch.FloatTensor([68.50, 66.63, 70.32]) |
|
) |
|
|
|
def fuse_results(results: list): |
|
x = (results[0] + results[1] + results[2]) |
|
return { |
|
"semantic" : results[0], |
|
"technical": results[1], |
|
"aesthetic": results[2], |
|
"overall" : x, |
|
} |
|
|
|
|
|
def gaussian_rescale(pr): |
|
|
|
pr = (pr - np.mean(pr)) / np.std(pr) |
|
return pr |
|
|
|
|
|
def uniform_rescale(pr): |
|
|
|
return np.arange(len(pr))[np.argsort(pr).argsort()] / len(pr) |
|
|
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("-o", "--opt" , type=str, default="./cover.yml", help="the option file") |
|
parser.add_argument('-d', "--device", type=str, default="cuda:0" , help='CUDA device id') |
|
parser.add_argument("-t", "--target_set", type=str, default="val-ytugc", help="target_set") |
|
parser.add_argument( "--output", type=str, default="ytugc.csv" , help='output file to store predict mos value') |
|
args = parser.parse_args() |
|
return args |
|
|
|
|
|
def logistic_func(X, bayta1, bayta2, bayta3, bayta4): |
|
|
|
logisticPart = 1 + np.exp(np.negative(np.divide(X - bayta3, np.abs(bayta4)))) |
|
yhat = bayta2 + np.divide(bayta1 - bayta2, logisticPart) |
|
return yhat |
|
|
|
|
|
if __name__ == '__main__': |
|
args = parse_args() |
|
|
|
with open(args.opt, "r") as f: |
|
opt = yaml.safe_load(f) |
|
|
|
|
|
evaluator = COVER(**opt["model"]["args"]).to(args.device) |
|
state_dict = torch.load(opt["test_load_path"], map_location=args.device) |
|
|
|
|
|
|
|
evaluator.load_state_dict(state_dict['state_dict'], strict=False) |
|
|
|
dopt = opt["data"][args.target_set]["args"] |
|
temporal_samplers = {} |
|
for stype, sopt in dopt["sample_types"].items(): |
|
temporal_samplers[stype] = UnifiedFrameSampler( |
|
sopt["clip_len"] // sopt["t_frag"], |
|
sopt["t_frag"], |
|
sopt["frame_interval"], |
|
sopt["num_clips"], |
|
) |
|
|
|
if args.target_set == 'val-livevqc': |
|
videos_dir = './datasets/LIVE_VQC/Video/' |
|
datainfo = './datasets/LIVE_VQC/metainfo/LIVE_VQC_metadata.csv' |
|
df = pd.read_csv(datainfo) |
|
files = df['File'].tolist() |
|
mos = df['MOS'].tolist() |
|
elif args.target_set == 'val-kv1k': |
|
videos_dir = './datasets/KoNViD/KoNViD_1k_videos/' |
|
datainfo = './datasets/KoNViD/metainfo/KoNVid_metadata.csv' |
|
df = pd.read_csv(datainfo) |
|
files = df['Filename'].tolist() |
|
files = [str(file) + '.mp4' for file in files] |
|
mos = df['MOS'].tolist() |
|
elif args.target_set == 'val-ytugc': |
|
videos_dir = './datasets/YouTubeUGC/' |
|
datainfo = './datasets/YouTubeUGC/../meta_info/Youtube-UGC_metadata.csv' |
|
df = pd.read_csv(datainfo) |
|
files = df['filename'].tolist() |
|
mos = df['MOSFull'].tolist() |
|
files = [str(file) + '_crf_10_ss_00_t_20.0.mp4' for file in files] |
|
else: |
|
print("unsupported video dataset for evaluation") |
|
assert(0) |
|
|
|
print(len(files)) |
|
|
|
pure_name_list = [] |
|
pre_overall = np.zeros(len(mos)) |
|
pre_smos = np.zeros(len(mos)) |
|
pre_tmos = np.zeros(len(mos)) |
|
pre_amos = np.zeros(len(mos)) |
|
gt_mos = np.array(mos) |
|
count = 0 |
|
|
|
for vi in range(len(mos)): |
|
video = files[vi] |
|
pure_name = os.path.splitext(video)[0] |
|
video_path = os.path.join(videos_dir, video) |
|
|
|
views, _ = spatial_temporal_view_decomposition( |
|
video_path, dopt["sample_types"], temporal_samplers |
|
) |
|
|
|
for k, v in views.items(): |
|
num_clips = dopt["sample_types"][k].get("num_clips", 1) |
|
if k == 'technical' or k == 'aesthetic': |
|
views[k] = ( |
|
((v.permute(1, 2, 3, 0) - mean_cover) / std_cover) |
|
.permute(3, 0, 1, 2) |
|
.reshape(v.shape[0], num_clips, -1, *v.shape[2:]) |
|
.transpose(0, 1) |
|
.to(args.device) |
|
) |
|
elif k == 'semantic': |
|
views[k] = ( |
|
((v.permute(1, 2, 3, 0) - mean_clip) / std_clip) |
|
.permute(3, 0, 1, 2) |
|
.reshape(v.shape[0], num_clips, -1, *v.shape[2:]) |
|
.transpose(0, 1) |
|
.to(args.device) |
|
) |
|
|
|
results = [r.mean().item() for r in evaluator(views)] |
|
|
|
|
|
pre_overall[count] = fuse_results(results)['overall'] |
|
pre_smos[count] = results[0] |
|
pre_tmos[count] = results[1] |
|
pre_amos[count] = results[2] |
|
pure_name_list.append(pure_name) |
|
print("Process ", video, ", predicted quality score is ", pre_overall[count]) |
|
count += 1 |
|
|
|
|
|
SROCC = stats.spearmanr(pre_overall, gt_mos)[0] |
|
KROCC = stats.stats.kendalltau(pre_overall, gt_mos)[0] |
|
|
|
|
|
beta_init = [np.max(gt_mos), np.min(gt_mos), np.mean(pre_overall), 0.5] |
|
popt, _ = curve_fit(logistic_func, pre_overall, gt_mos, p0=beta_init, maxfev=int(1e8)) |
|
pre_overall_logistic = logistic_func(pre_overall, *popt) |
|
|
|
PLCC = stats.pearsonr(gt_mos, pre_overall_logistic)[0] |
|
RMSE = np.sqrt(mean_squared_error(gt_mos, pre_overall_logistic)) |
|
|
|
print("Test results: SROCC={:.4f}, KROCC={:.4f}, PLCC={:.4f}, RMSE={:.4f}" |
|
.format(SROCC, KROCC, PLCC, RMSE)) |
|
|
|
save_to_csv(pure_name_list, pre_smos, pre_tmos, pre_amos, pre_overall, args.output) |