Skip to content

Commit

Permalink
Cache ExifToolHelper and pass it to Muxer class, speeds up directory …
Browse files Browse the repository at this point in the history
…mode by order of magnitude. Fixes #14
  • Loading branch information
PetrVys committed Nov 13, 2024
1 parent 503daba commit 8304c60
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 155 deletions.
177 changes: 85 additions & 92 deletions Muxer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
self,
image_fpath: str,
video_fpath: str,
exiftool: exiftool.ExifToolHelper,
output_fpath: str = None,
output_directory: str = None,
delete_video: bool = False,
Expand All @@ -50,6 +51,7 @@ def __init__(
self.overwrite = overwrite
self.delete_video = delete_video
self.no_xmp = no_xmp
self.exiftool = exiftool

if os.path.isfile(self.image_fpath) is False:
self.logger.error("Image file doesn't exist")
Expand Down Expand Up @@ -187,101 +189,92 @@ def merge_xmp(self, xmp: str):
def mux(self):
self.logger.info("Processing %s", self.image_fpath)

# Add script directory to PATH in case it contains exiftool.exe - useful for packaging, since script path will not be argv[0] nor current directory.
exiftool_path = Path(__file__).parent.resolve() / "exiftool.exe"
if exiftool_path.is_file():
os.environ["PATH"] += os.pathsep + f"{Path(__file__).parent.resolve()}"

with exiftool.ExifToolHelper(
encoding="utf-8",
logger=self.logger if self.verbose is True else None
) as et:
image_metadata, video_metadata = et.get_metadata(
[self.image_fpath, self.video_fpath]
image_metadata, video_metadata = self.exiftool.get_metadata(
[self.image_fpath, self.video_fpath]
)

for ns in const.NAMESPACES:
etree.register_namespace(ns, const.NAMESPACES[ns])

image_type = self.validate_image(self.image_fpath, metadata=image_metadata)
self.fix_output_fpath(image_metadata)
self.validate_video(self.video_fpath, metadata=video_metadata)

if self.no_xmp is False:
result = self.exiftool.execute(
*[
"-X",
"-ee",
"-n",
"-QuickTime:StillImageTime",
"-QuickTime:TrackDuration",
f"{self.video_fpath}",
]
)

for ns in const.NAMESPACES:
etree.register_namespace(ns, const.NAMESPACES[ns])

image_type = self.validate_image(self.image_fpath, metadata=image_metadata)
self.fix_output_fpath(image_metadata)
self.validate_video(self.video_fpath, metadata=video_metadata)

if self.no_xmp is False:
result = et.execute(
*[
"-X",
"-ee",
"-n",
"-QuickTime:StillImageTime",
"-QuickTime:TrackDuration",
f"{self.video_fpath}",
]
)

try:
track_number = extract_track_number(result)
self.logger.info("Live Photo keyframe track number: %s", track_number)

track_duration = extract_track_duration(track_number, result)
self.logger.info("Live Photo keyframe: %sus", track_duration)

self.xmp.find(".//rdf:Description", const.NAMESPACES).set(
const.GCAMER_TIMESTAMP_US,
str(track_duration),
)
except:
track_duration = -1
self.logger.info("Could not read Live Photo keyframe (source video is probably not from Live Photo). No keyframe will be set.")
try:
track_number = extract_track_number(result)
self.logger.info("Live Photo keyframe track number: %s", track_number)

video_data = read_file(self.video_fpath)
samsung_tail = SamsungTags(video_data, image_type)
track_duration = extract_track_duration(track_number, result)
self.logger.info("Live Photo keyframe: %sus", track_duration)

result = et.execute(*["-XMP", "-b", f"{self.image_fpath}"])
if result == "":
self.logger.warning("XMP of original file is empty")
else:
self.merge_xmp(result)

self.change_xmpresource(str(samsung_tail.get_video_size()), attribute=const.ITEM_LENGTH, semantic="MotionPhoto")
self.change_xmpresource(str(samsung_tail.get_image_padding()), attribute=const.ITEM_PADDING, semantic="Primary")

xmp_updated = self.output_fpath + ".XMP"
with open(xmp_updated, "wb") as f:
f.write(etree.tostring(self.xmp, pretty_print=True))

xmp_image = enrich_fname(self.output_fpath, "XMP")
shutil.copyfile(self.image_fpath, xmp_image)
et.execute(
*[
"-overwrite_original",
"-tagsfromfile",
xmp_updated,
"-xmp",
xmp_image,
]
self.xmp.find(".//rdf:Description", const.NAMESPACES).set(
const.GCAMER_TIMESTAMP_US,
str(track_duration),
)

merged_bytes = read_file(xmp_image)
except:
track_duration = -1
self.logger.info("Could not read Live Photo keyframe (source video is probably not from Live Photo). No keyframe will be set.")

video_data = read_file(self.video_fpath)
samsung_tail = SamsungTags(video_data, image_type)

result = self.exiftool.execute(*["-XMP", "-b", f"{self.image_fpath}"])
if result == "":
self.logger.warning("XMP of original file is empty")
else:
video_data = read_file(self.video_fpath)
samsung_tail = SamsungTags(video_data, image_type)
merged_bytes = read_file(self.image_fpath)
samsung_tail.set_image_size(len(merged_bytes))
video_footer = samsung_tail.video_footer()
merged_bytes += video_footer

self.logger.info("Writing output file: %s", self.output_fpath)
with open(self.output_fpath, "wb") as binary_file:
binary_file.write(merged_bytes)
shutil.copystat(self.image_fpath, self.output_fpath)

if self.delete_temp is True:
os.remove(xmp_updated)
self.logger.debug("Delete: %s", xmp_updated)
os.remove(xmp_image)
self.logger.debug("Delete: %s", xmp_image)

if self.delete_video is True:
os.remove(self.video_fpath)
self.logger.debug("Delete: %s", self.video_fpath)
self.merge_xmp(result)

self.change_xmpresource(str(samsung_tail.get_video_size()), attribute=const.ITEM_LENGTH, semantic="MotionPhoto")
self.change_xmpresource(str(samsung_tail.get_image_padding()), attribute=const.ITEM_PADDING, semantic="Primary")

xmp_updated = self.output_fpath + ".XMP"
with open(xmp_updated, "wb") as f:
f.write(etree.tostring(self.xmp, pretty_print=True))

xmp_image = enrich_fname(self.output_fpath, "XMP")
shutil.copyfile(self.image_fpath, xmp_image)
self.exiftool.execute(
*[
"-overwrite_original",
"-tagsfromfile",
xmp_updated,
"-xmp",
xmp_image,
]
)

merged_bytes = read_file(xmp_image)
else:
video_data = read_file(self.video_fpath)
samsung_tail = SamsungTags(video_data, image_type)
merged_bytes = read_file(self.image_fpath)
samsung_tail.set_image_size(len(merged_bytes))
video_footer = samsung_tail.video_footer()
merged_bytes += video_footer

self.logger.info("Writing output file: %s", self.output_fpath)
with open(self.output_fpath, "wb") as binary_file:
binary_file.write(merged_bytes)
shutil.copystat(self.image_fpath, self.output_fpath)

if self.delete_temp is True:
os.remove(xmp_updated)
self.logger.debug("Delete: %s", xmp_updated)
os.remove(xmp_image)
self.logger.debug("Delete: %s", xmp_image)

if self.delete_video is True:
os.remove(self.video_fpath)
self.logger.debug("Delete: %s", self.video_fpath)
145 changes: 82 additions & 63 deletions motionphoto2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
import argparse
import sys
import os
import exiftool
import logging

from pathlib import Path

from Muxer import Muxer

logging.basicConfig(
handlers=[logging.StreamHandler(sys.stdout)],
level=logging.DEBUG,
format="[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s",
datefmt="%d/%m/%Y %H:%M:%S"
)

if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="MotionPhoto2",
Expand Down Expand Up @@ -84,66 +93,76 @@
output_directory = f"{Path(args.output_directory).resolve()}"
if os.path.exists(output_directory) is False:
os.mkdir(output_directory)

if args.input_directory is not None:
print (f"Converting files in {args.input_directory}")
print("=" * 25)
input_directory = f"{Path(args.input_directory).resolve()}"
# Going to search couples of file with ext (".heic", ".heif", ".avif", ".jpg", ".jpeg") (".mp4", ".mov")
files = [
os.path.join(Path(pathv).relative_to(input_directory), file)
for pathv, directories, files in os.walk(input_directory)
for file in files
]
videos = [
f"{Path(f)}"
for f in files
if os.path.isfile(os.path.join(input_directory, f))
and Path(f).suffix.lower() in [".mp4", ".mov"]
]
images = [
f"{Path(f)}"
for f in files
if os.path.isfile(os.path.join(input_directory, f))
and Path(f).suffix.lower() in [".heic", ".heif", ".avif", ".jpg", ".jpeg"]
]

for image in images:
fname = f"{Path(image).with_suffix("")}"
for ext in [".mp4", ".mov", ".MP4", ".MOV"]:
if f"{Path(fname).with_suffix(ext)}" in videos:
video = videos.pop(videos.index(fname + ext))

input_image = os.path.join(input_directory, image)
input_video = os.path.join(input_directory, video)

output_subdirectory = args.output_directory
if output_subdirectory is not None:
output_subdirectory = f"{Path(os.path.join(output_subdirectory, os.path.dirname(fname))).resolve()}"
if os.path.exists(output_subdirectory) is False:
os.makedirs(output_subdirectory)

Muxer(
image_fpath=input_image,
video_fpath=input_video,
output_directory=output_subdirectory,
delete_video=args.delete_video,
delete_temp=not args.keep_temp,
overwrite=args.overwrite,
no_xmp=args.no_xmp,
verbose=args.verbose,
).mux()
print("=" * 25)
break
else:
Muxer(
image_fpath=args.input_image,
video_fpath=args.input_video,
output_fpath=args.output_file,
output_directory=args.output_directory,
delete_video=args.delete_video,
delete_temp=not args.keep_temp,
overwrite=args.overwrite,
no_xmp=args.no_xmp,
verbose=args.verbose,
).mux()

logger = logging.getLogger("ExifTool")
logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)

with exiftool.ExifToolHelper(
encoding="utf-8",
logger=logger if args.verbose is True else None
) as et:

if args.input_directory is not None:
print (f"Converting files in {args.input_directory}")
print("=" * 25)
input_directory = f"{Path(args.input_directory).resolve()}"
# Going to search couples of file with ext (".heic", ".heif", ".avif", ".jpg", ".jpeg") (".mp4", ".mov")
files = [
os.path.join(Path(pathv).relative_to(input_directory), file)
for pathv, directories, files in os.walk(input_directory)
for file in files
]
videos = [
f"{Path(f)}"
for f in files
if os.path.isfile(os.path.join(input_directory, f))
and Path(f).suffix.lower() in [".mp4", ".mov"]
]
images = [
f"{Path(f)}"
for f in files
if os.path.isfile(os.path.join(input_directory, f))
and Path(f).suffix.lower() in [".heic", ".heif", ".avif", ".jpg", ".jpeg"]
]

for image in images:
fname = f"{Path(image).with_suffix("")}"
for ext in [".mp4", ".mov", ".MP4", ".MOV"]:
if f"{Path(fname).with_suffix(ext)}" in videos:
video = videos.pop(videos.index(fname + ext))

input_image = os.path.join(input_directory, image)
input_video = os.path.join(input_directory, video)

output_subdirectory = args.output_directory
if output_subdirectory is not None:
output_subdirectory = f"{Path(os.path.join(output_subdirectory, os.path.dirname(fname))).resolve()}"
if os.path.exists(output_subdirectory) is False:
os.makedirs(output_subdirectory)

Muxer(
image_fpath=input_image,
video_fpath=input_video,
exiftool=et,
output_directory=output_subdirectory,
delete_video=args.delete_video,
delete_temp=not args.keep_temp,
overwrite=args.overwrite,
no_xmp=args.no_xmp,
verbose=args.verbose,
).mux()
print("=" * 25)
break
else:
Muxer(
image_fpath=args.input_image,
video_fpath=args.input_video,
exiftool=et,
output_fpath=args.output_file,
output_directory=args.output_directory,
delete_video=args.delete_video,
delete_temp=not args.keep_temp,
overwrite=args.overwrite,
no_xmp=args.no_xmp,
verbose=args.verbose,
).mux()

0 comments on commit 8304c60

Please sign in to comment.