# This code clips the raw videos based on claps detected by the user during the execution and stores them in a new file import subprocess import glob import sounddevice as sd import argparse import os from scipy.signal import butter, filtfilt from scipy.io import wavfile from pylab import * def butter_lowpass(cutoff_low, cutoff_high, fs, order=5): nyq = 0.5 * fs cutoff_low = cutoff_low / nyq cutoff_high = cutoff_high / nyq b, a = butter(order, [cutoff_low, cutoff_high], btype='bandpass', analog=False) return b, a def butter_lowpass_filter(data, cutoff_low, cutoff_high, fs, order=5): b, a = butter_lowpass(cutoff_low, cutoff_high, fs, order=order) y = filtfilt(b, a, data) return y def find_start_frame_by_clap(path_to_audio_file, startt=0, stopt=500, where=None): """ Detect clap in audio file. Sound is played and detection must be confirmed by pressing [d] or discarded by pressing [e]. [c] to restart, [a] to listen again, :param path_to_audio_file: path to .wav file startt: default 0, time in seconds to start search stopt: default 100, time in seconds to end search. By default only within the first minute and 30 seconds the clap is searched. where: valuesallowed are 'start' or 'end' if provided first half or last half resp. is set to zero :return: second_of_clap: Second of .wav file in which clap takes place """ audioFreq, audio = wavfile.read(path_to_audio_file) audio.setflags(write=1) # set part of sequence to zero if searched for start / end clap if where != None: if where == 'end': audio[0:int(len(audio) / 2)] = 0 elif where == 'start': audio[int(len(audio) / 2):len(audio)] = 0 else: print('where must be \'start\' oder \'end\'') cutoff_low = 1000 # desired cutoff frequency of the filter, Hz cutoff_high = 1400 fs = 3000 order = 5 foundStartClap = False k = 0 while not foundStartClap: k += 10 s = int(audioFreq * startt) e = int(audioFreq * (stopt + k)) # Filter .wav file within given range of time and find argmax amplitude y = butter_lowpass_filter(audio[s:e, 0], cutoff_low, cutoff_high, fs, order) maxamp = np.argmax(y) # -/+ audioFreq is necessary to actually hear something (1 second) # when audio is played. l = int(maxamp - 2 * audioFreq) u = int(maxamp + 2 * audioFreq) print(l / audioFreq, u / audioFreq) sd.play(audio[l:u], audioFreq, blocking=True) print('Clap playing second: ', np.ceil(maxamp / audioFreq)) # Capture keyboard events while True: print("Press [d] if clap was detected; [c] if sound was no clap; " "[a] to hear again; [e] to pass; press [n] if clap not " "found at all, second of clap will be set to one") x = input() if x == 'd': second_of_clap = maxamp / audioFreq print('Clap accepted; takes place at second ', second_of_clap) foundStartClap = True break elif x == 'c': print('Restart clap search') audio[l:u] = 0 break elif x == 'a': sd.play(audio[l:u], audioFreq, blocking=True) elif x == 'e': return 0 elif x == 'n': second_of_clap = 1 foundStartClap = True print('Clap not found, video will start at ', second_of_clap) break else: print('Key must be [d],[c], [a] or [e]') return second_of_clap def clip_video_from_clap(video_path, output_path, start_time, end_time): command_clip = ( f"ffmpeg -i {video_path} -ss {start_time} -to {end_time} -c copy {output_path}" ) subprocess.call(command_clip, shell=True) if __name__ == "__main__": """ This script processes video all in once. The default directory for videos being processed is data/Videos. Frames are automatically extracted after clap detection finished. The following steps are performed: 1) Create folder based on video name 2) Extract audio file 3) Clap detection (starting and ending clap) using low pass filter in video for synchronization of the three cameras. Make sure sound is turned on. Clap detection results are written to offset.csv. Headphones necessary to confirm detection. 4) Extract video frames using ffmpeg Args: inputpath: Path to folder with videos. Returns: Folder containing audio file and video frames. Writes number of frame where clap takes place and second of clap to offset.csv. """ parser = argparse.ArgumentParser(description='Processes all Videos in ' '--inputpath at once') parser.add_argument('--inputpath', default='/home/valapil/Project/ForkCausal_Adithya/raw_video/pair3', help='Path to folder with videos to process') parser.add_argument('--outputpath', default='/home/valapil/Project/ForkCausal_Adithya/clipped_vid', help='Path to folder with pair-folders') parser.add_argument('--ExtractFrames', action='store_false', help='If true frames are extracted from video') args = parser.parse_args() print(args, args.ExtractFrames) # Comment in if side view perspective is processed as well. # videofiles = glob.glob(args.inputpath + '/*.MTS') # videofiles.extend(glob.glob(args.inputpath + '/*.MOV')) # videofiles = sorted(videofiles) videofiles = sorted(glob.glob(args.inputpath + '/*.MTS')) print('Videos to process: ', args.inputpath, videofiles) f = open(os.path.join(args.outputpath, 'offset.csv'), 'a') # Extract clap and audio for video_dir in videofiles: accept_Video = True print('Processing video: ', video_dir) video = os.path.basename(video_dir) video_file = os.path.basename(video).split('.')[0] video_file = video_file.replace('r', 'o') print('PROCESS ', video_dir) # Create directory for pair pair = 'pair_' + str(video_file[0:3]) create_folder_pair = os.path.join(args.outputpath, pair) command_folder = "mkdir -p " + create_folder_pair subprocess.call(command_folder, shell=True) # Create directory for images in outputpath create_folder_img = os.path.join(args.outputpath, pair, video_file, 'images') command_folder_img = "mkdir -p " + create_folder_img subprocess.call(command_folder_img, shell=True) # Create directory for audio file in outputpath create_folder_audio = os.path.join(args.outputpath, pair, video_file, 'audio') command_folder_audio = "mkdir -p " + create_folder_audio subprocess.call(command_folder_audio, shell=True) # Extract audio from video aud = os.path.join(create_folder_audio, video_file + "_audio.wav") command_audio = "ffmpeg -i " + video_dir + \ " -ab 160k -ac 2 -ar 44100 -vn " + aud subprocess.call(command_audio, shell=True) # Detect strating and ending claps in video for i in ['start', 'end']: second_of_clap = find_start_frame_by_clap(path_to_audio_file =aud, where=i) # Video camera from media department format if os.path.basename(video).split('.')[1] == 'MTS': frame_rate = 25 frame_of_clap = int(second_of_clap * frame_rate) # ipad video format elif os.path.basename(video).split('.')[1] == 'MOV': # original frame rate is of .MOV is 29.984664, but side # view should be synchronized with frontal perspective frame_rate = 25 frame_of_clap = int(second_of_clap * frame_rate) print(video_file, i, second_of_clap, frame_of_clap) if i == 'start': start_clip = second_of_clap elif i == 'end': if second_of_clap == start_clip: accept_Video = False else: end_clip = second_of_clap if accept_Video == True: clip_output_path = os.path.join(create_folder_pair, video_file, f"{video}_clipped.mp4") clip_video_from_clap(video_path=video_dir, output_path=clip_output_path, start_time=start_clip, end_time= end_clip) # f.write(f'{video_file} {i} {second_of_clap} {frame_of_clap}\n') f.close() # # Extract video frames to pair/condition/images # if args.ExtractFrames: # print('Start extracting frames ... ') # for video_dir in videofiles: # video = os.path.basename(video_dir) # video_file = os.path.basename(video).split('.')[0] # # you can exclude videos (e.g. sideview) # # processedVideos = ['I am already processed'] # # if np.isin(video_file, processedVideos, invert=True): # pair = 'pair_' + str(video_file[0:3]) # create_folder_pair = os.path.join(args.outputpath, pair) # create_folder = os.path.join(create_folder_pair, video_file, # 'images') # vid = create_folder + '/' + video_file + '_%05d.png' # # # Call from shell # command_frames = "ffmpeg -i " + video_dir + " -r 25 " + vid # subprocess.call(command_frames, shell=True)