Doppelte Bilder finden Python-Script: Anwendung: doppelte-dateien-finden.py verz1 verz2 verz3 ... #!/usr/bin/env python # Findet doppelte Bilder nach Größe und falls Exif-Daten vorhanden sind, nach Kamera-Modell / Aufnahmedatum. # Beispiel: # 10 Bilder -> Urlaub-Ostsee # 6 Monate später -> alle Bilder von SDKarte zu -> Backup-SDKarte # Dann findet es Gruppe: # Gruppe-Index 0 # 0: UrlaubOstsee # 1: Backup-SDKarte # Bilder: 10 # dann können die 10 Bilder von Gruppe 0 Verzeichnis 1 gelöscht werden # Nehmen wir diesen Test an: # Verzeichnisse erstellen # mkdir -p find_groups/{a,b,c} # Inhalte erstellen # echo 1 > find_groups/a/A # echo 1 > find_groups/b/A # echo 1 > find_groups/c/A # echo 1 > find_groups/a/B # echo 1 > find_groups/b/B # echo 1 > find_groups/c/B # # echo 2 > find_groups/a/Z # echo 2 > find_groups/b/Z # # Dann werden 2 Gruppen gefunden: # GRUPPE INDEX=0 # Verzeichnis 0: find_groups/a # Verzeichnis 1: find_groups/b # Verzeichnis 2: find_groups/c # Dateien A,B # # GRUPPE INDEX=1 # Verzeichnis 0: find_groups/a # Verzeichnis 1: find_groups/b # Dateien Z # # Dann kann gesagt werden, dass Gruppenindex 0 aus Verzeichnis 1 gelöscht werden kann. # # Exif und sizes werden in cache.json gespeichert. Auch 1TB an Daten kann so # schnell und wiederholend verglichen werden. # finds duplicate images # by size and if exif data present by comara model / capture date # # Assume this test: # # mkdir -p find_groups/{a,b,c} # echo 1 > find_groups/a/A # echo 1 > find_groups/b/A # echo 1 > find_groups/c/A # echo 1 > find_groups/a/B # echo 1 > find_groups/b/B # echo 1 > find_groups/c/B # # echo 2 > find_groups/a/Z # echo 2 > find_groups/b/Z # # # Then 2 groups will be found: # # GROUP INDEX=0 # directory 0: find_groups/a # directory 1: find_groups/b # directory 2: find_groups/c # files A,B # # GROUP INDEX=1 # directory 0: find_groups/a # directory 1: find_groups/b # files Z # # Then you can say delete group index 0 from directory 1 # # Exif and sizes get cached in cache.json. Even TB of data can be maintained # this way easily. import signal import sys import exifread from datetime import datetime import json from concurrent.futures import ThreadPoolExecutor # from typing import Self from tqdm import tqdm import os import traceback import json from pathlib import Path def files(path): matches = [] for root, dirnames, filenames in os.walk(path): for p in filenames: x = os.path.join(root, p) if os.path.isfile(x): matches.append(x) return matches def get_exif_date_model(image_path): try: with open(image_path, 'rb') as image_file: exif_tags = exifread.process_file(image_file, details=False) if 'EXIF DateTimeOriginal' in exif_tags: exif_date = exif_tags['EXIF DateTimeOriginal'] exif_date = datetime.strptime(str(exif_date), '%Y:%m:%d %H:%M:%S') model = exif_tags['Image Model'] print([exif_date, model]) return [image_path, f"{exif_date}:f{model}"] except Exception as e: print(f"Error reading EXIF data: {e}") traceback.print_exc() return [image_path, "NO-EXIF"] def get_size(path): return [path, os.path.getsize(path)] def fill_cache(cache, paths, f, prefix): with tqdm(total=len(paths)) as progress: with ThreadPoolExecutor() as executor: for x in executor.map(f, paths): if x: file, r = x progress.update() cache[f"{prefix}{file}"] = r else: pass # no jpeg executor.shutdown() class UJsonStorage: def __init__(self, file_path): self.file_path = file_path self.data = {} try: with open(file_path, 'r') as file: self.data = json.load(file) except FileNotFoundError: pass def __enter__(self): # even ctrl-c is enough to have the file written incompletely. self.set_sigterm_handler() return self def __exit__(self, exc_type, exc_value, traceback): with open(self.file_path, 'w') as file: json.dump(self.data, file, indent=4) def get_or(self, k, f): if not k in self.data: self.data[k] = f() return self.data[k] def __getitem__(self, item): return self.data[item] def __setitem__(self, key, value): self.data[key] = value def __delitem__(self, key): del self.data[key] def set_sigterm_handler(self): '''Assigns sigterm_handler for graceful shutdown during dump()''' def sigterm_handler(*args, **kwargs): if self.dthread is not None: self.dthread.join() sys.exit(0) signal.signal(signal.SIGTERM, sigterm_handler) def main(directories): with UJsonStorage("cache.json") as cache: files_ = [y for x in directories for y in files(x)] print("files_") print(files_) print("e") exif_missing = [] size_missing = [] def exif_key(file): return f"exif:{file}" def size_key(file): return f"size:{file}" for x in files_: ek = exif_key(x) sk = size_key(x) if not ek in cache.data: exif_missing.append(x) if not sk in cache.data: size_missing.append(x) print('lese missing exif') fill_cache(cache, exif_missing, get_exif_date_model, "exif:") print('lese missing size') fill_cache(cache, size_missing, get_size, "size:") def key(path): size = cache[f"size:{path}"] exif = cache[f"exif:{path}"] return f"{os.path.basename(path)}:{size}:{exif}" bydirs = {} for f in files_: p = Path(f) k = key(f) if not k in bydirs: bydirs[k] = {"basename": p.name, "directories": []} bydirs[k]["directories"].append(str(p.parent)) print(bydirs) groups = {} for k, v in bydirs.items(): v["directories"].sort() d_str = "::".join(v["directories"]) if not d_str in groups: groups[d_str] = [] groups[d_str].append(v["basename"]) group_list = [] for k, v in groups.items(): group_list.append({ "directory_list": k.split("::"), "files": v }) group_list = [x for x in group_list if len(x["directory_list"]) > 1] group_list.sort(key = lambda x: len(x["files"])) def print_dirs(g): for i, d in enumerate(g["directory_list"]): print("%s: %s " % (i, d)) for i, g in enumerate(group_list): print("") print("") print("GROUP [INDEX=%s] === count: %s" % (i, len(g["files"]))) print("directories:") print_dirs(g) print("files:") print(g["files"]) while True: a = input("delete group [index|Q=quit]: ") if (a == "Q"): return a = int(a) group = group_list[int(a)] print_dirs(group) b = input("delete files from directory [index]: ") idx = int(a) d = group["directory_list"][int(b)] for f in group["files"]: x = os.path.join(d, f) print("del %s" % x) os.unlink(x) if __name__ == "__main__": directories = sys.argv[1:] print(directories) main(directories)