import glob import hashlib import inspect import os import queue import threading import tkinter as tk from tkinter import filedialog, messagebox, ttk from typing import Dict, Tuple, Optional from packaging.version import parse as parse_version import itertools import networkx as nx import requests from PIL import Image, ImageTk, PngImagePlugin import wdtagger as wdt import tomli from .I18N import _ from .ProgressFile import ProgressFile from .TagsRepo import TagsRepo from .autocomplete import TagManager from .common import get_auth_token, login, open_tag_wiki_url, open_webbrowser from .settings import Settings from .tag_processing import TAG_FIXES, parse_parameters, process_tag from .tagger_cache import TaggerCache class ProcessingDialog: def __init__(self, root, target_function, *args): self.root = root self.top = tk.Toplevel(root) self.top.title(_("Processing...")) self.top.geometry("300x150") self.top.protocol("WM_DELETE_WINDOW", self.on_close) self.label = tk.Label(self.top, text=_("Processing, please wait...")) self.label.pack(pady=10) # Start with indeterminate progress bar self.progress = ttk.Progressbar(self.top, mode="indeterminate") self.progress.pack(pady=10, fill="x") self.progress.start(10) sig = inspect.signature(target_function) if "secondary_progress_queue" in sig.parameters: self.sub_progress = ttk.Progressbar(self.top, mode="indeterminate") self.sub_progress.pack(pady=10, fill="x") self.sub_progress.start(10) # Setup communication queue and periodic checker self.queue = queue.Queue() self.sub_queue = queue.Queue() self.running = True self.cancel_event = threading.Event() # Cancellation flag self.thread = threading.Thread( target=self.run_task, args=(target_function, *args) ) self.thread.start() self.top.after(100, self.process_queue) def process_queue(self): """Process messages from the background thread""" while self.running: try: msg = self.queue.get_nowait() if msg[0] == "mode": self.progress.config(mode=msg[1]) if msg[1] == "determinate": self.progress["value"] = 0 self.progress.stop() elif msg[1] == "indeterminate": self.progress["value"] = 0 self.progress.start() elif msg[0] == "max": self.progress["maximum"] = msg[1] elif msg[0] == "progress": self.progress["value"] = msg[1] elif msg[0] == "label": self.label.config(text=msg[1]) self.top.update_idletasks() except queue.Empty: break try: msg = self.sub_queue.get_nowait() if msg[0] == "mode": self.sub_progress.config(mode=msg[1]) if msg[1] == "determinate": self.sub_progress["value"] = 0 self.sub_progress.stop() elif msg[1] == "indeterminate": self.sub_progress["value"] = 0 self.sub_progress.start() elif msg[0] == "max": self.sub_progress["maximum"] = msg[1] elif msg[0] == "progress": self.sub_progress["value"] = msg[1] self.top.update_idletasks() except queue.Empty: break if self.running: self.top.after(100, self.process_queue) def run_task(self, target_function, *args): """Execute target function with progress queue if supported""" try: sig = inspect.signature(target_function) kwargs = {} if "progress_queue" in sig.parameters: kwargs["progress_queue"] = self.queue if "secondary_progress_queue" in sig.parameters: kwargs["secondary_progress_queue"] = self.sub_queue if "cancel_event" in sig.parameters: kwargs["cancel_event"] = self.cancel_event target_function(*args, **kwargs) finally: self.close_dialog() def close_dialog(self): """Safely close the dialog""" if self.running: self.running = False self.progress.stop() self.top.after(0, self.top.destroy) self.root.after(100, self.thread.join) def on_close(self): """Handle manual window closure""" self.running = False self.cancel_event.set() # Notify target function that cancellation is requested self.top.destroy() class ImageBrowser(tk.Tk): def __init__(self): super().__init__() self.title("Kapitanbooru Uploader") self.geometry("900x600") self.version = "0.5.0" self.acknowledged_version = parse_version(self.version) self.settings = Settings() self.tags_repo = TagsRepo(self.settings) self.implication_graph = self.load_implication_graph() self.missing_tags = set() # Track tags not in the graph # Dodatkowe ustawienia dla Taggera self.tagger_name = "wdtagger" self.tagger_version = ( "1.0" # możesz ustawić wersję dynamicznie, jeśli to możliwe ) self.tagger_cache = TaggerCache( self.settings, self.tagger_name, self.tagger_version ) self.folder_path = "" self.image_files = [] self.image_files_md5 = [] self.current_index = None self.image_cache = None self.tagger_thread_idx = 0 self.tagger = wdt.Tagger() self.check_uploaded_files_stop_event = threading.Event() self.check_uploaded_files_thread: Optional[threading.Thread] = None self.process_tagger_queue_stop_event = threading.Event() self.process_tagger_queue_thread: Optional[threading.Thread] = None self.run_tagger_threads: Dict[str, threading.Thread] = {} # Liczniki statusu self.total_files = 0 self.tagger_processed = set() self.upload_verified = 0 self.uploaded_count = 0 # Oryginalny obraz (do skalowania) self.current_image_original = None self.current_parameters = "" # Mapa ratingów: wyświetlana nazwa -> wartość wysyłana self.rating_map = { "General": "g", "Sensitive": "s", "Questionable": "q", "Explicit": "e", "Unrated": "", } # Słowniki przechowujące stany tagów (dla PNG i Taggera) self.png_tags_states = {} self.tagger_tags_states = {} # Ścieżki do ustawień i cache # Ładujemy ustawienia # Nowy słownik przechowujący informację, czy dany plik (ścieżka) został już uploadowany self.uploaded = {} # key: file path, value: True/False self.create_menu() self.create_widgets() self.bind_events() # Schedule first update check self.after(1000, self._schedule_update_check) def _schedule_update_check(self): """Schedule periodic update checks""" self._check_for_update() # Check every 5 minutes (300,000 ms) self.after(300000, self._schedule_update_check) def _check_for_update(self): """Check for updates in a background thread""" def check_thread(): try: # Fetch pyproject.toml using requests response = requests.get( "https://git.mlesniak.pl/kapitan/kapitanbooru-uploader/raw/branch/main/pyproject.toml", timeout=10, # Add reasonable timeout ) response.raise_for_status() # Raise exception for HTTP errors remote_toml = tomli.loads(response.text) remote_version_str = remote_toml["project"]["version"] remote_version = parse_version(remote_version_str) current_version = parse_version(self.version) if ( remote_version > current_version and remote_version > self.acknowledged_version ): self.after(0, lambda: self._notify_user(remote_version_str)) self.acknowledged_version = remote_version except requests.exceptions.RequestException as e: print(_("Update check failed: {error}").format(error=e)) except KeyError as e: print(_("Malformed pyproject.toml: {error}").format(error=e)) except Exception as e: print( _("Unexpected error during update check: {error}").format(error=e) ) threading.Thread(target=check_thread, daemon=True).start() def _notify_user(self, new_version): """Show update notification with translated messages""" title = _("Update Available") message_template = _( "A new version {new_version} is available!\n" "You have version {current_version}.\n\n" "Update using: {update_command}" ) formatted_message = message_template.format( new_version=new_version, current_version=self.version, update_command="pip install --upgrade kapitanbooru-uploader", ) messagebox.showinfo(title, formatted_message) def reload_ui(self): """Reload UI components with new language""" # Destroy current widgets for widget in self.winfo_children(): widget.destroy() # Rebuild UI self.create_menu() self.create_widgets() def load_implication_graph(self) -> nx.DiGraph: G = nx.DiGraph() conn = self.tags_repo.get_conn() cursor = conn.cursor() # Step 1: Add all tags from the 'tags' table cursor.execute( """ SELECT CASE category WHEN 1 THEN 'artist:' || name WHEN 3 THEN 'copyright:' || name WHEN 4 THEN 'character:' || name WHEN 5 THEN 'meta:' || name ELSE name END AS prefixed_name FROM tags """ ) db_tags = {row[0] for row in cursor.fetchall()} G.add_nodes_from(db_tags) # Step 2: Add nodes from implications (antecedents/consequents not in 'tags' table) cursor.execute("SELECT antecedent, consequent FROM tag_closure") edge_tags = set() for ant, cons in cursor.fetchall(): edge_tags.add(ant) edge_tags.add(cons) G.add_nodes_from(edge_tags - db_tags) # Add tags only in implications # Step 3: Add edges cursor.execute("SELECT antecedent, consequent FROM tag_closure") G.add_edges_from(cursor.fetchall()) conn.close() return G def adjust_text_widget_height(self, widget): """ Ustawia wysokość widgetu Text na liczbę linii w jego treści, ale nie więcej niż max_lines. """ content = widget.get("1.0", "end-1c") num_lines = 0 cur_line_len = 0 text = content.split() for word in text: if cur_line_len + len(word) > 34: num_lines += 1 cur_line_len = 0 cur_line_len += len(word) + 1 max_lines = ( self.png_tags_text.winfo_height() + self.tagger_tags_text.winfo_height() + self.final_tags_text.winfo_height() - widget.winfo_height() - 8 ) widget.config(height=min(num_lines, max_lines) if num_lines > 4 else 4) def compute_final_tags_and_rating_for_file(self, file_path): """ Oblicza finalną listę tagów dla danego pliku oraz rating. Łączy tagi z: - pliku (PNG): parsowane przez parse_parameters, - Taggera (wynik z cache lub wyliczony na bieżąco), - ustawień (default tags), - manualnych tagów (z pola manual_tags_entry), oraz dodaje tag "meta:auto_upload". Zwraca finalny ciąg tagów oraz rating. """ # Pobierz tagi z pliku try: img = Image.open(file_path) parameters = "" if isinstance(img, PngImagePlugin.PngImageFile): parameters = img.info.get("parameters", "") png_tags = set( [ x for x in parse_parameters(parameters, self.tags_repo).split() if process_tag(x, self.tags_repo)[1] is not None # Ignoruj nieistniejące tagi ] ) img.close() except Exception as e: print(_("Błąd przy otwieraniu pliku"), file_path, ":", e) png_tags = set() # Pobierz tagi z Taggera – sprawdzając cache result = self.get_tagger_results(file_path) tagger_tags = set() rating = "Unrated" tagger_tags.update( ( TAG_FIXES[tag] if tag in TAG_FIXES else tag for tag in result.general_tag_data.keys() ) ) # Zamień nieprawidłowe tagi na poprawne for t in result.character_tags: full_tag = "character:" + t.replace(" ", "_").replace("\\", "") # Zamień nieprawidłowe tagi na poprawne if full_tag in TAG_FIXES: full_tag = TAG_FIXES[full_tag] tagger_tags.add(full_tag) rating = self.map_tagger_rating(result) # Pobierz tagi z ustawień i manualne default_tags = set(self.settings.default_tags.split()) manual_tags = set(self.manual_tags_manager.manual_tags) # Finalna lista: suma wszystkich tagów final_tags = default_tags.union(png_tags).union(tagger_tags).union(manual_tags) final_tags.add("meta:auto_upload") return " ".join(sorted(final_tags)), rating def get_tagger_results(self, file_path) -> wdt.Result: md5 = self.image_files_md5[file_path] cached = self.tagger_cache[md5] if cached: self.tagger_processed.add(md5) return cached["result"] try: with Image.open(file_path) as img: result = self.tagger.tag(img) self.tagger_cache[md5] = result self.tagger_processed.add(md5) self.after(0, self.update_status_bar) print(_("Tagger przetworzył:"), f"{file_path}") return result except Exception as e: print(_("Błąd Taggera dla"), file_path, ":", e) def map_tagger_rating(self, result: wdt.Result) -> str: """ Mapuje rating z Taggera na wartość używaną w Kapitanbooru. """ if result.rating == "general": new_rating = "General" elif result.rating == "sensitive": new_rating = "Sensitive" elif result.rating == "questionable": new_rating = "Questionable" elif result.rating == "explicit": new_rating = "Explicit" else: new_rating = "Unrated" return new_rating def create_menu(self): menubar = tk.Menu(self) self.config(menu=menubar) # Create file menu and store it as instance variable self.file_menu = tk.Menu(menubar, tearoff=0) # File menu items - create references for items we need to control self.file_menu.add_command(label=_("Otwórz folder"), command=self.select_folder) self.file_menu.add_separator() self.file_menu.add_command( label=_("Wyślij"), command=self.upload_current_image, state=tk.DISABLED ) self.file_menu.add_command( label=_("Wyślij wszystko"), command=self.upload_all_files, state=tk.DISABLED ) self.file_menu.add_separator() self.file_menu.add_command( label=_("Podmień tagi"), command=self.edit_current_image, state=tk.DISABLED ) self.file_menu.add_command( label=_("Otwórz post"), command=self.view_current_post, state=tk.DISABLED ) self.file_menu.add_separator() self.file_menu.add_command(label=_("Zakończ"), command=self.quit) menubar.add_cascade(label=_("Plik"), menu=self.file_menu) # Options menu options_menu = tk.Menu(menubar, tearoff=0) options_menu.add_command(label=_("Ustawienia"), command=self.open_settings) options_menu.add_separator() options_menu.add_command( label=_("Wyczyść cache Taggera"), command=self.clear_cache ) options_menu.add_command( label=_("Zregeneruj bazę tagów"), command=self.regenerate_tags_db ) menubar.add_cascade(label=_("Opcje"), menu=options_menu) help_menu = tk.Menu(menubar, tearoff=0) help_menu.add_command(label=_("About"), command=self.show_about) menubar.add_cascade(label=_("Help"), menu=help_menu) def show_about(self): """Multilingual About window (updated version)""" about = tk.Toplevel(self) about.title(_("About Kapitanbooru Uploader")) about.resizable(False, False) # Window content frame = ttk.Frame(about, padding=10) frame.pack(fill="both", expand=True) row_counter = itertools.count(0) ttk.Label( frame, text="Kapitanbooru Uploader", font=("TkDefaultFont", 16, "bold") ).grid(row=next(row_counter), column=0, sticky=tk.W) current_version = parse_version(self.version) if current_version < self.acknowledged_version: ttk.Label( frame, text=_("A new version {new_version} is available!").format( new_version=self.acknowledged_version ), foreground="red", ).grid(row=next(row_counter), column=0, sticky=tk.W) content = [ _("Current version: {version}").format(version=self.version), "", _("A GUI application for uploading images to KapitanBooru."), _("Features include image upload, tag management, automatic"), _("tagging with wdtagger, and cache management."), "", _("Authors:"), "Michał Leśniak", "", _("License: MIT License"), f"Copyright © 2025 Michał Leśniak", "", ] for text in content: ttk.Label(frame, text=text).grid(row=next(row_counter), column=0, sticky=tk.W) # Repository link repo_frame = ttk.Frame(frame) repo_frame.grid(row=next(row_counter), sticky=tk.W) ttk.Label(repo_frame, text=_("Repository:")).pack(side=tk.LEFT) repo_url = "https://git.mlesniak.pl/kapitan/kapitanbooru-uploader" repo = ttk.Label(repo_frame, text=repo_url, cursor="hand2", foreground="blue") repo.pack(side=tk.LEFT, padx=(5, 0)) repo.bind("", lambda e: open_webbrowser(repo_url, self.settings)) # Website link website_frame = ttk.Frame(frame) website_frame.grid(row=next(row_counter), sticky=tk.W) ttk.Label(website_frame, text=_("Website:")).pack(side=tk.LEFT) website_url = "https://booru.mlesniak.pl" website = ttk.Label( website_frame, text=website_url, cursor="hand2", foreground="blue" ) website.pack(side=tk.LEFT, padx=(5, 0)) website.bind( "", lambda e: open_webbrowser(website_url, self.settings) ) # Close button ttk.Button(about, text=_("Close"), command=about.destroy).pack(pady=10) def regenerate_tags_db(self): self.processing_dialog = ProcessingDialog(self, self.tags_repo.regenerate_db) def clear_cache(self): res, err = self.tagger_cache.clear_cache() if res: messagebox.showinfo(_("Cache"), _("Cache Taggera zostało wyczyszczone.")) else: messagebox.showerror( _("Cache"), f"{_('Błąd przy czyszczeniu cache:')} {err}" ) def open_settings(self): settings_window = tk.Toplevel(self) settings_window.title(_("Ustawienia")) settings_window.geometry("300x430") # Enlarged vertically settings_window.resizable(False, False) # Disable resizing settings_window.grab_set() lbl_login = tk.Label(settings_window, text=_("Login:")) lbl_login.pack(pady=(10, 0)) entry_login = tk.Entry(settings_window) entry_login.pack(pady=(0, 10), padx=10, fill="x") entry_login.insert(0, self.settings.username) lbl_password = tk.Label(settings_window, text=_("Hasło:")) lbl_password.pack(pady=(10, 0)) entry_password = tk.Entry(settings_window, show="*") entry_password.pack(pady=(0, 10), padx=10, fill="x") entry_password.insert(0, self.settings.password) lbl_base_url = tk.Label(settings_window, text=_("Base URL:")) lbl_base_url.pack(pady=(10, 0)) entry_base_url = tk.Entry(settings_window) entry_base_url.pack(pady=(0, 10), padx=10, fill="x") entry_base_url.insert(0, self.settings.base_url) lbl_default_tags = tk.Label(settings_window, text=_("Default Tags:")) lbl_default_tags.pack(pady=(10, 0)) entry_default_tags = tk.Entry(settings_window) entry_default_tags.pack(pady=(0, 10), padx=10, fill="x") entry_default_tags.insert(0, self.settings.default_tags) lbl_browser = tk.Label(settings_window, text=_("Browser:")) lbl_browser.pack(pady=(10, 0)) cb_browser = ttk.Combobox( settings_window, values=list(self.settings.installed_browsers.keys()), state="readonly", ) cb_browser.pack(pady=(0, 10), padx=10, fill="x") cb_browser.set( self.settings.installed_browsers_reverse.get( self.settings.browser, "Default" ) ) lbl_lang = tk.Label(settings_window, text=_("Language:")) lbl_lang.pack(pady=(10, 0)) cb_lang = ttk.Combobox( settings_window, values=list(self.settings.i18n.languages.values()), state="readonly", ) cb_lang.pack(pady=(0, 10), padx=10, fill="x") cb_lang.set( self.settings.i18n.languages.get(self.settings.i18n.current_lang, "en") ) def save_and_close(): self.settings.username = entry_login.get() self.settings.password = entry_password.get() self.settings.base_url = entry_base_url.get() self.settings.default_tags = entry_default_tags.get() self.settings.browser = self.settings.installed_browsers[cb_browser.get()] self.settings.i18n.set_language( next( ( lang for lang, name in self.settings.i18n.languages.items() if name == cb_lang.get() ), "en", ) ) self.settings.save_settings() settings_window.destroy() self.reload_ui() btn_save = tk.Button(settings_window, text=_("Zapisz"), command=save_and_close) btn_save.pack(pady=10) def create_widgets(self): # Główna ramka – trzy kolumny main_frame = tk.Frame(self) main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Lewa kolumna – lista plików left_frame = tk.Frame(main_frame) left_frame.grid(row=0, column=0, sticky=tk.NS) self.listbox = tk.Listbox(left_frame, width=30) self.listbox.pack(side=tk.LEFT, fill=tk.Y) self.listbox.bind("<>", self.on_listbox_select) scrollbar = tk.Scrollbar( left_frame, orient="vertical", command=self.listbox.yview ) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.listbox.config(yscrollcommand=scrollbar.set) # Środkowa kolumna – podgląd obrazu center_frame = tk.Frame(main_frame) center_frame.grid(row=0, column=1, sticky=tk.NSEW, padx=10) main_frame.grid_columnconfigure(1, weight=1) main_frame.grid_rowconfigure(0, weight=1) main_frame.grid_rowconfigure(1, weight=0) self.image_label = tk.Label(center_frame) self.image_label.pack(fill=tk.BOTH, expand=True) self.image_label.bind("", self.on_image_label_resize) # Prawa kolumna – panel tagów i uploadu (ograniczona szerokość) right_frame = tk.Frame(main_frame, width=300) right_frame.grid(row=0, column=2, sticky=tk.NSEW, padx=5) right_frame.grid_propagate(False) right_frame.grid_columnconfigure(0, weight=1) # Ustal wiersze: right_frame.grid_rowconfigure(0, weight=0) # PNG Tags – naturalny rozmiar right_frame.grid_rowconfigure(1, weight=0) # Tagger Tags – naturalny rozmiar right_frame.grid_rowconfigure(2, weight=0) # Manual Tags (jednolinijkowe) right_frame.grid_rowconfigure(3, weight=1) # Final Tags – rozszerza się right_frame.grid_rowconfigure(4, weight=0) # Upload Panel # PNG Tags – widget Text z scrollbar png_frame = tk.LabelFrame(right_frame, text=_("PNG Tags")) png_frame.grid(row=0, column=0, sticky=tk.EW, padx=5, pady=5) png_frame.grid_columnconfigure(0, weight=1) self.png_tags_text = tk.Text(png_frame, wrap=tk.WORD) self.png_tags_text.grid(row=0, column=0, sticky=tk.EW) scrollbar_png = tk.Scrollbar(png_frame, command=self.png_tags_text.yview) scrollbar_png.grid(row=0, column=1, sticky=tk.NS) self.png_tags_text.config( yscrollcommand=scrollbar_png.set, state=tk.DISABLED, height=4 ) # Tagger Tags – widget Text z scrollbar tagger_frame = tk.LabelFrame(right_frame, text=_("Tagger Tags")) tagger_frame.grid(row=1, column=0, sticky=tk.EW, padx=5, pady=5) tagger_frame.grid_columnconfigure(0, weight=1) self.tagger_tags_text = tk.Text(tagger_frame, wrap=tk.WORD) self.tagger_tags_text.grid(row=0, column=0, sticky=tk.EW) scrollbar_tagger = tk.Scrollbar( tagger_frame, command=self.tagger_tags_text.yview ) scrollbar_tagger.grid(row=0, column=1, sticky=tk.NS) self.tagger_tags_text.config( yscrollcommand=scrollbar_tagger.set, state=tk.DISABLED, height=4 ) # Manual Tags – Entry (stała wysokość) manual_frame = tk.LabelFrame(right_frame, text=_("Manual Tags")) manual_frame.grid(row=2, column=0, sticky=tk.NSEW, padx=5, pady=5) self.manual_tags_manager = TagManager( manual_frame, self.settings, self.tags_repo, self.update_final_tags ) self.manual_tags_manager.pack(fill=tk.BOTH, expand=True) # Final Tags – widget Text z scrollbar, który rozszerza się final_frame = tk.LabelFrame(right_frame, text=_("Final Tags")) final_frame.grid(row=3, column=0, sticky=tk.NSEW, padx=5, pady=5) final_frame.grid_rowconfigure(0, weight=1) final_frame.grid_columnconfigure(0, weight=1) self.final_tags_text = tk.Text(final_frame, state=tk.DISABLED, wrap=tk.WORD) self.final_tags_text.grid(row=0, column=0, sticky=tk.NSEW) scrollbar_final = tk.Scrollbar(final_frame, command=self.final_tags_text.yview) scrollbar_final.grid(row=0, column=1, sticky=tk.NS) self.final_tags_text.config(yscrollcommand=scrollbar_final.set) # Panel uploadu i rating – nie zmienia rozmiaru pionowo upload_frame = tk.Frame(right_frame) upload_frame.grid(row=4, column=0, sticky=tk.EW, padx=5, pady=5) self.rating_var = tk.StringVar(value="Unrated") rating_options = ["General", "Sensitive", "Questionable", "Explicit", "Unrated"] self.rating_dropdown = tk.OptionMenu( upload_frame, self.rating_var, *rating_options ) self.rating_dropdown.pack(side=tk.LEFT, padx=5) self.upload_button = tk.Button( upload_frame, text=_("Wyślij"), command=self.upload_current_image ) self.upload_button.pack(side=tk.LEFT, padx=5) self.upload_button.config(state=tk.DISABLED) self.view_post_button = tk.Button( upload_frame, text=_("Wyświetl"), command=self.view_current_post ) self.view_post_button.pack(side=tk.LEFT, padx=5) self.view_post_button.config(state=tk.DISABLED) btn_prev = tk.Button(upload_frame, text="<<", command=self.show_prev) btn_prev.pack(side=tk.LEFT, padx=5) btn_next = tk.Button(upload_frame, text=">>", command=self.show_next) btn_next.pack(side=tk.LEFT, padx=5) # Na końcu okna głównego dodaj status bar: self.status_label = tk.Label( main_frame, text="", bd=1, relief=tk.SUNKEN, anchor=tk.W ) self.status_label.grid(row=1, column=0, columnspan=3, sticky=tk.EW) def update_status_bar(self): status_text = ( f"{_('Przetworzono tagi:')} {len(self.tagger_processed)}/{self.total_files} {_('plików')} | " f"{_('Zweryfikowano status uploadu:')} {self.upload_verified}/{self.total_files} {_('plików')} | " f"{_('Zuploadowano:')} {self.uploaded_count}/{self.upload_verified} {_('plików')}" ) self.status_label.config(text=status_text) def on_arrow_key(self, event): """ Obsługuje klawisze strzałek w lewo i w prawo. """ # Jeśli fokus jest na Manual Tags, nie przełączamy plików. if self.focus_get() == self.manual_tags_manager: return if event.keysym == "Left": self.show_prev() elif event.keysym == "Right": self.show_next() def bind_events(self): """ Przypisuje zdarzenia do klawiszy strzałek w lewo i w prawo. """ self.bind("", self.on_arrow_key) self.bind("", self.on_arrow_key) def select_folder(self): """ Otwiera okno dialogowe wyboru folderu z obrazkami i wczytuje pliki PNG z wybranego folderu. """ folder = filedialog.askdirectory(title=_("Wybierz folder z obrazkami PNG")) if folder: self.folder_path = folder self.load_images() def load_images(self): """ Ładuje pliki PNG z wybranego folderu. """ pattern = os.path.join(self.folder_path, "*.png") self.image_files = sorted(glob.glob(pattern)) self.total_files = len(self.image_files) self.image_files_md5 = { file: self.compute_md5(file) for file in self.image_files } self.tagger_processed.clear() for md5 in self.image_files_md5.values(): if self.tagger_cache[md5]: self.tagger_processed.add(md5) self.listbox.delete(0, tk.END) self.uploaded.clear() self.upload_verified = 0 self.uploaded_count = 0 for file in self.image_files: self.listbox.insert(tk.END, os.path.basename(file)) self.uploaded[file] = False if self.image_files: self.current_index = 0 self.listbox.select_set(0) self.show_image(0) self.post_load_processing() else: messagebox.showinfo( _("Informacja"), _("Brak plików PNG w wybranym folderze.") ) def post_load_processing(self): """ Po załadowaniu plików, sprawdza czy są jakieś pliki do uploadu oraz przetwarza Taggerem pliki. """ self.join_check_uploaded_files_thread() self.check_uploaded_files_thread = threading.Thread( target=self.check_uploaded_files ) self.check_uploaded_files_thread.start() self.join_process_tagger_queue_thread() self.process_tagger_queue_thread = threading.Thread( target=self.process_tagger_queue ) self.process_tagger_queue_thread.start() def join_check_uploaded_files_thread(self): if self.check_uploaded_files_thread is not None: self.check_uploaded_files_stop_event.set() self.check_uploaded_files_thread.join() self.check_uploaded_files_thread = None self.check_uploaded_files_stop_event = threading.Event() def join_process_tagger_queue_thread(self): if self.process_tagger_queue_thread is not None: self.process_tagger_queue_stop_event.set() self.process_tagger_queue_thread.join() self.process_tagger_queue_thread = None self.process_tagger_queue_stop_event = threading.Event() def check_uploaded_files(self): """ Dla każdego obrazu oblicza MD5, grupuje je w paczki (do 100 skrótów), wysyła zapytanie do endpointa 'posts.json' dla każdej paczki, a następnie na podstawie odpowiedzi ustawia w self.uploaded post id dla uploadowanych plików. """ file_md5_list = [ (idx, file, self.image_files_md5[file]) for idx, file in enumerate(self.image_files) ] batch_size = 100 for i in range(0, len(file_md5_list), batch_size): if self.check_uploaded_files_stop_event.is_set(): break batch = file_md5_list[i : i + batch_size] batch_md5 = [item[2] for item in batch] md5_param = ",".join(batch_md5) url = self.settings.base_url.rstrip("/") + "/posts.json" try: response = requests.get(url, params={"md5": md5_param}) root = response.json() found = {} for elem in root: if self.check_uploaded_files_stop_event.is_set(): break post_md5 = elem.get("md5", "").lower() post_id = elem.get("id") if post_md5 and post_id: found[post_md5] = post_id for idx, file_path, md5 in batch: if self.check_uploaded_files_stop_event.is_set(): break self.upload_verified += 1 # Każdy plik w batchu jest zweryfikowany if md5.lower() in found: self.uploaded[file_path] = found[md5.lower()] self.uploaded_count += 1 self.after( 0, lambda idx=idx: self.listbox.itemconfig( idx, {"bg": "lightgray"} ), ) # Jeśli aktualnie wybrany plik, zmień przycisk if self.current_index == idx: self.after(0, self.update_button_states) else: self.uploaded[file_path] = False self.after(0, self.update_status_bar) except Exception as e: print(_("Błąd podczas sprawdzania paczki uploadu:"), e) self.after(100, self.join_check_uploaded_files_thread) def update_button_states(self): """ Update the state of UI elements based on current application state. Synchronizes the enabled/disabled states of menu items and buttons with: - Whether an image is currently selected (current_index exists) - Whether the selected image has an existing post (post_id exists in uploaded) State transitions: - When no image is selected (current_index is None): * Disables all upload-related buttons and menu items - When an image is selected: * Always enables 'Wyślij wszystko' (Send All) * Enables 'Wyślij' (Send) if no post exists for the image * Enables 'Podmień tagi' (Replace Tags) and 'Otwórz post' (Open Post) if a post exists for the image * Updates the upload button's text and command based on post existence Modifies: - File menu items: 'Wyślij', 'Wyślij wszystko', 'Podmień tagi', 'Otwórz post' - Buttons: upload_button, view_post_button Dependencies: - Relies on self.current_index to determine selection state - Checks self.uploaded against self.image_files for post existence """ has_current = self.current_index is not None post_id = ( self.uploaded.get(self.image_files[self.current_index]) if has_current else None ) # Common state for all elements send_all_state = tk.NORMAL if has_current else tk.DISABLED post_ops_state = tk.NORMAL if post_id else tk.DISABLED # Update menu items self.file_menu.entryconfig(_("Wyślij wszystko"), state=send_all_state) self.file_menu.entryconfig(_("Podmień tagi"), state=post_ops_state) self.file_menu.entryconfig(_("Otwórz post"), state=post_ops_state) # Update buttons self.upload_button.config( state=tk.NORMAL if has_current else tk.DISABLED, text=_("Podmień tagi") if post_id else _("Wyślij"), command=self.edit_current_image if post_id else self.upload_current_image, ) self.view_post_button.config(state=post_ops_state) # Special case for "Wyślij" menu item wyślij_state = tk.DISABLED if post_id else tk.NORMAL self.file_menu.entryconfig( _("Wyślij"), state=wyślij_state if has_current else tk.DISABLED ) def view_current_post(self): """Otwiera w przeglądarce URL posta.""" if self.current_index is None: return post_id = self.uploaded.get(self.image_files[self.current_index]) if post_id: url = self.settings.base_url.rstrip("/") + "/post/view/" + str(post_id) open_webbrowser(url, self.settings) def compute_md5(self, file_path, chunk_size=8192): """Oblicza MD5 dla danego pliku.""" hash_md5 = hashlib.md5() try: with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(chunk_size), b""): hash_md5.update(chunk) except Exception as e: print(_("Błąd przy obliczaniu MD5:"), e) return "" return hash_md5.hexdigest() def on_listbox_select(self, event): """ Wywoływane po wybraniu pliku z listy. Wyświetla obrazek, aktualizuje tagi PNG, uruchamia Taggera, ustawia przycisk uploadu. """ if not self.listbox.curselection(): return index = self.listbox.curselection()[0] self.current_index = index self.show_image(index) self.update_button_states() def upload_current_image(self): """ Uploaduje obrazek do serwera. Wysyła POST z obrazkiem, tagami i ratingiem. Po zakończeniu uploadu, ustawia przycisk na "Wyświetl". """ file_path = self.image_files[self.current_index] if self.uploaded.get(file_path, False): # Jeśli plik już uploadowany, ustaw przycisk na "Wyświetl" self.update_button_states() return self.upload_button.config(state=tk.DISABLED) self.processing_dialog = ProcessingDialog(self, self.upload_file, file_path) def show_image(self, index): """ Wyświetla obrazek o podanym indeksie z listy. Odczytuje tagi z pliku PNG, uruchamia Taggera, aktualizuje widgety z tagami. """ if index < 0 or index >= len(self.image_files): return file_path = self.image_files[index] try: img = Image.open(file_path) parameters = "" if isinstance(img, PngImagePlugin.PngImageFile): parameters = img.info.get("parameters", "") self.current_image_original = img.copy() self.current_parameters = parameters self.update_display_image() parsed_parameters = parse_parameters(parameters, self.tags_repo) # Uaktualnij widget PNG Tags self.update_png_tags_widget(parsed_parameters.split()) # Uruchom Taggera w osobnym wątku thread_name = f"tagger{self.tagger_thread_idx}" thread = threading.Thread(target=self.run_tagger, args=(thread_name,)) self.run_tagger_threads[thread_name] = thread self.tagger_thread_idx += 1 thread.start() except Exception as e: messagebox.showerror(_("Błąd"), f"{_('Nie można załadować obrazka:')}\n{e}") def edit_current_image(self): """ Modyfikuje obrazek na serwerze. Wysyła POST z md5 obrazka, tagami i ratingiem. """ file_path = self.image_files[self.current_index] if not self.uploaded.get(file_path, False): self.update_button_states() return self.upload_button.config(state=tk.DISABLED) self.processing_dialog = ProcessingDialog(self, self.edit_file, file_path) def update_display_image(self): """ Aktualizuje obrazek na podstawie aktualnego rozmiaru okna. Skaluje obrazek, jeśli jego rozmiar jest większy niż dostępna przestrzeń. """ if self.current_image_original is None: return self.image_label.update_idletasks() avail_width = self.image_label.winfo_width() avail_height = self.image_label.winfo_height() if avail_width <= 1 or avail_height <= 1: avail_width, avail_height = 400, 400 orig_width, orig_height = self.current_image_original.size scale = min(avail_width / orig_width, avail_height / orig_height, 1) new_width = int(orig_width * scale) new_height = int(orig_height * scale) try: resample_filter = Image.Resampling.LANCZOS except AttributeError: resample_filter = Image.LANCZOS if scale < 1: img = self.current_image_original.resize( (new_width, new_height), resample=resample_filter ) else: img = self.current_image_original self.image_cache = ImageTk.PhotoImage(img) self.image_label.config(image=self.image_cache) def update_listbox_item_color_by_rating(self, file_path, rating): """ Ustawia kolor tła dla pozycji w liście na podstawie ratingu, o ile plik nie został uploadowany. Kolor: lightgreen dla General, yellow dla Sensitive, darkorange dla Questionable, red dla Explicit. Jeśli plik uploadowany, nie zmieniamy (pozostaje lightgray). """ # Jeśli plik jest oznaczony jako uploadowany, nic nie robimy if self.uploaded.get(file_path, False): return try: index = self.image_files.index(file_path) except ValueError: return # Mapowanie ratingu na kolor if rating == "general": color = "lightgreen" elif rating == "sensitive": color = "yellow" elif rating == "questionable": color = "darkorange" elif rating == "explicit": color = "red" else: color = "white" self.after( 0, lambda idx=index, col=color: self.listbox.itemconfig(idx, {"bg": col}) ) def on_image_label_resize(self, event): """ Wywoływane przy zmianie rozmiaru okna. Aktualizuje obrazek w zależności od nowego rozmiaru okna. """ self.update_display_image() def show_prev(self): """ Przełącza na poprzedni obraz """ if self.current_index is None: return new_index = self.current_index - 1 if new_index < 0: new_index = len(self.image_files) - 1 self.current_index = new_index self.listbox.select_clear(0, tk.END) self.listbox.select_set(new_index) self.listbox.activate(new_index) self.show_image(new_index) self.update_button_states() def show_next(self): """ Przełącza na następny obraz """ if self.current_index is None: return new_index = self.current_index + 1 if new_index >= len(self.image_files): new_index = 0 self.current_index = new_index self.listbox.select_clear(0, tk.END) self.listbox.select_set(new_index) self.listbox.activate(new_index) self.show_image(new_index) self.update_button_states() # --- Metody obsługujące widgety z tagami --- def update_png_tags_widget(self, tags_list): """ Aktualizuje widget z tagami z PNG. Tworzy tagi jako klikalne, zaznaczone lub niezaznaczone. """ self.png_tags_text.config(state=tk.NORMAL) self.png_tags_text.delete("1.0", tk.END) self.png_tags_states = {} for tag in tags_list: self.png_tags_states[tag] = True start_index = self.png_tags_text.index(tk.INSERT) self.png_tags_text.insert(tk.INSERT, tag) end_index = self.png_tags_text.index(tk.INSERT) tag_name = "png_" + tag self.png_tags_text.tag_add(tag_name, start_index, end_index) self.png_tags_text.tag_configure(tag_name, foreground="blue") self.png_tags_text.tag_bind(tag_name, "", self.toggle_png_tag) self.png_tags_text.insert(tk.INSERT, " ") self.png_tags_text.config(state=tk.DISABLED) self.adjust_text_widget_height(self.png_tags_text) self.update_final_tags() def toggle_png_tag(self, event): """ Obsługuje kliknięcie na tag w PNG Tags. Zmienia stan tagu (zaznaczony/niezaznaczony) i aktualizuje listę finalnych tagów. """ index = self.png_tags_text.index("@%d,%d" % (event.x, event.y)) for t in self.png_tags_text.tag_names(index): if t.startswith("png_"): actual_tag = t[len("png_") :] self.png_tags_states[actual_tag] = not self.png_tags_states.get( actual_tag, True ) color = "blue" if self.png_tags_states[actual_tag] else "black" self.png_tags_text.tag_configure(t, foreground=color) break self.update_visible_tagger_tags() def get_visible_tags(self) -> list[tuple[str, bool, float]]: """ Zwraca listę tagów, które mają być widoczne w Tagger Tags. Tagi zaznaczone są zawsze widoczne. Tagi niezaznaczone są widoczne, jeśli nie są implikowane przez zaznaczone tagi. """ visible_tags = [] # character:amber_redmond cheat / WORKAROUND if "character:amber_redmond" in self.manual_tags_manager.manual_tags: for tag in [ "1girl", "tan", "black_hair", "very_short_hair", "short_hair", "black_choker", "heart_choker", "red_hair", "cutoffs", "blue_shorts", "short_shorts", "short_sleeves", "off_shoulder", "black_footwear", "feet", "toenail_polish", "toenails", "fingernails", "nail_polish", ]: if tag in self.tagger_tags_states: self.tagger_tags_states[tag] = ( True, self.tagger_tags_states[tag][1], ) for tag in [ "virtual_youtuber", "dark_skin", "dark-skinned_female", "black_collar", "collar", "yellow_eyes", ]: if tag in self.tagger_tags_states: self.tagger_tags_states.pop(tag) selected_tags = [ tag for tag, (selected, _) in self.tagger_tags_states.items() if selected ] selected_png_tags = [ tag for tag, selected in self.png_tags_states.items() if selected ] implied_by_selected = set() # Safely collect implications from selected tags for tag in selected_tags: if tag in self.implication_graph: implied_by_selected.update(nx.descendants(self.implication_graph, tag)) else: print( _("Warning: Tag '{tag}' not found in implication graph").format( tag=tag ) ) self.missing_tags.add(tag) # Log missing tags for tag in selected_png_tags: if tag in self.implication_graph: implied_by_selected.update(nx.descendants(self.implication_graph, tag)) else: print( _("Warning: Tag '{tag}' not found in implication graph").format( tag=tag ) ) self.missing_tags.add(tag) # Log missing tags # Build visible list for tag, (selected, confidence) in self.tagger_tags_states.items(): if selected: visible_tags.append((tag, True, confidence)) else: if tag not in implied_by_selected: visible_tags.append((tag, False, confidence)) return visible_tags def update_tagger_tags_widget(self, result: wdt.Result): """ Aktualizuje widget z tagami z Taggera. Zaznacza tagi, które są wspólne z tagami z PNG. Ukrywa tagi, które są implikowane przez zaznaczone tagi. """ # Opróżniamy widget i słownik stanów self.tagger_tags_states = {} tags_dict = {} # Przetwarzamy tagi postaci – dodajemy prefix "character:" i zaznaczamy je for tag, prob in result.character_tag_data.items(): full_tag = "character:" + tag # Zamień nieprawidłowe tagi na poprawne if full_tag in TAG_FIXES: full_tag = TAG_FIXES[full_tag] tags_dict[full_tag] = prob # Przetwarzamy tagi general – wszystkie ustawiamy jako zaznaczone for tag, prob in result.general_tag_data.items(): # Zamień nieprawidłowe tagi na poprawne if tag in TAG_FIXES: tag = TAG_FIXES[tag] tags_dict[tag] = prob for tag, prob in tags_dict.items(): self.tagger_tags_states[tag] = (True, prob) # Obliczamy przecięcie tagów z PNG i Taggera common = set(self.png_tags_states.keys()).intersection( set(self.tagger_tags_states.keys()) ) # Aktualizujemy stany w obu grupach – tylko wspólne tagi pozostają zaznaczone (True) for tag in self.png_tags_states: if tag.startswith("character:") or tag.startswith("copyright:"): continue # Pomijamy tagi postaci i praw autorskich, bo mamy do nich pewność self.png_tags_states[tag] = tag in common tag_name = "png_" + tag color = "blue" if self.png_tags_states[tag] else "black" self.png_tags_text.config(state=tk.NORMAL) self.png_tags_text.tag_configure(tag_name, foreground=color) self.png_tags_text.config(state=tk.DISABLED) for tag in self.tagger_tags_states: if tag.startswith("character:"): continue # Pomijamy tagi postaci, bo mamy do nich pewność self.tagger_tags_states[tag] = ( tag in common, self.tagger_tags_states[tag][1], ) self.update_visible_tagger_tags() def update_visible_tagger_tags(self): """ Aktualizuje widget z tagami z Taggera. Wyświetla tagi z Taggera w formacie: "tag (confidence %)". Koloruje tagi na podstawie stanu (zaznaczony/niezaznaczony) i prawdopodobieństwa. Podczas kliknięcia na tag, zmienia stan (zaznaczony/niezaznaczony). Poprawia wysokość widgetu. Aktualizuje listę finalnych tagów. """ self.tagger_tags_text.config(state=tk.NORMAL) self.tagger_tags_text.delete("1.0", tk.END) visible_tags = self.get_visible_tags() for tag, selected, prob in visible_tags: display_text = f"{tag} ({float(prob)*100:.1f}%)" # np. "Rem (99.9%)" start_index = self.tagger_tags_text.index(tk.INSERT) self.tagger_tags_text.insert(tk.INSERT, display_text) end_index = self.tagger_tags_text.index(tk.INSERT) tag_name = "tagger_" + tag color = self.get_tagger_tag_color((selected, prob)) self.tagger_tags_text.tag_add(tag_name, start_index, end_index) self.tagger_tags_text.tag_configure(tag_name, foreground=color) self.tagger_tags_text.tag_bind( tag_name, "", self.toggle_tagger_tag ) self.tagger_tags_text.insert(tk.INSERT, " ") self.tagger_tags_text.config(state=tk.DISABLED) self.adjust_text_widget_height(self.tagger_tags_text) self.update_final_tags() def get_tagger_tag_color(self, tagger_tag_state: Tuple[bool, float]) -> str: """ Zwraca kolor dla tagu z Taggera na podstawie stanu (selected) i prawdopodobieństwa. Kolor: - niebieski, jeśli tag jest zaznaczony, - czerwony, jeśli tag nie jest zaznaczony i prawdopodobieństwo < 0.3. - żółty, jeśli tag nie jest zaznaczony i prawdopodobieństwo >= 0.3 i < 0.6. - zielony, jeśli tag nie jest zaznaczony i prawdopodobieństwo >= 0.6. """ selected, prob = tagger_tag_state if selected: return "blue" if prob < 0.3: return "red" if prob < 0.6: return "darkorange" return "green" def toggle_tagger_tag(self, event): """ Obsługuje kliknięcie na tag z Taggera. Zmienia stan tagu (zaznaczony/niezaznaczony) i aktualizuje kolor. """ index = self.tagger_tags_text.index("@%d,%d" % (event.x, event.y)) for t in self.tagger_tags_text.tag_names(index): if t.startswith("tagger_"): actual_tag = t[len("tagger_") :] tag_state, tag_prob = self.tagger_tags_states.get(actual_tag, (True, 0)) self.tagger_tags_states[actual_tag] = (not tag_state, tag_prob) color = self.get_tagger_tag_color(self.tagger_tags_states[actual_tag]) self.tagger_tags_text.tag_configure(t, foreground=color) break self.update_visible_tagger_tags() def update_final_tags(self): """Buduje finalną listę tagów, uwzględniając default tags, PNG, Tagger i manualne. Dla każdego tagu pobiera informacje z bazy i ustawia styl (kolor i podkreślenie): - Deprecated: czerwony, podkreślony, - Tag nie istnieje: żółty, podkreślony, - Normalny: np. niebieski, bez podkreślenia. """ final_set = set() if self.settings.default_tags: final_set.update(self.settings.default_tags.split()) for tag, selected in self.png_tags_states.items(): if selected: final_set.add(tag) for tag, selected in self.tagger_tags_states.items(): if selected[0]: final_set.add(tag) manual = self.manual_tags_manager.manual_tags if manual: final_set.update(manual) final_list = sorted(final_set) self.final_tags_text.config(state=tk.NORMAL) self.final_tags_text.delete("1.0", tk.END) for tag in final_list: _, deprecated = process_tag(tag, self.tags_repo) # Ustal kolor i podkreślenie na podstawie wyniku if deprecated is True: color = "red" underline = 1 elif deprecated is None: color = "darkorange" underline = 1 else: color = "blue" underline = 0 start_index = self.final_tags_text.index(tk.INSERT) self.final_tags_text.insert(tk.INSERT, tag) end_index = self.final_tags_text.index(tk.INSERT) tag_name = "final_" + tag self.final_tags_text.tag_add(tag_name, start_index, end_index) self.final_tags_text.tag_configure( tag_name, foreground=color, underline=underline ) self.final_tags_text.tag_bind( tag_name, "", self.open_final_tag_wiki_url ) self.final_tags_text.insert(tk.INSERT, " ") self.final_tags_text.config(state=tk.DISABLED) def open_final_tag_wiki_url(self, event): """Otwiera w przeglądarce URL strony wiki dla klikniętego tagu. Usuwa znane prefiksy, aby utworzyć poprawny URL. """ index = self.final_tags_text.index("@%d,%d" % (event.x, event.y)) for t in self.final_tags_text.tag_names(index): if t.startswith("final_"): actual_tag = t[len("final_") :] open_tag_wiki_url(actual_tag, self.settings) break # --- Metody do cache'owania wyników Taggera --- def process_tagger_for_image(self, file_path): """Przetwarza obrazek przy użyciu Taggera i zapisuje wynik do cache.""" result = self.get_tagger_results(file_path) self.update_listbox_item_color_by_rating(file_path, result.rating) def process_tagger_queue(self): """Przetwarza wszystkie obrazki w tle (pomijając aktualnie wybrany).""" for file_path in self.image_files: if self.process_tagger_queue_stop_event.is_set(): break # Jeśli obrazek jest aktualnie wybrany, pomijamy – on będzie przetwarzany w foreground if ( self.current_index is not None and file_path == self.image_files[self.current_index] ): continue self.process_tagger_for_image(file_path) self.after(100, self.join_process_tagger_queue_thread) def run_tagger(self, thread_name): """ Przetwarza aktualnie wybrany obrazek. Jeśli wynik jest już w cache, wykorzystuje go; w przeciwnym razie uruchamia Taggera i zapisuje wynik do cache. """ if self.current_index is None: return # Ustaw komunikat, że Tagger pracuje self.tagger_tags_text.config(state=tk.NORMAL) self.tagger_tags_text.delete("1.0", tk.END) self.tagger_tags_text.insert("1.0", _("Tagger przetwarza...")) self.tagger_tags_text.config(state=tk.DISABLED) file_path = self.image_files[self.current_index] result = self.get_tagger_results(file_path) new_rating = self.map_tagger_rating(result) self.rating_var.set(new_rating) self.after(0, lambda: self.update_tagger_tags_widget(result)) self.update_listbox_item_color_by_rating(file_path, result.rating) self.after(100, lambda: self.run_tagger_threads[thread_name].join()) def upload_file( self, file_path, final_tags=None, final_rating=None, progress_queue: Optional[queue.Queue] = None, cancel_event: Optional[threading.Event] = None, ): base_file_name = os.path.basename(file_path) if progress_queue: progress_queue.put(("mode", "determinate")) progress_queue.put(("max", 100)) progress_queue.put( ( "label", _("Wysyłam plik {base_file_name}...").format( base_file_name=base_file_name ), ) ) url = self.settings.base_url.rstrip("/") + "/api/danbooru/add_post" tags = ( self.final_tags_text.get("1.0", tk.END).strip() if final_tags is None else final_tags ) fields = { "login": self.settings.username, "password": self.settings.password, "tags": tags, "source": "", } rating_value = self.rating_map.get( self.rating_var.get() if final_rating is None else final_rating, "" ) if rating_value: fields["rating"] = rating_value try: total_size = os.path.getsize(file_path) def progress_callback(bytes_read, total_size): if progress_queue: percentage = int(bytes_read / total_size * 100) progress_queue.put(("progress", percentage)) with open(file_path, "rb") as f: wrapped_file = ProgressFile( f, progress_callback, total_size, cancel_event ) files = {"file": (base_file_name, wrapped_file, "image/png")} response = requests.post(url, data=fields, files=files) if progress_queue: progress_queue.put(("progress", 100)) show_warn = False post_url = None if response.status_code in (200, 201): message = _("Wysyłanie zakończone powodzeniem!") post_url = response.headers.get("X-Danbooru-Location", None) elif response.status_code == 409: message = _( "Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}" ).format( status_code=response.status_code, text=response.headers.get("X-Danbooru-Errors", ""), ) post_url = response.headers.get("X-Danbooru-Location", None) show_warn = True else: message = _( "Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}" ).format(status_code=response.status_code, text=response.text) show_warn = True # Aktualizacja wyglądu listy – musimy użyć domyślnych argumentów w lambdzie, aby zachować bieżący indeks if show_warn: if not final_tags: messagebox.showwarning(_("Wysyłanie"), message) else: if not final_tags: messagebox.showinfo(_("Wysyłanie"), message) self.after( 0, lambda idx=self.image_files.index( file_path ): self.listbox.itemconfig(idx, {"bg": "lightgray"}), ) if post_url: post_id = post_url.split("/")[-1] self.uploaded[file_path] = post_id self.uploaded_count += 1 self.after(0, self.update_status_bar) except Exception as e: messagebox.showerror(_("Błąd wysyłania"), str(e)) finally: self.upload_button.after(0, self.update_button_states) def edit_file( self, file_path, final_tags=None, final_rating=None, progress_queue=None, cancel_event=None, ): """ Update tags and rating for an existing post without uploading the file. """ base_file_name = os.path.basename(file_path) post_id = self.uploaded.get(file_path) if not post_id: messagebox.showerror( _("Błąd edycji"), _("Post nie został znaleziony dla tego pliku") ) return if progress_queue: progress_queue.put(("mode", "determinate")) progress_queue.put(("max", 100)) progress_queue.put( ( "label", _("Aktualizuję tagi dla {base_file_name}...").format( base_file_name=base_file_name ), ) ) try: # Check for cancellation before starting the operation. if cancel_event is not None and cancel_event.is_set(): if progress_queue: progress_queue.put(("label", _("Operacja anulowana"))) return # Get authentication session and token session = login(self.settings) auth_token = get_auth_token(session, self.settings) # Check cancellation after login if needed. if cancel_event is not None and cancel_event.is_set(): if progress_queue: progress_queue.put(("label", _("Operacja anulowana"))) return # Prepare tags and rating tags = ( self.final_tags_text.get("1.0", tk.END).strip() if final_tags is None else final_tags ) rating_value = self.rating_map.get( self.rating_var.get() if final_rating is None else final_rating, "?" ) # Prepare API request url = self.settings.base_url.rstrip("/") + "/post/set" payload = { "auth_token": auth_token, "image_id": post_id, "title": base_file_name, "owner": self.settings.username, "tags": tags, "source": "", "rating": rating_value, } if progress_queue: progress_queue.put(("progress", 50)) # Check for cancellation before sending the update request. if cancel_event is not None and cancel_event.is_set(): if progress_queue: progress_queue.put(("label", _("Operacja anulowana"))) return # Send update request response = session.post(url, data=payload, allow_redirects=False) # Handle 302 redirect as success case if response.status_code == 302: if progress_queue: progress_queue.put(("progress", 100)) message = _("Tagi zostały zaktualizowane!") if not final_tags: # Only show success if not bulk operation messagebox.showinfo(_("Sukces edycji"), message) # Update UI state self.after(0, self.update_button_states) return # Handle other status codes error_msg = _("Błąd podczas aktualizacji tagów\nStatus: {code}").format( code=response.status_code ) if response.text: error_msg += f"\n{_('Treść:')} {response.text}" messagebox.showerror("Błąd edycji", error_msg) except Exception as e: messagebox.showerror(_("Krytyczny błąd edycji"), str(e)) finally: if progress_queue: progress_queue.put(("progress", 100)) def upload_all_files(self): """ Metoda, która po potwierdzeniu przez użytkownika uploaduje wszystkie niewrzucone pliki. Dla każdego pliku oblicza finalne tagi (przy użyciu compute_final_tags_for_file) i wywołuje upload_file. """ if not messagebox.askyesno( _("Potwierdzenie"), _( "Czy na pewno chcesz wrzucić wszystkie niewrzucone pliki?\nKażdy z nich zostanie oznaczony tagiem 'meta:auto_upload'.\nUpewnij się, że tagi są poprawne!" ), ): return def worker( progress_queue: queue.Queue = None, cancel_event: threading.Event = None, secondary_progress_queue: queue.Queue = None, ): files_to_upload = [x for x in self.image_files if not self.uploaded[x]] files_count = len(files_to_upload) if progress_queue: progress_queue.put(("mode", "determinate")) progress_queue.put(("max", 100)) file_idx = 0 for file_path in files_to_upload: if progress_queue: progress_queue.put(("progress", file_idx * 100 / files_count)) progress_queue.put( ("label", f"Wysyłam plik {file_idx+1}/{files_count}...") ) if cancel_event is not None and cancel_event.is_set(): if progress_queue: progress_queue.put(("label", _("Anulowano operację!"))) return if not self.uploaded.get(file_path, False): final_tags, final_rating = ( self.compute_final_tags_and_rating_for_file(file_path) ) print( _( "Wysyłanie {file_path} z tagami: {final_tags} i ratingiem: {final_rating}" ).format( file_path=file_path, final_tags=final_tags, final_rating=final_rating, ) ) self.upload_file( file_path, final_tags=final_tags, final_rating=final_rating, progress_queue=secondary_progress_queue, cancel_event=cancel_event, ) file_idx += 1 if progress_queue: progress_queue.put(("label", _("Przesłano pliki!"))) progress_queue.put(("progress", 100)) self.processing_dialog = ProcessingDialog(self, worker)