| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- # This code clips the raw videos based on claps detected by the user during the execution and stores them in a new file
- import subprocess
- import glob
- import sounddevice as sd
- import argparse
- import os
- from scipy.signal import butter, filtfilt
- from scipy.io import wavfile
- from pylab import *
- def butter_lowpass(cutoff_low, cutoff_high, fs, order=5):
- nyq = 0.5 * fs
- cutoff_low = cutoff_low / nyq
- cutoff_high = cutoff_high / nyq
- b, a = butter(order, [cutoff_low, cutoff_high], btype='bandpass',
- analog=False)
- return b, a
- def butter_lowpass_filter(data, cutoff_low, cutoff_high, fs, order=5):
- b, a = butter_lowpass(cutoff_low, cutoff_high, fs, order=order)
- y = filtfilt(b, a, data)
- return y
- def find_start_frame_by_clap(path_to_audio_file, startt=0, stopt=500,
- where=None):
- """
- Detect clap in audio file. Sound is played and detection must be confirmed
- by pressing [d] or discarded by pressing [e]. [c] to restart, [a] to
- listen again,
- :param
- path_to_audio_file: path to .wav file
- startt: default 0, time in seconds to start search
- stopt: default 100, time in seconds to end search. By default only within
- the first minute and 30 seconds the clap is searched.
- where: valuesallowed are 'start' or 'end' if provided first half or last
- half resp. is set to zero
- :return:
- second_of_clap: Second of .wav file in which clap takes place
- """
- audioFreq, audio = wavfile.read(path_to_audio_file)
- audio.setflags(write=1)
- # set part of sequence to zero if searched for start / end clap
- if where != None:
- if where == 'end':
- audio[0:int(len(audio) / 2)] = 0
- elif where == 'start':
- audio[int(len(audio) / 2):len(audio)] = 0
- else:
- print('where must be \'start\' oder \'end\'')
- cutoff_low = 1000 # desired cutoff frequency of the filter, Hz
- cutoff_high = 1400
- fs = 3000
- order = 5
- foundStartClap = False
- k = 0
- while not foundStartClap:
- k += 10
- s = int(audioFreq * startt)
- e = int(audioFreq * (stopt + k))
- # Filter .wav file within given range of time and find argmax amplitude
- y = butter_lowpass_filter(audio[s:e, 0], cutoff_low,
- cutoff_high, fs, order)
- maxamp = np.argmax(y)
- # -/+ audioFreq is necessary to actually hear something (1 second)
- # when audio is played.
- l = int(maxamp - 2 * audioFreq)
- u = int(maxamp + 2 * audioFreq)
- print(l / audioFreq, u / audioFreq)
- sd.play(audio[l:u], audioFreq, blocking=True)
- print('Clap playing second: ', np.ceil(maxamp / audioFreq))
- # Capture keyboard events
- while True:
- print("Press [d] if clap was detected; [c] if sound was no clap; "
- "[a] to hear again; [e] to pass; press [n] if clap not "
- "found at all, second of clap will be set to one")
- x = input()
- if x == 'd':
- second_of_clap = maxamp / audioFreq
- print('Clap accepted; takes place at second ', second_of_clap)
- foundStartClap = True
- break
- elif x == 'c':
- print('Restart clap search')
- audio[l:u] = 0
- break
- elif x == 'a':
- sd.play(audio[l:u], audioFreq, blocking=True)
- elif x == 'e':
- return 0
- elif x == 'n':
- second_of_clap = 1
- foundStartClap = True
- print('Clap not found, video will start at ', second_of_clap)
- break
- else:
- print('Key must be [d],[c], [a] or [e]')
- return second_of_clap
- def clip_video_from_clap(video_path, output_path, start_time, end_time):
- command_clip = (
- f"ffmpeg -i {video_path} -ss {start_time} -to {end_time} -c copy {output_path}"
- )
- subprocess.call(command_clip, shell=True)
- if __name__ == "__main__":
- """
- This script processes video all in once. The default directory for videos
- being processed is data/Videos. Frames are automatically extracted after
- clap detection finished. The following steps are performed:
- 1) Create folder based on video name
- 2) Extract audio file
- 3) Clap detection (starting and ending clap) using low pass filter in
- video for synchronization of the three cameras.
- Make sure sound is turned on. Clap detection results are written to
- offset.csv. Headphones necessary to confirm detection.
- 4) Extract video frames using ffmpeg
- Args:
- inputpath: Path to folder with videos.
- Returns:
- Folder containing audio file and video frames. Writes number of
- frame where clap takes place and second of clap to offset.csv.
- """
- parser = argparse.ArgumentParser(description='Processes all Videos in '
- '--inputpath at once')
- parser.add_argument('--inputpath',
- default='/home/valapil/Project/ForkCausal_Adithya/raw_video/pair3',
- help='Path to folder with videos to process')
- parser.add_argument('--outputpath',
- default='/home/valapil/Project/ForkCausal_Adithya/clipped_vid',
- help='Path to folder with pair-folders')
- parser.add_argument('--ExtractFrames', action='store_false',
- help='If true frames are extracted from video')
- args = parser.parse_args()
- print(args, args.ExtractFrames)
- # Comment in if side view perspective is processed as well.
- # videofiles = glob.glob(args.inputpath + '/*.MTS')
- # videofiles.extend(glob.glob(args.inputpath + '/*.MOV'))
- # videofiles = sorted(videofiles)
- videofiles = sorted(glob.glob(args.inputpath + '/*.MTS'))
- print('Videos to process: ', args.inputpath, videofiles)
- f = open(os.path.join(args.outputpath, 'offset.csv'), 'a')
- # Extract clap and audio
- for video_dir in videofiles:
- accept_Video = True
- print('Processing video: ', video_dir)
- video = os.path.basename(video_dir)
- video_file = os.path.basename(video).split('.')[0]
- video_file = video_file.replace('r', 'o')
- print('PROCESS ', video_dir)
- # Create directory for pair
- pair = 'pair_' + str(video_file[0:3])
- create_folder_pair = os.path.join(args.outputpath, pair)
- command_folder = "mkdir -p " + create_folder_pair
- subprocess.call(command_folder, shell=True)
- # Create directory for images in outputpath
- create_folder_img = os.path.join(args.outputpath, pair,
- video_file, 'images')
- command_folder_img = "mkdir -p " + create_folder_img
- subprocess.call(command_folder_img, shell=True)
- # Create directory for audio file in outputpath
- create_folder_audio = os.path.join(args.outputpath, pair,
- video_file, 'audio')
- command_folder_audio = "mkdir -p " + create_folder_audio
- subprocess.call(command_folder_audio, shell=True)
- # Extract audio from video
- aud = os.path.join(create_folder_audio,
- video_file + "_audio.wav")
- command_audio = "ffmpeg -i " + video_dir + \
- " -ab 160k -ac 2 -ar 44100 -vn " + aud
- subprocess.call(command_audio, shell=True)
- # Detect strating and ending claps in video
- for i in ['start', 'end']:
- second_of_clap = find_start_frame_by_clap(path_to_audio_file
- =aud, where=i)
- # Video camera from media department format
- if os.path.basename(video).split('.')[1] == 'MTS':
- frame_rate = 25
- frame_of_clap = int(second_of_clap * frame_rate)
- # ipad video format
- elif os.path.basename(video).split('.')[1] == 'MOV':
- # original frame rate is of .MOV is 29.984664, but side
- # view should be synchronized with frontal perspective
- frame_rate = 25
- frame_of_clap = int(second_of_clap * frame_rate)
- print(video_file, i, second_of_clap, frame_of_clap)
- if i == 'start':
- start_clip = second_of_clap
- elif i == 'end':
- if second_of_clap == start_clip:
- accept_Video = False
- else:
- end_clip = second_of_clap
- if accept_Video == True:
- clip_output_path = os.path.join(create_folder_pair, video_file, f"{video}_clipped.mp4")
- clip_video_from_clap(video_path=video_dir, output_path=clip_output_path, start_time=start_clip, end_time= end_clip)
- # f.write(f'{video_file} {i} {second_of_clap} {frame_of_clap}\n')
- f.close()
- # # Extract video frames to pair/condition/images
- # if args.ExtractFrames:
- # print('Start extracting frames ... ')
- # for video_dir in videofiles:
- # video = os.path.basename(video_dir)
- # video_file = os.path.basename(video).split('.')[0]
- # # you can exclude videos (e.g. sideview)
- # # processedVideos = ['I am already processed']
- # # if np.isin(video_file, processedVideos, invert=True):
- # pair = 'pair_' + str(video_file[0:3])
- # create_folder_pair = os.path.join(args.outputpath, pair)
- # create_folder = os.path.join(create_folder_pair, video_file,
- # 'images')
- # vid = create_folder + '/' + video_file + '_%05d.png'
- #
- # # Call from shell
- # command_frames = "ffmpeg -i " + video_dir + " -r 25 " + vid
- # subprocess.call(command_frames, shell=True)
|