My MobileUpload folder is full of duplicate images and movies created by the mobile client bugs.
I spent some time to generate and test a smart script with Claude that could eliminate this problem. This is the feature list.
- dry run
- multithreading
- run time: limit the run time just to check logs and statistics after a while
- detect corrupted images within duplicates group to avoi deleting good images
- check similarity within duplicates, avoid deletion of manual edits
- progress bar
- log file
With a 5 minutes run I saved about 900MB
this is Claude dependencies analysis:
Sure! Here’s the dependency summary in English:
The script relies exclusively on Python’s standard library — no pip install required:
argparse,json,os,re,shutil,subprocess,sys,threading,collections,concurrent.futures,datetime,pathlib
External tools (install via system package manager):
sudo pacman -S imagemagick ffmpeg # Arch / Garuda
sudo apt install imagemagick ffmpeg # Debian / Ubuntu
-
magick(ImageMagick v7) — image readability, average luminosity check, visual similarity -
ffprobe(bundled with ffmpeg) — video readability and duration comparison
So to get everything running:
sudo pacman -S imagemagick ffmpeg
python nc_dedup.py --dry-run ~/Nextcloud2/MobileUploads
this is an output sample:
This is the code, as everything it could be improved.
#!/usr/bin/env python3
# =============================================================================
# nc_dedup.py
# Versione: 3.30.0 (2026-04-02)
#
# Pulizia duplicati generati dal bug del client mobile Nextcloud.
#
# Pattern duplicati: "filename (N).ext" → originale: "filename.ext"
# Gestisce duplicati annidati: "foto (2) (2) (2).jpg" → "foto.jpg"
# Supporta: immagini (jpg, jpeg, png, heic, webp, gif, bmp, tiff) e video (mp4, mov, avi, mkv, 3gp)
#
# Uso:
# python nc_dedup.py [opzioni] <cartella_mobileuploads>
#
# Opzioni:
# --dry-run Simula tutto senza cancellare nulla (consigliato al primo run)
# --workers N Numero di thread paralleli (default: numero di CPU)
# --similarity T Soglia similarità immagini 0.0-1.0 (default: 0.90)
# Sotto soglia il duplicato NON viene eliminato.
# Esempi: 0.80 = più permissivo, 0.95 = più restrittivo
# --max-time SEC Tempo massimo di esecuzione in secondi (default: 300 = 5 min)
# Lo script si ferma al prossimo checkpoint e stampa le
# statistiche parziali. Usare 0 per run illimitato.
# Esempi:
# --max-time 120 → 2 minuti
# --max-time 600 → 10 minuti
# --max-time 3600 → 1 ora
# --max-time 0 → nessun limite
#
# Esempi d'uso:
# python nc_dedup.py --dry-run ~/Nextcloud2/MobileUploads
# python nc_dedup.py --dry-run --max-time 120 ~/Nextcloud2/MobileUploads
# python nc_dedup.py --max-time 600 --workers 4 ~/Nextcloud2/MobileUploads
# python nc_dedup.py --max-time 0 ~/Nextcloud2/MobileUploads
#
# Dipendenze: Python 3.6+, imagemagick v7 (magick), ffprobe (ffmpeg per video)
# Installazione dipendenze su Arch/Garuda:
# sudo pacman -S imagemagick ffmpeg
# =============================================================================
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import threading
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
VERSION = "3.30.0"
VERSION_DATE = "2026-04-02"
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".heic", ".webp", ".gif", ".bmp", ".tiff", ".tif"}
VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"}
# ---------------------------------------------------------------------------
# Colori ANSI
# ---------------------------------------------------------------------------
class C:
RED = "\033[0;31m"
YELLOW = "\033[1;33m"
GREEN = "\033[0;32m"
CYAN = "\033[0;36m"
MAGENTA = "\033[0;35m"
BOLD = "\033[1m"
RESET = "\033[0m"
def colored(color: str, text: str) -> str:
return f"{color}{text}{C.RESET}"
def _ansi_up(n: int) -> None:
if n > 0:
import sys; sys.stdout.write(f"\x1b[{n}A\r")
def _print_line(text: str) -> None:
import sys; sys.stdout.write(text + "\x1b[K\n")
def _flush_stdout() -> None:
import sys; sys.stdout.flush()
def fmt_size(n_bytes: int) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if n_bytes < 1024:
return f"{n_bytes:.1f} {unit}"
n_bytes /= 1024
return f"{n_bytes:.1f} PB"
def fmt_duration(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
return f"{h:02d}:{m:02d}:{s:02d}"
def file_type(path: Path) -> str:
ext = path.suffix.lower()
if ext in IMAGE_EXTS: return "image"
if ext in VIDEO_EXTS: return "video"
return "unknown"
# ---------------------------------------------------------------------------
# Logger thread-safe con scrittura atomica per gruppo
# Ogni gruppo accumula le proprie righe in un buffer locale (nessun lock
# durante l'elaborazione), poi le scarica tutte in una sola operazione.
# Questo garantisce che nel file di log i messaggi di ogni gruppo siano
# sempre raggruppati e mai intercalati con quelli di altri thread.
# ---------------------------------------------------------------------------
class Logger:
def __init__(self, log_path: Path):
self.log_path = log_path
self._lock = threading.Lock()
self._print_lock = threading.Lock()
log_path.touch()
def _write_lines(self, lines: list[str]):
"""Scrive una lista di righe in modo atomico (un solo acquire del lock)."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with self._lock:
with self.log_path.open("a", encoding="utf-8") as f:
for line in lines:
f.write(f"[{ts}] {line}\n")
def write_header(self, lines: list[str]):
"""Scrive righe di intestazione (usato per header/footer globali)."""
self._write_lines(lines)
def print_console(self, color: str, msg: str, live: "LiveStatus | None" = None):
"""Stampa un messaggio globale sopra il pannello live."""
if live:
live._flushing = True
with self._print_lock:
panel_h = LiveStatus.PANEL_LINES if live else 0
if panel_h > 0:
_ansi_up(panel_h)
_print_line(colored(color, msg))
if live:
with live._lock:
panel_lines = live._build_panel()
for line in panel_lines:
_print_line(line)
LiveStatus.PANEL_LINES = len(panel_lines)
_flush_stdout()
if live:
live._flushing = False
# ---------------------------------------------------------------------------
# Buffer di log per un singolo gruppo — accumulato senza lock, poi scaricato
# ---------------------------------------------------------------------------
class GroupLog:
def __init__(self, original_path: Path, dup_count: int):
self._lines = []
self._console_msgs = [] # (color, msg)
self._original = original_path
self._dup_count = dup_count
# intestazione del gruppo
self._lines.append(f"[GROUP ] {'─' * 56}")
self._lines.append(f"[GROUP ] ORIGINALE : {original_path}")
self._lines.append(f"[GROUP ] DUPLICATI : {dup_count} file")
self._lines.append(f"[GROUP ] {'─' * 56}")
def add(self, level: str, msg: str, color: str = ""):
"""
Aggiunge una riga al buffer locale (log) e alla coda console.
Tutti i messaggi vanno sempre a console — il colore è opzionale
(default grigio/reset se non specificato).
"""
self._lines.append(f"[{level:<8}] {msg}")
# Usa il colore fornito, oppure RESET (testo normale) come default
self._console_msgs.append((color or C.RESET, f"[{level}] {msg}"))
def flush(self, logger: Logger, live: "LiveStatus | None" = None):
"""
Scarica il buffer sul logger globale in modo atomico.
Stampa i messaggi del gruppo SOPRA il pannello live:
1. Risale le righe del pannello corrente
2. Stampa tutti i messaggi del gruppo (scorrono nel terminale)
3. Ridisegna il pannello in fondo
"""
self._lines.append(f"[GROUP ] {'─' * 56}")
logger._write_lines(self._lines)
if not self._console_msgs:
return
if live:
live._flushing = True
with logger._print_lock:
# Risali sopra il pannello corrente
panel_h = LiveStatus.PANEL_LINES if live else 0
if panel_h > 0:
_ansi_up(panel_h)
# Stampa tutti i messaggi del gruppo — scorroranno sopra il pannello
for color, msg in self._console_msgs:
_print_line(colored(color, msg))
# Ridisegna il pannello in fondo
if live:
with live._lock:
panel_lines = live._build_panel()
for line in panel_lines:
_print_line(line)
LiveStatus.PANEL_LINES = len(panel_lines)
_flush_stdout()
if live:
live._flushing = False
# ---------------------------------------------------------------------------
# LiveStatus — stato live dei thread + pannello aggiornato ogni secondo
# ---------------------------------------------------------------------------
class LiveStatus:
"""
Traccia lo stato di ogni worker thread e le statistiche progressive.
Un thread di rendering ridisegna ogni secondo il pannello a console,
sovrascrivendo le righe precedenti con sequenze ANSI.
"""
PANEL_LINES = 0 # quante righe occupa il pannello (aggiornato al primo render)
def __init__(self, total: int, workers: int, logger: "Logger",
start_dt: "datetime" = None, max_time: int = 0):
self.total = total
self.done = 0
self.workers = workers
self.logger = logger
self.start_dt = start_dt or datetime.now()
self.max_time = max_time # 0 = illimitato
self._lock = threading.Lock()
self._width = 36
self._stop_render = threading.Event()
# Stato per thread: tid → {file, fase, n_dups, done_dups}
self._thread_state: dict[int, dict] = {}
# Statistiche progressive (aggiornate dal main thread)
self.counts = dict(
originals_ok=0, originals_corrupt=0, promoted=0,
deleted=0, skipped_corrupt=0, skipped_sim=0,
errors=0, deleted_bytes=0,
corrupt_files=[], resolved_corrupt=[], dissimilar=[],
)
# Avvia il thread di rendering
self._render_thread = threading.Thread(target=self._render_loop, daemon=True)
self._render_thread.start()
# ------------------------------------------------------------------
# API per i worker thread
# ------------------------------------------------------------------
def set_state(self, phase: str, file: str = "", n_dups: int = 0, done_dups: int = 0):
"""Chiamato dal worker per aggiornare il proprio stato."""
tid = threading.get_ident()
with self._lock:
self._thread_state[tid] = {
"phase": phase, "file": file,
"n_dups": n_dups, "done_dups": done_dups,
}
def clear_state(self):
"""Chiamato dal worker quando ha finito un gruppo."""
tid = threading.get_ident()
with self._lock:
self._thread_state.pop(tid, None)
def advance(self, logger=None):
"""Incrementa il contatore gruppi completati."""
with self._lock:
self.done += 1
# ------------------------------------------------------------------
# Rendering del pannello
# ------------------------------------------------------------------
def _render_loop(self):
first = True
while not self._stop_render.is_set():
self._draw(first)
first = False
self._stop_render.wait(timeout=1.0)
self._draw(first=False) # render finale
def _draw(self, first: bool = False):
with self._lock:
lines = self._build_panel()
with self.logger._print_lock:
if not first and LiveStatus.PANEL_LINES > 0:
# Risali alle righe precedenti e sovrascrivile
_ansi_up(LiveStatus.PANEL_LINES)
for line in lines:
# Stampa la riga e cancella il resto della riga
_print_line(line)
LiveStatus.PANEL_LINES = len(lines)
_flush_stdout()
def _build_panel(self) -> list[str]:
"""Costruisce le righe del pannello (chiamato con _lock acquisito)."""
lines = []
W = 62
# Tempo trascorso e rimanente
elapsed = (datetime.now() - self.start_dt).total_seconds()
elapsed_str = fmt_duration(elapsed)
if self.max_time > 0:
remaining = max(0.0, self.max_time - elapsed)
remaining_str = fmt_duration(remaining)
pct_time = min(1.0, elapsed / self.max_time)
time_bar_w = 20
time_filled = int(time_bar_w * pct_time)
time_bar = "â–“" * time_filled + "â–‘" * (time_bar_w - time_filled)
time_color = C.RED if remaining < 30 else (C.YELLOW if remaining < 60 else C.CYAN)
time_line = (
f" {C.BOLD}Tempo:{C.RESET} "
f"{C.CYAN}{elapsed_str}{C.RESET} trascorso "
f"[{time_color}{time_bar}{C.RESET}] "
f"{time_color}{remaining_str}{C.RESET} rimanenti"
)
else:
time_line = f" {C.BOLD}Tempo:{C.RESET} {C.CYAN}{elapsed_str}{C.RESET} trascorso"
lines.append(time_line)
# Barra progresso gruppi
pct = self.done / self.total if self.total else 0
filled = int(self._width * pct)
bar = "â–ˆ" * filled + "â–‘" * (self._width - filled)
lines.append(
f"{C.BOLD} [{bar}] {self.done}/{self.total} ({pct*100:.1f}%){C.RESET}"
)
# Statistiche sintetiche
c = self.counts
deleted_mb = fmt_size(c["deleted_bytes"])
lines.append(
f" {C.GREEN}elim:{c['deleted']}{C.RESET} "
f"{C.CYAN}promo:{c['promoted']}{C.RESET} "
f"{C.YELLOW}skip:{c['skipped_corrupt'] + c['skipped_sim']}{C.RESET} "
f"{C.RED}err:{c['errors']}{C.RESET} "
f"{C.GREEN}liberati:{deleted_mb}{C.RESET}"
)
# Stato di ogni worker thread
lines.append(f" {C.BOLD}{'─' * (W-2)}{C.RESET}")
active = dict(self._thread_state)
if active:
for i, (tid, st) in enumerate(active.items()):
phase = st["phase"]
fname = Path(st["file"]).name if st["file"] else ""
n_dups = st["n_dups"]
done_dp = st["done_dups"]
dup_info = f" [{done_dp}/{n_dups} dup]" if n_dups else ""
# Tronca il nome file se troppo lungo
max_fname = W - len(phase) - len(dup_info) - 10
if len(fname) > max_fname:
fname = "…" + fname[-(max_fname-1):]
lines.append(
f" {C.CYAN}T{i+1}{C.RESET} "
f"{C.YELLOW}[{phase}]{C.RESET} "
f"{fname}"
f"{C.BOLD}{dup_info}{C.RESET}"
)
else:
lines.append(f" {C.CYAN}(nessun thread attivo){C.RESET}")
lines.append(f" {C.BOLD}{'─' * (W-2)}{C.RESET}")
return lines
def stop_rendering(self):
"""Ferma il thread di rendering (chiamato alla fine)."""
self._stop_render.set()
self._render_thread.join(timeout=2)
# Pulisce il pannello
with self.logger._print_lock:
if LiveStatus.PANEL_LINES > 0:
_ansi_up(LiveStatus.PANEL_LINES)
for _ in range(LiveStatus.PANEL_LINES):
_print_line("") # newline advances to next panel line
LiveStatus.PANEL_LINES = 0
# Alias per compatibilità con chiamate esistenti
Progress = LiveStatus
# ---------------------------------------------------------------------------
# Leggibilità e validità visiva
# ---------------------------------------------------------------------------
# Soglia per rilevare immagini uniformemente bianche o nere (0..65535).
# Un'immagine completamente bianca ha mean ~65535, nera ~0.
# Usiamo una banda di tolleranza del 2% su entrambi gli estremi.
_MEAN_BLACK_MAX = 65535 * 0.02 # < 2% → troppo scura/nera
_MEAN_WHITE_MIN = 65535 * 0.98 # > 98% → troppo chiara/bianca
def _mean_from_identify(path: Path) -> tuple[float | None, str]:
"""
Estrae la luminosità media tramite identify.
Ritorna (mean, descrizione) oppure (None, motivo_errore).
"""
r = subprocess.run(
["magick", "identify", "-quiet", "-format", "%[mean]\n", str(path)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
if r.returncode != 0:
err = r.stderr.decode(errors="replace").strip().splitlines()
# prendi la prima riga significativa dell'errore
msg = next((l for l in err if l.strip()), "identify fallito senza messaggio")
return None, f"identify fallito: {msg[:120]}"
raw = r.stdout.decode(errors="replace").strip()
tokens = [t for t in raw.replace("\n", " ").split() if t]
for tok in tokens:
try:
return float(tok), ""
except ValueError:
continue
return None, f"mean non parsabile: {raw[:80]!r}"
def is_image_fully_valid(path: Path) -> tuple[bool, str]:
"""
Verifica leggibilità tecnica + validità visiva di un'immagine.
Ritorna (valido, motivo_dettagliato).
Strategia:
1. convert -regard-warnings: decodifica rigorosa completa.
Se ok → valido, si passa al check visivo.
2. Se convert fallisce: cattura stderr per il motivo preciso,
poi verifica la luminosità media con identify.
- luminosità nel range valido → visivamente integro nonostante
il warning di convert (es. metadata EXIF non standard) → valido
- luminosità fuori range o identify fallisce → corrotto per davvero,
con messaggio dettagliato del perché
"""
try:
rc = subprocess.run(
["magick", str(path), "/dev/null"],
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
)
convert_ok = rc.returncode == 0
convert_stderr = rc.stderr.decode(errors="replace").strip()
# Prima riga significativa dell'errore di convert
convert_err_line = next(
(l.strip() for l in convert_stderr.splitlines() if l.strip()), ""
)
if not convert_ok:
# convert ha segnalato un problema: verifica luminositÃ
mean, mean_err = _mean_from_identify(path)
if mean is None:
return False, (
f"convert: {convert_err_line or 'errore sconosciuto'} | "
f"identify: {mean_err}"
)
if mean <= _MEAN_BLACK_MAX:
return False, (
f"convert: {convert_err_line} | "
f"immagine completamente nera (mean={mean:.0f})"
)
if mean >= _MEAN_WHITE_MIN:
return False, (
f"convert: {convert_err_line} | "
f"immagine completamente bianca (mean={mean:.0f})"
)
# luminosità ok → il warning di convert era inoffensivo
return True, (
f"ok (mean={mean:.0f}) — convert warning ignorato: "
f"{convert_err_line[:80]}"
)
# convert ok → check visivo con identify
mean, mean_err = _mean_from_identify(path)
if mean is None:
# identify fallisce dopo convert ok: accettiamo come valido
return True, f"ok (identify non disponibile per mean: {mean_err})"
if mean <= _MEAN_BLACK_MAX:
return False, f"immagine completamente nera (mean={mean:.0f})"
if mean >= _MEAN_WHITE_MIN:
return False, f"immagine completamente bianca (mean={mean:.0f})"
return True, f"ok (mean={mean:.0f})"
except FileNotFoundError:
print(colored(C.RED, "Errore: 'magick' non trovato. sudo pacman -S imagemagick"))
sys.exit(1)
def is_video_readable(path: Path) -> bool:
if not shutil.which("ffprobe"):
return True
r = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "json", str(path)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
return r.returncode == 0
def is_readable(path: Path) -> tuple[bool, str]:
"""
Verifica leggibilità + validità visiva.
Per le immagini include anche il controllo bianco/nero.
Ritorna (valido, motivo).
"""
ft = file_type(path)
if ft == "image":
return is_image_fully_valid(path)
if ft == "video":
ok = is_video_readable(path)
return (ok, "ok" if ok else "video non leggibile")
return True, "tipo non controllato"
# ---------------------------------------------------------------------------
# Similarità immagini — hash percettivo via identify
#
# "compare" fallisce quando le immagini hanno dimensioni diverse (caso comune
# con i duplicati Nextcloud). Usiamo invece "identify -format %#" che calcola
# un hash SHA-256 del contenuto visivo normalizzato, oppure come fallback
# confrontiamo il Mean Absolute Error dopo aver ridimensionato entrambe le
# immagini alla stessa dimensione tramite "convert".
# ---------------------------------------------------------------------------
def _phash_via_identify(path: Path) -> str | None:
"""
Estrae il perceptual hash di un'immagine usando identify.
Ritorna una stringa hex o None in caso di errore.
"""
try:
r = subprocess.run(
["magick", "identify", "-quiet", "-format", "%#", str(path)],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
)
if r.returncode != 0:
return None
val = r.stdout.decode(errors="replace").strip()
return val if val else None
except Exception:
return None
def _mae_similarity(a: Path, b: Path) -> float:
"""
Confronto MAE dopo resize a 64x64 usando convert+compare.
Ritorna score [0,1]. Più robusto di PHASH per immagini di dimensioni diverse.
"""
try:
r = subprocess.run(
[
"compare", "-metric", "MAE",
"-resize", "64x64!", # forza resize identico su entrambe
str(a), str(b), "/dev/null",
],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
# returncode 0=identiche, 1=diverse, 2=errore
if r.returncode == 2:
return 0.0
# MAE è su stderr, formato: "valore (normalizzato)"
raw = r.stderr.decode(errors="replace").strip()
# prende il valore normalizzato tra parentesi se presente, altrimenti il primo token
import re as _re
m = _re.search(r'\(([0-9.eE+\-]+)\)', raw)
if m:
mae_norm = float(m.group(1)) # range 0..1
else:
mae_norm = float(raw.split()[0]) / 65535.0 # raw è 0..65535
return max(0.0, 1.0 - mae_norm)
except Exception:
return 0.0
def image_similarity(a: Path, b: Path) -> float:
"""
Ritorna un punteggio di similarità [0.0, 1.0].
Strategia:
1. Prova hash SHA percettivo via identify: se identici → 1.0, se diversi → MAE
2. Fallback MAE con resize 64x64 (funziona anche con dimensioni diverse)
"""
# Tentativo 1: hash percettivo — veloce, identici → cortocircuito
hash_a = _phash_via_identify(a)
hash_b = _phash_via_identify(b)
if hash_a and hash_b:
if hash_a == hash_b:
return 1.0
# hash diversi ma potrebbe essere solo compressione: passiamo al MAE
# Tentativo 2: MAE con resize
return _mae_similarity(a, b)
# ---------------------------------------------------------------------------
# Similarità video (durata + dimensione)
# ---------------------------------------------------------------------------
def video_duration(path: Path) -> float:
if not shutil.which("ffprobe"):
return -1
try:
r = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "json", str(path)],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
)
data = json.loads(r.stdout.decode(errors="replace"))
return float(data["format"]["duration"])
except Exception:
return -1
def video_similarity(a: Path, b: Path) -> float:
try:
size_a = a.stat().st_size
size_b = b.stat().st_size
except Exception:
return 0.0
size_ratio = min(size_a, size_b) / max(size_a, size_b) if size_a > 0 and size_b > 0 else 0.0
dur_a, dur_b = video_duration(a), video_duration(b)
if dur_a > 0 and dur_b > 0 and abs(dur_a - dur_b) >= 2.0:
return 0.0
return size_ratio
def compute_similarity(a: Path, b: Path) -> float:
ft = file_type(a)
if ft == "image": return image_similarity(a, b)
if ft == "video": return video_similarity(a, b)
return 1.0
# ---------------------------------------------------------------------------
# Pattern duplicati
# ---------------------------------------------------------------------------
DUPE_PATTERN = re.compile(r'^(.*) \(\d+\)$')
def strip_dupe_suffix(stem: str) -> str:
while True:
m = DUPE_PATTERN.match(stem)
if m: stem = m.group(1)
else: break
return stem
def collect_groups(root: Path) -> dict:
groups = defaultdict(list)
for dirpath, _, filenames in os.walk(root):
for fname in filenames:
parts = fname.rsplit(".", 1)
if len(parts) < 2: continue
stem, ext_raw = parts
ext = "." + ext_raw
if not DUPE_PATTERN.match(stem): continue
original_stem = strip_dupe_suffix(stem)
original_path = Path(dirpath) / (original_stem + ext)
groups[original_path].append(Path(dirpath) / fname)
return dict(groups)
# ---------------------------------------------------------------------------
# Risultato gruppo
# ---------------------------------------------------------------------------
class GroupResult:
def __init__(self):
self.original_ok = False
self.original_corrupt = False
self.original_missing = False
self.promoted = False
self.skipped_corrupt = False
self.skipped_similarity = False
self.deleted_files = []
self.deleted_bytes = 0
self.corrupt_files = []
self.resolved_corrupt = []
self.dissimilar_groups = []
self.errors = 0
# ---------------------------------------------------------------------------
# Elaborazione singolo gruppo
#
# Struttura in 4 fasi separate e sequenziali:
# FASE A — Analisi: verifica leggibilità originale e tutti i duplicati
# FASE B — Decisione: determina cosa tenere e cosa cancellare
# FASE C — Promozione: se originale non valido, copia il miglior duplicato
# FASE D — Cancella: elimina i duplicati solo dopo aver garantito
# che esiste almeno un file valido nel gruppo
# ---------------------------------------------------------------------------
def process_group(
original_path: Path,
dups: list,
dry_run: bool,
sim_thresh: float,
logger: Logger,
live: "LiveStatus",
stop_event: threading.Event | None = None,
) -> GroupResult:
# Se il segnale di stop è già attivo prima di iniziare, ritorna subito
if stop_event and stop_event.is_set():
res = GroupResult()
res.skipped_corrupt = True
live.clear_state()
return res
res = GroupResult()
gl = GroupLog(original_path, len(dups))
live.set_state("ANALISI", str(original_path), n_dups=len(dups))
# ==================================================================
# FASE A: analisi completa di tutti i file del gruppo
# ==================================================================
# Controlla originale
original_exists = original_path.is_file()
original_readable = False
if original_exists:
_ok, _reason = is_readable(original_path)
if _ok:
original_readable = True
res.original_ok = True
gl.add("CHECK_OK", f"Originale valido: {original_path} [{_reason}]", C.GREEN)
else:
res.original_corrupt = True
res.corrupt_files.append(original_path)
gl.add("CORROTTO", f"Originale NON valido: {original_path} [{_reason}]", C.RED)
else:
res.original_missing = True
gl.add("MANCANTE", f"Originale non esiste su disco: {original_path}", C.YELLOW)
# Controlla stop prima del loop sui duplicati (potenzialmente lungo)
if stop_event and stop_event.is_set():
gl.add("STOP", "Interruzione per timeout — gruppo saltato", C.YELLOW)
gl.flush(logger, live)
with live._lock:
live.counts["skipped_corrupt"] += 1
live.advance()
live.clear_state()
res.skipped_corrupt = True
return res
# Analizza ogni duplicato: validità + similaritÃ
# Risultato per ogni dup: ("delete"|"skip"|"promote_candidate", motivo, file_size)
dup_analysis = [] # (dup_path, action, reason, file_size, is_corrupt)
# Riferimento per il confronto similarità : l'originale se valido,
# altrimenti verrà aggiornato dopo la promozione
original_for_sim = original_path if original_readable else None
for dup in dups:
if not dup.is_file():
dup_analysis.append((dup, "delete", "file non trovato su disco", 0, False))
continue
try:
file_size = dup.stat().st_size
except Exception:
file_size = 0
# Check stop dentro il loop (ogni duplicato può richiedere secondi)
if stop_event and stop_event.is_set():
gl.add("STOP", f"Interruzione per timeout durante analisi duplicati — gruppo parziale", C.YELLOW)
gl.flush(logger, live)
with live._lock:
live.counts["skipped_corrupt"] += 1
live.advance()
live.clear_state()
res.skipped_corrupt = True
return res
live.set_state("DUP-CHECK", str(dup), n_dups=len(dups), done_dups=len(dup_analysis))
dup_ok, dup_reason = is_readable(dup)
if not dup_ok:
# Corrotto: da eliminare, nessun confronto necessario
res.corrupt_files.append(dup)
gl.add("CORROTTO_DUP",
f"Duplicato NON valido [{dup_reason}]: {dup}", C.RED)
dup_analysis.append((dup, "delete", f"corrotto: {dup_reason}", file_size, True))
continue
# Valido: se l'originale è corrotto/mancante, questo è un candidato promozione
if not original_readable:
gl.add("CANDIDATO",
f"Duplicato valido, candidato promozione [{dup_reason}]: {dup}", C.CYAN)
dup_analysis.append((dup, "promote_candidate", dup_reason, file_size, False))
continue
# Originale valido: confronto similaritÃ
try:
same_size = original_for_sim.stat().st_size == file_size
except Exception:
same_size = False
if same_size:
gl.add("SIMILARITY",
f"Dimensione identica ({fmt_size(file_size)}) → stesso file: {dup.name}")
dup_analysis.append((dup, "delete", "dimensione identica", file_size, False))
else:
score = compute_similarity(original_for_sim, dup)
gl.add("SIMILARITY",
f"Similarità : {original_for_sim.name} ↔ {dup.name}: "
f"score={score:.3f} (soglia: {sim_thresh:.2f})")
if score < sim_thresh:
res.skipped_similarity = True
res.dissimilar_groups.append((original_path, dup, score))
gl.add("SKIP_SIM",
f"[SKIP-SIMILARITÀ] score={score:.3f} — NON elimino:\n"
f" orig: {original_for_sim}\n dup : {dup}",
C.MAGENTA)
dup_analysis.append((dup, "skip", f"score={score:.3f}", file_size, False))
else:
dup_analysis.append((dup, "delete", f"simile (score={score:.3f})", file_size, False))
# ==================================================================
# FASE B: decisione — identifica il miglior candidato alla promozione
# ==================================================================
best_candidate = None
if not original_readable:
for dup, action, reason, fsize, is_corrupt in dup_analysis:
if action == "promote_candidate":
best_candidate = dup
break
if not best_candidate:
gl.add("SKIP",
"Nessun duplicato valido trovato — gruppo interamente saltato", C.RED)
res.skipped_corrupt = True
gl.flush(logger, live)
with live._lock:
live.counts["skipped_corrupt"] += 1
live.advance()
live.clear_state()
return res
res.promoted = True
original_for_sim = best_candidate
# L'originale corrotto verrà gestito (rinominato in .corrupt):
# spostalo da corrupt_files a resolved_corrupt così non appare
# nelle statistiche finali come problema irrisolto.
if original_path in res.corrupt_files:
res.corrupt_files.remove(original_path)
res.resolved_corrupt.append(original_path)
# Ora che abbiamo il promosso, rivalutiamo i candidati rimasti
# che non avevano ancora un riferimento valido per la similaritÃ
new_analysis = []
for dup, action, reason, fsize, is_corrupt in dup_analysis:
if action != "promote_candidate":
new_analysis.append((dup, action, reason, fsize, is_corrupt))
continue
if dup == best_candidate:
new_analysis.append((dup, "keep_promoted", "promosso come originale", fsize, False))
continue
# Altri candidati promozione: ora confronta con il promosso
try:
same_size = best_candidate.stat().st_size == fsize
except Exception:
same_size = False
if same_size:
gl.add("SIMILARITY",
f"Dimensione identica ({fmt_size(fsize)}) → stesso file del promosso: {dup.name}")
new_analysis.append((dup, "delete", "dimensione identica al promosso", fsize, False))
else:
score = compute_similarity(best_candidate, dup)
gl.add("SIMILARITY",
f"Similarità vs promosso: {best_candidate.name} ↔ {dup.name}: "
f"score={score:.3f} (soglia: {sim_thresh:.2f})")
if score < sim_thresh:
res.skipped_similarity = True
res.dissimilar_groups.append((original_path, dup, score))
gl.add("SKIP_SIM",
f"[SKIP-SIMILARITÀ] score={score:.3f} — NON elimino: {dup}", C.MAGENTA)
new_analysis.append((dup, "skip", f"score={score:.3f}", fsize, False))
else:
new_analysis.append((dup, "delete", f"simile al promosso (score={score:.3f})", fsize, False))
dup_analysis = new_analysis
# ==================================================================
# FASE C: promozione (solo ora che sappiamo che il gruppo è salvabile)
# ==================================================================
if best_candidate:
live.set_state("PROMOZIONE", str(best_candidate), n_dups=len(dups))
if dry_run:
gl.add("DRY_PROMO",
f"[DRY-RUN] Promuoverei: {best_candidate} → {original_path}", C.YELLOW)
if original_exists:
gl.add("DRY_PROMO",
f"[DRY-RUN] Eliminerei il corrotto: {original_path}", C.YELLOW)
else:
try:
if original_exists:
# Il file corrotto non serve: lo eliminiamo direttamente
original_path.unlink()
gl.add("ELIMINATO",
f"Corrotto eliminato: {original_path}", C.GREEN)
shutil.copy2(str(best_candidate), str(original_path))
gl.add("PROMOSSO",
f"Promosso: {best_candidate} → {original_path}", C.GREEN)
except Exception as e:
res.errors += 1
gl.add("ERRORE", f"Promozione fallita: {e}", C.RED)
# ==================================================================
# FASE D: cancellazione — solo ora che il gruppo è garantito sicuro
# ==================================================================
live.set_state("CANCELLAZIONE", str(original_path), n_dups=len(dups))
for dup, action, reason, file_size, is_corrupt in dup_analysis:
if action not in ("delete",):
continue
dup_status = "CORROTTO" if is_corrupt else "valido"
if not dup.is_file():
gl.add("SKIP", f"Non trovato su disco (già rimosso?): {dup}")
continue
if dry_run:
gl.add("DRY_DEL",
f"[DRY-RUN] Eliminerei [{dup_status}] ({fmt_size(file_size)}): {dup}",
C.YELLOW)
res.deleted_files.append(dup)
res.deleted_bytes += file_size
else:
try:
dup.unlink()
res.deleted_files.append(dup)
res.deleted_bytes += file_size
gl.add("ELIMINATO",
f"[{dup_status}] ({fmt_size(file_size)}): {dup}", C.GREEN)
except Exception as e:
res.errors += 1
gl.add("ERRORE", f"Impossibile eliminare {dup}: {e}", C.RED)
# Aggiorna i contatori live direttamente nel worker thread,
# così il pannello si aggiorna immediatamente senza aspettare
# che il main thread raccolga il future.result().
with live._lock:
c = live.counts
if res.original_ok: c["originals_ok"] += 1
if res.original_corrupt: c["originals_corrupt"] += 1
if res.promoted: c["promoted"] += 1
if res.skipped_corrupt: c["skipped_corrupt"] += 1
if res.skipped_similarity: c["skipped_sim"] += 1
c["deleted"] += len(res.deleted_files)
c["deleted_bytes"] += res.deleted_bytes
c["errors"] += res.errors
c["corrupt_files"] += res.corrupt_files
c["resolved_corrupt"] += res.resolved_corrupt
c["dissimilar"] += res.dissimilar_groups
gl.flush(logger, live)
live.advance()
live.clear_state()
return res
# ---------------------------------------------------------------------------
# Stampa statistiche (usata sia dal main thread che dal timer thread)
# ---------------------------------------------------------------------------
def _print_stats(
stopped_early, start_dt, total,
count_originals_ok, count_originals_corrupt, count_promoted,
count_deleted, count_skipped_corrupt, count_skipped_sim,
count_errors, total_deleted_bytes,
all_corrupt_files, all_resolved_corrupt, all_dissimilar,
mode_str, workers, sim_thresh, max_time, logger, log_path,
):
end_dt = datetime.now()
elapsed_sec = (end_dt - start_dt).total_seconds()
gruppi_elaborati = count_originals_ok + count_originals_corrupt + count_skipped_corrupt
stats = [
f"[STATS ] {'=' * 56}",
f"[STATS ] {'STATISTICHE PARZIALI' if stopped_early else 'STATISTICHE FINALI'} — nc_dedup.py v{VERSION} ({VERSION_DATE})",
f"[STATS ] {'⚠INTERRUZIONE ANTICIPATA — dati parziali' if stopped_early else 'Elaborazione completata'}",
f"[STATS ] Inizio elaborazione : {start_dt.strftime('%Y-%m-%d %H:%M:%S')}",
f"[STATS ] Fine elaborazione : {end_dt.strftime('%Y-%m-%d %H:%M:%S')}",
f"[STATS ] Tempo impiegato : {fmt_duration(elapsed_sec)} ({elapsed_sec:.1f}s)",
f"[STATS ] {'-' * 56}",
f"[STATS ] Gruppi totali trovati : {total}",
f"[STATS ] Gruppi elaborati : {gruppi_elaborati}",
f"[STATS ] Originali OK : {count_originals_ok}",
f"[STATS ] Originali corrotti : {count_originals_corrupt}",
f"[STATS ] Duplicati promossi : {count_promoted}",
f"[STATS ] Gruppi saltati (corrotti) : {count_skipped_corrupt}",
f"[STATS ] Coppie saltate (diversi) : {count_skipped_sim}",
f"[STATS ] File eliminati : {count_deleted}",
f"[STATS ] Dimensione liberata : {fmt_size(total_deleted_bytes)}",
f"[STATS ] Gruppi senza dup. valido : {len(all_corrupt_files)}",
f"[STATS ] Errori : {count_errors}",
f"[STATS ] Modalità : {mode_str}",
f"[STATS ] Thread : {workers}",
f"[STATS ] Soglia similarità : {sim_thresh:.2f}",
f"[STATS ] Tempo max : {'illimitato' if max_time == 0 else fmt_duration(max_time)}",
]
if all_corrupt_files:
stats.append(f"[STATS ] {'-' * 56}")
stats.append(f"[STATS ] âš GRUPPI SENZA DUPLICATO VALIDO (intervento manuale necessario):")
for f in all_corrupt_files:
stats.append(f"[STATS ] ! {f}")
if all_dissimilar:
stats.append(f"[STATS ] {'-' * 56}")
stats.append(f"[STATS ] COPPIE SKIPPATE PER BASSA SIMILARITÀ:")
for orig, dup, score in all_dissimilar:
stats.append(f"[STATS ] score={score:.3f} orig={orig.name} dup={dup.name}")
stats.append(f"[STATS ] orig_path={orig}")
stats.append(f"[STATS ] dup_path ={dup}")
stats.append(f"[STATS ] {'=' * 56}")
logger.write_header(stats)
print()
print(colored(C.BOLD, "=" * 60))
title_str = " STATISTICHE PARZIALI âš " if stopped_early else " STATISTICHE FINALI"
print(colored(C.YELLOW if stopped_early else C.BOLD, title_str))
if stopped_early:
print(colored(C.YELLOW, f" Interruzione dopo {fmt_duration(max_time)} — elaborati {gruppi_elaborati}/{total} gruppi"))
print(colored(C.BOLD, "-" * 60))
print(f" Inizio elaborazione : {colored(C.CYAN, start_dt.strftime('%Y-%m-%d %H:%M:%S'))}")
print(f" Fine elaborazione : {colored(C.CYAN, end_dt.strftime('%Y-%m-%d %H:%M:%S'))}")
print(f" Tempo impiegato : {colored(C.CYAN, fmt_duration(elapsed_sec))} ({elapsed_sec:.1f}s)")
print(colored(C.BOLD, "-" * 60))
print(f" Gruppi totali trovati : {colored(C.BOLD, str(total))}")
print(f" Gruppi elaborati : {colored(C.BOLD, str(gruppi_elaborati))}")
print(f" Originali OK : {colored(C.GREEN, str(count_originals_ok))}")
print(f" Originali corrotti : {colored(C.RED, str(count_originals_corrupt))}")
print(f" Duplicati promossi : {colored(C.CYAN, str(count_promoted))}")
print(f" Gruppi saltati (corrotti): {colored(C.YELLOW, str(count_skipped_corrupt))}")
print(f" Coppie saltate (diversi) : {colored(C.MAGENTA, str(count_skipped_sim))}")
print(f" File eliminati : {colored(C.GREEN, str(count_deleted))}")
print(f" Dimensione liberata : {colored(C.GREEN, fmt_size(total_deleted_bytes))}")
print(f" Gruppi senza dup. valido : {colored(C.RED, str(len(all_corrupt_files)))}")
print(f" Errori : {colored(C.RED, str(count_errors))}")
if all_corrupt_files:
print(colored(C.BOLD, "-" * 60))
print(colored(C.RED, " âš GRUPPI SENZA DUPLICATO VALIDO (intervento manuale necessario):"))
for cf in all_corrupt_files:
print(f" {colored(C.RED, '!')} {colored(C.YELLOW, str(cf))}")
if all_dissimilar:
print(colored(C.BOLD, "-" * 60))
print(colored(C.MAGENTA, " COPPIE SKIPPATE PER BASSA SIMILARITÀ:"))
for orig, dup, score in all_dissimilar:
print(f" {colored(C.YELLOW, f'score={score:.3f}')} orig={orig.name} dup={dup.name}")
print(f" {colored(C.CYAN, str(dup))}")
print(colored(C.BOLD, "=" * 60))
print(f" Log salvato in: {colored(C.CYAN, str(log_path))}")
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
cpu_count = os.cpu_count() or 4
parser = argparse.ArgumentParser(
description=f"Nextcloud Duplicate Cleaner v{VERSION}"
)
parser.add_argument("target_dir")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--workers", type=int, default=cpu_count,
help=f"Thread paralleli (default: {cpu_count})")
parser.add_argument("--similarity", type=float, default=0.90,
help="Soglia similarità 0.0-1.0 (default: 0.90)")
parser.add_argument("--max-time", type=int, default=300,
help="Tempo massimo di esecuzione in secondi (default: 300 = 5 min, 0 = illimitato)")
args = parser.parse_args()
dry_run = args.dry_run
workers = max(1, args.workers)
sim_thresh = max(0.0, min(1.0, args.similarity))
max_time = args.max_time # 0 = illimitato
target_dir = Path(args.target_dir).expanduser().resolve()
if not target_dir.is_dir():
print(colored(C.RED, f"Errore: '{target_dir}' non esiste."))
sys.exit(1)
if not shutil.which("magick"):
print(colored(C.RED, "Errore: 'identify' non trovato. sudo pacman -S imagemagick"))
sys.exit(1)
if not shutil.which("magick"):
print(colored(C.YELLOW, "Attenzione: 'compare' non trovato — similarità immagini disabilitata."))
if not shutil.which("ffprobe"):
print(colored(C.YELLOW, "Attenzione: 'ffprobe' non trovato — verifica video disabilitata. sudo pacman -S ffmpeg"))
start_dt = datetime.now()
start_ts = start_dt.strftime("%Y%m%d_%H%M%S")
log_path = Path.cwd() / f"cleanup_dupes_{start_ts}.log"
logger = Logger(log_path)
mode_str = "DRY-RUN" if dry_run else "LIVE"
# Header log
logger.write_header([
f"[INFO ] {'=' * 56}",
f"[INFO ] nc_dedup.py v{VERSION} ({VERSION_DATE})",
f"[INFO ] Inizio : {start_dt.strftime('%Y-%m-%d %H:%M:%S')}",
f"[INFO ] Cartella : {target_dir}",
f"[INFO ] Modalità : {mode_str}",
f"[INFO ] Thread : {workers}",
f"[INFO ] Similarità : {sim_thresh:.2f}",
f"[INFO ] Tempo max : {'illimitato' if max_time == 0 else fmt_duration(max_time)}",
f"[INFO ] {'=' * 56}",
])
# Header console
print()
print(colored(C.BOLD, "=" * 60))
print(colored(C.BOLD, f" Nextcloud Duplicate Cleaner v{VERSION} ({VERSION_DATE})"))
print(f" Cartella : {colored(C.CYAN, str(target_dir))}")
if dry_run:
print(f" Modalità : {colored(C.YELLOW, 'DRY-RUN — nessun file verrà cancellato')}")
else:
print(f" Modalità : {colored(C.RED, 'LIVE — i duplicati verranno eliminati definitivamente')}")
print(f" Thread : {colored(C.CYAN, str(workers))}")
print(f" Similarità : {colored(C.CYAN, str(sim_thresh))} (sotto soglia → skip)")
max_time_str = "illimitato" if max_time == 0 else fmt_duration(max_time)
print(f" Tempo max : {colored(C.CYAN, max_time_str)}")
print(f" Log : {colored(C.CYAN, str(log_path))}")
print(colored(C.BOLD, "=" * 60))
print()
print("Scansione duplicati in corso...")
groups = collect_groups(target_dir)
total = len(groups)
print(f"Trovati {colored(C.BOLD, str(total))} gruppi di duplicati.\n")
logger.write_header([f"[INFO ] Trovati {total} gruppi di duplicati."])
if total == 0:
print(colored(C.GREEN, "Nessun duplicato trovato. Uscita."))
sys.exit(0)
live = LiveStatus(total, workers, logger, start_dt=start_dt, max_time=max_time)
count_originals_ok = 0
count_originals_corrupt = 0
count_promoted = 0
count_deleted = 0
count_skipped_corrupt = 0
count_skipped_sim = 0
count_errors = 0
total_deleted_bytes = 0
all_corrupt_files = []
all_resolved_corrupt = []
all_dissimilar = []
# ----------------------------------------------------------------
# stop_event : segnale condiviso tra timer thread e worker thread.
# Quando scade il timeout, il timer thread lo imposta;
# i worker lo controllano nei punti chiave e escono.
# done_event : il main thread lo imposta quando ha finito tutto,
# così il timer thread sa che può terminare senza fare nulla.
# ----------------------------------------------------------------
stop_event = threading.Event()
done_event = threading.Event()
stopped_early = False
# counts è ora dentro live.counts (LiveStatus)
# stats_printed_event: evita che sia il timer che il main stampino le stats
stats_printed_event = threading.Event()
def _timer_thread():
"""
Dorme per max_time secondi. Se scade:
1. Imposta stop_event → i worker escono al prossimo checkpoint
2. Stampa avviso a console
3. Aspetta done_event (max_time + 60s safety)
4. Imposta stats_printed_event per bloccare il main thread
5. Stampa le statistiche parziali
Se done_event scatta prima del timeout → esce senza fare nulla.
"""
nonlocal stopped_early
finished_naturally = done_event.wait(timeout=max_time)
if finished_naturally:
return # completato naturalmente, il main stampa le stats
# Timeout scaduto
stopped_early = True
stop_event.set()
msg = f"Timeout raggiunto ({fmt_duration(max_time)}) — attendo i checkpoint dei thread in corso..."
logger.write_header([f"[TIMEOUT ] {msg}"])
with logger._print_lock:
print(colored(C.YELLOW, f"\n âš {msg}"))
# Aspetta che il main thread segnali che i worker sono usciti
done_event.wait(timeout=max_time + 60)
# Prende il diritto di stampare le statistiche
stats_printed_event.set()
# Ferma il pannello live e stampa le statistiche
live.stop_rendering()
with live._lock:
_print_stats(
stopped_early=True,
start_dt=start_dt,
total=total,
count_originals_ok=live.counts["originals_ok"],
count_originals_corrupt=live.counts["originals_corrupt"],
count_promoted=live.counts["promoted"],
count_deleted=live.counts["deleted"],
count_skipped_corrupt=live.counts["skipped_corrupt"],
count_skipped_sim=live.counts["skipped_sim"],
count_errors=live.counts["errors"],
total_deleted_bytes=live.counts["deleted_bytes"],
all_corrupt_files=list(dict.fromkeys(live.counts["corrupt_files"])),
all_resolved_corrupt=list(dict.fromkeys(live.counts["resolved_corrupt"])),
all_dissimilar=live.counts["dissimilar"],
mode_str=mode_str,
workers=workers,
sim_thresh=sim_thresh,
max_time=max_time,
logger=logger,
log_path=log_path,
)
if max_time > 0:
t = threading.Thread(target=_timer_thread, daemon=True)
t.start()
print(f"Elaborazione con {workers} thread:\n")
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(
process_group,
original_path, dups, dry_run, sim_thresh, logger, live,
stop_event,
): original_path
for original_path, dups in groups.items()
}
for future in as_completed(futures):
if stop_event.is_set():
# Timer ha già fatto scattare lo stop: cancella i pending
for f in futures:
f.cancel()
# Raccoglie i risultati dei worker già completati prima del break
# (i loro counts sono già stati aggiornati nel worker thread)
try:
future.result(timeout=0)
except Exception:
pass
break
try:
res = future.result()
except Exception as e:
import traceback as _tb
err_msg = f"Eccezione nel thread: {e}"
tb_str = _tb.format_exc().strip()
logger.write_header([f"[ERROR ] {err_msg}", f"[TRACEBACK] {tb_str}"])
logger.print_console(C.RED, f"[ERRORE] {err_msg}", live)
logger.print_console(C.RED, tb_str, live)
with live._lock:
live.counts["errors"] += 1
continue
# Counts già aggiornati nel worker thread — niente da fare qui
# Segnala al timer thread che i worker sono usciti
done_event.set()
# Se il timeout è scaduto, aspetta che il timer thread finisca di stampare
# le statistiche (stats_printed_event) prima di procedere
if stopped_early:
t.join(timeout=30) # attendi il timer thread (max 30s)
live.stop_rendering()
return # statistiche già stampate dal timer thread
# Run completato naturalmente prima del timeout: stampiamo noi le stats
live.stop_rendering()
if not stats_printed_event.is_set():
_print_stats(
stopped_early=False,
start_dt=start_dt,
total=total,
count_originals_ok=live.counts["originals_ok"],
count_originals_corrupt=live.counts["originals_corrupt"],
count_promoted=live.counts["promoted"],
count_deleted=live.counts["deleted"],
count_skipped_corrupt=live.counts["skipped_corrupt"],
count_skipped_sim=live.counts["skipped_sim"],
count_errors=live.counts["errors"],
total_deleted_bytes=live.counts["deleted_bytes"],
all_corrupt_files=list(dict.fromkeys(live.counts["corrupt_files"])),
all_resolved_corrupt=list(dict.fromkeys(live.counts["resolved_corrupt"])),
all_dissimilar=live.counts["dissimilar"],
mode_str=mode_str,
workers=workers,
sim_thresh=sim_thresh,
max_time=max_time,
logger=logger,
log_path=log_path,
)
if __name__ == "__main__":
main()
Cheers
