projekte:2023:findedoppeltebilderscript

Doppelte Bilder finden Python-Script: Anwendung: doppelte-dateien-finden.py verz1 verz2 verz3 …

#!/usr/bin/env python

# Findet doppelte Bilder nach Größe und falls Exif-Daten vorhanden sind, nach Kamera-Modell / Aufnahmedatum.

# Beispiel: 
# 10 Bilder -> Urlaub-Ostsee
# 6 Monate später -> alle Bilder von SDKarte zu -> Backup-SDKarte
# Dann findet es Gruppe:
# Gruppe-Index 0
# 0: UrlaubOstsee
# 1: Backup-SDKarte
# Bilder: 10
# dann können die 10 Bilder von Gruppe 0 Verzeichnis 1 gelöscht werden


# Nehmen wir diesen Test an:
# Verzeichnisse erstellen
# mkdir -p find_groups/{a,b,c}
# Inhalte erstellen
# echo 1 > find_groups/a/A
# echo 1 > find_groups/b/A
# echo 1 > find_groups/c/A
# echo 1 > find_groups/a/B
# echo 1 > find_groups/b/B
# echo 1 > find_groups/c/B
# 
# echo 2 > find_groups/a/Z
# echo 2 > find_groups/b/Z
#
# Dann werden 2 Gruppen gefunden:

# GRUPPE INDEX=0
# Verzeichnis 0: find_groups/a
# Verzeichnis 1: find_groups/b
# Verzeichnis 2: find_groups/c
# Dateien A,B
#
# GRUPPE INDEX=1
# Verzeichnis 0: find_groups/a
# Verzeichnis 1: find_groups/b
# Dateien Z
#
# Dann kann gesagt werden, dass Gruppenindex 0 aus Verzeichnis 1 gelöscht werden kann.
#
# Exif und sizes werden in cache.json gespeichert. Auch 1TB an Daten kann so
# schnell und wiederholend verglichen werden.


# finds duplicate images
# by size and if exif data present by comara model / capture date
#
# Assume this test:
# 
# mkdir -p find_groups/{a,b,c}
# echo 1 > find_groups/a/A
# echo 1 > find_groups/b/A
# echo 1 > find_groups/c/A
# echo 1 > find_groups/a/B
# echo 1 > find_groups/b/B
# echo 1 > find_groups/c/B
# 
# echo 2 > find_groups/a/Z
# echo 2 > find_groups/b/Z
#
#
# Then 2 groups will be found:
#
# GROUP INDEX=0
# directory 0: find_groups/a
# directory 1: find_groups/b
# directory 2: find_groups/c
# files A,B
#
# GROUP INDEX=1

# directory 0: find_groups/a
# directory 1: find_groups/b
# files Z
#
# Then you can say delete group index 0 from directory 1
# 
# Exif and sizes get cached in cache.json. Even TB of data can be maintained
# this way easily.

import signal
import sys
import exifread
from datetime import datetime
import json
from concurrent.futures import ThreadPoolExecutor
# from typing import Self
from tqdm import tqdm
import os
import traceback
import json
from pathlib import Path

def files(path):
  matches = []
  for root, dirnames, filenames in os.walk(path):
      for p in filenames:
          x = os.path.join(root, p)
          if os.path.isfile(x):
              matches.append(x)
  return matches


def get_exif_date_model(image_path):
  try:
      with open(image_path, 'rb') as image_file:
          exif_tags = exifread.process_file(image_file, details=False)
          if 'EXIF DateTimeOriginal' in exif_tags:
              exif_date = exif_tags['EXIF DateTimeOriginal']
              exif_date = datetime.strptime(str(exif_date), '%Y:%m:%d %H:%M:%S')
              model = exif_tags['Image Model']
              print([exif_date, model])
              return [image_path, f"{exif_date}:f{model}"]
  except Exception as e:
      print(f"Error reading EXIF data: {e}")
      traceback.print_exc()
  return [image_path, "NO-EXIF"]

def get_size(path):
  return [path, os.path.getsize(path)]

