Python Script to solve Mobile Uploads Duplicates

My MobileUpload folder is full of duplicate images and movies created by the mobile client bugs.

I spent some time to generate and test a smart script with Claude that could eliminate this problem. This is the feature list.

  • dry run
  • multithreading
  • run time: limit the run time just to check logs and statistics after a while
  • detect corrupted images within duplicates group to avoi deleting good images
  • check similarity within duplicates, avoid deletion of manual edits
  • progress bar
  • log file

With a 5 minutes run I saved about 900MB

this is Claude dependencies analysis:

Sure! Here’s the dependency summary in English:

The script relies exclusively on Python’s standard library — no pip install required:

  • argparse, json, os, re, shutil, subprocess, sys, threading, collections, concurrent.futures, datetime, pathlib

External tools (install via system package manager):

sudo pacman -S imagemagick ffmpeg   # Arch / Garuda
sudo apt install imagemagick ffmpeg # Debian / Ubuntu

  • magick (ImageMagick v7) — image readability, average luminosity check, visual similarity

  • ffprobe (bundled with ffmpeg) — video readability and duration comparison

So to get everything running:

sudo pacman -S imagemagick ffmpeg
python nc_dedup.py --dry-run ~/Nextcloud2/MobileUploads

this is an output sample:

This is the code, as everything it could be improved.

#!/usr/bin/env python3
# =============================================================================
# nc_dedup.py
# Versione: 3.30.0 (2026-04-02)
#
# Pulizia duplicati generati dal bug del client mobile Nextcloud.
#
# Pattern duplicati: "filename (N).ext"  →  originale: "filename.ext"
# Gestisce duplicati annidati: "foto (2) (2) (2).jpg" → "foto.jpg"
# Supporta: immagini (jpg, jpeg, png, heic, webp, gif, bmp, tiff) e video (mp4, mov, avi, mkv, 3gp)
#
# Uso:
#   python nc_dedup.py [opzioni] <cartella_mobileuploads>
#
# Opzioni:
#   --dry-run          Simula tutto senza cancellare nulla (consigliato al primo run)
#   --workers N        Numero di thread paralleli (default: numero di CPU)
#   --similarity T     Soglia similarità immagini 0.0-1.0 (default: 0.90)
#                      Sotto soglia il duplicato NON viene eliminato.
#                      Esempi: 0.80 = più permissivo, 0.95 = più restrittivo
#   --max-time SEC     Tempo massimo di esecuzione in secondi (default: 300 = 5 min)
#                      Lo script si ferma al prossimo checkpoint e stampa le
#                      statistiche parziali. Usare 0 per run illimitato.
#                      Esempi:
#                        --max-time 120   →  2 minuti
#                        --max-time 600   → 10 minuti
#                        --max-time 3600  →  1 ora
#                        --max-time 0     →  nessun limite
#
# Esempi d'uso:
#   python nc_dedup.py --dry-run ~/Nextcloud2/MobileUploads
#   python nc_dedup.py --dry-run --max-time 120 ~/Nextcloud2/MobileUploads
#   python nc_dedup.py --max-time 600 --workers 4 ~/Nextcloud2/MobileUploads
#   python nc_dedup.py --max-time 0 ~/Nextcloud2/MobileUploads
#
# Dipendenze: Python 3.6+, imagemagick v7 (magick), ffprobe (ffmpeg per video)
# Installazione dipendenze su Arch/Garuda:
#   sudo pacman -S imagemagick ffmpeg
# =============================================================================

import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import threading
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path

VERSION      = "3.30.0"
VERSION_DATE = "2026-04-02"

IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".heic", ".webp", ".gif", ".bmp", ".tiff", ".tif"}
VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"}

# ---------------------------------------------------------------------------
# Colori ANSI
# ---------------------------------------------------------------------------
class C:
    RED     = "\033[0;31m"
    YELLOW  = "\033[1;33m"
    GREEN   = "\033[0;32m"
    CYAN    = "\033[0;36m"
    MAGENTA = "\033[0;35m"
    BOLD    = "\033[1m"
    RESET   = "\033[0m"

def colored(color: str, text: str) -> str:
    return f"{color}{text}{C.RESET}"

def _ansi_up(n: int) -> None:
    if n > 0:
        import sys; sys.stdout.write(f"\x1b[{n}A\r")

def _print_line(text: str) -> None:
    import sys; sys.stdout.write(text + "\x1b[K\n")

def _flush_stdout() -> None:
    import sys; sys.stdout.flush()


def fmt_size(n_bytes: int) -> str:
    for unit in ("B", "KB", "MB", "GB", "TB"):
        if n_bytes < 1024:
            return f"{n_bytes:.1f} {unit}"
        n_bytes /= 1024
    return f"{n_bytes:.1f} PB"

def fmt_duration(seconds: float) -> str:
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = int(seconds % 60)
    return f"{h:02d}:{m:02d}:{s:02d}"

def file_type(path: Path) -> str:
    ext = path.suffix.lower()
    if ext in IMAGE_EXTS: return "image"
    if ext in VIDEO_EXTS: return "video"
    return "unknown"

# ---------------------------------------------------------------------------
# Logger thread-safe con scrittura atomica per gruppo
# Ogni gruppo accumula le proprie righe in un buffer locale (nessun lock
# durante l'elaborazione), poi le scarica tutte in una sola operazione.
# Questo garantisce che nel file di log i messaggi di ogni gruppo siano
# sempre raggruppati e mai intercalati con quelli di altri thread.
# ---------------------------------------------------------------------------
class Logger:
    def __init__(self, log_path: Path):
        self.log_path = log_path
        self._lock    = threading.Lock()
        self._print_lock = threading.Lock()
        log_path.touch()

    def _write_lines(self, lines: list[str]):
        """Scrive una lista di righe in modo atomico (un solo acquire del lock)."""
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        with self._lock:
            with self.log_path.open("a", encoding="utf-8") as f:
                for line in lines:
                    f.write(f"[{ts}] {line}\n")

    def write_header(self, lines: list[str]):
        """Scrive righe di intestazione (usato per header/footer globali)."""
        self._write_lines(lines)

    def print_console(self, color: str, msg: str, live: "LiveStatus | None" = None):
        """Stampa un messaggio globale sopra il pannello live."""
        if live:
            live._flushing = True
        with self._print_lock:
            panel_h = LiveStatus.PANEL_LINES if live else 0
            if panel_h > 0:
                _ansi_up(panel_h)
            _print_line(colored(color, msg))
            if live:
                with live._lock:
                    panel_lines = live._build_panel()
                for line in panel_lines:
                    _print_line(line)
                LiveStatus.PANEL_LINES = len(panel_lines)
            _flush_stdout()
        if live:
            live._flushing = False

