1
\$\begingroup\$

enter image description here

The xstack filter allows to build a mosaic video file with ffmpeg.

https://ffmpeg.org/ffmpeg-filters.html#xstack

To automate its usage, I wrote a script that takes video files with unmatched resolutions, durations and ratios, and used letterboxing to fit them and encode it with only 2 stages, for video and audio. I did not want to have intermediate files, so it works with only 3 stages (scan, convert, mix_audio). I'm not sure if it would execute faster if it used intermediates files.

To spread and "balance" videos across 4 segments of similar length, I use a bin packing algorithm:

1. Have 4 mosaic lists of video segments
2. Pick the longest video segment among non-inserted segments
3. Insert it into the mosaic segment list that has the shortest duration length.
4. repeat step 2 and 3 until all video has been inserted

The code is not the cleanest, but I used it several times and I think it works well enough. The script generates a ffmpeg command dump to inspect if it follows good "ffmpeg practices" for each filter. Video encoding is not my specialty, and I worked several times to fix audio/video desync problems, SAR/DAR issues, and to make letterboxing work, which was not easy to understand, and folks on #ffmpeg on libera and stackoverflow helped me a lot.

This script also generates a HTML report for every video file, that shows codecs, SAR/DAR, resolutions, durations, etc.

Apart from the obvious messiness, I wanted to know if there are possible improvements to this script, and if someones thinks there are better ways to do this. I'm not 100% sure it's using ffmpeg correctly, since it's not always easy to listen to desyncs or to spot for problems. This command often generates warnings, but I'm not sure they're serious.

Feel free to test it!

There are other python script that handle ffmpeg, but I don't think they can really do everything this script does (I haven't really tried those other scripts in details, but I'm guessing they have limitations).

from os import listdir as ls
from shutil import copyfile as cp
from sys import argv
import subprocess
from pprint import pformat
import os, sys
import time
import datetime
import random
################ REQUIREMENTS ################
ffmpeg = r"c:/ffmpeg.exe"
ffprobe = r"c:/ffprobe.exe"
'''
################ USAGE ################
0. ALL VIDEO FILES MUST HAVE AN AUDIO TRACK
1. python ffmpeg-mosaic.py <VIDEO FOLDER PATH> scan
 Scans for video files, generates a HTML file to spot files without audio
2. python ffmpeg-mosaic.py <VIDEO FOLDER PATH> convert
 Video processing
 For powershell users, use a folder with short paths to be below the CMD BATCH size limit.
3. python ffmpeg-mosaic.py <VIDEO FOLDER PATH> mix_audio <VIDEO FOLDER PATH>\xstacked\<THE XSTACK VIDEO FILE>
 Does the same thing for audio, and merge audio and video. This stage is of course much faster
'''
def lessshortdate():
 return datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S")
def shortdate():
 return datetime.datetime.now().strftime("%Y-%m-%d--%H-%M")
def sizeof_fmt(num, suffix=''):
 for unit in ['','K','M','G','T','P','E','Z']:
 if abs(num) < 1024.0:
 numpart = "%.1f"%num
 if numpart[-2:] == '.0' or num >99: numpart = numpart[:-2]
 return "%s %s%s" % (numpart, unit, suffix)
 # return "%3.1f%s%s" % (num, unit, suffix)
 num /= 1024.0
 numpart = "%.1f"%num
 if numpart[-2:] == '.0': numpart = numpart[:-2]
 return "%s %s%s" % (numpart, 'Y', suffix)