def fill_cache(cache, paths, f, prefix):
  with tqdm(total=len(paths)) as progress:
      with ThreadPoolExecutor() as executor:
          for x in executor.map(f, paths):
              if x:
                  file, r = x
                  progress.update()
                  cache[f"{prefix}{file}"] = r
              else:
                  pass # no jpeg
          executor.shutdown()


class UJsonStorage:

  def __init__(self, file_path):
      self.file_path = file_path
      self.data = {}
      try:
          with open(file_path, 'r') as file:
              self.data = json.load(file)
      except FileNotFoundError:
          pass

  def __enter__(self):
      # even ctrl-c is enough to have the file written incompletely.
      self.set_sigterm_handler()
      return self

  def __exit__(self, exc_type, exc_value, traceback):
      with open(self.file_path, 'w') as file:
          json.dump(self.data, file, indent=4)

  def get_or(self, k, f):
      if not k in self.data:
          self.data[k] = f()
      return self.data[k]

  def __getitem__(self, item):
      return self.data[item]

  def __setitem__(self, key, value):
      self.data[key] = value

  def __delitem__(self, key):
      del self.data[key]

  def set_sigterm_handler(self):
      '''Assigns sigterm_handler for graceful shutdown during dump()'''
      def sigterm_handler(*args, **kwargs):
          if self.dthread is not None:
              self.dthread.join()
          sys.exit(0)
      signal.signal(signal.SIGTERM, sigterm_handler)


def main(directories):

  with UJsonStorage("cache.json") as cache:
      files_ = [y  for x in directories for y in files(x)]
      print("files_")
      print(files_)
      print("e")

      exif_missing = []
      size_missing = []

      def exif_key(file):
          return f"exif:{file}"
      def size_key(file):
          return f"size:{file}"

      for x in files_:
          ek = exif_key(x)
          sk = size_key(x)
          if not ek in cache.data:
              exif_missing.append(x)
          if not sk in cache.data:
              size_missing.append(x)

      print('lese missing exif')
      fill_cache(cache, exif_missing, get_exif_date_model, "exif:")
      print('lese missing size')
      fill_cache(cache, size_missing, get_size, "size:")

      def key(path):
          size = cache[f"size:{path}"]
          exif = cache[f"exif:{path}"]
          return f"{os.path.basename(path)}:{size}:{exif}"

      bydirs = {}
      for f in files_:
          p = Path(f)
          k = key(f)
          if not k in bydirs:
              bydirs[k] = {"basename": p.name, "directories": []}
          bydirs[k]["directories"].append(str(p.parent))

      print(bydirs)

      groups = {}

      for k, v in bydirs.items():
          v["directories"].sort()
          d_str = "::".join(v["directories"])
          if not d_str in groups:
              groups[d_str] = []
          groups[d_str].append(v["basename"])

      group_list = []

      for k, v in groups.items():
          group_list.append({
              "directory_list": k.split("::"),
              "files": v
              })

      group_list = [x for x in group_list if len(x["directory_list"]) > 1]

      group_list.sort(key = lambda x: len(x["files"]))

      def print_dirs(g):
          for i, d in enumerate(g["directory_list"]):
              print("%s: %s " % (i, d))

      for i, g in enumerate(group_list):
          print("")
          print("")
          print("GROUP [INDEX=%s] === count: %s" % (i, len(g["files"])))
          print("directories:")
          print_dirs(g)
          print("files:")
          print(g["files"])

      while True:
          a = input("delete group [index|Q=quit]: ")
          if (a == "Q"):
              return
          a = int(a)
          group = group_list[int(a)]
          print_dirs(group)
          b = input("delete files from directory [index]: ")
          idx = int(a)
          d = group["directory_list"][int(b)]
          for f in group["files"]:
              x = os.path.join(d, f)
              print("del %s" % x)
              os.unlink(x)

if __name__ == "__main__":
  directories = sys.argv[1:]
  print(directories)
  main(directories)
  • projekte/2023/findedoppeltebilderscript.txt
  • Zuletzt geändert: 2023/12/30 09:53
  • von marc_weber