from .common import open_tag_wiki_url, open_webbrowser from .ProgressFile import ProgressFile from .autocomplete import TagManager from .settings import Settings from .tag_processing import TAG_FIXES, parse_parameters, process_tag from .tagger_cache import TaggerCache from .TagsRepo import TagsRepo import networkx as nx import requests import wdtagger as wdt from PIL import Image, ImageTk, PngImagePlugin import glob import hashlib import os import threading import tkinter as tk from tkinter import filedialog, messagebox, ttk from typing import Tuple class ProcessingDialog: def __init__(self, root, target_function, *args): self.root = root self.top = tk.Toplevel(root) # Create a top-level window self.top.title("Processing...") self.top.geometry("300x150") self.top.protocol("WM_DELETE_WINDOW", self.on_close) # Handle close event # Create Label and Progress Bar (or rotating animation) self.label = tk.Label(self.top, text="Processing, please wait...") self.label.pack(pady=10) self.progress = ttk.Progressbar(self.top, mode="indeterminate") self.progress.pack(pady=10, fill="x") self.progress.start(10) # Start animation # Create a thread for the target function self.running = True self.thread = threading.Thread( target=self.run_task, args=(target_function, *args) ) self.thread.start() def run_task(self, target_function, *args): try: target_function(*args) # Run the function finally: self.close_dialog() # Close when done def close_dialog(self): """Safely close the dialog.""" if self.running: self.running = False self.top.after(0, self.top.destroy) # Ensure UI updates on the main thread def on_close(self): """User manually closed the dialog, terminate thread.""" self.running = False self.top.destroy() class ImageBrowser(tk.Tk): def __init__(self): super().__init__() self.title("Kapitanbooru Uploader") self.geometry("900x600") self.settings = Settings() self.tags_repo = TagsRepo(self.settings) self.implication_graph = self.load_implication_graph() self.missing_tags = set() # Track tags not in the graph # Dodatkowe ustawienia dla Taggera self.tagger_name = "wdtagger" self.tagger_version = ( "1.0" # możesz ustawić wersję dynamicznie, jeśli to możliwe ) self.tagger_cache = TaggerCache( self.settings, self.tagger_name, self.tagger_version ) self.folder_path = "" self.image_files = [] self.image_files_md5 = [] self.current_index = None self.image_cache = None # Liczniki statusu self.total_files = 0 self.tagger_processed = set() self.upload_verified = 0 self.uploaded_count = 0 # Oryginalny obraz (do skalowania) self.current_image_original = None self.current_parameters = "" # Mapa ratingów: wyświetlana nazwa -> wartość wysyłana self.rating_map = { "General": "g", "Sensitive": "s", "Questionable": "q", "Explicit": "e", "Unrated": "", } # Słowniki przechowujące stany tagów (dla PNG i Taggera) self.png_tags_states = {} self.tagger_tags_states = {} # Ścieżki do ustawień i cache # Ładujemy ustawienia # Nowy słownik przechowujący informację, czy dany plik (ścieżka) został już uploadowany self.uploaded = {} # key: file path, value: True/False self.create_menu() self.create_widgets() self.bind_events() def load_implication_graph(self) -> nx.DiGraph: G = nx.DiGraph() conn = self.tags_repo.get_conn() cursor = conn.cursor() # Step 1: Add all tags from the 'tags' table cursor.execute( """ SELECT CASE category WHEN 1 THEN 'artist:' || name WHEN 3 THEN 'copyright:' || name WHEN 4 THEN 'character:' || name WHEN 5 THEN 'meta:' || name ELSE name END AS prefixed_name FROM tags """ ) db_tags = {row[0] for row in cursor.fetchall()} G.add_nodes_from(db_tags) # Step 2: Add nodes from implications (antecedents/consequents not in 'tags' table) cursor.execute("SELECT antecedent, consequent FROM tag_closure") edge_tags = set() for ant, cons in cursor.fetchall(): edge_tags.add(ant) edge_tags.add(cons) G.add_nodes_from(edge_tags - db_tags) # Add tags only in implications # Step 3: Add edges cursor.execute("SELECT antecedent, consequent FROM tag_closure") G.add_edges_from(cursor.fetchall()) conn.close() return G def adjust_text_widget_height(self, widget): """ Ustawia wysokość widgetu Text na liczbę linii w jego treści, ale nie więcej niż max_lines. """ content = widget.get("1.0", "end-1c") num_lines = 0 cur_line_len = 0 text = content.split() for word in text: if cur_line_len + len(word) > 34: num_lines += 1 cur_line_len = 0 cur_line_len += len(word) + 1 max_lines = ( self.png_tags_text.winfo_height() + self.tagger_tags_text.winfo_height() + self.final_tags_text.winfo_height() - widget.winfo_height() - 8 ) widget.config(height=min(num_lines, max_lines) if num_lines > 4 else 4) def compute_final_tags_and_rating_for_file(self, file_path): """ Oblicza finalną listę tagów dla danego pliku oraz rating. Łączy tagi z: - pliku (PNG): parsowane przez parse_parameters, - Taggera (wynik z cache lub wyliczony na bieżąco), - ustawień (default tags), - manualnych tagów (z pola manual_tags_entry), oraz dodaje tag "meta:auto_upload". Zwraca finalny ciąg tagów oraz rating. """ # Pobierz tagi z pliku try: img = Image.open(file_path) parameters = "" if isinstance(img, PngImagePlugin.PngImageFile): parameters = img.info.get("parameters", "") png_tags = set(parse_parameters(parameters, self.tags_repo).split()) img.close() except Exception as e: print("Błąd przy otwieraniu pliku", file_path, ":", e) png_tags = set() # Pobierz tagi z Taggera – sprawdzając cache result = self.get_tagger_results(file_path) tagger_tags = set() rating = "Unrated" tagger_tags.update( ( TAG_FIXES[tag] if tag in TAG_FIXES else tag for tag in result.general_tag_data.keys() ) ) # Zamień nieprawidłowe tagi na poprawne for t in result.character_tags: full_tag = "character:" + t.replace(" ", "_").replace("\\", "") # Zamień nieprawidłowe tagi na poprawne if full_tag in TAG_FIXES: full_tag = TAG_FIXES[full_tag] tagger_tags.add(full_tag) rating = self.map_tagger_rating(result) # Pobierz tagi z ustawień i manualne default_tags = set(self.settings.default_tags.split()) manual_tags = set(self.manual_tags_manager.manual_tags) # Finalna lista: suma wszystkich tagów final_tags = default_tags.union(png_tags).union(tagger_tags).union(manual_tags) final_tags.add("meta:auto_upload") return " ".join(sorted(final_tags)), rating def get_tagger_results(self, file_path) -> wdt.Result: md5 = self.image_files_md5[file_path] cached = self.tagger_cache[md5] if cached: self.tagger_processed.add(md5) return cached["result"] try: tagger = wdt.Tagger() with Image.open(file_path) as img: result = tagger.tag(img) self.tagger_cache[md5] = result self.tagger_processed.add(md5) self.after(0, self.update_status_bar) print(f"Tagger przetworzył: {file_path}") return result except Exception as e: print("Błąd Taggera dla", file_path, ":", e) def map_tagger_rating(self, result: wdt.Result) -> str: """ Mapuje rating z Taggera na wartość używaną w Kapitanbooru. """ if result.rating == "general": new_rating = "General" elif result.rating == "sensitive": new_rating = "Sensitive" elif result.rating == "questionable": new_rating = "Questionable" elif result.rating == "explicit": new_rating = "Explicit" else: new_rating = "Unrated" return new_rating def create_menu(self): menubar = tk.Menu(self) self.config(menu=menubar) menubar.add_command(label="Ustawienia", command=self.open_settings) menubar.add_command(label="Wyczyść cache Taggera", command=self.clear_cache) menubar.add_command(label="Wrzuć wszystko", command=self.upload_all_files) menubar.add_command( label="Zregeneruj bazę tagów", command=self.regenerate_tags_db ) def regenerate_tags_db(self): self.processing_dialog = ProcessingDialog(self, self.tags_repo.regenerate_db) def clear_cache(self): res, err = self.tagger_cache.clear_cache() if res: messagebox.showinfo("Cache", "Cache Taggera zostało wyczyszczone.") else: messagebox.showerror("Cache", f"Błąd przy czyszczeniu cache: {err}") def open_settings(self): settings_window = tk.Toplevel(self) settings_window.title("Ustawienia") settings_window.geometry("300x350") settings_window.grab_set() lbl_login = tk.Label(settings_window, text="Login:") lbl_login.pack(pady=(10, 0)) entry_login = tk.Entry(settings_window) entry_login.pack(pady=(0, 10), padx=10, fill="x") entry_login.insert(0, self.settings.username) lbl_password = tk.Label(settings_window, text="Hasło:") lbl_password.pack(pady=(10, 0)) entry_password = tk.Entry(settings_window, show="*") entry_password.pack(pady=(0, 10), padx=10, fill="x") entry_password.insert(0, self.settings.password) lbl_base_url = tk.Label(settings_window, text="Base URL:") lbl_base_url.pack(pady=(10, 0)) entry_base_url = tk.Entry(settings_window) entry_base_url.pack(pady=(0, 10), padx=10, fill="x") entry_base_url.insert(0, self.settings.base_url) lbl_default_tags = tk.Label(settings_window, text="Default Tags:") lbl_default_tags.pack(pady=(10, 0)) entry_default_tags = tk.Entry(settings_window) entry_default_tags.pack(pady=(0, 10), padx=10, fill="x") entry_default_tags.insert(0, self.settings.default_tags) lbl_browser = tk.Label(settings_window, text="Browser:") lbl_browser.pack(pady=(10, 0)) cb_browser = ttk.Combobox( settings_window, values=list(self.settings.installed_browsers.keys()), state="readonly", ) cb_browser.pack(pady=(0, 10), padx=10, fill="x") cb_browser.set( self.settings.installed_browsers_reverse.get( self.settings.browser, "Default" ) ) def save_and_close(): self.settings.username = entry_login.get() self.settings.password = entry_password.get() self.settings.base_url = entry_base_url.get() self.settings.default_tags = entry_default_tags.get() self.settings.browser = self.settings.installed_browsers[cb_browser.get()] self.settings.save_settings() settings_window.destroy() btn_save = tk.Button(settings_window, text="Zapisz", command=save_and_close) btn_save.pack(pady=10) def create_widgets(self): # Górna ramka top_frame = tk.Frame(self) top_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5) btn_select_folder = tk.Button( top_frame, text="Wybierz folder", command=self.select_folder ) btn_select_folder.pack(side=tk.LEFT, padx=5) btn_prev = tk.Button(top_frame, text="<<", command=self.show_prev) btn_prev.pack(side=tk.LEFT, padx=5) btn_next = tk.Button(top_frame, text=">>", command=self.show_next) btn_next.pack(side=tk.LEFT, padx=5) # Główna ramka – trzy kolumny main_frame = tk.Frame(self) main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Lewa kolumna – lista plików left_frame = tk.Frame(main_frame) left_frame.grid(row=0, column=0, sticky="ns") self.listbox = tk.Listbox(left_frame, width=30) self.listbox.pack(side=tk.LEFT, fill=tk.Y) self.listbox.bind("<>", self.on_listbox_select) scrollbar = tk.Scrollbar( left_frame, orient="vertical", command=self.listbox.yview ) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.listbox.config(yscrollcommand=scrollbar.set) # Środkowa kolumna – podgląd obrazu center_frame = tk.Frame(main_frame) center_frame.grid(row=0, column=1, sticky="nsew", padx=10) main_frame.grid_columnconfigure(1, weight=1) main_frame.grid_rowconfigure(0, weight=1) main_frame.grid_rowconfigure(1, weight=0) self.image_label = tk.Label(center_frame) self.image_label.pack(fill=tk.BOTH, expand=True) self.image_label.bind("", self.on_image_label_resize) # Prawa kolumna – panel tagów i uploadu (ograniczona szerokość) right_frame = tk.Frame(main_frame, width=300) right_frame.grid(row=0, column=2, sticky="nsew", padx=5) right_frame.grid_propagate(False) right_frame.grid_columnconfigure(0, weight=1) # Ustal wiersze: right_frame.grid_rowconfigure(0, weight=0) # PNG Tags – naturalny rozmiar right_frame.grid_rowconfigure(1, weight=0) # Tagger Tags – naturalny rozmiar right_frame.grid_rowconfigure(2, weight=0) # Manual Tags (jednolinijkowe) right_frame.grid_rowconfigure(3, weight=1) # Final Tags – rozszerza się right_frame.grid_rowconfigure(4, weight=0) # Upload Panel # PNG Tags – widget Text z scrollbar png_frame = tk.LabelFrame(right_frame, text="PNG Tags") png_frame.grid(row=0, column=0, sticky="ew", padx=5, pady=5) png_frame.grid_columnconfigure(0, weight=1) self.png_tags_text = tk.Text(png_frame, wrap="word") self.png_tags_text.grid(row=0, column=0, sticky="ew") scrollbar_png = tk.Scrollbar(png_frame, command=self.png_tags_text.yview) scrollbar_png.grid(row=0, column=1, sticky="ns") self.png_tags_text.config( yscrollcommand=scrollbar_png.set, state="disabled", height=4 ) # Tagger Tags – widget Text z scrollbar tagger_frame = tk.LabelFrame(right_frame, text="Tagger Tags") tagger_frame.grid(row=1, column=0, sticky="ew", padx=5, pady=5) tagger_frame.grid_columnconfigure(0, weight=1) self.tagger_tags_text = tk.Text(tagger_frame, wrap="word") self.tagger_tags_text.grid(row=0, column=0, sticky="ew") scrollbar_tagger = tk.Scrollbar( tagger_frame, command=self.tagger_tags_text.yview ) scrollbar_tagger.grid(row=0, column=1, sticky="ns") self.tagger_tags_text.config( yscrollcommand=scrollbar_tagger.set, state="disabled", height=4 ) # Manual Tags – Entry (stała wysokość) manual_frame = tk.LabelFrame(right_frame, text="Manual Tags") manual_frame.grid(row=2, column=0, sticky="nsew", padx=5, pady=5) self.manual_tags_manager = TagManager( manual_frame, self.settings, self.tags_repo ) self.manual_tags_manager.pack(fill=tk.BOTH, expand=True) self.manual_tags_manager.bind( "", lambda e: self.update_final_tags(), add="+" ) self.manual_tags_manager.bind( "", lambda e: self.update_final_tags(), add="+" ) # Final Tags – widget Text z scrollbar, który rozszerza się final_frame = tk.LabelFrame(right_frame, text="Final Tags") final_frame.grid(row=3, column=0, sticky="nsew", padx=5, pady=5) final_frame.grid_rowconfigure(0, weight=1) final_frame.grid_columnconfigure(0, weight=1) self.final_tags_text = tk.Text(final_frame, state="disabled", wrap="word") self.final_tags_text.grid(row=0, column=0, sticky="nsew") scrollbar_final = tk.Scrollbar(final_frame, command=self.final_tags_text.yview) scrollbar_final.grid(row=0, column=1, sticky="ns") self.final_tags_text.config(yscrollcommand=scrollbar_final.set) # Panel uploadu i rating – nie zmienia rozmiaru pionowo upload_frame = tk.Frame(right_frame) upload_frame.grid(row=4, column=0, sticky="ew", padx=5, pady=5) self.rating_var = tk.StringVar(value="Unrated") rating_options = ["General", "Sensitive", "Questionable", "Explicit", "Unrated"] self.rating_dropdown = tk.OptionMenu( upload_frame, self.rating_var, *rating_options ) self.rating_dropdown.pack(side=tk.LEFT, padx=5) self.upload_button = tk.Button( upload_frame, text="Upload", command=self.upload_current_image ) self.upload_button.pack(side=tk.LEFT, padx=5) self.progress_var = tk.IntVar(value=0) self.progress_bar = ttk.Progressbar( upload_frame, orient="horizontal", mode="determinate", variable=self.progress_var, maximum=100, ) self.progress_bar.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5) # Na końcu okna głównego dodaj status bar: self.status_label = tk.Label( main_frame, text="", bd=1, relief=tk.SUNKEN, anchor=tk.W ) self.status_label.grid(row=1, column=0, columnspan=3, sticky="ew") def update_status_bar(self): status_text = ( f"Przetworzono tagi: {len(self.tagger_processed)}/{self.total_files} plików | " f"Zweryfikowano status uploadu: {self.upload_verified}/{self.total_files} plików | " f"Zuploadowano: {self.uploaded_count}/{self.upload_verified} plików" ) self.status_label.config(text=status_text) def on_arrow_key(self, event): """ Obsługuje klawisze strzałek w lewo i w prawo. """ # Jeśli fokus jest na Manual Tags, nie przełączamy plików. if self.focus_get() == self.manual_tags_manager: return if event.keysym == "Left": self.show_prev() elif event.keysym == "Right": self.show_next() def bind_events(self): """ Przypisuje zdarzenia do klawiszy strzałek w lewo i w prawo. """ self.bind("", self.on_arrow_key) self.bind("", self.on_arrow_key) def select_folder(self): """ Otwiera okno dialogowe wyboru folderu z obrazkami i wczytuje pliki PNG z wybranego folderu. """ folder = filedialog.askdirectory(title="Wybierz folder z obrazkami PNG") if folder: self.folder_path = folder self.load_images() def load_images(self): """ Ładuje pliki PNG z wybranego folderu. """ pattern = os.path.join(self.folder_path, "*.png") self.image_files = sorted(glob.glob(pattern)) self.total_files = len(self.image_files) self.image_files_md5 = { file: self.compute_md5(file) for file in self.image_files } self.tagger_processed.clear() for md5 in self.image_files_md5.values(): if self.tagger_cache[md5]: self.tagger_processed.add(md5) self.listbox.delete(0, tk.END) self.uploaded.clear() for file in self.image_files: self.listbox.insert(tk.END, os.path.basename(file)) self.uploaded[file] = False if self.image_files: self.current_index = 0 self.listbox.select_set(0) self.show_image(0) self.post_load_processing() else: messagebox.showinfo("Informacja", "Brak plików PNG w wybranym folderze.") def post_load_processing(self): """ Po załadowaniu plików, sprawdza czy są jakieś pliki do uploadu oraz przetwarza Taggerem pliki. """ threading.Thread(target=self.check_uploaded_files, daemon=True).start() threading.Thread(target=self.process_tagger_queue, daemon=True).start() def check_uploaded_files(self): """ Dla każdego obrazu oblicza MD5, grupuje je w paczki (do 100 skrótów), wysyła zapytanie do endpointa 'posts.json' dla każdej paczki, a następnie na podstawie odpowiedzi ustawia w self.uploaded post id dla uploadowanych plików. """ file_md5_list = [ (idx, file, self.image_files_md5[file]) for idx, file in enumerate(self.image_files) ] batch_size = 100 for i in range(0, len(file_md5_list), batch_size): batch = file_md5_list[i : i + batch_size] batch_md5 = [item[2] for item in batch] md5_param = ",".join(batch_md5) url = self.settings.base_url.rstrip("/") + "/posts.json" try: response = requests.get(url, params={"md5": md5_param}) root = response.json() found = {} for elem in root: post_md5 = elem.get("md5", "").lower() post_id = elem.get("id") if post_md5 and post_id: found[post_md5] = post_id for idx, file_path, md5 in batch: self.upload_verified += 1 # Każdy plik w batchu jest zweryfikowany if md5.lower() in found: self.uploaded[file_path] = found[md5.lower()] self.uploaded_count += 1 self.after( 0, lambda idx=idx: self.listbox.itemconfig( idx, {"bg": "lightgray"} ), ) # Jeśli aktualnie wybrany plik, zmień przycisk if self.current_index == idx: self.after(0, self.set_upload_button_to_view_or_upload) else: self.uploaded[file_path] = False self.after(0, self.update_status_bar) except Exception as e: print("Błąd podczas sprawdzania paczki uploadu:", e) def set_upload_button_to_view_or_upload(self): """ Ustawia przycisk uploadu na "Wyświetl" lub "Upload" w zależności od stan uploadu dla aktualnie wybranego pliku. """ if self.current_index is None: return post_id = self.uploaded.get(self.image_files[self.current_index]) if post_id: self.upload_button.config( text="Wyświetl", command=lambda: self.view_post(post_id) ) else: self.upload_button.config(text="Upload", command=self.upload_current_image) def view_post(self, post_id): """Otwiera w przeglądarce URL posta.""" url = self.settings.base_url.rstrip("/") + "/post/view/" + str(post_id) open_webbrowser(url, self.settings) def compute_md5(self, file_path, chunk_size=8192): """Oblicza MD5 dla danego pliku.""" hash_md5 = hashlib.md5() try: with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(chunk_size), b""): hash_md5.update(chunk) except Exception as e: print("Błąd przy obliczaniu MD5:", e) return "" return hash_md5.hexdigest() def on_listbox_select(self, event): """ Wywoływane po wybraniu pliku z listy. Wyświetla obrazek, aktualizuje tagi PNG, uruchamia Taggera, ustawia przycisk uploadu. """ if not self.listbox.curselection(): return index = self.listbox.curselection()[0] self.current_index = index self.show_image(index) self.set_upload_button_to_view_or_upload() def upload_current_image(self): """ Uploaduje obrazek do serwera. Wysyła POST z obrazkiem, tagami i ratingiem. Po zakończeniu uploadu, ustawia przycisk na "Wyświetl". """ file_path = self.image_files[self.current_index] if self.uploaded.get(file_path, False): # Jeśli plik już uploadowany, ustaw przycisk na "Wyświetl" self.set_upload_button_to_view_or_upload() return self.upload_button.config(state="disabled") self.progress_var.set(0) threading.Thread( target=self.upload_file, args=(file_path,), daemon=True ).start() def show_image(self, index): """ Wyświetla obrazek o podanym indeksie z listy. Odczytuje tagi z pliku PNG, uruchamia Taggera, aktualizuje widgety z tagami. """ if index < 0 or index >= len(self.image_files): return file_path = self.image_files[index] try: img = Image.open(file_path) parameters = "" if isinstance(img, PngImagePlugin.PngImageFile): parameters = img.info.get("parameters", "") self.current_image_original = img.copy() self.current_parameters = parameters self.update_display_image() parsed_parameters = parse_parameters(parameters, self.tags_repo) # Uaktualnij widget PNG Tags self.update_png_tags_widget(parsed_parameters.split()) # Uruchom Taggera w osobnym wątku threading.Thread(target=self.run_tagger, daemon=True).start() except Exception as e: messagebox.showerror("Błąd", f"Nie można załadować obrazka:\n{e}") def update_display_image(self): """ Aktualizuje obrazek na podstawie aktualnego rozmiaru okna. Skaluje obrazek, jeśli jego rozmiar jest większy niż dostępna przestrzeń. """ if self.current_image_original is None: return self.image_label.update_idletasks() avail_width = self.image_label.winfo_width() avail_height = self.image_label.winfo_height() if avail_width <= 1 or avail_height <= 1: avail_width, avail_height = 400, 400 orig_width, orig_height = self.current_image_original.size scale = min(avail_width / orig_width, avail_height / orig_height, 1) new_width = int(orig_width * scale) new_height = int(orig_height * scale) try: resample_filter = Image.Resampling.LANCZOS except AttributeError: resample_filter = Image.LANCZOS if scale < 1: img = self.current_image_original.resize( (new_width, new_height), resample=resample_filter ) else: img = self.current_image_original self.image_cache = ImageTk.PhotoImage(img) self.image_label.config(image=self.image_cache) def update_listbox_item_color_by_rating(self, file_path, rating): """ Ustawia kolor tła dla pozycji w liście na podstawie ratingu, o ile plik nie został uploadowany. Kolor: lightgreen dla General, yellow dla Sensitive, darkorange dla Questionable, red dla Explicit. Jeśli plik uploadowany, nie zmieniamy (pozostaje lightgray). """ # Jeśli plik jest oznaczony jako uploadowany, nic nie robimy if self.uploaded.get(file_path, False): return try: index = self.image_files.index(file_path) except ValueError: return # Mapowanie ratingu na kolor if rating == "general": color = "lightgreen" elif rating == "sensitive": color = "yellow" elif rating == "questionable": color = "darkorange" elif rating == "explicit": color = "red" else: color = "white" self.after( 0, lambda idx=index, col=color: self.listbox.itemconfig(idx, {"bg": col}) ) def on_image_label_resize(self, event): """ Wywoływane przy zmianie rozmiaru okna. Aktualizuje obrazek w zależności od nowego rozmiaru okna. """ self.update_display_image() def show_prev(self): """ Przełącza na poprzedni obraz """ if self.current_index is None: return new_index = self.current_index - 1 if new_index < 0: new_index = len(self.image_files) - 1 self.current_index = new_index self.listbox.select_clear(0, tk.END) self.listbox.select_set(new_index) self.listbox.activate(new_index) self.show_image(new_index) self.set_upload_button_to_view_or_upload() def show_next(self): """ Przełącza na następny obraz """ if self.current_index is None: return new_index = self.current_index + 1 if new_index >= len(self.image_files): new_index = 0 self.current_index = new_index self.listbox.select_clear(0, tk.END) self.listbox.select_set(new_index) self.listbox.activate(new_index) self.show_image(new_index) self.set_upload_button_to_view_or_upload() # --- Metody obsługujące widgety z tagami --- def update_png_tags_widget(self, tags_list): """ Aktualizuje widget z tagami z PNG. Tworzy tagi jako klikalne, zaznaczone lub niezaznaczone. """ self.png_tags_text.config(state="normal") self.png_tags_text.delete("1.0", tk.END) self.png_tags_states = {} for tag in tags_list: self.png_tags_states[tag] = True start_index = self.png_tags_text.index(tk.INSERT) self.png_tags_text.insert(tk.INSERT, tag) end_index = self.png_tags_text.index(tk.INSERT) tag_name = "png_" + tag self.png_tags_text.tag_add(tag_name, start_index, end_index) self.png_tags_text.tag_configure(tag_name, foreground="blue") self.png_tags_text.tag_bind(tag_name, "", self.toggle_png_tag) self.png_tags_text.insert(tk.INSERT, " ") self.png_tags_text.config(state="disabled") self.adjust_text_widget_height(self.png_tags_text) self.update_final_tags() def toggle_png_tag(self, event): """ Obsługuje kliknięcie na tag w PNG Tags. Zmienia stan tagu (zaznaczony/niezaznaczony) i aktualizuje listę finalnych tagów. """ index = self.png_tags_text.index("@%d,%d" % (event.x, event.y)) for t in self.png_tags_text.tag_names(index): if t.startswith("png_"): actual_tag = t[len("png_") :] self.png_tags_states[actual_tag] = not self.png_tags_states.get( actual_tag, True ) color = "blue" if self.png_tags_states[actual_tag] else "black" self.png_tags_text.tag_configure(t, foreground=color) break self.update_visible_tagger_tags() def get_visible_tags(self) -> list[tuple[str, bool, float]]: """ Zwraca listę tagów, które mają być widoczne w Tagger Tags. Tagi zaznaczone są zawsze widoczne. Tagi niezaznaczone są widoczne, jeśli nie są implikowane przez zaznaczone tagi. """ visible_tags = [] # character:amber_redmond cheat / WORKAROUND if "character:amber_redmond" in self.manual_tags_manager.manual_tags: for tag in [ "1girl", "tan", "black_hair", "very_short_hair", "short_hair", "black_choker", "heart_choker", "red_hair", "cutoffs", "blue_shorts", "short_shorts", "short_sleeves", "off_shoulder", "black_footwear", "feet", "toenail_polish", "toenails", "fingernails", "nail_polish", ]: if tag in self.tagger_tags_states: self.tagger_tags_states[tag] = ( True, self.tagger_tags_states[tag][1], ) for tag in [ "virtual_youtuber", "dark_skin", "dark-skinned_female", "black_collar", "collar", "yellow_eyes", ]: if tag in self.tagger_tags_states: self.tagger_tags_states.pop(tag) selected_tags = [ tag for tag, (selected, _) in self.tagger_tags_states.items() if selected ] selected_png_tags = [ tag for tag, selected in self.png_tags_states.items() if selected ] implied_by_selected = set() # Safely collect implications from selected tags for tag in selected_tags: if tag in self.implication_graph: implied_by_selected.update(nx.descendants(self.implication_graph, tag)) else: print(f"Warning: Tag '{tag}' not found in implication graph") self.missing_tags.add(tag) # Log missing tags for tag in selected_png_tags: if tag in self.implication_graph: implied_by_selected.update(nx.descendants(self.implication_graph, tag)) else: print(f"Warning: Tag '{tag}' not found in implication graph") self.missing_tags.add(tag) # Log missing tags # Build visible list for tag, (selected, confidence) in self.tagger_tags_states.items(): if selected: visible_tags.append((tag, True, confidence)) else: if tag not in implied_by_selected: visible_tags.append((tag, False, confidence)) return visible_tags def update_tagger_tags_widget(self, result: wdt.Result): """ Aktualizuje widget z tagami z Taggera. Zaznacza tagi, które są wspólne z tagami z PNG. Ukrywa tagi, które są implikowane przez zaznaczone tagi. """ # Opróżniamy widget i słownik stanów self.tagger_tags_states = {} tags_dict = {} # Przetwarzamy tagi postaci – dodajemy prefix "character:" i zaznaczamy je for tag, prob in result.character_tag_data.items(): full_tag = "character:" + tag # Zamień nieprawidłowe tagi na poprawne if full_tag in TAG_FIXES: full_tag = TAG_FIXES[full_tag] tags_dict[full_tag] = prob # Przetwarzamy tagi general – wszystkie ustawiamy jako zaznaczone for tag, prob in result.general_tag_data.items(): # Zamień nieprawidłowe tagi na poprawne if tag in TAG_FIXES: tag = TAG_FIXES[tag] tags_dict[tag] = prob for tag, prob in tags_dict.items(): self.tagger_tags_states[tag] = (True, prob) # Obliczamy przecięcie tagów z PNG i Taggera common = set(self.png_tags_states.keys()).intersection( set(self.tagger_tags_states.keys()) ) # Aktualizujemy stany w obu grupach – tylko wspólne tagi pozostają zaznaczone (True) for tag in self.png_tags_states: if tag.startswith("character:") or tag.startswith("copyright:"): continue # Pomijamy tagi postaci i praw autorskich, bo mamy do nich pewność self.png_tags_states[tag] = tag in common tag_name = "png_" + tag color = "blue" if self.png_tags_states[tag] else "black" self.png_tags_text.config(state="normal") self.png_tags_text.tag_configure(tag_name, foreground=color) self.png_tags_text.config(state="disabled") for tag in self.tagger_tags_states: if tag.startswith("character:"): continue # Pomijamy tagi postaci, bo mamy do nich pewność self.tagger_tags_states[tag] = ( tag in common, self.tagger_tags_states[tag][1], ) self.update_visible_tagger_tags() def update_visible_tagger_tags(self): """ Aktualizuje widget z tagami z Taggera. Wyświetla tagi z Taggera w formacie: "tag (confidence %)". Koloruje tagi na podstawie stanu (zaznaczony/niezaznaczony) i prawdopodobieństwa. Podczas kliknięcia na tag, zmienia stan (zaznaczony/niezaznaczony). Poprawia wysokość widgetu. Aktualizuje listę finalnych tagów. """ self.tagger_tags_text.config(state="normal") self.tagger_tags_text.delete("1.0", tk.END) visible_tags = self.get_visible_tags() for tag, selected, prob in visible_tags: display_text = f"{tag} ({float(prob)*100:.1f}%)" # np. "Rem (99.9%)" start_index = self.tagger_tags_text.index(tk.INSERT) self.tagger_tags_text.insert(tk.INSERT, display_text) end_index = self.tagger_tags_text.index(tk.INSERT) tag_name = "tagger_" + tag color = self.get_tagger_tag_color((selected, prob)) self.tagger_tags_text.tag_add(tag_name, start_index, end_index) self.tagger_tags_text.tag_configure(tag_name, foreground=color) self.tagger_tags_text.tag_bind( tag_name, "", self.toggle_tagger_tag ) self.tagger_tags_text.insert(tk.INSERT, " ") self.tagger_tags_text.config(state="disabled") self.adjust_text_widget_height(self.tagger_tags_text) self.update_final_tags() def get_tagger_tag_color(self, tagger_tag_state: Tuple[bool, float]) -> str: """ Zwraca kolor dla tagu z Taggera na podstawie stanu (selected) i prawdopodobieństwa. Kolor: - niebieski, jeśli tag jest zaznaczony, - czerwony, jeśli tag nie jest zaznaczony i prawdopodobieństwo < 0.3. - żółty, jeśli tag nie jest zaznaczony i prawdopodobieństwo >= 0.3 i < 0.6. - zielony, jeśli tag nie jest zaznaczony i prawdopodobieństwo >= 0.6. """ selected, prob = tagger_tag_state if selected: return "blue" if prob < 0.3: return "red" if prob < 0.6: return "darkorange" return "green" def toggle_tagger_tag(self, event): """ Obsługuje kliknięcie na tag z Taggera. Zmienia stan tagu (zaznaczony/niezaznaczony) i aktualizuje kolor. """ index = self.tagger_tags_text.index("@%d,%d" % (event.x, event.y)) for t in self.tagger_tags_text.tag_names(index): if t.startswith("tagger_"): actual_tag = t[len("tagger_") :] tag_state, tag_prob = self.tagger_tags_states.get(actual_tag, (True, 0)) self.tagger_tags_states[actual_tag] = (not tag_state, tag_prob) color = self.get_tagger_tag_color(self.tagger_tags_states[actual_tag]) self.tagger_tags_text.tag_configure(t, foreground=color) break self.update_visible_tagger_tags() def update_final_tags(self): """Buduje finalną listę tagów, uwzględniając default tags, PNG, Tagger i manualne. Dla każdego tagu pobiera informacje z bazy i ustawia styl (kolor i podkreślenie): - Deprecated: czerwony, podkreślony, - Tag nie istnieje: żółty, podkreślony, - Normalny: np. niebieski, bez podkreślenia. """ final_set = set() if self.settings.default_tags: final_set.update(self.settings.default_tags.split()) for tag, selected in self.png_tags_states.items(): if selected: final_set.add(tag) for tag, selected in self.tagger_tags_states.items(): if selected[0]: final_set.add(tag) manual = self.manual_tags_manager.manual_tags if manual: final_set.update(manual) final_list = sorted(final_set) self.final_tags_text.config(state="normal") self.final_tags_text.delete("1.0", tk.END) for tag in final_list: _, deprecated = process_tag(tag, self.tags_repo) # Ustal kolor i podkreślenie na podstawie wyniku if deprecated is True: color = "red" underline = 1 elif deprecated is None: color = "darkorange" underline = 1 else: color = "blue" underline = 0 start_index = self.final_tags_text.index(tk.INSERT) self.final_tags_text.insert(tk.INSERT, tag) end_index = self.final_tags_text.index(tk.INSERT) tag_name = "final_" + tag self.final_tags_text.tag_add(tag_name, start_index, end_index) self.final_tags_text.tag_configure( tag_name, foreground=color, underline=underline ) self.final_tags_text.tag_bind( tag_name, "", self.open_final_tag_wiki_url ) self.final_tags_text.insert(tk.INSERT, " ") self.final_tags_text.config(state="disabled") def open_final_tag_wiki_url(self, event): """Otwiera w przeglądarce URL strony wiki dla klikniętego tagu. Usuwa znane prefiksy, aby utworzyć poprawny URL. """ index = self.final_tags_text.index("@%d,%d" % (event.x, event.y)) for t in self.final_tags_text.tag_names(index): if t.startswith("final_"): actual_tag = t[len("final_") :] open_tag_wiki_url(actual_tag, self.settings) break # --- Metody do cache'owania wyników Taggera --- def process_tagger_for_image(self, file_path): """Przetwarza obrazek przy użyciu Taggera i zapisuje wynik do cache.""" result = self.get_tagger_results(file_path) self.update_listbox_item_color_by_rating(file_path, result.rating) def process_tagger_queue(self): """Przetwarza wszystkie obrazki w tle (pomijając aktualnie wybrany).""" for file_path in self.image_files: # Jeśli obrazek jest aktualnie wybrany, pomijamy – on będzie przetwarzany w foreground if ( self.current_index is not None and file_path == self.image_files[self.current_index] ): continue self.process_tagger_for_image(file_path) def run_tagger(self): """ Przetwarza aktualnie wybrany obrazek. Jeśli wynik jest już w cache, wykorzystuje go; w przeciwnym razie uruchamia Taggera i zapisuje wynik do cache. """ if self.current_index is None: return # Ustaw komunikat, że Tagger pracuje self.tagger_tags_text.config(state="normal") self.tagger_tags_text.delete("1.0", tk.END) self.tagger_tags_text.insert("1.0", "Tagger przetwarza...") self.tagger_tags_text.config(state="disabled") file_path = self.image_files[self.current_index] result = self.get_tagger_results(file_path) new_rating = self.map_tagger_rating(result) self.rating_var.set(new_rating) self.after(0, lambda: self.update_tagger_tags_widget(result)) self.update_listbox_item_color_by_rating(file_path, result.rating) def upload_file(self, file_path, final_tags=None, final_rating=None): url = self.settings.base_url.rstrip("/") + "/api/danbooru/add_post" tags = ( self.final_tags_text.get("1.0", tk.END).strip() if final_tags is None else final_tags ) fields = { "login": self.settings.username, "password": self.settings.password, "tags": tags, "source": "", } rating_value = self.rating_map.get( self.rating_var.get() if final_rating is None else final_rating, "" ) if rating_value: fields["rating"] = rating_value try: total_size = os.path.getsize(file_path) def progress_callback(bytes_read, total_size): percentage = int(bytes_read / total_size * 100) self.progress_bar.after(0, lambda: self.progress_var.set(percentage)) with open(file_path, "rb") as f: wrapped_file = ProgressFile(f, progress_callback, total_size) files = { "file": (os.path.basename(file_path), wrapped_file, "image/png") } response = requests.post(url, data=fields, files=files) self.progress_bar.after(0, lambda: self.progress_var.set(100)) show_warn = False post_url = None if response.status_code in (200, 201): message = "Upload zakończony powodzeniem!" post_url = response.headers.get("X-Danbooru-Location", None) elif response.status_code == 409: message = f"Upload zakończony błędem.\nStatus: 409\nTreść: {response.headers.get('X-Danbooru-Errors', '')}" post_url = response.headers.get("X-Danbooru-Location", None) show_warn = True else: message = f"Upload zakończony błędem.\nStatus: {response.status_code}\nTreść: {response.text}" show_warn = True self.upload_button.after( 0, lambda: self.upload_button.config(state="normal") ) # Aktualizacja wyglądu listy – musimy użyć domyślnych argumentów w lambdzie, aby zachować bieżący indeks if show_warn: if not final_tags: messagebox.showwarning("Upload", message) else: if not final_tags: messagebox.showinfo("Upload", message) self.after( 0, lambda idx=self.image_files.index( file_path ): self.listbox.itemconfig(idx, {"bg": "lightgray"}), ) if post_url: post_id = post_url.split("/")[-1] self.uploaded[file_path] = post_id self.uploaded_count += 1 self.after(0, self.update_status_bar) self.set_upload_button_to_view_or_upload() except Exception as e: self.upload_button.after( 0, lambda: self.upload_button.config(state="normal") ) messagebox.showerror("Błąd uploadu", str(e)) def upload_all_files(self): """ Metoda, która po potwierdzeniu przez użytkownika uploaduje wszystkie niewrzucone pliki. Dla każdego pliku oblicza finalne tagi (przy użyciu compute_final_tags_for_file) i wywołuje upload_file. """ if not messagebox.askyesno( "Potwierdzenie", "Czy na pewno chcesz wrzucić wszystkie niewrzucone pliki?\nKażdy z nich zostanie oznaczony tagiem 'meta:auto_upload'.\nUpewnij się, że tagi są poprawne!", ): return def worker(): for file_path in self.image_files: if not self.uploaded.get(file_path, False): final_tags, final_rating = ( self.compute_final_tags_and_rating_for_file(file_path) ) print( f"Uploading {file_path} z tagami: {final_tags} i ratingiem: {final_rating}" ) self.upload_file( file_path, final_tags=final_tags, final_rating=final_rating ) threading.Thread(target=worker, daemon=True).start()