import glob import hashlib import os import threading from typing import Any, Callable, Tuple import concurrent.futures from packaging.version import parse as parse_version import queue import time import networkx as nx import requests from PIL import Image import wdtagger as wdt from .I18N import _ from .TagsRepo import TagsRepo from .settings import Settings from .common import login, get_auth_token from .ProgressFile import ProgressFile from .tag_processing import ( TAG_FIXES, extract_parameters, parse_parameters, process_tag, extract_artist_from_filename, ) from .tagger_cache import TaggerCache class Core: """ Core functionality for Kapitanbooru Uploader. Handles image processing, tagging, and upload logic. """ def __init__(self, settings: Settings, gui_mode: bool = True): self.version = "0.9.3" self.acknowledged_version = parse_version(self.version) self.settings = settings self.tags_repo = TagsRepo(settings) # Dodatkowe ustawienia dla Taggera self.tagger_name = "wdtagger" self.tagger_version = ( "1.0" # możesz ustawić wersję dynamicznie, jeśli to możliwe ) self.tagger_cache = TaggerCache( self.settings, self.tagger_name, self.tagger_version ) self.gui_mode = gui_mode self.main_thread_queue = queue.Queue() self.after_events = [] self.implication_graph = self.load_implication_graph() self.missing_tags = set() # Track tags not in the graph self.check_uploaded_files_stop_event = threading.Event() self.check_uploaded_files_thread: threading.Thread | None = None self.process_tagger_queue_stop_event = threading.Event() self.process_tagger_queue_thread: threading.Thread | None = None self.check_uploaded_files_callback = None self.update_status_bar_callback = None self.process_tagger_for_image_callback = None self.upload_file_success_callback: Callable[[str], Any] | None = None self.upload_file_completed_callback: Callable | None = None self.folder_path = "" self.image_files = [] self.image_files_md5 = [] self.current_index = None self.image_cache = None self.tagger_thread_idx = 0 self.tagger = wdt.Tagger() # Liczniki statusu self.total_files = 0 self.tagger_processed = set() self.upload_verified = 0 self.uploaded_count = 0 # Mapa ratingów: wyświetlana nazwa -> wartość wysyłana self.rating_map = { "General": "g", "Sensitive": "s", "Questionable": "q", "Explicit": "e", "Unrated": "", } # Nowy słownik przechowujący informację, czy dany plik (ścieżka) został już uploadowany self.uploaded = {} # key: file path, value: True/False def schedule_in_main_thread(self, func, delay_ms=0): """Schedule a function to run in the main thread""" if self.gui_mode: # In GUI mode, use the after mechanism self.after_events.append((time.time() + delay_ms / 1000, func)) else: # In non-GUI mode, add to queue for immediate execution self.main_thread_queue.put(func) def process_main_thread_queue(self): """Process pending main thread tasks""" if self.gui_mode: # Process scheduled events now = time.time() new_events = [] for schedule_time, func in self.after_events: if now >= schedule_time: try: func() except Exception as e: print(f"Error in scheduled task: {e}") else: new_events.append((schedule_time, func)) self.after_events = new_events else: # Process all queued tasks in non-GUI mode while not self.main_thread_queue.empty(): try: func = self.main_thread_queue.get_nowait() func() except queue.Empty: break except Exception as e: print(f"Error in main thread task: {e}") def load_implication_graph(self) -> nx.DiGraph: G = nx.DiGraph() conn = self.tags_repo.get_conn() cursor = conn.cursor() # Step 1: Add all tags from the 'tags' table cursor.execute( """ SELECT CASE category WHEN 1 THEN 'artist:' || name WHEN 3 THEN 'copyright:' || name WHEN 4 THEN 'character:' || name WHEN 5 THEN 'meta:' || name ELSE name END AS prefixed_name FROM tags """ ) db_tags = {row[0] for row in cursor.fetchall()} G.add_nodes_from(db_tags) # Step 2: Add nodes from implications (antecedents/consequents not in 'tags' table) cursor.execute("SELECT antecedent, consequent FROM tag_closure") edge_tags = set() for ant, cons in cursor.fetchall(): edge_tags.add(ant) edge_tags.add(cons) G.add_nodes_from(edge_tags - db_tags) # Add tags only in implications # Step 3: Add edges cursor.execute("SELECT antecedent, consequent FROM tag_closure") G.add_edges_from(cursor.fetchall()) conn.close() return G def compute_final_tags_and_rating_for_file( self, file_path, update_status_callback, manual_tags=set() ) -> Tuple[str, str]: """ Oblicza finalną listę tagów dla danego pliku oraz rating. Łączy tagi z: - pliku (PNG): parsowane przez parse_parameters, - Taggera (wynik z cache lub wyliczony na bieżąco), - ustawień (default tags), - manualnych tagów (z pola manual_tags_entry), oraz dodaje tag "meta:auto_upload". Zwraca finalny ciąg tagów oraz rating. """ # Pobierz tagi z pliku try: img = Image.open(file_path) parameters = extract_parameters(img, file_path) artist_tag = extract_artist_from_filename(file_path) png_tags = set( [ x for x in parse_parameters(parameters, self.tags_repo).split() if process_tag(x, self.tags_repo)[1] is not None # Ignoruj nieistniejące tagi ] ) if artist_tag: png_tags.add("artist:" + artist_tag.replace(" ", "_").replace("\\", "")) img.close() except Exception as e: print(_("Błąd przy otwieraniu pliku"), file_path, ":", e) png_tags = set() # Pobierz tagi z Taggera – sprawdzając cache result = self.get_tagger_results(file_path, update_status_callback) tagger_tags = set() rating = "Unrated" tagger_tags.update( ( TAG_FIXES[tag] if tag in TAG_FIXES else tag for tag in result.general_tag_data.keys() ) ) # Zamień nieprawidłowe tagi na poprawne for t in result.character_tags: full_tag = "character:" + t.replace(" ", "_").replace("\\", "") # Zamień nieprawidłowe tagi na poprawne if full_tag in TAG_FIXES: full_tag = TAG_FIXES[full_tag] tagger_tags.add(full_tag) rating = self.map_tagger_rating(result) # Pobierz tagi z ustawień i manualne default_tags = set(self.settings.default_tags.split()) # Finalna lista: suma wszystkich tagów final_tags = default_tags.union(png_tags).union(tagger_tags).union(manual_tags) final_tags.add("meta:auto_upload") return " ".join(sorted(final_tags)), rating def get_tagger_results(self, file_path, callback: Callable|None = None) -> wdt.Result: md5 = self.image_files_md5[file_path] cached = self.tagger_cache[md5] if cached: self.tagger_processed.add(md5) return cached["result"] try: with Image.open(file_path) as img: result = self.tagger.tag(img) self.tagger_cache[md5] = result self.tagger_processed.add(md5) if callback: callback() print(_("Tagger przetworzył:"), f"{file_path}") return result except Exception as e: print(_("Błąd Taggera dla"), file_path, ":", e) def map_tagger_rating(self, result: wdt.Result) -> str: """ Mapuje rating z Taggera na wartość używaną w Kapitanbooru. """ if result.rating == "general": new_rating = "General" elif result.rating == "sensitive": new_rating = "Sensitive" elif result.rating == "questionable": new_rating = "Questionable" elif result.rating == "explicit": new_rating = "Explicit" else: new_rating = "Unrated" return new_rating def load_images(self): """ Ładuje pliki PNG, JPEG, WebP, AVIF i GIF z wybranego folderu. """ extensions = ("*.png", "*.jpg", "*.jpeg", "*.webp", "*.avif", "*.gif") self.image_files = sorted( file for ext in extensions for file in glob.glob(os.path.join(self.folder_path, ext), recursive=True) ) self.total_files = len(self.image_files) self.image_files_md5 = { file: md5 for file, md5 in zip( self.image_files, self.compute_md5_parallel(self.image_files) ) } self.tagger_processed.clear() for md5 in self.image_files_md5.values(): if self.tagger_cache[md5]: self.tagger_processed.add(md5) self.uploaded.clear() self.upload_verified = 0 self.uploaded_count = 0 for file in self.image_files: self.uploaded[file] = False if self.image_files: self.post_load_processing() def compute_md5(self, file_path, chunk_size=8192): """Compute MD5 for a single file.""" hash_md5 = hashlib.md5() try: with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(chunk_size), b""): hash_md5.update(chunk) except Exception as e: print(_("Error computing MD5:"), e) return "" return hash_md5.hexdigest() def compute_md5_parallel(self, file_paths): """Compute MD5 for multiple files in parallel.""" with concurrent.futures.ThreadPoolExecutor() as executor: return list(executor.map(self.compute_md5, file_paths)) def post_load_processing(self): """ Po załadowaniu plików, sprawdza czy są jakieś pliki do uploadu oraz przetwarza Taggerem pliki. """ self.join_check_uploaded_files_thread() self.check_uploaded_files_thread = threading.Thread( target=self.check_uploaded_files ) self.check_uploaded_files_thread.start() self.join_process_tagger_queue_thread() self.process_tagger_queue_thread = threading.Thread( target=self.process_tagger_queue ) self.process_tagger_queue_thread.start() def wait_for_completion(self): """Wait for background threads to finish (non-GUI mode)""" # Join the checking thread if running if ( self.check_uploaded_files_thread and self.check_uploaded_files_thread.is_alive() ): self.check_uploaded_files_stop_event.set() self.check_uploaded_files_thread.join() # Join the tagger processing thread if running if ( self.process_tagger_queue_thread and self.process_tagger_queue_thread.is_alive() ): self.process_tagger_queue_stop_event.set() self.process_tagger_queue_thread.join() # Process any remaining main thread tasks self.process_main_thread_queue() def join_check_uploaded_files_thread(self): if self.check_uploaded_files_thread is not None: self.check_uploaded_files_stop_event.set() self.check_uploaded_files_thread.join() self.check_uploaded_files_thread = None self.check_uploaded_files_stop_event = threading.Event() def join_process_tagger_queue_thread(self): if self.process_tagger_queue_thread is not None: self.process_tagger_queue_stop_event.set() self.process_tagger_queue_thread.join() self.process_tagger_queue_thread = None self.process_tagger_queue_stop_event = threading.Event() def process_tagger_for_image(self, file_path): """Przetwarza obrazek przy użyciu Taggera i zapisuje wynik do cache.""" result = self.get_tagger_results(file_path, self.update_status_bar_callback) if self.process_tagger_for_image_callback: self.process_tagger_for_image_callback(file_path, result.rating) def process_tagger_queue(self): """Przetwarza wszystkie obrazki w tle (pomijając aktualnie wybrany).""" for file_path in self.image_files: if self.process_tagger_queue_stop_event.is_set(): break # Jeśli obrazek jest aktualnie wybrany, pomijamy – on będzie przetwarzany w foreground if ( self.current_index is not None and file_path == self.image_files[self.current_index] ): continue self.process_tagger_for_image(file_path) self.schedule_in_main_thread(self.join_process_tagger_queue_thread, 100) def check_uploaded_files(self): """ Dla każdego obrazu oblicza MD5, grupuje je w paczki (do 100 skrótów), wysyła zapytanie do endpointa 'posts.json' dla każdej paczki, a następnie na podstawie odpowiedzi ustawia w self.uploaded post id dla uploadowanych plików. """ file_md5_list = [ (idx, file, self.image_files_md5[file]) for idx, file in enumerate(self.image_files) ] batch_size = 100 for i in range(0, len(file_md5_list), batch_size): if self.check_uploaded_files_stop_event.is_set(): break batch = file_md5_list[i : i + batch_size] batch_md5 = [item[2] for item in batch] md5_param = ",".join(batch_md5) url = self.settings.base_url.rstrip("/") + "/posts.json" try: response = requests.get(url, params={"md5": md5_param}) root = response.json() found = {} for elem in root: if self.check_uploaded_files_stop_event.is_set(): break post_md5 = elem.get("md5", "").lower() post_id = elem.get("id") if post_md5 and post_id: found[post_md5] = post_id for idx, file_path, md5 in batch: if self.check_uploaded_files_stop_event.is_set(): break self.upload_verified += 1 # Każdy plik w batchu jest zweryfikowany if md5.lower() in found: self.uploaded[file_path] = found[md5.lower()] self.uploaded_count += 1 if self.check_uploaded_files_callback: self.check_uploaded_files_callback(idx) else: self.uploaded[file_path] = False if self.update_status_bar_callback: self.update_status_bar_callback() except Exception as e: print(_("Błąd podczas sprawdzania paczki uploadu:"), e) self.schedule_in_main_thread(self.join_check_uploaded_files_thread, 100) def autotag_files(self, file_paths): """ Autotaguje pliki przy użyciu Taggera i wysyła je na serwer. """ for file_path in file_paths: if not os.path.isfile(file_path): print(_("Plik nie istnieje:"), file_path) continue try: tags, rating = self.compute_final_tags_and_rating_for_file( file_path, lambda: None ) print(_("Tagi dla pliku"), file_path, ":", tags, "Rating:", rating) self.upload_file( file_path, final_tags=tags, final_rating=rating, progress_queue=None, cancel_event=None, ) except Exception as e: print(_("Błąd podczas autotagowania pliku"), file_path, ":", e) def autotag_dir(self, dir): """ Autotaguje wszystkie pliki w katalogu przy użyciu Taggera i wysyła je na serwer. """ if not os.path.isdir(dir): print(_("Podana ścieżka nie jest katalogiem:"), dir) return self.folder_path = dir self.load_images() self.check_uploaded_files_thread.join() self.process_tagger_queue_thread.join() files_to_upload = [x for x in self.image_files if not self.uploaded[x]] if not files_to_upload: print(_("Brak obrazów do przetworzenia w katalogu:"), dir) return self.autotag_files(files_to_upload) def upload_file( self, file_path, final_tags=None, final_rating=None, progress_queue: queue.Queue | None = None, cancel_event: threading.Event | None = None, info_callback: Callable[[str], Any] | None = None, warning_callback: Callable[[str], Any] | None = None, error_callback: Callable[[str], Any] | None = None, ): base_file_name = os.path.basename(file_path) if progress_queue: progress_queue.put(("mode", "determinate")) progress_queue.put(("max", 100)) progress_queue.put( ( "label", _("Wysyłam plik {base_file_name}...").format( base_file_name=base_file_name ), ) ) url = self.settings.base_url.rstrip("/") + "/api/danbooru/add_post" tags = final_tags fields = { "login": self.settings.username, "password": self.settings.password, "tags": tags, "source": "", } rating_value = self.rating_map.get(final_rating, "") if rating_value: fields["rating"] = rating_value try: total_size = os.path.getsize(file_path) def progress_callback(bytes_read, total_size): if progress_queue: percentage = int(bytes_read / total_size * 100) progress_queue.put(("progress", percentage)) with open(file_path, "rb") as f: wrapped_file = ProgressFile( f, progress_callback, total_size, cancel_event ) files = {"file": (base_file_name, wrapped_file, "image/png")} response = requests.post(url, data=fields, files=files) if progress_queue: progress_queue.put(("progress", 100)) show_warn = False post_url = None if response.status_code in (200, 201): message = _("Wysyłanie zakończone powodzeniem!") post_url = response.headers.get("X-Danbooru-Location", None) elif response.status_code == 409: message = _( "Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}" ).format( status_code=response.status_code, text=response.headers.get("X-Danbooru-Errors", ""), ) post_url = response.headers.get("X-Danbooru-Location", None) show_warn = True else: message = _( "Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}" ).format(status_code=response.status_code, text=response.text) show_warn = True # Aktualizacja wyglądu listy – musimy użyć domyślnych argumentów w lambdzie, aby zachować bieżący indeks if show_warn: if not final_tags: if warning_callback: warning_callback(message) else: print("[WARN]", _("Wysyłanie"), message) else: if not final_tags: if info_callback: info_callback(message) else: print("[INFO]", _("Wysyłanie"), message) if self.upload_file_success_callback: self.upload_file_success_callback(file_path) if post_url: post_id = post_url.split("/")[-1] self.uploaded[file_path] = post_id self.uploaded_count += 1 self.after(0, self.update_status_bar) except Exception as e: if error_callback: error_callback(str(e)) else: print("[ERROR]", _("Błąd wysyłania pliku"), file_path, ":", e) finally: if self.upload_file_completed_callback: self.upload_file_completed_callback() def upload_all_files( self, progress_queue: queue.Queue = None, cancel_event: threading.Event = None, secondary_progress_queue: queue.Queue = None, update_status_callback=None, manual_tags: set = set(), info_callback: Callable[[str], Any] | None = None, warning_callback: Callable[[str], Any] | None = None, error_callback: Callable[[str], Any] | None = None, ): files_to_upload = [x for x in self.image_files if not self.uploaded[x]] files_count = len(files_to_upload) if progress_queue: progress_queue.put(("mode", "determinate")) progress_queue.put(("max", 100)) file_idx = 0 for file_path in files_to_upload: if progress_queue: progress_queue.put(("progress", file_idx * 100 / files_count)) progress_queue.put( ("label", f"Wysyłam plik {file_idx+1}/{files_count}...") ) if cancel_event is not None and cancel_event.is_set(): if progress_queue: progress_queue.put(("label", _("Anulowano operację!"))) return if not self.uploaded.get(file_path, False): final_tags, final_rating = self.compute_final_tags_and_rating_for_file( file_path, update_status_callback, # lambda: self.after(0, self.update_status_bar) manual_tags, # set(self.manual_tags_manager.manual_tags) ) print( _( "Wysyłanie {file_path} z tagami: {final_tags} i ratingiem: {final_rating}" ).format( file_path=file_path, final_tags=final_tags, final_rating=final_rating, ) ) self.upload_file( file_path, final_tags=final_tags, final_rating=final_rating, progress_queue=secondary_progress_queue, cancel_event=cancel_event, info_callback=info_callback, warning_callback=warning_callback, error_callback=error_callback, ) file_idx += 1 if progress_queue: progress_queue.put(("label", _("Przesłano pliki!"))) progress_queue.put(("progress", 100)) def edit_file( self, file_path, final_tags=None, final_rating=None, progress_queue=None, cancel_event=None, info_callback: Callable[[str], Any] | None = None, error_callback: Callable[[str], Any] | None = None, ): """ Update tags and rating for an existing post without uploading the file. """ base_file_name = os.path.basename(file_path) post_id = self.uploaded.get(file_path) if not post_id: if error_callback: error_callback(_("Post nie został znaleziony dla tego pliku")) else: print( "[ERROR]", _("Post nie został znaleziony dla tego pliku"), file_path, ) return if progress_queue: progress_queue.put(("mode", "determinate")) progress_queue.put(("max", 100)) progress_queue.put( ( "label", _("Aktualizuję tagi dla {base_file_name}...").format( base_file_name=base_file_name ), ) ) try: # Check for cancellation before starting the operation. if cancel_event is not None and cancel_event.is_set(): if progress_queue: progress_queue.put(("label", _("Operacja anulowana"))) return # Get authentication session and token session = login(self.settings) auth_token = get_auth_token(session, self.settings) # Check cancellation after login if needed. if cancel_event is not None and cancel_event.is_set(): if progress_queue: progress_queue.put(("label", _("Operacja anulowana"))) return # Prepare tags and rating tags = final_tags rating_value = self.rating_map.get(final_rating, "?") # Prepare API request url = self.settings.base_url.rstrip("/") + "/post/set" payload = { "auth_token": auth_token, "image_id": post_id, "title": base_file_name, "owner": self.settings.username, "tags": tags, "source": "", "rating": rating_value, } if progress_queue: progress_queue.put(("progress", 50)) # Check for cancellation before sending the update request. if cancel_event is not None and cancel_event.is_set(): if progress_queue: progress_queue.put(("label", _("Operacja anulowana"))) return # Send update request response = session.post(url, data=payload, allow_redirects=False) # Handle 302 redirect as success case if response.status_code == 302: if progress_queue: progress_queue.put(("progress", 100)) message = _("Tagi zostały zaktualizowane!") if not final_tags: # Only show success if not bulk operation if info_callback: info_callback(message) else: print("[INFO]", _("Sukces edycji"), message) # Update UI state if self.upload_file_completed_callback: self.upload_file_completed_callback() return # Handle other status codes error_msg = _("Błąd podczas aktualizacji tagów\nStatus: {code}").format( code=response.status_code ) if response.text: error_msg += f"\n{_('Treść:')} {response.text}" if error_callback: error_callback(error_msg) else: print("[ERROR]", _("Błąd edycji"), error_msg) except Exception as e: if error_callback: error_callback(str(e)) else: print("[ERROR]", _("Krytyczny błąd edycji"), file_path, ":", e) finally: if progress_queue: progress_queue.put(("progress", 100))