# for html generation
def html_start(f, css=''):
 f.write("<html><head>")
 f.write("<meta http-equiv='Content-Type' content='text/html; charset=UTF-8' />")
 f.write('''
<script type="text/javascript">
 window.onload = function(){
 const getCellValue = (tr, idx) => tr.children[idx].innerText || tr.children[idx].textContent;
 const comparer = (idx, asc) => (a, b) => ((v1, v2) =>
 v1 !== '' && v2 !== '' && !isNaN(v1) && !isNaN(v2) ? v1 - v2 : v1.toString().localeCompare(v2)
 )(getCellValue(asc ? a : b, idx), getCellValue(asc ? b : a, idx));
 // do the work...
 document.querySelectorAll('th').forEach(th => th.addEventListener('click', (() => {
 const table = th.closest('table');
 Array.from(table.querySelectorAll('tr:nth-child(n+2)'))
 .sort(comparer(Array.from(th.parentNode.children).indexOf(th), this.asc = !this.asc))
 .forEach(tr => table.appendChild(tr) );
 })));
};
</script>''')
 f.write('''<style type='text/css'>
 .b{font-size:150%;} .b1{font-size:150%;} .b2{font-size:200%;} .b3{font-size:250%;}
 .s{font-size:75%;} .s1{font-size:50%;} .s2{font-size:25%;} .s3{font-size:75%;}
 .s3{font-size:50%;vertical-align:middle;}
 .al{vertical-align:middle;}
 .bgf{background: #bbb;}
 .r{text-align: left; font-family: sans-serif;}
 .node{padding-left:2px;padding-top:2px; border:1px solid black;float:right;}
 #w span { cursor: pointer; }
 #w span:hover { color:orange; }
 #w span:visited { color:green; }
 #w span:active { color:red; }
 .constantwidth {width:1000px; height:auto;}
 ''' + css +
 '''
 </style>
 </head>
 <body>
 ''')
def html_finish(f):
 f.write("</body></html>")
 f.close()
stage = sys.argv[2] if len(sys.argv) > 2 else None
def shfn(s, maxsize = 32):
 # if len(s)-a-b < 8:
 if len(s) < maxsize:
 return s
 return s[:int(maxsize*0.75)]+u'...'+s[-int(maxsize*0.25):]
# getting video info data
def get_info(path):
 # proc = subprocess.run([ffprobe]+args, stdout = subprocess.DEVNULL)
 # first show_format
 args = ["-show_format", "-print_format", "json", "-i", path]
 proc = subprocess.run([ffprobe]+args, capture_output=True)
 result = proc.stdout.decode("utf-8")
 show_format = eval(result)
 show_format = show_format['format']
 # print(show_format)
 if False: # spamming the terminal
 print(show_format['filename'])
 for a,b in show_format.items():
 print(' ', a, '\t', b)
 # then show_streams
 video = None
 audio = None
 args = ["-show_streams", "-print_format", "json", "-i", path]
 proc = subprocess.run([ffprobe]+args, capture_output=True)
 result = proc.stdout.decode("utf-8")
 streams = eval(result)
 streams = streams['streams']
 # print(len(streams))
 if len(streams) < 2:
 if 'codec_type' in streams[0] and streams[0]['codec_type'] == 'audio':
 return None, None
 video = streams[0]
 else:
 if 'codec_type' in streams[0]:
 if streams[0]['codec_type'] == 'audio':
 audio = streams[0]
 elif streams[0]['codec_type'] == 'video':
 video = streams[0]
 if 'codec_type' in streams[1]:
 if streams[1]['codec_type'] == 'audio':
 audio = streams[1]
 elif streams[1]['codec_type'] == 'video':
 video = streams[1]
 filename = os.path.basename(path)
 ret = {}
 # a if cond else b
 if 'duration' in video:
 ret['dur'] = time.strftime('%M:%S', time.gmtime(float(video['duration'])))
 ret['duration'] = float(video['duration'])
 elif 'duration' in show_format:
 ret['dur'] = time.strftime('%M:%S', time.gmtime(float(show_format['duration'])))
 ret['duration'] = float(show_format['duration'])
 elif 'tags' in video and 'DURATION' in video['tags']:
 ret['dur'] = video['tags']['DURATION']
 ret['duration'] = video['tags']['DURATION']
 else:
 ret['dur'] = '-'
 ret['duration'] = '-'
 if 'width' in video:
 ret['dim'] = str(video['width'])+'x'+str(video['height'])
 dim2 = (video['width'], video['height'])
 ret['ratio'] = str(round(dim2[0] / dim2[1], 3))
 else:
 ret['ratio'] = '-'
 ret['dim'] = '-'
 # ret['dur'] = time.strftime('%M:%S', time.gmtime(float(video['duration'])))
 # ret['dur'] = time.strftime('%M:%S', time.gmtime(float(dic['duration'])))
 ret['filesize'] = sizeof_fmt(os.path.getsize(path))
 # print(path)
 if 'avg_frame_rate' in video:
 framerate = video['avg_frame_rate']
 try:
 framerate_f = round(float(int(framerate.split('/')[0]) / int(framerate.split('/')[1])),3)
 except ZeroDivisionError:
 print(filename, framerate)
 framerate = "%s %s"%(str(framerate_f),framerate)
 else:
 framerate = '-'
 ret['framerate'] = framerate ### removed
 ret['codec'] = video['codec_name'] ### removed
 # ret['framerate'] = video['avg_frame_rate'] ### removed
 ret['audio'] = audio['codec_name'] if audio != None else '-'
 ret['filename'] = filename
 ret['path'] = path
 # ret['pix_fmt'] = video['pix_fmt'] ## removed
 # ret['ratio'] = video['display_aspect_ratio'].split(':')
 # ret['ratio'] = str(int(ret['ratio'][0])/int(ret['ratio'][1]))
 # print(ret)
 # print('finished')
 return ret, result