# ---------------------------------------------------------------------------
# Buffer di log per un singolo gruppo — accumulato senza lock, poi scaricato
# ---------------------------------------------------------------------------
class GroupLog:
    def __init__(self, original_path: Path, dup_count: int):
        self._lines        = []
        self._console_msgs = []   # (color, msg)
        self._original     = original_path
        self._dup_count    = dup_count
        # intestazione del gruppo
        self._lines.append(f"[GROUP  ] {'─' * 56}")
        self._lines.append(f"[GROUP  ] ORIGINALE : {original_path}")
        self._lines.append(f"[GROUP  ] DUPLICATI : {dup_count} file")
        self._lines.append(f"[GROUP  ] {'─' * 56}")

    def add(self, level: str, msg: str, color: str = ""):
        """
        Aggiunge una riga al buffer locale (log) e alla coda console.
        Tutti i messaggi vanno sempre a console — il colore è opzionale
        (default grigio/reset se non specificato).
        """
        self._lines.append(f"[{level:<8}] {msg}")
        # Usa il colore fornito, oppure RESET (testo normale) come default
        self._console_msgs.append((color or C.RESET, f"[{level}] {msg}"))

    def flush(self, logger: Logger, live: "LiveStatus | None" = None):
        """
        Scarica il buffer sul logger globale in modo atomico.
        Stampa i messaggi del gruppo SOPRA il pannello live:
        1. Risale le righe del pannello corrente
        2. Stampa tutti i messaggi del gruppo (scorrono nel terminale)
        3. Ridisegna il pannello in fondo
        """
        self._lines.append(f"[GROUP  ] {'─' * 56}")
        logger._write_lines(self._lines)

        if not self._console_msgs:
            return

        if live:
            live._flushing = True
        with logger._print_lock:
            # Risali sopra il pannello corrente
            panel_h = LiveStatus.PANEL_LINES if live else 0
            if panel_h > 0:
                _ansi_up(panel_h)

            # Stampa tutti i messaggi del gruppo — scorroranno sopra il pannello
            for color, msg in self._console_msgs:
                _print_line(colored(color, msg))

            # Ridisegna il pannello in fondo
            if live:
                with live._lock:
                    panel_lines = live._build_panel()
                for line in panel_lines:
                    _print_line(line)
                LiveStatus.PANEL_LINES = len(panel_lines)
            _flush_stdout()
        if live:
            live._flushing = False

# ---------------------------------------------------------------------------
# LiveStatus — stato live dei thread + pannello aggiornato ogni secondo
# ---------------------------------------------------------------------------
class LiveStatus:
    """
    Traccia lo stato di ogni worker thread e le statistiche progressive.
    Un thread di rendering ridisegna ogni secondo il pannello a console,
    sovrascrivendo le righe precedenti con sequenze ANSI.
    """
    PANEL_LINES = 0  # quante righe occupa il pannello (aggiornato al primo render)

    def __init__(self, total: int, workers: int, logger: "Logger",
                 start_dt: "datetime" = None, max_time: int = 0):
        self.total      = total
        self.done       = 0
        self.workers    = workers
        self.logger     = logger
        self.start_dt   = start_dt or datetime.now()
        self.max_time   = max_time   # 0 = illimitato
        self._lock      = threading.Lock()
        self._width     = 36
        self._stop_render = threading.Event()

        # Stato per thread: tid → {file, fase, n_dups, done_dups}
        self._thread_state: dict[int, dict] = {}

        # Statistiche progressive (aggiornate dal main thread)
        self.counts = dict(
            originals_ok=0, originals_corrupt=0, promoted=0,
            deleted=0, skipped_corrupt=0, skipped_sim=0,
            errors=0, deleted_bytes=0,
            corrupt_files=[], resolved_corrupt=[], dissimilar=[],
        )

        # Avvia il thread di rendering
        self._render_thread = threading.Thread(target=self._render_loop, daemon=True)
        self._render_thread.start()

    # ------------------------------------------------------------------
    # API per i worker thread
    # ------------------------------------------------------------------
    def set_state(self, phase: str, file: str = "", n_dups: int = 0, done_dups: int = 0):
        """Chiamato dal worker per aggiornare il proprio stato."""
        tid = threading.get_ident()
        with self._lock:
            self._thread_state[tid] = {
                "phase": phase, "file": file,
                "n_dups": n_dups, "done_dups": done_dups,
            }

    def clear_state(self):
        """Chiamato dal worker quando ha finito un gruppo."""
        tid = threading.get_ident()
        with self._lock:
            self._thread_state.pop(tid, None)

    def advance(self, logger=None):
        """Incrementa il contatore gruppi completati."""
        with self._lock:
            self.done += 1

    # ------------------------------------------------------------------
    # Rendering del pannello
    # ------------------------------------------------------------------
    def _render_loop(self):
        first = True
        while not self._stop_render.is_set():
            self._draw(first)
            first = False
            self._stop_render.wait(timeout=1.0)
        self._draw(first=False)  # render finale

    def _draw(self, first: bool = False):
        with self._lock:
            lines = self._build_panel()

        with self.logger._print_lock:
            if not first and LiveStatus.PANEL_LINES > 0:
                # Risali alle righe precedenti e sovrascrivile
                _ansi_up(LiveStatus.PANEL_LINES)
            for line in lines:
                # Stampa la riga e cancella il resto della riga
                _print_line(line)
            LiveStatus.PANEL_LINES = len(lines)
            _flush_stdout()

    def _build_panel(self) -> list[str]:
        """Costruisce le righe del pannello (chiamato con _lock acquisito)."""
        lines = []
        W = 62

        # Tempo trascorso e rimanente
        elapsed = (datetime.now() - self.start_dt).total_seconds()
        elapsed_str = fmt_duration(elapsed)
        if self.max_time > 0:
            remaining = max(0.0, self.max_time - elapsed)
            remaining_str = fmt_duration(remaining)
            pct_time = min(1.0, elapsed / self.max_time)
            time_bar_w = 20
            time_filled = int(time_bar_w * pct_time)
            time_bar = "â–“" * time_filled + "â–‘" * (time_bar_w - time_filled)
            time_color = C.RED if remaining < 30 else (C.YELLOW if remaining < 60 else C.CYAN)
            time_line = (
                f"  {C.BOLD}Tempo:{C.RESET} "
                f"{C.CYAN}{elapsed_str}{C.RESET} trascorso  "
                f"[{time_color}{time_bar}{C.RESET}]  "
                f"{time_color}{remaining_str}{C.RESET} rimanenti"
            )
        else:
            time_line = f"  {C.BOLD}Tempo:{C.RESET} {C.CYAN}{elapsed_str}{C.RESET} trascorso"
        lines.append(time_line)

        # Barra progresso gruppi
        pct    = self.done / self.total if self.total else 0
        filled = int(self._width * pct)
        bar    = "â–ˆ" * filled + "â–‘" * (self._width - filled)
        lines.append(
            f"{C.BOLD}  [{bar}] {self.done}/{self.total} ({pct*100:.1f}%){C.RESET}"
        )

        # Statistiche sintetiche
        c = self.counts
        deleted_mb = fmt_size(c["deleted_bytes"])
        lines.append(
            f"  {C.GREEN}elim:{c['deleted']}{C.RESET}  "
            f"{C.CYAN}promo:{c['promoted']}{C.RESET}  "
            f"{C.YELLOW}skip:{c['skipped_corrupt'] + c['skipped_sim']}{C.RESET}  "
            f"{C.RED}err:{c['errors']}{C.RESET}  "
            f"{C.GREEN}liberati:{deleted_mb}{C.RESET}"
        )

        # Stato di ogni worker thread
        lines.append(f"  {C.BOLD}{'─' * (W-2)}{C.RESET}")
        active = dict(self._thread_state)
        if active:
            for i, (tid, st) in enumerate(active.items()):
                phase    = st["phase"]
                fname    = Path(st["file"]).name if st["file"] else ""
                n_dups   = st["n_dups"]
                done_dp  = st["done_dups"]
                dup_info = f" [{done_dp}/{n_dups} dup]" if n_dups else ""
                # Tronca il nome file se troppo lungo
                max_fname = W - len(phase) - len(dup_info) - 10
                if len(fname) > max_fname:
                    fname = "…" + fname[-(max_fname-1):]
                lines.append(
                    f"  {C.CYAN}T{i+1}{C.RESET} "
                    f"{C.YELLOW}[{phase}]{C.RESET} "
                    f"{fname}"
                    f"{C.BOLD}{dup_info}{C.RESET}"
                )
        else:
            lines.append(f"  {C.CYAN}(nessun thread attivo){C.RESET}")
        lines.append(f"  {C.BOLD}{'─' * (W-2)}{C.RESET}")
        return lines

    def stop_rendering(self):
        """Ferma il thread di rendering (chiamato alla fine)."""
        self._stop_render.set()
        self._render_thread.join(timeout=2)
        # Pulisce il pannello
        with self.logger._print_lock:
            if LiveStatus.PANEL_LINES > 0:
                _ansi_up(LiveStatus.PANEL_LINES)
                for _ in range(LiveStatus.PANEL_LINES):
                    _print_line("")  # newline advances to next panel line
            LiveStatus.PANEL_LINES = 0

