Kapitan dc3efc5d3c
All checks were successful
Gitea/kapitanbooru-uploader/pipeline/head This commit looks good
Fix uploaded count
2025-03-08 10:06:33 +01:00

1666 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import glob
import hashlib
import inspect
import os
import queue
import threading
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from typing import Dict, Tuple, Optional
import networkx as nx
import requests
from PIL import Image, ImageTk, PngImagePlugin
import wdtagger as wdt
from .I18N import _
from .ProgressFile import ProgressFile
from .TagsRepo import TagsRepo
from .autocomplete import TagManager
from .common import get_auth_token, login, open_tag_wiki_url, open_webbrowser
from .settings import Settings
from .tag_processing import TAG_FIXES, parse_parameters, process_tag
from .tagger_cache import TaggerCache
class ProcessingDialog:
def __init__(self, root, target_function, *args):
self.root = root
self.top = tk.Toplevel(root)
self.top.title(_("Processing..."))
self.top.geometry("300x150")
self.top.protocol("WM_DELETE_WINDOW", self.on_close)
self.label = tk.Label(self.top, text=_("Processing, please wait..."))
self.label.pack(pady=10)
# Start with indeterminate progress bar
self.progress = ttk.Progressbar(self.top, mode="indeterminate")
self.progress.pack(pady=10, fill="x")
self.progress.start(10)
sig = inspect.signature(target_function)
if "secondary_progress_queue" in sig.parameters:
self.sub_progress = ttk.Progressbar(self.top, mode="indeterminate")
self.sub_progress.pack(pady=10, fill="x")
self.sub_progress.start(10)
# Setup communication queue and periodic checker
self.queue = queue.Queue()
self.sub_queue = queue.Queue()
self.running = True
self.cancel_event = threading.Event() # Cancellation flag
self.thread = threading.Thread(
target=self.run_task, args=(target_function, *args)
)
self.thread.start()
self.top.after(100, self.process_queue)
def process_queue(self):
"""Process messages from the background thread"""
while self.running:
try:
msg = self.queue.get_nowait()
if msg[0] == "mode":
self.progress.config(mode=msg[1])
if msg[1] == "determinate":
self.progress["value"] = 0
self.progress.stop()
elif msg[1] == "indeterminate":
self.progress["value"] = 0
self.progress.start()
elif msg[0] == "max":
self.progress["maximum"] = msg[1]
elif msg[0] == "progress":
self.progress["value"] = msg[1]
elif msg[0] == "label":
self.label.config(text=msg[1])
self.top.update_idletasks()
except queue.Empty:
break
try:
msg = self.sub_queue.get_nowait()
if msg[0] == "mode":
self.sub_progress.config(mode=msg[1])
if msg[1] == "determinate":
self.sub_progress["value"] = 0
self.sub_progress.stop()
elif msg[1] == "indeterminate":
self.sub_progress["value"] = 0
self.sub_progress.start()
elif msg[0] == "max":
self.sub_progress["maximum"] = msg[1]
elif msg[0] == "progress":
self.sub_progress["value"] = msg[1]
self.top.update_idletasks()
except queue.Empty:
break
if self.running:
self.top.after(100, self.process_queue)
def run_task(self, target_function, *args):
"""Execute target function with progress queue if supported"""
try:
sig = inspect.signature(target_function)
kwargs = {}
if "progress_queue" in sig.parameters:
kwargs["progress_queue"] = self.queue
if "secondary_progress_queue" in sig.parameters:
kwargs["secondary_progress_queue"] = self.sub_queue
if "cancel_event" in sig.parameters:
kwargs["cancel_event"] = self.cancel_event
target_function(*args, **kwargs)
finally:
self.close_dialog()
def close_dialog(self):
"""Safely close the dialog"""
if self.running:
self.running = False
self.progress.stop()
self.top.after(0, self.top.destroy)
self.root.after(100, self.thread.join)
def on_close(self):
"""Handle manual window closure"""
self.running = False
self.cancel_event.set() # Notify target function that cancellation is requested
self.top.destroy()
class ImageBrowser(tk.Tk):
def __init__(self):
super().__init__()
self.title("Kapitanbooru Uploader")
self.geometry("900x600")
self.version = "0.4.2"
self.settings = Settings()
self.tags_repo = TagsRepo(self.settings)
self.implication_graph = self.load_implication_graph()
self.missing_tags = set() # Track tags not in the graph
# Dodatkowe ustawienia dla Taggera
self.tagger_name = "wdtagger"
self.tagger_version = (
"1.0" # możesz ustawić wersję dynamicznie, jeśli to możliwe
)
self.tagger_cache = TaggerCache(
self.settings, self.tagger_name, self.tagger_version
)
self.folder_path = ""
self.image_files = []
self.image_files_md5 = []
self.current_index = None
self.image_cache = None
self.tagger_thread_idx = 0
self.tagger = wdt.Tagger()
self.check_uploaded_files_stop_event = threading.Event()
self.check_uploaded_files_thread: Optional[threading.Thread] = None
self.process_tagger_queue_stop_event = threading.Event()
self.process_tagger_queue_thread: Optional[threading.Thread] = None
self.run_tagger_threads: Dict[str, threading.Thread] = {}
# Liczniki statusu
self.total_files = 0
self.tagger_processed = set()
self.upload_verified = 0
self.uploaded_count = 0
# Oryginalny obraz (do skalowania)
self.current_image_original = None
self.current_parameters = ""
# Mapa ratingów: wyświetlana nazwa -> wartość wysyłana
self.rating_map = {
"General": "g",
"Sensitive": "s",
"Questionable": "q",
"Explicit": "e",
"Unrated": "",
}
# Słowniki przechowujące stany tagów (dla PNG i Taggera)
self.png_tags_states = {}
self.tagger_tags_states = {}
# Ścieżki do ustawień i cache
# Ładujemy ustawienia
# Nowy słownik przechowujący informację, czy dany plik (ścieżka) został już uploadowany
self.uploaded = {} # key: file path, value: True/False
self.create_menu()
self.create_widgets()
self.bind_events()
def reload_ui(self):
"""Reload UI components with new language"""
# Destroy current widgets
for widget in self.winfo_children():
widget.destroy()
# Rebuild UI
self.create_menu()
self.create_widgets()
def load_implication_graph(self) -> nx.DiGraph:
G = nx.DiGraph()
conn = self.tags_repo.get_conn()
cursor = conn.cursor()
# Step 1: Add all tags from the 'tags' table
cursor.execute(
"""
SELECT
CASE category
WHEN 1 THEN 'artist:' || name
WHEN 3 THEN 'copyright:' || name
WHEN 4 THEN 'character:' || name
WHEN 5 THEN 'meta:' || name
ELSE name
END AS prefixed_name
FROM tags
"""
)
db_tags = {row[0] for row in cursor.fetchall()}
G.add_nodes_from(db_tags)
# Step 2: Add nodes from implications (antecedents/consequents not in 'tags' table)
cursor.execute("SELECT antecedent, consequent FROM tag_closure")
edge_tags = set()
for ant, cons in cursor.fetchall():
edge_tags.add(ant)
edge_tags.add(cons)
G.add_nodes_from(edge_tags - db_tags) # Add tags only in implications
# Step 3: Add edges
cursor.execute("SELECT antecedent, consequent FROM tag_closure")
G.add_edges_from(cursor.fetchall())
conn.close()
return G
def adjust_text_widget_height(self, widget):
"""
Ustawia wysokość widgetu Text na liczbę linii w jego treści,
ale nie więcej niż max_lines.
"""
content = widget.get("1.0", "end-1c")
num_lines = 0
cur_line_len = 0
text = content.split()
for word in text:
if cur_line_len + len(word) > 34:
num_lines += 1
cur_line_len = 0
cur_line_len += len(word) + 1
max_lines = (
self.png_tags_text.winfo_height()
+ self.tagger_tags_text.winfo_height()
+ self.final_tags_text.winfo_height()
- widget.winfo_height()
- 8
)
widget.config(height=min(num_lines, max_lines) if num_lines > 4 else 4)
def compute_final_tags_and_rating_for_file(self, file_path):
"""
Oblicza finalną listę tagów dla danego pliku oraz rating.
Łączy tagi z:
- pliku (PNG): parsowane przez parse_parameters,
- Taggera (wynik z cache lub wyliczony na bieżąco),
- ustawień (default tags),
- manualnych tagów (z pola manual_tags_entry),
oraz dodaje tag "meta:auto_upload".
Zwraca finalny ciąg tagów oraz rating.
"""
# Pobierz tagi z pliku
try:
img = Image.open(file_path)
parameters = ""
if isinstance(img, PngImagePlugin.PngImageFile):
parameters = img.info.get("parameters", "")
png_tags = set(parse_parameters(parameters, self.tags_repo).split())
img.close()
except Exception as e:
print(_("Błąd przy otwieraniu pliku"), file_path, ":", e)
png_tags = set()
# Pobierz tagi z Taggera sprawdzając cache
result = self.get_tagger_results(file_path)
tagger_tags = set()
rating = "Unrated"
tagger_tags.update(
(
TAG_FIXES[tag] if tag in TAG_FIXES else tag
for tag in result.general_tag_data.keys()
)
) # Zamień nieprawidłowe tagi na poprawne
for t in result.character_tags:
full_tag = "character:" + t.replace(" ", "_").replace("\\", "")
# Zamień nieprawidłowe tagi na poprawne
if full_tag in TAG_FIXES:
full_tag = TAG_FIXES[full_tag]
tagger_tags.add(full_tag)
rating = self.map_tagger_rating(result)
# Pobierz tagi z ustawień i manualne
default_tags = set(self.settings.default_tags.split())
manual_tags = set(self.manual_tags_manager.manual_tags)
# Finalna lista: suma wszystkich tagów
final_tags = default_tags.union(png_tags).union(tagger_tags).union(manual_tags)
final_tags.add("meta:auto_upload")
return " ".join(sorted(final_tags)), rating
def get_tagger_results(self, file_path) -> wdt.Result:
md5 = self.image_files_md5[file_path]
cached = self.tagger_cache[md5]
if cached:
self.tagger_processed.add(md5)
return cached["result"]
try:
with Image.open(file_path) as img:
result = self.tagger.tag(img)
self.tagger_cache[md5] = result
self.tagger_processed.add(md5)
self.after(0, self.update_status_bar)
print(_("Tagger przetworzył:"), f"{file_path}")
return result
except Exception as e:
print(_("Błąd Taggera dla"), file_path, ":", e)
def map_tagger_rating(self, result: wdt.Result) -> str:
"""
Mapuje rating z Taggera na wartość używaną w Kapitanbooru.
"""
if result.rating == "general":
new_rating = "General"
elif result.rating == "sensitive":
new_rating = "Sensitive"
elif result.rating == "questionable":
new_rating = "Questionable"
elif result.rating == "explicit":
new_rating = "Explicit"
else:
new_rating = "Unrated"
return new_rating
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Create file menu and store it as instance variable
self.file_menu = tk.Menu(menubar, tearoff=0)
# File menu items - create references for items we need to control
self.file_menu.add_command(label=_("Otwórz folder"), command=self.select_folder)
self.file_menu.add_separator()
self.file_menu.add_command(
label=_("Wyślij"), command=self.upload_current_image, state=tk.DISABLED
)
self.file_menu.add_command(
label=_("Wyślij wszystko"), command=self.upload_all_files, state=tk.DISABLED
)
self.file_menu.add_separator()
self.file_menu.add_command(
label=_("Podmień tagi"), command=self.edit_current_image, state=tk.DISABLED
)
self.file_menu.add_command(
label=_("Otwórz post"), command=self.view_current_post, state=tk.DISABLED
)
self.file_menu.add_separator()
self.file_menu.add_command(label=_("Zakończ"), command=self.quit)
menubar.add_cascade(label=_("Plik"), menu=self.file_menu)
# Options menu
options_menu = tk.Menu(menubar, tearoff=0)
options_menu.add_command(label=_("Ustawienia"), command=self.open_settings)
options_menu.add_separator()
options_menu.add_command(
label=_("Wyczyść cache Taggera"), command=self.clear_cache
)
options_menu.add_command(
label=_("Zregeneruj bazę tagów"), command=self.regenerate_tags_db
)
menubar.add_cascade(label=_("Opcje"), menu=options_menu)
help_menu = tk.Menu(menubar, tearoff=0)
help_menu.add_command(label=_("About"), command=self.show_about)
menubar.add_cascade(label=_("Help"), menu=help_menu)
def show_about(self):
"""Multilingual About window (updated version)"""
about = tk.Toplevel(self)
about.title(_("About Kapitanbooru Uploader"))
about.resizable(False, False)
# Window content
frame = ttk.Frame(about, padding=10)
frame.pack(fill="both", expand=True)
ttk.Label(
frame, text="Kapitanbooru Uploader", font=("TkDefaultFont", 16, "bold")
).grid(row=0, column=0, sticky=tk.W)
content = [
(f"Version {self.version}", 1),
("", 2),
(_("A GUI application for uploading images to KapitanBooru."), 3),
(_("Features include image upload, tag management, automatic"), 4),
(_("tagging with wdtagger, and cache management."), 5),
("", 6),
(_("Authors:"), 7),
("Michał Leśniak", 8),
("", 9),
(_("License: MIT License"), 10),
(f"Copyright © 2025 Michał Leśniak", 11),
("", 12),
]
for text, row in content:
ttk.Label(frame, text=text).grid(row=row, column=0, sticky=tk.W)
# Repository link
repo_frame = ttk.Frame(frame)
repo_frame.grid(row=13, sticky=tk.W)
ttk.Label(repo_frame, text=_("Repository:")).pack(side=tk.LEFT)
repo_url = "https://git.mlesniak.pl/kapitan/kapitanbooru-uploader"
repo = ttk.Label(repo_frame, text=repo_url, cursor="hand2", foreground="blue")
repo.pack(side=tk.LEFT, padx=(5, 0))
repo.bind("<Button-1>", lambda e: open_webbrowser(repo_url, self.settings))
# Website link
website_frame = ttk.Frame(frame)
website_frame.grid(row=14, sticky=tk.W)
ttk.Label(website_frame, text=_("Website:")).pack(side=tk.LEFT)
website_url = "https://booru.mlesniak.pl"
website = ttk.Label(
website_frame, text=website_url, cursor="hand2", foreground="blue"
)
website.pack(side=tk.LEFT, padx=(5, 0))
website.bind(
"<Button-1>", lambda e: open_webbrowser(website_url, self.settings)
)
# Close button
ttk.Button(about, text=_("Close"), command=about.destroy).pack(pady=10)
def regenerate_tags_db(self):
self.processing_dialog = ProcessingDialog(self, self.tags_repo.regenerate_db)
def clear_cache(self):
res, err = self.tagger_cache.clear_cache()
if res:
messagebox.showinfo(_("Cache"), _("Cache Taggera zostało wyczyszczone."))
else:
messagebox.showerror(
_("Cache"), f"{_('Błąd przy czyszczeniu cache:')} {err}"
)
def open_settings(self):
settings_window = tk.Toplevel(self)
settings_window.title(_("Ustawienia"))
settings_window.geometry("300x430") # Enlarged vertically
settings_window.resizable(False, False) # Disable resizing
settings_window.grab_set()
lbl_login = tk.Label(settings_window, text=_("Login:"))
lbl_login.pack(pady=(10, 0))
entry_login = tk.Entry(settings_window)
entry_login.pack(pady=(0, 10), padx=10, fill="x")
entry_login.insert(0, self.settings.username)
lbl_password = tk.Label(settings_window, text=_("Hasło:"))
lbl_password.pack(pady=(10, 0))
entry_password = tk.Entry(settings_window, show="*")
entry_password.pack(pady=(0, 10), padx=10, fill="x")
entry_password.insert(0, self.settings.password)
lbl_base_url = tk.Label(settings_window, text=_("Base URL:"))
lbl_base_url.pack(pady=(10, 0))
entry_base_url = tk.Entry(settings_window)
entry_base_url.pack(pady=(0, 10), padx=10, fill="x")
entry_base_url.insert(0, self.settings.base_url)
lbl_default_tags = tk.Label(settings_window, text=_("Default Tags:"))
lbl_default_tags.pack(pady=(10, 0))
entry_default_tags = tk.Entry(settings_window)
entry_default_tags.pack(pady=(0, 10), padx=10, fill="x")
entry_default_tags.insert(0, self.settings.default_tags)
lbl_browser = tk.Label(settings_window, text=_("Browser:"))
lbl_browser.pack(pady=(10, 0))
cb_browser = ttk.Combobox(
settings_window,
values=list(self.settings.installed_browsers.keys()),
state="readonly",
)
cb_browser.pack(pady=(0, 10), padx=10, fill="x")
cb_browser.set(
self.settings.installed_browsers_reverse.get(
self.settings.browser, "Default"
)
)
lbl_lang = tk.Label(settings_window, text=_("Language:"))
lbl_lang.pack(pady=(10, 0))
cb_lang = ttk.Combobox(
settings_window,
values=list(self.settings.i18n.languages.values()),
state="readonly",
)
cb_lang.pack(pady=(0, 10), padx=10, fill="x")
cb_lang.set(
self.settings.i18n.languages.get(self.settings.i18n.current_lang, "en")
)
def save_and_close():
self.settings.username = entry_login.get()
self.settings.password = entry_password.get()
self.settings.base_url = entry_base_url.get()
self.settings.default_tags = entry_default_tags.get()
self.settings.browser = self.settings.installed_browsers[cb_browser.get()]
self.settings.i18n.set_language(
next(
(
lang
for lang, name in self.settings.i18n.languages.items()
if name == cb_lang.get()
),
"en",
)
)
self.settings.save_settings()
settings_window.destroy()
self.reload_ui()
btn_save = tk.Button(settings_window, text=_("Zapisz"), command=save_and_close)
btn_save.pack(pady=10)
def create_widgets(self):
# Główna ramka trzy kolumny
main_frame = tk.Frame(self)
main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Lewa kolumna lista plików
left_frame = tk.Frame(main_frame)
left_frame.grid(row=0, column=0, sticky=tk.NS)
self.listbox = tk.Listbox(left_frame, width=30)
self.listbox.pack(side=tk.LEFT, fill=tk.Y)
self.listbox.bind("<<ListboxSelect>>", self.on_listbox_select)
scrollbar = tk.Scrollbar(
left_frame, orient="vertical", command=self.listbox.yview
)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.listbox.config(yscrollcommand=scrollbar.set)
# Środkowa kolumna podgląd obrazu
center_frame = tk.Frame(main_frame)
center_frame.grid(row=0, column=1, sticky=tk.NSEW, padx=10)
main_frame.grid_columnconfigure(1, weight=1)
main_frame.grid_rowconfigure(0, weight=1)
main_frame.grid_rowconfigure(1, weight=0)
self.image_label = tk.Label(center_frame)
self.image_label.pack(fill=tk.BOTH, expand=True)
self.image_label.bind("<Configure>", self.on_image_label_resize)
# Prawa kolumna panel tagów i uploadu (ograniczona szerokość)
right_frame = tk.Frame(main_frame, width=300)
right_frame.grid(row=0, column=2, sticky=tk.NSEW, padx=5)
right_frame.grid_propagate(False)
right_frame.grid_columnconfigure(0, weight=1)
# Ustal wiersze:
right_frame.grid_rowconfigure(0, weight=0) # PNG Tags naturalny rozmiar
right_frame.grid_rowconfigure(1, weight=0) # Tagger Tags naturalny rozmiar
right_frame.grid_rowconfigure(2, weight=0) # Manual Tags (jednolinijkowe)
right_frame.grid_rowconfigure(3, weight=1) # Final Tags rozszerza się
right_frame.grid_rowconfigure(4, weight=0) # Upload Panel
# PNG Tags widget Text z scrollbar
png_frame = tk.LabelFrame(right_frame, text=_("PNG Tags"))
png_frame.grid(row=0, column=0, sticky=tk.EW, padx=5, pady=5)
png_frame.grid_columnconfigure(0, weight=1)
self.png_tags_text = tk.Text(png_frame, wrap=tk.WORD)
self.png_tags_text.grid(row=0, column=0, sticky=tk.EW)
scrollbar_png = tk.Scrollbar(png_frame, command=self.png_tags_text.yview)
scrollbar_png.grid(row=0, column=1, sticky=tk.NS)
self.png_tags_text.config(
yscrollcommand=scrollbar_png.set, state=tk.DISABLED, height=4
)
# Tagger Tags widget Text z scrollbar
tagger_frame = tk.LabelFrame(right_frame, text=_("Tagger Tags"))
tagger_frame.grid(row=1, column=0, sticky=tk.EW, padx=5, pady=5)
tagger_frame.grid_columnconfigure(0, weight=1)
self.tagger_tags_text = tk.Text(tagger_frame, wrap=tk.WORD)
self.tagger_tags_text.grid(row=0, column=0, sticky=tk.EW)
scrollbar_tagger = tk.Scrollbar(
tagger_frame, command=self.tagger_tags_text.yview
)
scrollbar_tagger.grid(row=0, column=1, sticky=tk.NS)
self.tagger_tags_text.config(
yscrollcommand=scrollbar_tagger.set, state=tk.DISABLED, height=4
)
# Manual Tags Entry (stała wysokość)
manual_frame = tk.LabelFrame(right_frame, text=_("Manual Tags"))
manual_frame.grid(row=2, column=0, sticky=tk.NSEW, padx=5, pady=5)
self.manual_tags_manager = TagManager(
manual_frame, self.settings, self.tags_repo, self.update_final_tags
)
self.manual_tags_manager.pack(fill=tk.BOTH, expand=True)
# Final Tags widget Text z scrollbar, który rozszerza się
final_frame = tk.LabelFrame(right_frame, text=_("Final Tags"))
final_frame.grid(row=3, column=0, sticky=tk.NSEW, padx=5, pady=5)
final_frame.grid_rowconfigure(0, weight=1)
final_frame.grid_columnconfigure(0, weight=1)
self.final_tags_text = tk.Text(final_frame, state=tk.DISABLED, wrap=tk.WORD)
self.final_tags_text.grid(row=0, column=0, sticky=tk.NSEW)
scrollbar_final = tk.Scrollbar(final_frame, command=self.final_tags_text.yview)
scrollbar_final.grid(row=0, column=1, sticky=tk.NS)
self.final_tags_text.config(yscrollcommand=scrollbar_final.set)
# Panel uploadu i rating nie zmienia rozmiaru pionowo
upload_frame = tk.Frame(right_frame)
upload_frame.grid(row=4, column=0, sticky=tk.EW, padx=5, pady=5)
self.rating_var = tk.StringVar(value="Unrated")
rating_options = ["General", "Sensitive", "Questionable", "Explicit", "Unrated"]
self.rating_dropdown = tk.OptionMenu(
upload_frame, self.rating_var, *rating_options
)
self.rating_dropdown.pack(side=tk.LEFT, padx=5)
self.upload_button = tk.Button(
upload_frame, text=_("Wyślij"), command=self.upload_current_image
)
self.upload_button.pack(side=tk.LEFT, padx=5)
self.upload_button.config(state=tk.DISABLED)
self.view_post_button = tk.Button(
upload_frame, text=_("Wyświetl"), command=self.view_current_post
)
self.view_post_button.pack(side=tk.LEFT, padx=5)
self.view_post_button.config(state=tk.DISABLED)
btn_prev = tk.Button(upload_frame, text="<<", command=self.show_prev)
btn_prev.pack(side=tk.LEFT, padx=5)
btn_next = tk.Button(upload_frame, text=">>", command=self.show_next)
btn_next.pack(side=tk.LEFT, padx=5)
# Na końcu okna głównego dodaj status bar:
self.status_label = tk.Label(
main_frame, text="", bd=1, relief=tk.SUNKEN, anchor=tk.W
)
self.status_label.grid(row=1, column=0, columnspan=3, sticky=tk.EW)
def update_status_bar(self):
status_text = (
f"{_('Przetworzono tagi:')} {len(self.tagger_processed)}/{self.total_files} {_('plików')} | "
f"{_('Zweryfikowano status uploadu:')} {self.upload_verified}/{self.total_files} {_('plików')} | "
f"{_('Zuploadowano:')} {self.uploaded_count}/{self.upload_verified} {_('plików')}"
)
self.status_label.config(text=status_text)
def on_arrow_key(self, event):
"""
Obsługuje klawisze strzałek w lewo i w prawo.
"""
# Jeśli fokus jest na Manual Tags, nie przełączamy plików.
if self.focus_get() == self.manual_tags_manager:
return
if event.keysym == "Left":
self.show_prev()
elif event.keysym == "Right":
self.show_next()
def bind_events(self):
"""
Przypisuje zdarzenia do klawiszy strzałek w lewo i w prawo.
"""
self.bind("<Left>", self.on_arrow_key)
self.bind("<Right>", self.on_arrow_key)
def select_folder(self):
"""
Otwiera okno dialogowe wyboru folderu z obrazkami
i wczytuje pliki PNG z wybranego folderu.
"""
folder = filedialog.askdirectory(title=_("Wybierz folder z obrazkami PNG"))
if folder:
self.folder_path = folder
self.load_images()
def load_images(self):
"""
Ładuje pliki PNG z wybranego folderu.
"""
pattern = os.path.join(self.folder_path, "*.png")
self.image_files = sorted(glob.glob(pattern))
self.total_files = len(self.image_files)
self.image_files_md5 = {
file: self.compute_md5(file) for file in self.image_files
}
self.tagger_processed.clear()
for md5 in self.image_files_md5.values():
if self.tagger_cache[md5]:
self.tagger_processed.add(md5)
self.listbox.delete(0, tk.END)
self.uploaded.clear()
for file in self.image_files:
self.listbox.insert(tk.END, os.path.basename(file))
self.uploaded[file] = False
if self.image_files:
self.current_index = 0
self.listbox.select_set(0)
self.show_image(0)
self.post_load_processing()
else:
messagebox.showinfo(
_("Informacja"), _("Brak plików PNG w wybranym folderze.")
)
def post_load_processing(self):
"""
Po załadowaniu plików, sprawdza czy są jakieś pliki do uploadu oraz przetwarza Taggerem pliki.
"""
self.join_check_uploaded_files_thread()
self.check_uploaded_files_thread = threading.Thread(
target=self.check_uploaded_files
)
self.check_uploaded_files_thread.start()
self.join_process_tagger_queue_thread()
self.process_tagger_queue_thread = threading.Thread(
target=self.process_tagger_queue
)
self.process_tagger_queue_thread.start()
def join_check_uploaded_files_thread(self):
if self.check_uploaded_files_thread is not None:
self.check_uploaded_files_stop_event.set()
self.check_uploaded_files_thread.join()
self.check_uploaded_files_thread = None
self.check_uploaded_files_stop_event = threading.Event()
def join_process_tagger_queue_thread(self):
if self.process_tagger_queue_thread is not None:
self.process_tagger_queue_stop_event.set()
self.process_tagger_queue_thread.join()
self.process_tagger_queue_thread = None
self.process_tagger_queue_stop_event = threading.Event()
def check_uploaded_files(self):
"""
Dla każdego obrazu oblicza MD5, grupuje je w paczki (do 100 skrótów),
wysyła zapytanie do endpointa 'posts.json' dla każdej paczki,
a następnie na podstawie odpowiedzi ustawia w self.uploaded post id dla uploadowanych plików.
"""
file_md5_list = [
(idx, file, self.image_files_md5[file])
for idx, file in enumerate(self.image_files)
]
batch_size = 100
for i in range(0, len(file_md5_list), batch_size):
if self.check_uploaded_files_stop_event.is_set():
break
batch = file_md5_list[i : i + batch_size]
batch_md5 = [item[2] for item in batch]
md5_param = ",".join(batch_md5)
url = self.settings.base_url.rstrip("/") + "/posts.json"
try:
response = requests.get(url, params={"md5": md5_param})
root = response.json()
found = {}
for elem in root:
if self.check_uploaded_files_stop_event.is_set():
break
post_md5 = elem.get("md5", "").lower()
post_id = elem.get("id")
if post_md5 and post_id:
found[post_md5] = post_id
for idx, file_path, md5 in batch:
if self.check_uploaded_files_stop_event.is_set():
break
self.upload_verified += 1 # Każdy plik w batchu jest zweryfikowany
if md5.lower() in found:
self.uploaded[file_path] = found[md5.lower()]
self.uploaded_count += 1
self.after(
0,
lambda idx=idx: self.listbox.itemconfig(
idx, {"bg": "lightgray"}
),
)
# Jeśli aktualnie wybrany plik, zmień przycisk
if self.current_index == idx:
self.after(0, self.update_button_states)
else:
self.uploaded[file_path] = False
self.after(0, self.update_status_bar)
except Exception as e:
print(_("Błąd podczas sprawdzania paczki uploadu:"), e)
self.after(100, self.join_check_uploaded_files_thread)
def update_button_states(self):
"""
Update the state of UI elements based on current application state.
Synchronizes the enabled/disabled states of menu items and buttons with:
- Whether an image is currently selected (current_index exists)
- Whether the selected image has an existing post (post_id exists in uploaded)
State transitions:
- When no image is selected (current_index is None):
* Disables all upload-related buttons and menu items
- When an image is selected:
* Always enables 'Wyślij wszystko' (Send All)
* Enables 'Wyślij' (Send) if no post exists for the image
* Enables 'Podmień tagi' (Replace Tags) and 'Otwórz post' (Open Post)
if a post exists for the image
* Updates the upload button's text and command based on post existence
Modifies:
- File menu items: 'Wyślij', 'Wyślij wszystko', 'Podmień tagi', 'Otwórz post'
- Buttons: upload_button, view_post_button
Dependencies:
- Relies on self.current_index to determine selection state
- Checks self.uploaded against self.image_files for post existence
"""
has_current = self.current_index is not None
post_id = (
self.uploaded.get(self.image_files[self.current_index])
if has_current
else None
)
# Common state for all elements
send_all_state = tk.NORMAL if has_current else tk.DISABLED
post_ops_state = tk.NORMAL if post_id else tk.DISABLED
# Update menu items
self.file_menu.entryconfig(_("Wyślij wszystko"), state=send_all_state)
self.file_menu.entryconfig(_("Podmień tagi"), state=post_ops_state)
self.file_menu.entryconfig(_("Otwórz post"), state=post_ops_state)
# Update buttons
self.upload_button.config(
state=tk.NORMAL if has_current else tk.DISABLED,
text=_("Podmień tagi") if post_id else _("Wyślij"),
command=self.edit_current_image if post_id else self.upload_current_image,
)
self.view_post_button.config(state=post_ops_state)
# Special case for "Wyślij" menu item
wyślij_state = tk.DISABLED if post_id else tk.NORMAL
self.file_menu.entryconfig(
_("Wyślij"), state=wyślij_state if has_current else tk.DISABLED
)
def view_current_post(self):
"""Otwiera w przeglądarce URL posta."""
if self.current_index is None:
return
post_id = self.uploaded.get(self.image_files[self.current_index])
if post_id:
url = self.settings.base_url.rstrip("/") + "/post/view/" + str(post_id)
open_webbrowser(url, self.settings)
def compute_md5(self, file_path, chunk_size=8192):
"""Oblicza MD5 dla danego pliku."""
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hash_md5.update(chunk)
except Exception as e:
print(_("Błąd przy obliczaniu MD5:"), e)
return ""
return hash_md5.hexdigest()
def on_listbox_select(self, event):
"""
Wywoływane po wybraniu pliku z listy.
Wyświetla obrazek, aktualizuje tagi PNG, uruchamia Taggera, ustawia przycisk uploadu.
"""
if not self.listbox.curselection():
return
index = self.listbox.curselection()[0]
self.current_index = index
self.show_image(index)
self.update_button_states()
def upload_current_image(self):
"""
Uploaduje obrazek do serwera.
Wysyła POST z obrazkiem, tagami i ratingiem.
Po zakończeniu uploadu, ustawia przycisk na "Wyświetl".
"""
file_path = self.image_files[self.current_index]
if self.uploaded.get(file_path, False):
# Jeśli plik już uploadowany, ustaw przycisk na "Wyświetl"
self.update_button_states()
return
self.upload_button.config(state=tk.DISABLED)
self.processing_dialog = ProcessingDialog(self, self.upload_file, file_path)
def show_image(self, index):
"""
Wyświetla obrazek o podanym indeksie z listy.
Odczytuje tagi z pliku PNG, uruchamia Taggera, aktualizuje widgety z tagami.
"""
if index < 0 or index >= len(self.image_files):
return
file_path = self.image_files[index]
try:
img = Image.open(file_path)
parameters = ""
if isinstance(img, PngImagePlugin.PngImageFile):
parameters = img.info.get("parameters", "")
self.current_image_original = img.copy()
self.current_parameters = parameters
self.update_display_image()
parsed_parameters = parse_parameters(parameters, self.tags_repo)
# Uaktualnij widget PNG Tags
self.update_png_tags_widget(parsed_parameters.split())
# Uruchom Taggera w osobnym wątku
thread_name = f"tagger{self.tagger_thread_idx}"
thread = threading.Thread(target=self.run_tagger, args=(thread_name,))
self.run_tagger_threads[thread_name] = thread
self.tagger_thread_idx += 1
thread.start()
except Exception as e:
messagebox.showerror(_("Błąd"), f"{_('Nie można załadować obrazka:')}\n{e}")
def edit_current_image(self):
"""
Modyfikuje obrazek na serwerze.
Wysyła POST z md5 obrazka, tagami i ratingiem.
"""
file_path = self.image_files[self.current_index]
if not self.uploaded.get(file_path, False):
self.update_button_states()
return
self.upload_button.config(state=tk.DISABLED)
self.processing_dialog = ProcessingDialog(self, self.edit_file, file_path)
def update_display_image(self):
"""
Aktualizuje obrazek na podstawie aktualnego rozmiaru okna.
Skaluje obrazek, jeśli jego rozmiar jest większy niż dostępna przestrzeń.
"""
if self.current_image_original is None:
return
self.image_label.update_idletasks()
avail_width = self.image_label.winfo_width()
avail_height = self.image_label.winfo_height()
if avail_width <= 1 or avail_height <= 1:
avail_width, avail_height = 400, 400
orig_width, orig_height = self.current_image_original.size
scale = min(avail_width / orig_width, avail_height / orig_height, 1)
new_width = int(orig_width * scale)
new_height = int(orig_height * scale)
try:
resample_filter = Image.Resampling.LANCZOS
except AttributeError:
resample_filter = Image.LANCZOS
if scale < 1:
img = self.current_image_original.resize(
(new_width, new_height), resample=resample_filter
)
else:
img = self.current_image_original
self.image_cache = ImageTk.PhotoImage(img)
self.image_label.config(image=self.image_cache)
def update_listbox_item_color_by_rating(self, file_path, rating):
"""
Ustawia kolor tła dla pozycji w liście na podstawie ratingu, o ile plik nie został uploadowany.
Kolor: lightgreen dla General, yellow dla Sensitive,
darkorange dla Questionable, red dla Explicit. Jeśli plik uploadowany, nie zmieniamy (pozostaje lightgray).
"""
# Jeśli plik jest oznaczony jako uploadowany, nic nie robimy
if self.uploaded.get(file_path, False):
return
try:
index = self.image_files.index(file_path)
except ValueError:
return
# Mapowanie ratingu na kolor
if rating == "general":
color = "lightgreen"
elif rating == "sensitive":
color = "yellow"
elif rating == "questionable":
color = "darkorange"
elif rating == "explicit":
color = "red"
else:
color = "white"
self.after(
0, lambda idx=index, col=color: self.listbox.itemconfig(idx, {"bg": col})
)
def on_image_label_resize(self, event):
"""
Wywoływane przy zmianie rozmiaru okna.
Aktualizuje obrazek w zależności od nowego rozmiaru okna.
"""
self.update_display_image()
def show_prev(self):
"""
Przełącza na poprzedni obraz
"""
if self.current_index is None:
return
new_index = self.current_index - 1
if new_index < 0:
new_index = len(self.image_files) - 1
self.current_index = new_index
self.listbox.select_clear(0, tk.END)
self.listbox.select_set(new_index)
self.listbox.activate(new_index)
self.show_image(new_index)
self.update_button_states()
def show_next(self):
"""
Przełącza na następny obraz
"""
if self.current_index is None:
return
new_index = self.current_index + 1
if new_index >= len(self.image_files):
new_index = 0
self.current_index = new_index
self.listbox.select_clear(0, tk.END)
self.listbox.select_set(new_index)
self.listbox.activate(new_index)
self.show_image(new_index)
self.update_button_states()
# --- Metody obsługujące widgety z tagami ---
def update_png_tags_widget(self, tags_list):
"""
Aktualizuje widget z tagami z PNG.
Tworzy tagi jako klikalne, zaznaczone lub niezaznaczone.
"""
self.png_tags_text.config(state=tk.NORMAL)
self.png_tags_text.delete("1.0", tk.END)
self.png_tags_states = {}
for tag in tags_list:
self.png_tags_states[tag] = True
start_index = self.png_tags_text.index(tk.INSERT)
self.png_tags_text.insert(tk.INSERT, tag)
end_index = self.png_tags_text.index(tk.INSERT)
tag_name = "png_" + tag
self.png_tags_text.tag_add(tag_name, start_index, end_index)
self.png_tags_text.tag_configure(tag_name, foreground="blue")
self.png_tags_text.tag_bind(tag_name, "<Button-1>", self.toggle_png_tag)
self.png_tags_text.insert(tk.INSERT, " ")
self.png_tags_text.config(state=tk.DISABLED)
self.adjust_text_widget_height(self.png_tags_text)
self.update_final_tags()
def toggle_png_tag(self, event):
"""
Obsługuje kliknięcie na tag w PNG Tags.
Zmienia stan tagu (zaznaczony/niezaznaczony) i aktualizuje listę finalnych tagów.
"""
index = self.png_tags_text.index("@%d,%d" % (event.x, event.y))
for t in self.png_tags_text.tag_names(index):
if t.startswith("png_"):
actual_tag = t[len("png_") :]
self.png_tags_states[actual_tag] = not self.png_tags_states.get(
actual_tag, True
)
color = "blue" if self.png_tags_states[actual_tag] else "black"
self.png_tags_text.tag_configure(t, foreground=color)
break
self.update_visible_tagger_tags()
def get_visible_tags(self) -> list[tuple[str, bool, float]]:
"""
Zwraca listę tagów, które mają być widoczne w Tagger Tags.
Tagi zaznaczone są zawsze widoczne.
Tagi niezaznaczone są widoczne, jeśli nie są implikowane przez zaznaczone tagi.
"""
visible_tags = []
# character:amber_redmond cheat / WORKAROUND
if "character:amber_redmond" in self.manual_tags_manager.manual_tags:
for tag in [
"1girl",
"tan",
"black_hair",
"very_short_hair",
"short_hair",
"black_choker",
"heart_choker",
"red_hair",
"cutoffs",
"blue_shorts",
"short_shorts",
"short_sleeves",
"off_shoulder",
"black_footwear",
"feet",
"toenail_polish",
"toenails",
"fingernails",
"nail_polish",
]:
if tag in self.tagger_tags_states:
self.tagger_tags_states[tag] = (
True,
self.tagger_tags_states[tag][1],
)
for tag in [
"virtual_youtuber",
"dark_skin",
"dark-skinned_female",
"black_collar",
"collar",
"yellow_eyes",
]:
if tag in self.tagger_tags_states:
self.tagger_tags_states.pop(tag)
selected_tags = [
tag for tag, (selected, _) in self.tagger_tags_states.items() if selected
]
selected_png_tags = [
tag for tag, selected in self.png_tags_states.items() if selected
]
implied_by_selected = set()
# Safely collect implications from selected tags
for tag in selected_tags:
if tag in self.implication_graph:
implied_by_selected.update(nx.descendants(self.implication_graph, tag))
else:
print(
_("Warning: Tag '{tag}' not found in implication graph").format(
tag=tag
)
)
self.missing_tags.add(tag) # Log missing tags
for tag in selected_png_tags:
if tag in self.implication_graph:
implied_by_selected.update(nx.descendants(self.implication_graph, tag))
else:
print(
_("Warning: Tag '{tag}' not found in implication graph").format(
tag=tag
)
)
self.missing_tags.add(tag) # Log missing tags
# Build visible list
for tag, (selected, confidence) in self.tagger_tags_states.items():
if selected:
visible_tags.append((tag, True, confidence))
else:
if tag not in implied_by_selected:
visible_tags.append((tag, False, confidence))
return visible_tags
def update_tagger_tags_widget(self, result: wdt.Result):
"""
Aktualizuje widget z tagami z Taggera.
Zaznacza tagi, które są wspólne z tagami z PNG.
Ukrywa tagi, które są implikowane przez zaznaczone tagi.
"""
# Opróżniamy widget i słownik stanów
self.tagger_tags_states = {}
tags_dict = {}
# Przetwarzamy tagi postaci dodajemy prefix "character:" i zaznaczamy je
for tag, prob in result.character_tag_data.items():
full_tag = "character:" + tag
# Zamień nieprawidłowe tagi na poprawne
if full_tag in TAG_FIXES:
full_tag = TAG_FIXES[full_tag]
tags_dict[full_tag] = prob
# Przetwarzamy tagi general wszystkie ustawiamy jako zaznaczone
for tag, prob in result.general_tag_data.items():
# Zamień nieprawidłowe tagi na poprawne
if tag in TAG_FIXES:
tag = TAG_FIXES[tag]
tags_dict[tag] = prob
for tag, prob in tags_dict.items():
self.tagger_tags_states[tag] = (True, prob)
# Obliczamy przecięcie tagów z PNG i Taggera
common = set(self.png_tags_states.keys()).intersection(
set(self.tagger_tags_states.keys())
)
# Aktualizujemy stany w obu grupach tylko wspólne tagi pozostają zaznaczone (True)
for tag in self.png_tags_states:
if tag.startswith("character:") or tag.startswith("copyright:"):
continue # Pomijamy tagi postaci i praw autorskich, bo mamy do nich pewność
self.png_tags_states[tag] = tag in common
tag_name = "png_" + tag
color = "blue" if self.png_tags_states[tag] else "black"
self.png_tags_text.config(state=tk.NORMAL)
self.png_tags_text.tag_configure(tag_name, foreground=color)
self.png_tags_text.config(state=tk.DISABLED)
for tag in self.tagger_tags_states:
if tag.startswith("character:"):
continue # Pomijamy tagi postaci, bo mamy do nich pewność
self.tagger_tags_states[tag] = (
tag in common,
self.tagger_tags_states[tag][1],
)
self.update_visible_tagger_tags()
def update_visible_tagger_tags(self):
"""
Aktualizuje widget z tagami z Taggera.
Wyświetla tagi z Taggera w formacie: "tag (confidence %)".
Koloruje tagi na podstawie stanu (zaznaczony/niezaznaczony) i prawdopodobieństwa.
Podczas kliknięcia na tag, zmienia stan (zaznaczony/niezaznaczony).
Poprawia wysokość widgetu.
Aktualizuje listę finalnych tagów.
"""
self.tagger_tags_text.config(state=tk.NORMAL)
self.tagger_tags_text.delete("1.0", tk.END)
visible_tags = self.get_visible_tags()
for tag, selected, prob in visible_tags:
display_text = f"{tag} ({float(prob)*100:.1f}%)" # np. "Rem (99.9%)"
start_index = self.tagger_tags_text.index(tk.INSERT)
self.tagger_tags_text.insert(tk.INSERT, display_text)
end_index = self.tagger_tags_text.index(tk.INSERT)
tag_name = "tagger_" + tag
color = self.get_tagger_tag_color((selected, prob))
self.tagger_tags_text.tag_add(tag_name, start_index, end_index)
self.tagger_tags_text.tag_configure(tag_name, foreground=color)
self.tagger_tags_text.tag_bind(
tag_name, "<Button-1>", self.toggle_tagger_tag
)
self.tagger_tags_text.insert(tk.INSERT, " ")
self.tagger_tags_text.config(state=tk.DISABLED)
self.adjust_text_widget_height(self.tagger_tags_text)
self.update_final_tags()
def get_tagger_tag_color(self, tagger_tag_state: Tuple[bool, float]) -> str:
"""
Zwraca kolor dla tagu z Taggera na podstawie stanu (selected) i prawdopodobieństwa.
Kolor:
- niebieski, jeśli tag jest zaznaczony,
- czerwony, jeśli tag nie jest zaznaczony i prawdopodobieństwo < 0.3.
- żółty, jeśli tag nie jest zaznaczony i prawdopodobieństwo >= 0.3 i < 0.6.
- zielony, jeśli tag nie jest zaznaczony i prawdopodobieństwo >= 0.6.
"""
selected, prob = tagger_tag_state
if selected:
return "blue"
if prob < 0.3:
return "red"
if prob < 0.6:
return "darkorange"
return "green"
def toggle_tagger_tag(self, event):
"""
Obsługuje kliknięcie na tag z Taggera.
Zmienia stan tagu (zaznaczony/niezaznaczony) i aktualizuje kolor.
"""
index = self.tagger_tags_text.index("@%d,%d" % (event.x, event.y))
for t in self.tagger_tags_text.tag_names(index):
if t.startswith("tagger_"):
actual_tag = t[len("tagger_") :]
tag_state, tag_prob = self.tagger_tags_states.get(actual_tag, (True, 0))
self.tagger_tags_states[actual_tag] = (not tag_state, tag_prob)
color = self.get_tagger_tag_color(self.tagger_tags_states[actual_tag])
self.tagger_tags_text.tag_configure(t, foreground=color)
break
self.update_visible_tagger_tags()
def update_final_tags(self):
"""Buduje finalną listę tagów, uwzględniając default tags, PNG, Tagger i manualne.
Dla każdego tagu pobiera informacje z bazy i ustawia styl (kolor i podkreślenie):
- Deprecated: czerwony, podkreślony,
- Tag nie istnieje: żółty, podkreślony,
- Normalny: np. niebieski, bez podkreślenia.
"""
final_set = set()
if self.settings.default_tags:
final_set.update(self.settings.default_tags.split())
for tag, selected in self.png_tags_states.items():
if selected:
final_set.add(tag)
for tag, selected in self.tagger_tags_states.items():
if selected[0]:
final_set.add(tag)
manual = self.manual_tags_manager.manual_tags
if manual:
final_set.update(manual)
final_list = sorted(final_set)
self.final_tags_text.config(state=tk.NORMAL)
self.final_tags_text.delete("1.0", tk.END)
for tag in final_list:
_, deprecated = process_tag(tag, self.tags_repo)
# Ustal kolor i podkreślenie na podstawie wyniku
if deprecated is True:
color = "red"
underline = 1
elif deprecated is None:
color = "darkorange"
underline = 1
else:
color = "blue"
underline = 0
start_index = self.final_tags_text.index(tk.INSERT)
self.final_tags_text.insert(tk.INSERT, tag)
end_index = self.final_tags_text.index(tk.INSERT)
tag_name = "final_" + tag
self.final_tags_text.tag_add(tag_name, start_index, end_index)
self.final_tags_text.tag_configure(
tag_name, foreground=color, underline=underline
)
self.final_tags_text.tag_bind(
tag_name, "<Button-1>", self.open_final_tag_wiki_url
)
self.final_tags_text.insert(tk.INSERT, " ")
self.final_tags_text.config(state=tk.DISABLED)
def open_final_tag_wiki_url(self, event):
"""Otwiera w przeglądarce URL strony wiki dla klikniętego tagu.
Usuwa znane prefiksy, aby utworzyć poprawny URL.
"""
index = self.final_tags_text.index("@%d,%d" % (event.x, event.y))
for t in self.final_tags_text.tag_names(index):
if t.startswith("final_"):
actual_tag = t[len("final_") :]
open_tag_wiki_url(actual_tag, self.settings)
break
# --- Metody do cache'owania wyników Taggera ---
def process_tagger_for_image(self, file_path):
"""Przetwarza obrazek przy użyciu Taggera i zapisuje wynik do cache."""
result = self.get_tagger_results(file_path)
self.update_listbox_item_color_by_rating(file_path, result.rating)
def process_tagger_queue(self):
"""Przetwarza wszystkie obrazki w tle (pomijając aktualnie wybrany)."""
for file_path in self.image_files:
if self.process_tagger_queue_stop_event.is_set():
break
# Jeśli obrazek jest aktualnie wybrany, pomijamy on będzie przetwarzany w foreground
if (
self.current_index is not None
and file_path == self.image_files[self.current_index]
):
continue
self.process_tagger_for_image(file_path)
self.after(100, self.join_process_tagger_queue_thread)
def run_tagger(self, thread_name):
"""
Przetwarza aktualnie wybrany obrazek.
Jeśli wynik jest już w cache, wykorzystuje go; w przeciwnym razie uruchamia Taggera
i zapisuje wynik do cache.
"""
if self.current_index is None:
return
# Ustaw komunikat, że Tagger pracuje
self.tagger_tags_text.config(state=tk.NORMAL)
self.tagger_tags_text.delete("1.0", tk.END)
self.tagger_tags_text.insert("1.0", _("Tagger przetwarza..."))
self.tagger_tags_text.config(state=tk.DISABLED)
file_path = self.image_files[self.current_index]
result = self.get_tagger_results(file_path)
new_rating = self.map_tagger_rating(result)
self.rating_var.set(new_rating)
self.after(0, lambda: self.update_tagger_tags_widget(result))
self.update_listbox_item_color_by_rating(file_path, result.rating)
self.after(100, lambda: self.run_tagger_threads[thread_name].join())
def upload_file(
self,
file_path,
final_tags=None,
final_rating=None,
progress_queue: Optional[queue.Queue] = None,
cancel_event: Optional[threading.Event] = None,
):
base_file_name = os.path.basename(file_path)
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
progress_queue.put(
(
"label",
_("Wysyłam plik {base_file_name}...").format(
base_file_name=base_file_name
),
)
)
url = self.settings.base_url.rstrip("/") + "/api/danbooru/add_post"
tags = (
self.final_tags_text.get("1.0", tk.END).strip()
if final_tags is None
else final_tags
)
fields = {
"login": self.settings.username,
"password": self.settings.password,
"tags": tags,
"source": "",
}
rating_value = self.rating_map.get(
self.rating_var.get() if final_rating is None else final_rating, ""
)
if rating_value:
fields["rating"] = rating_value
try:
total_size = os.path.getsize(file_path)
def progress_callback(bytes_read, total_size):
if progress_queue:
percentage = int(bytes_read / total_size * 100)
progress_queue.put(("progress", percentage))
with open(file_path, "rb") as f:
wrapped_file = ProgressFile(
f, progress_callback, total_size, cancel_event
)
files = {"file": (base_file_name, wrapped_file, "image/png")}
response = requests.post(url, data=fields, files=files)
if progress_queue:
progress_queue.put(("progress", 100))
show_warn = False
post_url = None
if response.status_code in (200, 201):
message = _("Wysyłanie zakończone powodzeniem!")
post_url = response.headers.get("X-Danbooru-Location", None)
elif response.status_code == 409:
message = _(
"Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}"
).format(
status_code=response.status_code,
text=response.headers.get("X-Danbooru-Errors", ""),
)
post_url = response.headers.get("X-Danbooru-Location", None)
show_warn = True
else:
message = _(
"Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}"
).format(status_code=response.status_code, text=response.text)
show_warn = True
# Aktualizacja wyglądu listy musimy użyć domyślnych argumentów w lambdzie, aby zachować bieżący indeks
if show_warn:
if not final_tags:
messagebox.showwarning(_("Wysyłanie"), message)
else:
if not final_tags:
messagebox.showinfo(_("Wysyłanie"), message)
self.after(
0,
lambda idx=self.image_files.index(
file_path
): self.listbox.itemconfig(idx, {"bg": "lightgray"}),
)
if post_url:
post_id = post_url.split("/")[-1]
self.uploaded[file_path] = post_id
self.uploaded_count += 1
self.after(0, self.update_status_bar)
except Exception as e:
messagebox.showerror(_("Błąd wysyłania"), str(e))
finally:
self.upload_button.after(0, self.update_button_states)
def edit_file(
self,
file_path,
final_tags=None,
final_rating=None,
progress_queue=None,
cancel_event=None,
):
"""
Update tags and rating for an existing post without uploading the file.
"""
base_file_name = os.path.basename(file_path)
post_id = self.uploaded.get(file_path)
if not post_id:
messagebox.showerror(
_("Błąd edycji"), _("Post nie został znaleziony dla tego pliku")
)
return
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
progress_queue.put(
(
"label",
_("Aktualizuję tagi dla {base_file_name}...").format(
base_file_name=base_file_name
),
)
)
try:
# Check for cancellation before starting the operation.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Get authentication session and token
session = login(self.settings)
auth_token = get_auth_token(session, self.settings)
# Check cancellation after login if needed.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Prepare tags and rating
tags = (
self.final_tags_text.get("1.0", tk.END).strip()
if final_tags is None
else final_tags
)
rating_value = self.rating_map.get(
self.rating_var.get() if final_rating is None else final_rating, "?"
)
# Prepare API request
url = self.settings.base_url.rstrip("/") + "/post/set"
payload = {
"auth_token": auth_token,
"image_id": post_id,
"title": base_file_name,
"owner": self.settings.username,
"tags": tags,
"source": "",
"rating": rating_value,
}
if progress_queue:
progress_queue.put(("progress", 50))
# Check for cancellation before sending the update request.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Send update request
response = session.post(url, data=payload, allow_redirects=False)
# Handle 302 redirect as success case
if response.status_code == 302:
if progress_queue:
progress_queue.put(("progress", 100))
message = _("Tagi zostały zaktualizowane!")
if not final_tags: # Only show success if not bulk operation
messagebox.showinfo(_("Sukces edycji"), message)
# Update UI state
self.after(0, self.update_button_states)
return
# Handle other status codes
error_msg = _("Błąd podczas aktualizacji tagów\nStatus: {code}").format(
code=response.status_code
)
if response.text:
error_msg += f"\n{_('Treść:')} {response.text}"
messagebox.showerror("Błąd edycji", error_msg)
except Exception as e:
messagebox.showerror(_("Krytyczny błąd edycji"), str(e))
finally:
if progress_queue:
progress_queue.put(("progress", 100))
def upload_all_files(self):
"""
Metoda, która po potwierdzeniu przez użytkownika uploaduje wszystkie niewrzucone pliki.
Dla każdego pliku oblicza finalne tagi (przy użyciu compute_final_tags_for_file)
i wywołuje upload_file.
"""
if not messagebox.askyesno(
_("Potwierdzenie"),
_(
"Czy na pewno chcesz wrzucić wszystkie niewrzucone pliki?\nKażdy z nich zostanie oznaczony tagiem 'meta:auto_upload'.\nUpewnij się, że tagi są poprawne!"
),
):
return
def worker(
progress_queue: queue.Queue = None,
cancel_event: threading.Event = None,
secondary_progress_queue: queue.Queue = None,
):
files_count = len(self.image_files)
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
file_idx = 0
for file_path in self.image_files:
if progress_queue:
progress_queue.put(("progress", file_idx * 100 / files_count))
progress_queue.put(
("label", f"Wysyłam plik {file_idx+1}/{files_count}...")
)
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Anulowano operację!")))
return
if not self.uploaded.get(file_path, False):
final_tags, final_rating = (
self.compute_final_tags_and_rating_for_file(file_path)
)
print(
_(
"Wysyłanie {file_path} z tagami: {final_tags} i ratingiem: {final_rating}"
).format(
file_path=file_path,
final_tags=final_tags,
final_rating=final_rating,
)
)
self.upload_file(
file_path,
final_tags=final_tags,
final_rating=final_rating,
progress_queue=secondary_progress_queue,
cancel_event=cancel_event,
)
file_idx += 1
if progress_queue:
progress_queue.put(("label", _("Przesłano pliki!")))
progress_queue.put(("progress", 100))
self.processing_dialog = ProcessingDialog(self, worker)