# printing a dict with return lines
def dirtyformat(dic):
 s = repr(dic)
 s = s.replace('{','{\n').replace('}', '}\n\n')
 return s
# html report
def write_html(data):
 tag = lambda s, t: "<%s>%s</%s>\n"%(t,s,t)
 trth = lambda ar: tag(''.join([tag(a, 'th') for a in ar]), 'tr')
 table = lambda s: tag(s, 'table')
 tr = lambda ar: tag(''.join([tag(a, 'td') for a in ar]), 'tr')
 keys = []
 for filename,stuff in data.items():
 # keys = stuff.keys()
 keys = [a for a,b in (stuff).items() if a != 'path']
 break
 # for filename,stuff in data.items():
 for a,b in data.items():
 b['filename'] = shfn(b['filename'],64)
 ar = [[b for a,b in (stuff).items() if a != 'path'] for filename, stuff in data.items()]
 # print(ar)
 # print(keys)
 f = open(argv[1]+"\\xstacked\\vid_data.nogit.html",'w', encoding = 'utf-8')
 html_start(f)
 f.write(table(
 trth(keys)+''.join([tr(line) for line in ar])
 ))
 html_finish(f)
# gathering video info, either scan or opening saved scan
def gather_vid_info(path, stage, segments_limit = False):
 print("###### PLEASE CHECK IF ALL FILES GOT AUDIO FIRST ! ######")
 import os
 from pprint import pformat
 vid_data = {}
 print('scanning dir', path)
 if 'file_list' in sys.argv:
 pass
 if stage == 'scan':
 raw = open(sys.argv[1]+"\\xstacked\\stdout.nogit.txt",'w',encoding='utf-8', newline='')
 i = 0
 for p in ls(path):
 if p[-4:] not in ['.avi','.mp4','webm']: continue
 sys.stdout.write("%d \r" % (i))
 sys.stdout.flush()
 abspath = os.path.abspath(path+'/'+p)
 if os.path.isdir(abspath):
 print('skipping', p)
 continue
 ret, result = get_info(abspath)
 if ret == None:
 print('failed, maybe only audio!')
 continue
 vid_data[p] = ret
 raw.write(abspath+'\n')
 raw.write(result.replace('\n\n','\n')+'\n\n--------------------\n\n')
 i+=1
 # saves the data
 open(sys.argv[1]+"\\xstacked\\vid_data.nogit.txt",'w', encoding='utf8').write(dirtyformat(vid_data))
 else:
 vid_data = eval(open(sys.argv[1]+"\\xstacked\\vid_data.nogit.txt", encoding='utf8').read())
 print('imported', len(vid_data))
 dic_with_audio = {k:data for k,data in vid_data.items() if data['audio'] != '-'}
 total = sum([data['duration'] for k, data in dic_with_audio.items()])
 print('total', time.strftime('%M:%S', time.gmtime(float(total))))
 write_html(vid_data)
 print("###### PLEASE CHECK IF ALL FILES GOT AUDIO FIRST ! ######")
 if segments_limit:
 print('segments_limit is ON, limiting to 15 files')
 vid_data_items = [(k,v) for k,v in vid_data.items()]
 vid_data = {k:v for k,v in vid_data_items[:15]}
 return vid_data