# Alias per compatibilità con chiamate esistenti
Progress = LiveStatus

# ---------------------------------------------------------------------------
# Leggibilità e validità visiva
# ---------------------------------------------------------------------------

# Soglia per rilevare immagini uniformemente bianche o nere (0..65535).
# Un'immagine completamente bianca ha mean ~65535, nera ~0.
# Usiamo una banda di tolleranza del 2% su entrambi gli estremi.
_MEAN_BLACK_MAX =  65535 * 0.02   # < 2% → troppo scura/nera
_MEAN_WHITE_MIN =  65535 * 0.98   # > 98% → troppo chiara/bianca

def _mean_from_identify(path: Path) -> tuple[float | None, str]:
    """
    Estrae la luminosità media tramite identify.
    Ritorna (mean, descrizione) oppure (None, motivo_errore).
    """
    r = subprocess.run(
        ["magick", "identify", "-quiet", "-format", "%[mean]\n", str(path)],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    )
    if r.returncode != 0:
        err = r.stderr.decode(errors="replace").strip().splitlines()
        # prendi la prima riga significativa dell'errore
        msg = next((l for l in err if l.strip()), "identify fallito senza messaggio")
        return None, f"identify fallito: {msg[:120]}"
    raw    = r.stdout.decode(errors="replace").strip()
    tokens = [t for t in raw.replace("\n", " ").split() if t]
    for tok in tokens:
        try:
            return float(tok), ""
        except ValueError:
            continue
    return None, f"mean non parsabile: {raw[:80]!r}"

def is_image_fully_valid(path: Path) -> tuple[bool, str]:
    """
    Verifica leggibilità tecnica + validità visiva di un'immagine.
    Ritorna (valido, motivo_dettagliato).

    Strategia:
      1. convert -regard-warnings: decodifica rigorosa completa.
         Se ok → valido, si passa al check visivo.
      2. Se convert fallisce: cattura stderr per il motivo preciso,
         poi verifica la luminosità media con identify.
         - luminosità nel range valido → visivamente integro nonostante
           il warning di convert (es. metadata EXIF non standard) → valido
         - luminosità fuori range o identify fallisce → corrotto per davvero,
           con messaggio dettagliato del perché
    """
    try:
        rc = subprocess.run(
            ["magick", str(path), "/dev/null"],
            stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
        )
        convert_ok     = rc.returncode == 0
        convert_stderr = rc.stderr.decode(errors="replace").strip()
        # Prima riga significativa dell'errore di convert
        convert_err_line = next(
            (l.strip() for l in convert_stderr.splitlines() if l.strip()), ""
        )

        if not convert_ok:
            # convert ha segnalato un problema: verifica luminosità
            mean, mean_err = _mean_from_identify(path)
            if mean is None:
                return False, (
                    f"convert: {convert_err_line or 'errore sconosciuto'} | "
                    f"identify: {mean_err}"
                )
            if mean <= _MEAN_BLACK_MAX:
                return False, (
                    f"convert: {convert_err_line} | "
                    f"immagine completamente nera (mean={mean:.0f})"
                )
            if mean >= _MEAN_WHITE_MIN:
                return False, (
                    f"convert: {convert_err_line} | "
                    f"immagine completamente bianca (mean={mean:.0f})"
                )
            # luminosità ok → il warning di convert era inoffensivo
            return True, (
                f"ok (mean={mean:.0f}) — convert warning ignorato: "
                f"{convert_err_line[:80]}"
            )

        # convert ok → check visivo con identify
        mean, mean_err = _mean_from_identify(path)
        if mean is None:
            # identify fallisce dopo convert ok: accettiamo come valido
            return True, f"ok (identify non disponibile per mean: {mean_err})"
        if mean <= _MEAN_BLACK_MAX:
            return False, f"immagine completamente nera (mean={mean:.0f})"
        if mean >= _MEAN_WHITE_MIN:
            return False, f"immagine completamente bianca (mean={mean:.0f})"
        return True, f"ok (mean={mean:.0f})"

    except FileNotFoundError:
        print(colored(C.RED, "Errore: 'magick' non trovato. sudo pacman -S imagemagick"))
        sys.exit(1)

