From 7cdd498a3c0e2140daa69fa2ae47d06c06e303d6 Mon Sep 17 00:00:00 2001 From: eziolotta <35468578+eziolotta@users.noreply.github.com> Date: Tue, 16 Mar 2021 15:32:03 +0100 Subject: [PATCH] Corpora collector (#129) --- .../mitads-speech-part1.yaml | 41 ++ MITADS-Speech/corpora_collector.py | 644 +++++++++++++++--- MITADS-Speech/utils/collector_util.py | 78 +++ 3 files changed, 674 insertions(+), 89 deletions(-) create mode 100644 MITADS-Speech/assets/corpora_collector/mitads-speech-part1.yaml create mode 100644 MITADS-Speech/utils/collector_util.py diff --git a/MITADS-Speech/assets/corpora_collector/mitads-speech-part1.yaml b/MITADS-Speech/assets/corpora_collector/mitads-speech-part1.yaml new file mode 100644 index 00000000..49f63bf6 --- /dev/null +++ b/MITADS-Speech/assets/corpora_collector/mitads-speech-part1.yaml @@ -0,0 +1,41 @@ +## +name: 'mitads-speech-part1' +version: '0.1' +description: 'MITADS-Speech Dataset, filter audio more than 20 second' + +## to be more usable we split final corpora into parts +split_final_dataset: 18 +csv_rel_path_linux: True +corpus2collect: + ##evalita2009: + + ## filter: + ## max_duration: 20 + ##mspka: + ## filter: + ## max_duration: 20 + ##siwis: + #### filter: + ## max_duration: 20 + + m-ailabs: + filter: + max_duration: 20 + + mls: + filter: + max_duration: 20 + comments_contains: + ## filter ancient work by author + - Dante Alighieri + - Giovanni Francesco Straparola + - Niccolò Machiavelli + ##filter title book that is present in m-ailabs + - Novelle per un anno + - Galatea + - Il fu Mattia Pascal + - Ritratto del Diavolo + - Contessa di Karolystria + - Le meraviglie del Duemila + - Malavoglia + \ No newline at end of file diff --git a/MITADS-Speech/corpora_collector.py b/MITADS-Speech/corpora_collector.py index 4445f359..a3ba9357 100644 --- a/MITADS-Speech/corpora_collector.py +++ b/MITADS-Speech/corpora_collector.py @@ -3,132 +3,598 @@ import time import os +import re from os import path, makedirs +import ntpath +from pathlib import Path +import random import logging import progressbar from utils.downloader import SIMPLE_BAR from multiprocessing import Pool from shutil import copyfile +import yaml ## pip install PyYAML +from corpora_importer import BASE_OUTPUT_FOLDER_NAME,FIELDNAMES_CSV_MINIMAL,FIELDNAMES_CSV_FULL +import csv +import zipfile +import argparse +from random import randrange +random.seed(10) logging.basicConfig(level=logging.DEBUG) +FIELDNAMES_CSV = ["wav_filename", "wav_filesize", "transcript","speaker_id","duration"] -corpus2collect = [] -######append -corpus2collect = ['evalita2009','siwis','mspka'] -##if wav location is in onother device, fill this list -corpus2collect_alt_dir = {} +parser = argparse.ArgumentParser() +subparsers = parser.add_subparsers(dest="subcommand") +subparsers.required = True +collector_parser = subparsers.add_parser('collector') +collector_parser.add_argument('-c', '--config_file', type=str, default=os.path.join(os.path.dirname(__file__),'assets','corpora_collector', 'mitads-speech-v0.1.yaml'), + help='Configuration of current collector' + 'Default is assets/mitads-speech-v0.1.yaml') -corpora_output_dir = os.path.abspath('collected_corpora') -corpora_wav_output_dir = os.path.join(corpora_output_dir,'audios') +collector_parser.add_argument('-o', '--csv_folder', type=str, default=os.path.abspath(BASE_OUTPUT_FOLDER_NAME), + help='root folder of csv dataset to collect, also is root of output csv' + 'default is root_project/MITADS-Speech-output') -fout_train = None -fout_test = None -fout_dev = None -fout_full_train = None -all_filenames= set() +collector_parser.add_argument('-z', '--zip_output', type=str, default='true', + help='if true collect files into .zip. If false files are copyed to a folder in csv_folder') -def collect_datasets(): + + +def load_corpora_config(config_file_path): + config = None + with open(config_file_path, 'r') as stream: + try: + config = yaml.safe_load(stream) + except yaml.YAMLError as exc: + raise(exc) + return config + + +def get_wav_filename_abs_path(csv_wav_filename,csv_corpus_rootdir,corpus_name): + if(not os.path.isabs(csv_wav_filename)): + ##is relative path + file_name = csv_wav_filename.rsplit('/', 1)[-1] + wav_filename_path = os.path.join(csv_corpus_rootdir,corpus_name,'audios',file_name) + else: + return csv_wav_filename + +def execute_dataset_balancing(corpus_rows,save_vocab=True): + ##ESEGUI BILANCIAMENTO + from utils.collector_util import create_vocabulary,get_min_corpus_cover_vocab,save_vocabulary + vocab = create_vocabulary(corpus_rows) + if(save_vocab): + save_vocabulary(vocab,os.path.join(corpora_output_dir,'vocab.txt')) + + corpus_rows = get_min_corpus_cover_vocab(corpus_rows,vocab) + + return corpus_rows + +def speaker_filter(csv_rows,min_speaker_minute): + + speakers_rows = {} + speakers_duration = {} + ret_rows = [] + for row in csv_rows: + speaker_id = row[3] + duration =float(row[4]) + if(speaker_id==None or speaker_id==''): + ##unknown speaker + continue + + dur = speakers_duration.get(speaker_id,0) + dur +=duration + curr_rows = speakers_rows.get(speaker_id,[]) + curr_rows.append(row) + speakers_duration[speaker_id] = dur + speakers_rows[speaker_id] = curr_rows + + ##check min minute filter + for speaker_id, duration in speakers_duration.items(): + if(duration>= min_speaker_minute*60): + ##ok + ret_rows.extend(speakers_rows[speaker_id]) + + return ret_rows + + + +def get_info_and_stats(csv_rows): + ############## + ##get info and stats + total_duration = 0 + ##total_size = + speakers_data = {} + corpus_stats = {} + min_audio_duration = 9999 + max_audio_duration = 0 + #################### + unknown_speaker_corpus = {} + + ##check unique filename and replace path + for row in csv_rows: + + speaker_id = row[3] + duration =float(row[4]) + + corpus_name = row[7] + + ###################### + ##append info report + + curr_corpus_stats = corpus_stats.get(corpus_name,{}) + corpus_stats[corpus_name] = curr_corpus_stats + + total_duration +=duration + + min_audio_duration = duration if durationmax_audio_duration else max_audio_duration + + curr_speaker_data = None + if(speaker_id not in speakers_data): + curr_speaker_data = {'corpus':corpus_name,'minutes':0} + speakers_data[speaker_id] = curr_speaker_data + else: + curr_speaker_data = speakers_data[speaker_id] + curr_speaker_data['minutes'] = curr_speaker_data['minutes'] + duration/60 + + if(speaker_id==''): + #unknow speaker + unknown_speaker_corpus[corpus_name] = True + ################# + c_duration = curr_corpus_stats.get('duration',0) + spekaers_curr = curr_corpus_stats.get('speakers',set()) + spekaers_curr.add(speaker_id) + c_duration += duration + curr_corpus_stats['duration'] = c_duration + curr_corpus_stats['speakers'] = spekaers_curr + ########################### + + global_stats = {'total_duration':total_duration,'min_audio_duration':min_audio_duration,'max_audio_duration':max_audio_duration} + + return (corpus_stats,speakers_data,unknown_speaker_corpus,global_stats) + + +def collect_datasets(config,args): + + zip_output = True if args.zip_output.lower()=='true' else False + csv_corpus_rootdir = args.csv_folder + corpus2collect = config['corpus2collect'] + + + ######append + #corpus2collect = ['mls'] ## 'evalita2009','siwis','mspka' + ##if wav location is in onother device, fill this list + #corpus2collect_alt_dir = {} + + #corpora_output_dir = os.path.abspath('collected_corpora') + + split_final_dataset = config.get('split_final_dataset',"0") + split_final_dataset = int(split_final_dataset) if int(split_final_dataset)>=2 else None + + csv_rel_path_linux = config.get('csv_rel_path_linux', True) + + + final_corpora_name = config['name'] + final_corpora_version = config['version'] + output_corpora_foldername = final_corpora_name + '_' + 'v' + final_corpora_version + corpora_output_dir = os.path.join(csv_corpus_rootdir, output_corpora_foldername) + if not path.exists(corpora_output_dir): print('No path "%s" - creating ...' % corpora_output_dir) makedirs(corpora_output_dir) - if not path.exists(corpora_wav_output_dir): - print('No path "%s" - creating ...' % corpora_wav_output_dir) - makedirs(corpora_wav_output_dir) - - - fout_train =open(os.path.join(corpora_output_dir,'train.csv',),"a",encoding='utf-8') - fout_test =open(os.path.join(corpora_output_dir,'test.csv'),"a",encoding='utf-8') - fout_dev =open(os.path.join(corpora_output_dir,'dev.csv'),"a",encoding='utf-8') - fout_full_train =open(os.path.join(corpora_output_dir,'train_full.csv'),"a",encoding='utf-8') - csv_outputs = [fout_train,fout_test,fout_dev,fout_full_train] - - print(f"Collect csv train/test/dev/train_full...") - all_wav_file_origin = [] - all_filenames = set() - for corpus_name in corpus2collect: - files = collect_all_csv(corpus_name,csv_outputs) - ##unique name validation - curr_filenames = set([f.split('/')[-1] for f in files]) - ##if intersection is not empty, is not unique names - if(len(all_filenames & curr_filenames))>0: - raise('not unique name, check it!') + + + csv_rows = [] + + + count_filename_renamed = 0 + + for corpus_name in corpus2collect: - all_filenames.update(curr_filenames) - ######################### - data_dir = corpus2collect_alt_dir.get(corpus_name,None) - for f_rel in files: - f_rel = f_rel.replace('/',os.path.sep) - dataset_path = os.path.abspath(corpus_name) if data_dir==None else os.path.join(data_dir, corpus_name) - origin_data_path = os.path.join(dataset_path, "origin") if data_dir==None else data_dir - wav_path = os.path.join(origin_data_path, f_rel) - all_wav_file_origin.append(wav_path) - - fout_train.close() - fout_test.close() - fout_dev.close() - fout_full_train.close() - #################################### - ##copy file - print(f"Collect Wav files...") - pool = Pool() - bar = progressbar.ProgressBar(max_value=len(all_wav_file_origin), widgets=SIMPLE_BAR) - for i, _ in enumerate(pool.imap_unordered(_maybe_copy_one, all_wav_file_origin), start=1): - bar.update(i) - - bar.update(len(all_wav_file_origin)) - pool.close() - pool.join() + ##filter sample based on configuration file - ex. duration , comments + curr_corpus_csv_path = os.path.join(csv_corpus_rootdir,corpus_name,'train_full.csv') + if(not os.path.exists(curr_corpus_csv_path)): + raise ValueError('file not found: {}'.format(curr_corpus_csv_path)) + + print("Filter and Collect Corpus {}...".format(curr_corpus_csv_path)) + + corpus_rows = filter_corpus(curr_corpus_csv_path,config['corpus2collect'][corpus_name]) + + ######################################## + + ##check unique filename and replace path + for row in corpus_rows: + wav_filename_path = row[0] + + + ##check if file exist + wav_filename_path = get_wav_filename_abs_path(wav_filename_path,csv_corpus_rootdir,corpus_name) + if(not os.path.exists(wav_filename_path)): + raise ValueError('file {} not exist'.format(wav_filename_path)) + + ###override path with absolute + row[0] = wav_filename_path + ################ + row.append(corpus_name) ##need for stats report + # temporarily append the final absolute path of wav, it will be removed when collect wav files + #row.append(wav_destination_path) + + ##append row + csv_rows.append(row) + ###################### + + ############ + ##balance + if(config.get('vocabulary_balance',False)==True): + print("vocabulary_balance...") + csv_rows = execute_dataset_balancing(csv_rows) + + if(config.get('min_speaker_minute',None)!=None): + print("Filter Speakers...") + csv_rows = speaker_filter(csv_rows,config['min_speaker_minute']) + + ################################## + #random.seed(76528) + random.shuffle(csv_rows) + + ##get stats + corpus_stats,speakers_data,unknown_speaker_corpus,global_stats = get_info_and_stats(csv_rows) + ## + + ######################## + all_filenames = [] + + dataset_parts = [] + if(split_final_dataset==None): + ##generate one final dataset folder + dataset_parts.append((None,csv_rows)) + else: + ##generate final dataset slitted + len_sub = int(len(csv_rows)/split_final_dataset) + for i in range(split_final_dataset): + #scurr_len = len(csv_rows) - len_sub*(split_final_dataset-1) if i==split_final_dataset-1 else len_sub + + if(i0 else len(speakers_data) + file.write('Number of Speakers: {}\n'.format(n_speakers)) + if len(unknown_speaker_corpus)>0: + from itertools import chain + middles_corpus = ','.join(unknown_speaker_corpus.keys()) + file.write('No speakers identified in corpus: {} \n'.format(middles_corpus)) + file.write('\n') + file.write('###################################################\n') + file.write('\n') + file.write('\n') + file.write('\n') + file.write('SPEAKER CORPUS MINUTES \n') + file.write('\n') + for speakers_id, data in speakers_data.items(): + file.write('{}{}{}'.format(speakers_id.ljust(24),data['corpus'].ljust(16),str(round(data['minutes'],2) ))) + file.write('\n') + +def write_csv(samples,output_path,train_size=0.8,test_size=0.1): + + + samples_len = len(samples) + train_len = int(samples_len*train_size) + test_len = int(samples_len*test_size) + if(samples_lenmax_duration_filter): + return True + + if(comments_contains_filter!=None and comments!=''): + for text in comments_contains_filter: + if(text in comments): + return True + + return False + +def filter_corpus(csv1_path,config): + + filter_cfg = config.get('filter',{}) if config !=None else {} + max_duration_filter = filter_cfg.get('max_duration',None) + comments_contains_filter = filter_cfg.get('comments_contains',None) + f = open(csv1_path,encoding='utf-8') - next(f) # skip the header - files = set() + next(f) # skip the header + + output_rows = [] for line in f: - wav_file_rel_path = line[0:line.find(',')] - filename = wav_file_rel_path.split('/')[-1] - if(wav_file_rel_path not in files): - files.add(wav_file_rel_path) + row = re.split('\t',line) + ## for order see corpora_importer FIELDNAMES_CSV_FULL + wav_filepath = row[0] + filesize = row[1] + trascript = row[2] + speaker_id = row[3] + duration = -1 + try: + duration = float(row[4]) + except Exception as e: + raise(e) + comments = row[5] + + if(not filter_row(row,max_duration_filter,comments_contains_filter)): + ##add + output_rows.append(row) ##append only column : wav_filename + filesize + transcript + speaker_id else: - raise(f'not unique wav filename {wav_file_rel_path} - fix!') + ##filtered + pass - new_file_path = 'audios/'+filename - new_line = line.replace(wav_file_rel_path,new_file_path) - file_output.write(new_line) f.close() # not really needed - return files + return output_rows + +def _maybe_copy_one(sample): -def _maybe_copy_one(origin_file_path): - ##need repeat define var, progress bar multiprocessor - corpora_output_dir = os.path.abspath('collected_corpora') - corpora_wav_output_dir = os.path.join(corpora_output_dir,'audios') - filename = os.path.split(origin_file_path)[-1] + original_wav_path = sample[0] + destination_wav_path = sample[-1] ##last column we append destination path - dest_file = os.path.join(corpora_wav_output_dir,filename) - if(os.path.isfile(dest_file)): + if(os.path.isfile(destination_wav_path)): ##file exist , skip return ##copy file - copyfile(origin_file_path, dest_file) - ################################# + copyfile(original_wav_path, destination_wav_path) + if __name__ == "__main__": - collect_datasets() \ No newline at end of file + + + args = collector_parser.parse_args() + + config = load_corpora_config(args.config_file) + + collect_datasets(config,args) \ No newline at end of file diff --git a/MITADS-Speech/utils/collector_util.py b/MITADS-Speech/utils/collector_util.py new file mode 100644 index 00000000..c740bade --- /dev/null +++ b/MITADS-Speech/utils/collector_util.py @@ -0,0 +1,78 @@ +#from keras.preprocessing.text import text_to_word_sequence + +maketrans = str.maketrans + +def text_to_word_sequence(text, + filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n', + lower=True, split=" "): + """Converts a text to a sequence of words (or tokens). + # Arguments + text: Input text (string). + filters: list (or concatenation) of characters to filter out, such as + punctuation. Default: ``!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\\t\\n``, + includes basic punctuation, tabs, and newlines. + lower: boolean. Whether to convert the input to lowercase. + split: str. Separator for word splitting. + # Returns + A list of words (or tokens). + """ + + if lower: + text = text.lower() + + translate_dict = {c: split for c in filters} + translate_map = maketrans(translate_dict) + text = text.translate(translate_map) + + seq = text.split(split) + return [i for i in seq if i] + + +def create_vocabulary(rows): + + + # estimate the size of the vocabulary + words = set() + for row in rows: + transcript = row[2]## row['transcript'] + _words = set(text_to_word_sequence(transcript)) + words.update(_words) + + vocab_size = len(words) + print('Generated vocab, size {}'.format(vocab_size)) + # integer encode the document + #result = one_hot(text, round(vocab_size*1.3)) + #print(result) + + return words + + +def save_vocabulary(vocab,file_path): + with open(file_path, 'w',encoding='utf-8') as out: + for word in vocab: + out.write(word + '\n') + + + +def get_min_corpus_cover_vocab(corpus_rows,vocab,max_speaker=1): + + consumed_words = set() + output_rows = [] + for row in corpus_rows: + + transcript = row[2] ## row['transcript'] + #speaker_id = row[3] + words = set(text_to_word_sequence(transcript)) + + ##se ci sono elementi da consumare + if(not words.issubset(consumed_words)): + ##OK - PRESO - ancora non incluso + output_rows.append(row) + consumed_words.update(words) + else: + pass + + + return output_rows + +