# re-encode video with width 360, framerate 30
def re_encode(data, new_folder):
 path = data['path']
 resize = False
 if int(data['dim'].split('x')[0]) > 500 and float(data['ratio']) < 1.8:
 resize = True
 args = [ffmpeg, '-i', path,] + "-r 30 -vf scale=540:-2".split() + [new_folder+data['filename']]
 subprocess.run(args)
# put n in the array with the smallest sum
def pickput(gr, n):
 sums = [(i,sum([t[0] for t in a])) for i,a in enumerate(gr)]
 smallest = sorted(sums, key = lambda a: a[1])[0][0] # oh cheesus
 gr[smallest].append(n)
# balance videos to minimize "holes" at the end
def balanced_sums(vid_data):
 durs = []
 for filename, data in vid_data.items():
 t = data['dur']
 t = int(t.split(':')[0])*60 + int(t.split(':')[1])
 durs.append((t, filename))
 durs.sort()
 gr = [[],[],[],[]]
 i = 0
 while len(durs):
 n = durs.pop()
 pickput(gr,n)
 i+=1
 f = open(sys.argv[1]+'\\xstacked\\vid_data-balanced.nogit.txt','w', encoding='utf8')
 for a in gr:
 f.write(repr((sum([t[0] for t in a]), [t[0] for t in a]))+'\n')
 f.write('\n\n---\n\n')
 random.seed(433)
 for a in gr:
 random.shuffle(a)
 for a in gr:
 f.write(repr((sum([t[0] for t in a]), [t[0] for t in a]))+'\n')
 for t in a:
 f.write(repr(t)+'\n')
 f.write('\n\n\n')
 remade = [[((j+1)*100+i, t[1], t[0]) for i,t in enumerate(a)] for j,a in enumerate(gr)]
 return remade
# build the xstack command arguments
def xstack(dic, mute = [], clean = False, lst = True):
 # target folder
 folder = next(iter(dic.values()))['path']
 folder = os.path.dirname(folder)
 folder+='\\xstacked\\'
 if not os.path.exists(folder):
 os.makedirs(folder)
 args = ['ffmpeg.exe'] if clean else [ffmpeg, '-report']
 # balancing into 4 list with alsmost similar length, to reduce end-of-video "holes"
 if False:
 dic = {a:b for a,b in dic.items() if float(b['framerate'].split()[0]) < 40}
 removed = {a:b for a,b in dic.items() if float(b['framerate'].split()[0]) >= 40}
 print('removed',len(removed))
 for a,b in dic.items():
 print(b['framerate'],',', float(b['framerate'].split()[0])<40)
 for a,b in removed.items():
 print('removed', b['filename'])
 balanced = balanced_sums(dic)
 # inputs
 backmap = {} # to track the id ffmpeg will assign to inputs
 j = 0
 filelist = open(sys.argv[1]+'\\xstacked\\filelist.nogit.txt','w', encoding='utf-8')
 for li in balanced:
 for i,k, dur in li:
 backmap[k] = j # tracking video id because ffmpeg use an ordered list of id as aliases
 if not lst:
 args += ['-i', 'file%d.mp4'%j] if clean else ['-i', '%s'%dic[k]['path']] # I swear this is a coincidence. Let's be honest why this script exists
 filelist.write('file "%s"\n'%dic[k]['path'])
 j+=1
 filelist.close()
 # this works, I tried many things. look at the docs. ow is output width, so guess the rest. setsar will set the sample aspect ratio to 1.
 letterbox = 'scale=480:270:force_original_aspect_ratio=decrease,pad=480:270:(ow-iw)/2:(oh-ih)/2, setsar=1, setpts=PTS-STARTPTS'
 # xstack argument with the layout
 # 2x2
 # xstack = "[concat0][concat1][concat2][concat3]xstack=inputs=4:layout=0_0|w0_0|0_h0|w0_h0"
 xstack = "[cct0][cct1][cct2][cct3]xstack=inputs=4:layout=0_0|w0_0|0_h0|w0_h0"
 # 4x4
 # applying letterbox BEFORE the concatenate stage. those are the 4 "list" from the balanced output
 # rescalers = ['','','','']
 rescalers = []
 for v,i in backmap.items():
 # print(v,i)
 vol = ''
 for a in mute:
 if a in v:
 vol = 'volume=0.01,'
 print('muting', v)
 break
 # testing with both audio and video to avoid desync
 rescalers.append("[rsclbf%d]fps=24[rscl%d];[%d]%s%s[rsclbf%d];"%(i,i,i, vol, letterbox, i))
 # don't really worry if the ids magically seem to be ordered when they should not, it's because tralala look at the code you know?
 concats = ['','','',''] # 16
 for quad, li in enumerate(balanced):
 concats[quad] = ''.join(["[rscl%d]"%(backmap[v]) for i,v,dur in li])+'concat=n=%d[cct%d];'%(len(li),quad)
 if lst:
 args +=['-i', folder+'filelist.nogit.txt']
 open(folder+'filter_complex_script.txt', 'w',encoding='utf8').write(''.join(rescalers)+''.join(concats)+xstack)
 args+=['-filter_complex_script', f'{folder}filter_complex_script.txt']
 args+=['output.mp4'] if clean else [folder+'xstacked-%s.mp4'%shortdate()]
 cmds = open(argv[1]+'\\xstacked\\ffmpeg-command.nogit.txt','w', encoding='utf8')
 cmds_d = open(argv[1]+'\\xstacked\\ffmpeg-debug.nogit.txt','w', encoding='utf8')
 cmds.write('\n'.join([repr(a)+',' for a in args]))
 cmds_d.write('\n'.join(args))
 if clean: return
 print(args)
 subprocess.run(args, cwd=folder)