def is_video_readable(path: Path) -> bool:
    if not shutil.which("ffprobe"):
        return True
    r = subprocess.run(
        ["ffprobe", "-v", "error", "-show_entries", "format=duration",
         "-of", "json", str(path)],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    return r.returncode == 0

def is_readable(path: Path) -> tuple[bool, str]:
    """
    Verifica leggibilità + validità visiva.
    Per le immagini include anche il controllo bianco/nero.
    Ritorna (valido, motivo).
    """
    ft = file_type(path)
    if ft == "image":
        return is_image_fully_valid(path)
    if ft == "video":
        ok = is_video_readable(path)
        return (ok, "ok" if ok else "video non leggibile")
    return True, "tipo non controllato"

# ---------------------------------------------------------------------------
# Similarità immagini — hash percettivo via identify
#
# "compare" fallisce quando le immagini hanno dimensioni diverse (caso comune
# con i duplicati Nextcloud). Usiamo invece "identify -format %#" che calcola
# un hash SHA-256 del contenuto visivo normalizzato, oppure come fallback
# confrontiamo il Mean Absolute Error dopo aver ridimensionato entrambe le
# immagini alla stessa dimensione tramite "convert".
# ---------------------------------------------------------------------------

def _phash_via_identify(path: Path) -> str | None:
    """
    Estrae il perceptual hash di un'immagine usando identify.
    Ritorna una stringa hex o None in caso di errore.
    """
    try:
        r = subprocess.run(
            ["magick", "identify", "-quiet", "-format", "%#", str(path)],
            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
        )
        if r.returncode != 0:
            return None
        val = r.stdout.decode(errors="replace").strip()
        return val if val else None
    except Exception:
        return None

def _mae_similarity(a: Path, b: Path) -> float:
    """
    Confronto MAE dopo resize a 64x64 usando convert+compare.
    Ritorna score [0,1]. Più robusto di PHASH per immagini di dimensioni diverse.
    """
    try:
        r = subprocess.run(
            [
                "compare", "-metric", "MAE",
                "-resize", "64x64!",        # forza resize identico su entrambe
                str(a), str(b), "/dev/null",
            ],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        )
        # returncode 0=identiche, 1=diverse, 2=errore
        if r.returncode == 2:
            return 0.0
        # MAE è su stderr, formato: "valore (normalizzato)"
        raw = r.stderr.decode(errors="replace").strip()
        # prende il valore normalizzato tra parentesi se presente, altrimenti il primo token
        import re as _re
        m = _re.search(r'\(([0-9.eE+\-]+)\)', raw)
        if m:
            mae_norm = float(m.group(1))   # range 0..1
        else:
            mae_norm = float(raw.split()[0]) / 65535.0  # raw è 0..65535
        return max(0.0, 1.0 - mae_norm)
    except Exception:
        return 0.0

def image_similarity(a: Path, b: Path) -> float:
    """
    Ritorna un punteggio di similarità [0.0, 1.0].
    Strategia:
      1. Prova hash SHA percettivo via identify: se identici → 1.0, se diversi → MAE
      2. Fallback MAE con resize 64x64 (funziona anche con dimensioni diverse)
    """
    # Tentativo 1: hash percettivo — veloce, identici → cortocircuito
    hash_a = _phash_via_identify(a)
    hash_b = _phash_via_identify(b)
    if hash_a and hash_b:
        if hash_a == hash_b:
            return 1.0
        # hash diversi ma potrebbe essere solo compressione: passiamo al MAE

    # Tentativo 2: MAE con resize
    return _mae_similarity(a, b)

# ---------------------------------------------------------------------------
# Similarità video (durata + dimensione)
# ---------------------------------------------------------------------------
def video_duration(path: Path) -> float:
    if not shutil.which("ffprobe"):
        return -1
    try:
        r = subprocess.run(
            ["ffprobe", "-v", "error", "-show_entries", "format=duration",
             "-of", "json", str(path)],
            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
        )
        data = json.loads(r.stdout.decode(errors="replace"))
        return float(data["format"]["duration"])
    except Exception:
        return -1

def video_similarity(a: Path, b: Path) -> float:
    try:
        size_a = a.stat().st_size
        size_b = b.stat().st_size
    except Exception:
        return 0.0
    size_ratio = min(size_a, size_b) / max(size_a, size_b) if size_a > 0 and size_b > 0 else 0.0
    dur_a, dur_b = video_duration(a), video_duration(b)
    if dur_a > 0 and dur_b > 0 and abs(dur_a - dur_b) >= 2.0:
        return 0.0
    return size_ratio

def compute_similarity(a: Path, b: Path) -> float:
    ft = file_type(a)
    if ft == "image": return image_similarity(a, b)
    if ft == "video": return video_similarity(a, b)
    return 1.0

# ---------------------------------------------------------------------------
# Pattern duplicati
# ---------------------------------------------------------------------------
DUPE_PATTERN = re.compile(r'^(.*) \(\d+\)$')

def strip_dupe_suffix(stem: str) -> str:
    while True:
        m = DUPE_PATTERN.match(stem)
        if m: stem = m.group(1)
        else: break
    return stem

def collect_groups(root: Path) -> dict:
    groups = defaultdict(list)
    for dirpath, _, filenames in os.walk(root):
        for fname in filenames:
            parts = fname.rsplit(".", 1)
            if len(parts) < 2: continue
            stem, ext_raw = parts
            ext = "." + ext_raw
            if not DUPE_PATTERN.match(stem): continue
            original_stem = strip_dupe_suffix(stem)
            original_path = Path(dirpath) / (original_stem + ext)
            groups[original_path].append(Path(dirpath) / fname)
    return dict(groups)

# ---------------------------------------------------------------------------
# Risultato gruppo
# ---------------------------------------------------------------------------
class GroupResult:
    def __init__(self):
        self.original_ok        = False
        self.original_corrupt   = False
        self.original_missing   = False
        self.promoted           = False
        self.skipped_corrupt    = False
        self.skipped_similarity = False
        self.deleted_files      = []
        self.deleted_bytes      = 0
        self.corrupt_files      = []
        self.resolved_corrupt   = []
        self.dissimilar_groups  = []
        self.errors             = 0

# ---------------------------------------------------------------------------
# Elaborazione singolo gruppo
#
# Struttura in 4 fasi separate e sequenziali:
#   FASE A — Analisi:    verifica leggibilità originale e tutti i duplicati
#   FASE B — Decisione:  determina cosa tenere e cosa cancellare
#   FASE C — Promozione: se originale non valido, copia il miglior duplicato
#   FASE D — Cancella:   elimina i duplicati solo dopo aver garantito
#                        che esiste almeno un file valido nel gruppo
# ---------------------------------------------------------------------------
def process_group(
    original_path: Path,
    dups: list,
    dry_run: bool,
    sim_thresh: float,
    logger: Logger,
    live: "LiveStatus",
    stop_event: threading.Event | None = None,
) -> GroupResult:

    # Se il segnale di stop è già attivo prima di iniziare, ritorna subito
    if stop_event and stop_event.is_set():
        res = GroupResult()
        res.skipped_corrupt = True
        live.clear_state()
        return res

    res = GroupResult()
    gl  = GroupLog(original_path, len(dups))
    live.set_state("ANALISI", str(original_path), n_dups=len(dups))

    # ==================================================================
    # FASE A: analisi completa di tutti i file del gruppo
    # ==================================================================

    # Controlla originale
    original_exists   = original_path.is_file()
    original_readable = False

    if original_exists:
        _ok, _reason = is_readable(original_path)
        if _ok:
            original_readable = True
            res.original_ok   = True
            gl.add("CHECK_OK", f"Originale valido: {original_path} [{_reason}]", C.GREEN)
        else:
            res.original_corrupt = True
            res.corrupt_files.append(original_path)
            gl.add("CORROTTO", f"Originale NON valido: {original_path} [{_reason}]", C.RED)
    else:
        res.original_missing = True
        gl.add("MANCANTE", f"Originale non esiste su disco: {original_path}", C.YELLOW)

    # Controlla stop prima del loop sui duplicati (potenzialmente lungo)
    if stop_event and stop_event.is_set():
        gl.add("STOP", "Interruzione per timeout — gruppo saltato", C.YELLOW)
        gl.flush(logger, live)
        with live._lock:
            live.counts["skipped_corrupt"] += 1
        live.advance()
        live.clear_state()
        res.skipped_corrupt = True
        return res

    # Analizza ogni duplicato: validità + similarità
    # Risultato per ogni dup: ("delete"|"skip"|"promote_candidate", motivo, file_size)
    dup_analysis = []   # (dup_path, action, reason, file_size, is_corrupt)

    # Riferimento per il confronto similarità: l'originale se valido,
    # altrimenti verrà aggiornato dopo la promozione
    original_for_sim = original_path if original_readable else None

    for dup in dups:
        if not dup.is_file():
            dup_analysis.append((dup, "delete", "file non trovato su disco", 0, False))
            continue

        try:
            file_size = dup.stat().st_size
        except Exception:
            file_size = 0

        # Check stop dentro il loop (ogni duplicato può richiedere secondi)
        if stop_event and stop_event.is_set():
            gl.add("STOP", f"Interruzione per timeout durante analisi duplicati — gruppo parziale", C.YELLOW)
            gl.flush(logger, live)
            with live._lock:
                live.counts["skipped_corrupt"] += 1
            live.advance()
            live.clear_state()
            res.skipped_corrupt = True
            return res
        live.set_state("DUP-CHECK", str(dup), n_dups=len(dups), done_dups=len(dup_analysis))

        dup_ok, dup_reason = is_readable(dup)

        if not dup_ok:
            # Corrotto: da eliminare, nessun confronto necessario
            res.corrupt_files.append(dup)
            gl.add("CORROTTO_DUP",
                f"Duplicato NON valido [{dup_reason}]: {dup}", C.RED)
            dup_analysis.append((dup, "delete", f"corrotto: {dup_reason}", file_size, True))
            continue

        # Valido: se l'originale è corrotto/mancante, questo è un candidato promozione
        if not original_readable:
            gl.add("CANDIDATO",
                f"Duplicato valido, candidato promozione [{dup_reason}]: {dup}", C.CYAN)
            dup_analysis.append((dup, "promote_candidate", dup_reason, file_size, False))
            continue

        # Originale valido: confronto similarità
        try:
            same_size = original_for_sim.stat().st_size == file_size
        except Exception:
            same_size = False

        if same_size:
            gl.add("SIMILARITY",
                f"Dimensione identica ({fmt_size(file_size)}) → stesso file: {dup.name}")
            dup_analysis.append((dup, "delete", "dimensione identica", file_size, False))
        else:
            score = compute_similarity(original_for_sim, dup)
            gl.add("SIMILARITY",
                f"Similarità: {original_for_sim.name} ↔ {dup.name}: "
                f"score={score:.3f} (soglia: {sim_thresh:.2f})")
            if score < sim_thresh:
                res.skipped_similarity = True
                res.dissimilar_groups.append((original_path, dup, score))
                gl.add("SKIP_SIM",
                    f"[SKIP-SIMILARITÀ] score={score:.3f} — NON elimino:\n"
                    f"    orig: {original_for_sim}\n    dup : {dup}",
                    C.MAGENTA)
                dup_analysis.append((dup, "skip", f"score={score:.3f}", file_size, False))
            else:
                dup_analysis.append((dup, "delete", f"simile (score={score:.3f})", file_size, False))

    # ==================================================================
    # FASE B: decisione — identifica il miglior candidato alla promozione
    # ==================================================================
    best_candidate = None
    if not original_readable:
        for dup, action, reason, fsize, is_corrupt in dup_analysis:
            if action == "promote_candidate":
                best_candidate = dup
                break

        if not best_candidate:
            gl.add("SKIP",
                "Nessun duplicato valido trovato — gruppo interamente saltato", C.RED)
            res.skipped_corrupt = True
            gl.flush(logger, live)
            with live._lock:
                live.counts["skipped_corrupt"] += 1
            live.advance()
            live.clear_state()
            return res

        res.promoted     = True
        original_for_sim = best_candidate
        # L'originale corrotto verrà gestito (rinominato in .corrupt):
        # spostalo da corrupt_files a resolved_corrupt così non appare
        # nelle statistiche finali come problema irrisolto.
        if original_path in res.corrupt_files:
            res.corrupt_files.remove(original_path)
            res.resolved_corrupt.append(original_path)

        # Ora che abbiamo il promosso, rivalutiamo i candidati rimasti
        # che non avevano ancora un riferimento valido per la similarità
        new_analysis = []
        for dup, action, reason, fsize, is_corrupt in dup_analysis:
            if action != "promote_candidate":
                new_analysis.append((dup, action, reason, fsize, is_corrupt))
                continue
            if dup == best_candidate:
                new_analysis.append((dup, "keep_promoted", "promosso come originale", fsize, False))
                continue
            # Altri candidati promozione: ora confronta con il promosso
            try:
                same_size = best_candidate.stat().st_size == fsize
            except Exception:
                same_size = False

            if same_size:
                gl.add("SIMILARITY",
                    f"Dimensione identica ({fmt_size(fsize)}) → stesso file del promosso: {dup.name}")
                new_analysis.append((dup, "delete", "dimensione identica al promosso", fsize, False))
            else:
                score = compute_similarity(best_candidate, dup)
                gl.add("SIMILARITY",
                    f"Similarità vs promosso: {best_candidate.name} ↔ {dup.name}: "
                    f"score={score:.3f} (soglia: {sim_thresh:.2f})")
                if score < sim_thresh:
                    res.skipped_similarity = True
                    res.dissimilar_groups.append((original_path, dup, score))
                    gl.add("SKIP_SIM",
                        f"[SKIP-SIMILARITÀ] score={score:.3f} — NON elimino: {dup}", C.MAGENTA)
                    new_analysis.append((dup, "skip", f"score={score:.3f}", fsize, False))
                else:
                    new_analysis.append((dup, "delete", f"simile al promosso (score={score:.3f})", fsize, False))
        dup_analysis = new_analysis

    # ==================================================================
    # FASE C: promozione (solo ora che sappiamo che il gruppo è salvabile)
    # ==================================================================
    if best_candidate:
        live.set_state("PROMOZIONE", str(best_candidate), n_dups=len(dups))
        if dry_run:
            gl.add("DRY_PROMO",
                f"[DRY-RUN] Promuoverei: {best_candidate} → {original_path}", C.YELLOW)
            if original_exists:
                gl.add("DRY_PROMO",
                    f"[DRY-RUN] Eliminerei il corrotto: {original_path}", C.YELLOW)
        else:
            try:
                if original_exists:
                    # Il file corrotto non serve: lo eliminiamo direttamente
                    original_path.unlink()
                    gl.add("ELIMINATO",
                        f"Corrotto eliminato: {original_path}", C.GREEN)
                shutil.copy2(str(best_candidate), str(original_path))
                gl.add("PROMOSSO",
                    f"Promosso: {best_candidate} → {original_path}", C.GREEN)
            except Exception as e:
                res.errors += 1
                gl.add("ERRORE", f"Promozione fallita: {e}", C.RED)

    # ==================================================================
    # FASE D: cancellazione — solo ora che il gruppo è garantito sicuro
    # ==================================================================
    live.set_state("CANCELLAZIONE", str(original_path), n_dups=len(dups))
    for dup, action, reason, file_size, is_corrupt in dup_analysis:
        if action not in ("delete",):
            continue

        dup_status = "CORROTTO" if is_corrupt else "valido"

        if not dup.is_file():
            gl.add("SKIP", f"Non trovato su disco (già rimosso?): {dup}")
            continue

        if dry_run:
            gl.add("DRY_DEL",
                f"[DRY-RUN] Eliminerei [{dup_status}] ({fmt_size(file_size)}): {dup}",
                C.YELLOW)
            res.deleted_files.append(dup)
            res.deleted_bytes += file_size
        else:
            try:
                dup.unlink()
                res.deleted_files.append(dup)
                res.deleted_bytes += file_size
                gl.add("ELIMINATO",
                    f"[{dup_status}] ({fmt_size(file_size)}): {dup}", C.GREEN)
            except Exception as e:
                res.errors += 1
                gl.add("ERRORE", f"Impossibile eliminare {dup}: {e}", C.RED)

    # Aggiorna i contatori live direttamente nel worker thread,
    # così il pannello si aggiorna immediatamente senza aspettare
    # che il main thread raccolga il future.result().
    with live._lock:
        c = live.counts
        if res.original_ok:        c["originals_ok"]      += 1
        if res.original_corrupt:   c["originals_corrupt"]  += 1
        if res.promoted:           c["promoted"]           += 1
        if res.skipped_corrupt:    c["skipped_corrupt"]    += 1
        if res.skipped_similarity: c["skipped_sim"]        += 1
        c["deleted"]          += len(res.deleted_files)
        c["deleted_bytes"]    += res.deleted_bytes
        c["errors"]           += res.errors
        c["corrupt_files"]    += res.corrupt_files
        c["resolved_corrupt"] += res.resolved_corrupt
        c["dissimilar"]       += res.dissimilar_groups

    gl.flush(logger, live)
    live.advance()
    live.clear_state()
    return res

# ---------------------------------------------------------------------------
# Stampa statistiche (usata sia dal main thread che dal timer thread)
# ---------------------------------------------------------------------------
def _print_stats(
    stopped_early, start_dt, total,
    count_originals_ok, count_originals_corrupt, count_promoted,
    count_deleted, count_skipped_corrupt, count_skipped_sim,
    count_errors, total_deleted_bytes,
    all_corrupt_files, all_resolved_corrupt, all_dissimilar,
    mode_str, workers, sim_thresh, max_time, logger, log_path,
):
    end_dt      = datetime.now()
    elapsed_sec = (end_dt - start_dt).total_seconds()
    gruppi_elaborati = count_originals_ok + count_originals_corrupt + count_skipped_corrupt

    stats = [
        f"[STATS   ] {'=' * 56}",
        f"[STATS   ] {'STATISTICHE PARZIALI' if stopped_early else 'STATISTICHE FINALI'}  — nc_dedup.py v{VERSION} ({VERSION_DATE})",
        f"[STATS   ] {'⚠ INTERRUZIONE ANTICIPATA — dati parziali' if stopped_early else 'Elaborazione completata'}",
        f"[STATS   ] Inizio elaborazione       : {start_dt.strftime('%Y-%m-%d %H:%M:%S')}",
        f"[STATS   ] Fine elaborazione         : {end_dt.strftime('%Y-%m-%d %H:%M:%S')}",
        f"[STATS   ] Tempo impiegato           : {fmt_duration(elapsed_sec)} ({elapsed_sec:.1f}s)",
        f"[STATS   ] {'-' * 56}",
        f"[STATS   ] Gruppi totali trovati     : {total}",
        f"[STATS   ] Gruppi elaborati          : {gruppi_elaborati}",
        f"[STATS   ] Originali OK              : {count_originals_ok}",
        f"[STATS   ] Originali corrotti        : {count_originals_corrupt}",
        f"[STATS   ] Duplicati promossi        : {count_promoted}",
        f"[STATS   ] Gruppi saltati (corrotti) : {count_skipped_corrupt}",
        f"[STATS   ] Coppie saltate (diversi)  : {count_skipped_sim}",
        f"[STATS   ] File eliminati            : {count_deleted}",
        f"[STATS   ] Dimensione liberata       : {fmt_size(total_deleted_bytes)}",
        f"[STATS   ] Gruppi senza dup. valido  : {len(all_corrupt_files)}",
        f"[STATS   ] Errori                    : {count_errors}",
        f"[STATS   ] Modalità                  : {mode_str}",
        f"[STATS   ] Thread                    : {workers}",
        f"[STATS   ] Soglia similarità         : {sim_thresh:.2f}",
        f"[STATS   ] Tempo max                 : {'illimitato' if max_time == 0 else fmt_duration(max_time)}",
    ]
    if all_corrupt_files:
        stats.append(f"[STATS   ] {'-' * 56}")
        stats.append(f"[STATS   ] âš  GRUPPI SENZA DUPLICATO VALIDO (intervento manuale necessario):")
        for f in all_corrupt_files:
            stats.append(f"[STATS   ]   ! {f}")
    if all_dissimilar:
        stats.append(f"[STATS   ] {'-' * 56}")
        stats.append(f"[STATS   ] COPPIE SKIPPATE PER BASSA SIMILARITÀ:")
        for orig, dup, score in all_dissimilar:
            stats.append(f"[STATS   ]   score={score:.3f}  orig={orig.name}  dup={dup.name}")
            stats.append(f"[STATS   ]     orig_path={orig}")
            stats.append(f"[STATS   ]     dup_path ={dup}")
    stats.append(f"[STATS   ] {'=' * 56}")
    logger.write_header(stats)

    print()
    print(colored(C.BOLD, "=" * 60))
    title_str = "  STATISTICHE PARZIALI âš " if stopped_early else "  STATISTICHE FINALI"
    print(colored(C.YELLOW if stopped_early else C.BOLD, title_str))
    if stopped_early:
        print(colored(C.YELLOW, f"  Interruzione dopo {fmt_duration(max_time)} — elaborati {gruppi_elaborati}/{total} gruppi"))
    print(colored(C.BOLD,    "-" * 60))
    print(f"  Inizio elaborazione      : {colored(C.CYAN,    start_dt.strftime('%Y-%m-%d %H:%M:%S'))}")
    print(f"  Fine elaborazione        : {colored(C.CYAN,    end_dt.strftime('%Y-%m-%d %H:%M:%S'))}")
    print(f"  Tempo impiegato          : {colored(C.CYAN,    fmt_duration(elapsed_sec))} ({elapsed_sec:.1f}s)")
    print(colored(C.BOLD,    "-" * 60))
    print(f"  Gruppi totali trovati    : {colored(C.BOLD,    str(total))}")
    print(f"  Gruppi elaborati         : {colored(C.BOLD,    str(gruppi_elaborati))}")
    print(f"  Originali OK             : {colored(C.GREEN,   str(count_originals_ok))}")
    print(f"  Originali corrotti       : {colored(C.RED,     str(count_originals_corrupt))}")
    print(f"  Duplicati promossi       : {colored(C.CYAN,    str(count_promoted))}")
    print(f"  Gruppi saltati (corrotti): {colored(C.YELLOW,  str(count_skipped_corrupt))}")
    print(f"  Coppie saltate (diversi) : {colored(C.MAGENTA, str(count_skipped_sim))}")
    print(f"  File eliminati           : {colored(C.GREEN,   str(count_deleted))}")
    print(f"  Dimensione liberata      : {colored(C.GREEN,   fmt_size(total_deleted_bytes))}")
    print(f"  Gruppi senza dup. valido : {colored(C.RED,     str(len(all_corrupt_files)))}")
    print(f"  Errori                   : {colored(C.RED,     str(count_errors))}")

    if all_corrupt_files:
        print(colored(C.BOLD, "-" * 60))
        print(colored(C.RED, "  âš  GRUPPI SENZA DUPLICATO VALIDO (intervento manuale necessario):"))
        for cf in all_corrupt_files:
            print(f"    {colored(C.RED, '!')} {colored(C.YELLOW, str(cf))}")

    if all_dissimilar:
        print(colored(C.BOLD, "-" * 60))
        print(colored(C.MAGENTA, "  COPPIE SKIPPATE PER BASSA SIMILARITÀ:"))
        for orig, dup, score in all_dissimilar:
            print(f"    {colored(C.YELLOW, f'score={score:.3f}')}  orig={orig.name}  dup={dup.name}")
            print(f"      {colored(C.CYAN, str(dup))}")

    print(colored(C.BOLD, "=" * 60))
    print(f"  Log salvato in: {colored(C.CYAN, str(log_path))}")
    print()


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
    cpu_count = os.cpu_count() or 4

    parser = argparse.ArgumentParser(
        description=f"Nextcloud Duplicate Cleaner v{VERSION}"
    )
    parser.add_argument("target_dir")
    parser.add_argument("--dry-run",    action="store_true")
    parser.add_argument("--workers",    type=int,   default=cpu_count,
                        help=f"Thread paralleli (default: {cpu_count})")
    parser.add_argument("--similarity", type=float, default=0.90,
                        help="Soglia similarità 0.0-1.0 (default: 0.90)")
    parser.add_argument("--max-time",   type=int,   default=300,
                        help="Tempo massimo di esecuzione in secondi (default: 300 = 5 min, 0 = illimitato)")
    args = parser.parse_args()

    dry_run    = args.dry_run
    workers    = max(1, args.workers)
    sim_thresh = max(0.0, min(1.0, args.similarity))
    max_time   = args.max_time  # 0 = illimitato
    target_dir = Path(args.target_dir).expanduser().resolve()

    if not target_dir.is_dir():
        print(colored(C.RED, f"Errore: '{target_dir}' non esiste."))
        sys.exit(1)
    if not shutil.which("magick"):
        print(colored(C.RED, "Errore: 'identify' non trovato. sudo pacman -S imagemagick"))
        sys.exit(1)
    if not shutil.which("magick"):
        print(colored(C.YELLOW, "Attenzione: 'compare' non trovato — similarità immagini disabilitata."))
    if not shutil.which("ffprobe"):
        print(colored(C.YELLOW, "Attenzione: 'ffprobe' non trovato — verifica video disabilitata. sudo pacman -S ffmpeg"))

    start_dt = datetime.now()
    start_ts = start_dt.strftime("%Y%m%d_%H%M%S")
    log_path = Path.cwd() / f"cleanup_dupes_{start_ts}.log"
    logger   = Logger(log_path)
    mode_str = "DRY-RUN" if dry_run else "LIVE"

    # Header log
    logger.write_header([
        f"[INFO    ] {'=' * 56}",
        f"[INFO    ] nc_dedup.py  v{VERSION} ({VERSION_DATE})",
        f"[INFO    ] Inizio     : {start_dt.strftime('%Y-%m-%d %H:%M:%S')}",
        f"[INFO    ] Cartella   : {target_dir}",
        f"[INFO    ] Modalità   : {mode_str}",
        f"[INFO    ] Thread     : {workers}",
        f"[INFO    ] Similarità : {sim_thresh:.2f}",
        f"[INFO    ] Tempo max  : {'illimitato' if max_time == 0 else fmt_duration(max_time)}",
        f"[INFO    ] {'=' * 56}",
    ])

    # Header console
    print()
    print(colored(C.BOLD, "=" * 60))
    print(colored(C.BOLD, f"  Nextcloud Duplicate Cleaner  v{VERSION} ({VERSION_DATE})"))
    print(f"  Cartella    : {colored(C.CYAN, str(target_dir))}")
    if dry_run:
        print(f"  Modalità    : {colored(C.YELLOW, 'DRY-RUN — nessun file verrà cancellato')}")
    else:
        print(f"  Modalità    : {colored(C.RED, 'LIVE — i duplicati verranno eliminati definitivamente')}")
    print(f"  Thread      : {colored(C.CYAN, str(workers))}")
    print(f"  Similarità  : {colored(C.CYAN, str(sim_thresh))} (sotto soglia → skip)")
    max_time_str = "illimitato" if max_time == 0 else fmt_duration(max_time)
    print(f"  Tempo max   : {colored(C.CYAN, max_time_str)}")
    print(f"  Log         : {colored(C.CYAN, str(log_path))}")
    print(colored(C.BOLD, "=" * 60))
    print()

    print("Scansione duplicati in corso...")
    groups = collect_groups(target_dir)
    total  = len(groups)
    print(f"Trovati {colored(C.BOLD, str(total))} gruppi di duplicati.\n")
    logger.write_header([f"[INFO    ] Trovati {total} gruppi di duplicati."])

    if total == 0:
        print(colored(C.GREEN, "Nessun duplicato trovato. Uscita."))
        sys.exit(0)

    live = LiveStatus(total, workers, logger, start_dt=start_dt, max_time=max_time)

    count_originals_ok      = 0
    count_originals_corrupt = 0
    count_promoted          = 0
    count_deleted           = 0
    count_skipped_corrupt   = 0
    count_skipped_sim       = 0
    count_errors            = 0
    total_deleted_bytes     = 0
    all_corrupt_files       = []
    all_resolved_corrupt    = []
    all_dissimilar          = []

    # ----------------------------------------------------------------
    # stop_event : segnale condiviso tra timer thread e worker thread.
    #              Quando scade il timeout, il timer thread lo imposta;
    #              i worker lo controllano nei punti chiave e escono.
    # done_event : il main thread lo imposta quando ha finito tutto,
    #              così il timer thread sa che può terminare senza fare nulla.
    # ----------------------------------------------------------------
    stop_event    = threading.Event()
    done_event    = threading.Event()
    stopped_early = False
    # counts è ora dentro live.counts (LiveStatus)

    # stats_printed_event: evita che sia il timer che il main stampino le stats
    stats_printed_event = threading.Event()

    def _timer_thread():
        """
        Dorme per max_time secondi. Se scade:
        1. Imposta stop_event → i worker escono al prossimo checkpoint
        2. Stampa avviso a console
        3. Aspetta done_event (max_time + 60s safety)
        4. Imposta stats_printed_event per bloccare il main thread
        5. Stampa le statistiche parziali
        Se done_event scatta prima del timeout → esce senza fare nulla.
        """
        nonlocal stopped_early

        finished_naturally = done_event.wait(timeout=max_time)
        if finished_naturally:
            return  # completato naturalmente, il main stampa le stats

        # Timeout scaduto
        stopped_early = True
        stop_event.set()

        msg = f"Timeout raggiunto ({fmt_duration(max_time)}) — attendo i checkpoint dei thread in corso..."
        logger.write_header([f"[TIMEOUT ] {msg}"])
        with logger._print_lock:
            print(colored(C.YELLOW, f"\n  âš   {msg}"))

        # Aspetta che il main thread segnali che i worker sono usciti
        done_event.wait(timeout=max_time + 60)

        # Prende il diritto di stampare le statistiche
        stats_printed_event.set()

        # Ferma il pannello live e stampa le statistiche
        live.stop_rendering()
        with live._lock:
            _print_stats(
                stopped_early=True,
                start_dt=start_dt,
                total=total,
                count_originals_ok=live.counts["originals_ok"],
                count_originals_corrupt=live.counts["originals_corrupt"],
                count_promoted=live.counts["promoted"],
                count_deleted=live.counts["deleted"],
                count_skipped_corrupt=live.counts["skipped_corrupt"],
                count_skipped_sim=live.counts["skipped_sim"],
                count_errors=live.counts["errors"],
                total_deleted_bytes=live.counts["deleted_bytes"],
                all_corrupt_files=list(dict.fromkeys(live.counts["corrupt_files"])),
                all_resolved_corrupt=list(dict.fromkeys(live.counts["resolved_corrupt"])),
                all_dissimilar=live.counts["dissimilar"],
                mode_str=mode_str,
                workers=workers,
                sim_thresh=sim_thresh,
                max_time=max_time,
                logger=logger,
                log_path=log_path,
            )

    if max_time > 0:
        t = threading.Thread(target=_timer_thread, daemon=True)
        t.start()

    print(f"Elaborazione con {workers} thread:\n")
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = {
            executor.submit(
                process_group,
                original_path, dups, dry_run, sim_thresh, logger, live,
                stop_event,
            ): original_path
            for original_path, dups in groups.items()
        }

        for future in as_completed(futures):
            if stop_event.is_set():
                # Timer ha già fatto scattare lo stop: cancella i pending
                for f in futures:
                    f.cancel()
                # Raccoglie i risultati dei worker già completati prima del break
                # (i loro counts sono già stati aggiornati nel worker thread)
                try:
                    future.result(timeout=0)
                except Exception:
                    pass
                break

            try:
                res = future.result()
            except Exception as e:
                import traceback as _tb
                err_msg = f"Eccezione nel thread: {e}"
                tb_str  = _tb.format_exc().strip()
                logger.write_header([f"[ERROR   ] {err_msg}", f"[TRACEBACK] {tb_str}"])
                logger.print_console(C.RED, f"[ERRORE] {err_msg}", live)
                logger.print_console(C.RED, tb_str, live)
                with live._lock:
                    live.counts["errors"] += 1
                continue

            # Counts già aggiornati nel worker thread — niente da fare qui

    # Segnala al timer thread che i worker sono usciti
    done_event.set()

    # Se il timeout è scaduto, aspetta che il timer thread finisca di stampare
    # le statistiche (stats_printed_event) prima di procedere
    if stopped_early:
        t.join(timeout=30)   # attendi il timer thread (max 30s)
        live.stop_rendering()
        return               # statistiche già stampate dal timer thread

    # Run completato naturalmente prima del timeout: stampiamo noi le stats
    live.stop_rendering()
    if not stats_printed_event.is_set():
        _print_stats(
            stopped_early=False,
            start_dt=start_dt,
            total=total,
            count_originals_ok=live.counts["originals_ok"],
            count_originals_corrupt=live.counts["originals_corrupt"],
            count_promoted=live.counts["promoted"],
            count_deleted=live.counts["deleted"],
            count_skipped_corrupt=live.counts["skipped_corrupt"],
            count_skipped_sim=live.counts["skipped_sim"],
            count_errors=live.counts["errors"],
            total_deleted_bytes=live.counts["deleted_bytes"],
            all_corrupt_files=list(dict.fromkeys(live.counts["corrupt_files"])),
            all_resolved_corrupt=list(dict.fromkeys(live.counts["resolved_corrupt"])),
            all_dissimilar=live.counts["dissimilar"],
            mode_str=mode_str,
            workers=workers,
            sim_thresh=sim_thresh,
            max_time=max_time,
            logger=logger,
            log_path=log_path,
        )

if __name__ == "__main__":
    main()

Cheers