def mix_audio(dic, balanced, stacked):
 backmap = {} # to track the id ffmpeg will assign to inputs
 j = 0
 clean = False
 args = [ffmpeg, '-report']
 for li in balanced:
 for i,k, dur in li:
 backmap[k] = j # tracking video id because ffmpeg use an ordered list of id as aliases
 args += ['-i', 'file%d.mp4'%j] if clean else ['-i', '%s'%dic[k]['path']] # I swear this is a coincidence. Let's be honest why this script exists
 j+=1
 stk_id = len(backmap)
 args += ['-i', stacked]
 concats_a = ['', '', '', '']
 for quad, li in enumerate(balanced):
 concats_a[quad] = ''.join(
 ["[%d:a]"%(backmap[v]) for i,v,dur in li])+'concat=n=%d:v=0:a=1[cct_a%d];'%(len(li),quad)
 amix = '[cct_a0][cct_a1][cct_a2][cct_a3]amix=inputs=4[all_aud]'
 args += ['-filter_complex']
 args += [''.join(concats_a)+amix]
 args += [
 '-map', '%d:v'%stk_id,
 '-map',
 '[all_aud]',
 '-c:v', 'copy',
 ]
 args += [stacked.replace('.mp4','')+"_aud.mp4"]
 cmds2 = open(sys.argv[1]+'\\xstacked\\ffmpeg-command_aud.nogit.txt','w', encoding='utf8')
 cmds2.write('\n'.join([repr(a)+',' for a in args]))
 proc = subprocess.run(args)
# this calls ffprobe on all video files is 'scan' is in the command.
# if not it assumes scan was already done and read from a cache file
if len(sys.argv) >1:
 folder = sys.argv[1]
 print(folder)
 folder+='\\xstacked\\'
 if not os.path.exists(folder):
 os.makedirs(folder)
vid_data = gather_vid_info(argv[1], stage, segments_limit = 'segments_limit' in sys.argv)
if "audio_check" in sys.argv:
 dic_with_audio = {k:data for k,data in vid_data.items() if data['audio'] != '-'}
 print('vid_data', len(vid_data))
 print('dic_with_audio', len(dic_with_audio))
 vid_data = dic_with_audio
if len(sys.argv) <3: exit('must have have args: <path> <convert|balance|scan|mix_audio>')
print('''remember, mix audio must be provided with a filepath''')
def fix_sar_dar(path):
 # old function to deal with exotic SAR/DAR values
 'ffmpeg -i <input> -vf scale=720x406,setdar=16:9 <output>'
 args = [ffmpeg]
 args+=['-i', path, '-vf', 'scale=640x360',os.path.dirname(path)+'/DONE2.mp4' ]
 subprocess.run(args)
def mute_audio(path):
 args = [ffmpeg]
 args+=['-i', path, '-filter:a', 'volume=0',os.path.dirname(path)+'/'+os.path.basename(path)+'muted.mp4' ]
 subprocess.run(args)
muted = []
if stage == 'convert':
 xstack(vid_data, muted, clean='clean' in sys.argv, lst=False)
elif stage == 'balance':
 bal = balanced_sums(vid_data)
 for l in bal:
 for a in l:
 print((a[0],a[2],a[1]))
elif stage == 'mute':
 mute_audio(sys.argv[3])
elif stage == 'mix_audio':
 bal = balanced_sums(vid_data)
 mix_audio(vid_data, bal, sys.argv[-1])
elif stage == 'custom':
 fix_sar_dar(argv[3])
asked Sep 30, 2022 at 18:41
\$\endgroup\$
1
  • \$\begingroup\$ Sorry, I did not take the time to test it since I removed dependencies. It's fixed. I tested it on a few video files and it works now. \$\endgroup\$ Commented Oct 1, 2022 at 8:53

1 Answer 1

1
\$\begingroup\$

You should not alias from os import listdir as ls and from shutil import copyfile as cp, but also you just shouldn't be using os.listdir at all. Use pathlib.Path methods instead.

Requiring that these executables exist at the root of the drive:

ffmpeg = r"c:/ffmpeg.exe"
ffprobe = r"c:/ffprobe.exe"

is quite inconvenient for your users. Instead, use shutil.which() to find the location of those executables from the operating system's search paths. And don't leave the executables in C:\; put them somewhere in a proper program path like C:\Program Files\ffmpeg and include that in PATH.

You have a USAGE comment block - that would be better translated to a help string included in argparse. Use argparse instead of one-off checks in argv.

Add PEP484 typehints to your function signatures.

for unit in should use a tuple and not a list.

When comparing to < 1024.0 there's no need to write it as a float.

"%.1f"%num is better-expressed with string interpolation as f'{num:.1f}'. Same with "%s %s%s" % and so on.

sizeof_fmt doesn't need to loop. Just log10 to find the number of digits. Do not have Y as a special case; it should be in the format sequence with the other characters.

html_start and its markup should go away. This is a job for Jinja.

shfn is not a good name; I have no idea what that function does.

subprocess.run should be replaced with subprocess.check_output.

Do not eval, and especially do not eval from the untrusted output of another process. This is disastrous from a security point of view, but is also not maintainable.

if False: # spamming the terminal should be replaced with a call to the official Python logger module. If you don't want this output to be shown for normal operation, demote it to the debug level so that it will only be shown if you change the logger level to debug.

time.strftime('%M:%S', time.gmtime should be replaced with datetime module equivalents.

Delete dead code like this:

# ret['dur'] = time.strftime('%M:%S', time.gmtime(float(video['duration'])))
# ret['dur'] = time.strftime('%M:%S', time.gmtime(float(dic['duration'])))

If you're worried about keeping a history of code evolution, that's what source control like git is for.

Move all of your imports to the top of the file.

You should avoid hard-coding Windows \ path separators. Based on context, these should either be replaced with /, or pathsep, or pathlib.Path / operators.

From if len(sys.argv) >1: onward, that code should all be in a main() function.

Addressing these issues will go a long way to making your code sane. I encourage you to work on it, ask questions as comments on this response if needed, and then when you're ready ask a new question with your revised code.

answered Oct 1, 2022 at 13:55
\$\endgroup\$
1
  • \$\begingroup\$ shfn shortens filenames I assume. \$\endgroup\$ Commented Oct 2, 2022 at 1:21

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.