Update Kapitanbooru Uploader to version 0.9.0 with significant changes
All checks were successful
Gitea/kapitanbooru-uploader/pipeline/head This commit looks good

- Updated version number in pyproject.toml and messages.po files.
- Added new translations and updated existing ones in messages.po for better localization.
- Implemented core functionality in Core.py, including image processing, tagging, and upload logic.
- Enhanced the autotagging feature to support multiple image formats (PNG, JPEG, WebP, AVIF, GIF).
- Improved error handling and logging for file operations and network requests.
- Added functionality to check for uploaded files and manage tag updates for existing posts.
- Introduced threading for background processing to improve application responsiveness.
This commit is contained in:
2025-06-26 17:22:34 +02:00
parent 0987fbbed0
commit 67df3ea793
6 changed files with 1463 additions and 1123 deletions

View File

@@ -0,0 +1,744 @@
import glob
import hashlib
import os
import threading
from typing import Any, Callable, Tuple
import concurrent.futures
from packaging.version import parse as parse_version
import queue
import time
import networkx as nx
import requests
from PIL import Image
import wdtagger as wdt
from .I18N import _
from .TagsRepo import TagsRepo
from .settings import Settings
from .common import login, get_auth_token
from .ProgressFile import ProgressFile
from .tag_processing import (
TAG_FIXES,
extract_parameters,
parse_parameters,
process_tag,
extract_artist_from_filename,
)
from .tagger_cache import TaggerCache
class Core:
"""
Core functionality for Kapitanbooru Uploader.
Handles image processing, tagging, and upload logic.
"""
def __init__(self, settings: Settings, gui_mode: bool = True):
self.version = "0.9.0"
self.acknowledged_version = parse_version(self.version)
self.settings = settings
self.tags_repo = TagsRepo(settings)
# Dodatkowe ustawienia dla Taggera
self.tagger_name = "wdtagger"
self.tagger_version = (
"1.0" # możesz ustawić wersję dynamicznie, jeśli to możliwe
)
self.tagger_cache = TaggerCache(
self.settings, self.tagger_name, self.tagger_version
)
self.gui_mode = gui_mode
self.main_thread_queue = queue.Queue()
self.after_events = []
self.implication_graph = self.load_implication_graph()
self.missing_tags = set() # Track tags not in the graph
self.check_uploaded_files_stop_event = threading.Event()
self.check_uploaded_files_thread: threading.Thread | None = None
self.process_tagger_queue_stop_event = threading.Event()
self.process_tagger_queue_thread: threading.Thread | None = None
self.check_uploaded_files_callback = None
self.update_status_bar_callback = None
self.process_tagger_for_image_callback = None
self.upload_file_success_callback: Callable[[str], Any] | None = None
self.upload_file_completed_callback: Callable | None = None
self.folder_path = ""
self.image_files = []
self.image_files_md5 = []
self.current_index = None
self.image_cache = None
self.tagger_thread_idx = 0
self.tagger = wdt.Tagger()
# Liczniki statusu
self.total_files = 0
self.tagger_processed = set()
self.upload_verified = 0
self.uploaded_count = 0
# Oryginalny obraz (do skalowania)
self.current_image_original = None
self.current_parameters = ""
# Mapa ratingów: wyświetlana nazwa -> wartość wysyłana
self.rating_map = {
"General": "g",
"Sensitive": "s",
"Questionable": "q",
"Explicit": "e",
"Unrated": "",
}
# Słowniki przechowujące stany tagów (dla PNG i Taggera)
self.png_tags_states = {}
self.tagger_tags_states = {}
# Nowy słownik przechowujący informację, czy dany plik (ścieżka) został już uploadowany
self.uploaded = {} # key: file path, value: True/False
def schedule_in_main_thread(self, func, delay_ms=0):
"""Schedule a function to run in the main thread"""
if self.gui_mode:
# In GUI mode, use the after mechanism
self.after_events.append((time.time() + delay_ms / 1000, func))
else:
# In non-GUI mode, add to queue for immediate execution
self.main_thread_queue.put(func)
def process_main_thread_queue(self):
"""Process pending main thread tasks"""
if self.gui_mode:
# Process scheduled events
now = time.time()
new_events = []
for schedule_time, func in self.after_events:
if now >= schedule_time:
try:
func()
except Exception as e:
print(f"Error in scheduled task: {e}")
else:
new_events.append((schedule_time, func))
self.after_events = new_events
else:
# Process all queued tasks in non-GUI mode
while not self.main_thread_queue.empty():
try:
func = self.main_thread_queue.get_nowait()
func()
except queue.Empty:
break
except Exception as e:
print(f"Error in main thread task: {e}")
def load_implication_graph(self) -> nx.DiGraph:
G = nx.DiGraph()
conn = self.tags_repo.get_conn()
cursor = conn.cursor()
# Step 1: Add all tags from the 'tags' table
cursor.execute(
"""
SELECT
CASE category
WHEN 1 THEN 'artist:' || name
WHEN 3 THEN 'copyright:' || name
WHEN 4 THEN 'character:' || name
WHEN 5 THEN 'meta:' || name
ELSE name
END AS prefixed_name
FROM tags
"""
)
db_tags = {row[0] for row in cursor.fetchall()}
G.add_nodes_from(db_tags)
# Step 2: Add nodes from implications (antecedents/consequents not in 'tags' table)
cursor.execute("SELECT antecedent, consequent FROM tag_closure")
edge_tags = set()
for ant, cons in cursor.fetchall():
edge_tags.add(ant)
edge_tags.add(cons)
G.add_nodes_from(edge_tags - db_tags) # Add tags only in implications
# Step 3: Add edges
cursor.execute("SELECT antecedent, consequent FROM tag_closure")
G.add_edges_from(cursor.fetchall())
conn.close()
return G
def compute_final_tags_and_rating_for_file(
self, file_path, update_status_callback, manual_tags=set()
) -> Tuple[str, str]:
"""
Oblicza finalną listę tagów dla danego pliku oraz rating.
Łączy tagi z:
- pliku (PNG): parsowane przez parse_parameters,
- Taggera (wynik z cache lub wyliczony na bieżąco),
- ustawień (default tags),
- manualnych tagów (z pola manual_tags_entry),
oraz dodaje tag "meta:auto_upload".
Zwraca finalny ciąg tagów oraz rating.
"""
# Pobierz tagi z pliku
try:
img = Image.open(file_path)
parameters = extract_parameters(img, file_path)
artist_tag = extract_artist_from_filename(file_path)
png_tags = set(
[
x
for x in parse_parameters(parameters, self.tags_repo).split()
if process_tag(x, self.tags_repo)[1]
is not None # Ignoruj nieistniejące tagi
]
)
if artist_tag:
png_tags.add("artist:" + artist_tag.replace(" ", "_").replace("\\", ""))
img.close()
except Exception as e:
print(_("Błąd przy otwieraniu pliku"), file_path, ":", e)
png_tags = set()
# Pobierz tagi z Taggera sprawdzając cache
result = self.get_tagger_results(file_path, update_status_callback)
tagger_tags = set()
rating = "Unrated"
tagger_tags.update(
(
TAG_FIXES[tag] if tag in TAG_FIXES else tag
for tag in result.general_tag_data.keys()
)
) # Zamień nieprawidłowe tagi na poprawne
for t in result.character_tags:
full_tag = "character:" + t.replace(" ", "_").replace("\\", "")
# Zamień nieprawidłowe tagi na poprawne
if full_tag in TAG_FIXES:
full_tag = TAG_FIXES[full_tag]
tagger_tags.add(full_tag)
rating = self.map_tagger_rating(result)
# Pobierz tagi z ustawień i manualne
default_tags = set(self.settings.default_tags.split())
# Finalna lista: suma wszystkich tagów
final_tags = default_tags.union(png_tags).union(tagger_tags).union(manual_tags)
final_tags.add("meta:auto_upload")
return " ".join(sorted(final_tags)), rating
def get_tagger_results(self, file_path, callback) -> wdt.Result:
md5 = self.image_files_md5[file_path]
cached = self.tagger_cache[md5]
if cached:
self.tagger_processed.add(md5)
return cached["result"]
try:
with Image.open(file_path) as img:
result = self.tagger.tag(img)
self.tagger_cache[md5] = result
self.tagger_processed.add(md5)
callback()
print(_("Tagger przetworzył:"), f"{file_path}")
return result
except Exception as e:
print(_("Błąd Taggera dla"), file_path, ":", e)
def map_tagger_rating(self, result: wdt.Result) -> str:
"""
Mapuje rating z Taggera na wartość używaną w Kapitanbooru.
"""
if result.rating == "general":
new_rating = "General"
elif result.rating == "sensitive":
new_rating = "Sensitive"
elif result.rating == "questionable":
new_rating = "Questionable"
elif result.rating == "explicit":
new_rating = "Explicit"
else:
new_rating = "Unrated"
return new_rating
def load_images(self):
"""
Ładuje pliki PNG, JPEG, WebP, AVIF i GIF z wybranego folderu.
"""
extensions = ("*.png", "*.jpg", "*.jpeg", "*.webp", "*.avif", "*.gif")
self.image_files = sorted(
file
for ext in extensions
for file in glob.glob(os.path.join(self.folder_path, ext), recursive=True)
)
self.total_files = len(self.image_files)
self.image_files_md5 = {
file: md5
for file, md5 in zip(
self.image_files, self.compute_md5_parallel(self.image_files)
)
}
self.tagger_processed.clear()
for md5 in self.image_files_md5.values():
if self.tagger_cache[md5]:
self.tagger_processed.add(md5)
self.uploaded.clear()
self.upload_verified = 0
self.uploaded_count = 0
for file in self.image_files:
self.uploaded[file] = False
if self.image_files:
self.post_load_processing()
def compute_md5(self, file_path, chunk_size=8192):
"""Compute MD5 for a single file."""
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hash_md5.update(chunk)
except Exception as e:
print(_("Error computing MD5:"), e)
return ""
return hash_md5.hexdigest()
def compute_md5_parallel(self, file_paths):
"""Compute MD5 for multiple files in parallel."""
with concurrent.futures.ThreadPoolExecutor() as executor:
return list(executor.map(self.compute_md5, file_paths))
def post_load_processing(self):
"""
Po załadowaniu plików, sprawdza czy są jakieś pliki do uploadu oraz przetwarza Taggerem pliki.
"""
self.join_check_uploaded_files_thread()
self.check_uploaded_files_thread = threading.Thread(
target=self.check_uploaded_files
)
self.check_uploaded_files_thread.start()
self.join_process_tagger_queue_thread()
self.process_tagger_queue_thread = threading.Thread(
target=self.process_tagger_queue
)
self.process_tagger_queue_thread.start()
def wait_for_completion(self):
"""Wait for background threads to finish (non-GUI mode)"""
# Join the checking thread if running
if (
self.check_uploaded_files_thread
and self.check_uploaded_files_thread.is_alive()
):
self.check_uploaded_files_stop_event.set()
self.check_uploaded_files_thread.join()
# Join the tagger processing thread if running
if (
self.process_tagger_queue_thread
and self.process_tagger_queue_thread.is_alive()
):
self.process_tagger_queue_stop_event.set()
self.process_tagger_queue_thread.join()
# Process any remaining main thread tasks
self.process_main_thread_queue()
def join_check_uploaded_files_thread(self):
if self.check_uploaded_files_thread is not None:
self.check_uploaded_files_stop_event.set()
self.check_uploaded_files_thread.join()
self.check_uploaded_files_thread = None
self.check_uploaded_files_stop_event = threading.Event()
def join_process_tagger_queue_thread(self):
if self.process_tagger_queue_thread is not None:
self.process_tagger_queue_stop_event.set()
self.process_tagger_queue_thread.join()
self.process_tagger_queue_thread = None
self.process_tagger_queue_stop_event = threading.Event()
def process_tagger_for_image(self, file_path):
"""Przetwarza obrazek przy użyciu Taggera i zapisuje wynik do cache."""
result = self.get_tagger_results(file_path)
if self.process_tagger_for_image_callback:
self.process_tagger_for_image_callback(file_path, result.rating)
def process_tagger_queue(self):
"""Przetwarza wszystkie obrazki w tle (pomijając aktualnie wybrany)."""
for file_path in self.image_files:
if self.process_tagger_queue_stop_event.is_set():
break
# Jeśli obrazek jest aktualnie wybrany, pomijamy on będzie przetwarzany w foreground
if (
self.current_index is not None
and file_path == self.image_files[self.current_index]
):
continue
self.process_tagger_for_image(file_path)
self.schedule_in_main_thread(self.join_process_tagger_queue_thread, 100)
def check_uploaded_files(self):
"""
Dla każdego obrazu oblicza MD5, grupuje je w paczki (do 100 skrótów),
wysyła zapytanie do endpointa 'posts.json' dla każdej paczki,
a następnie na podstawie odpowiedzi ustawia w self.uploaded post id dla uploadowanych plików.
"""
file_md5_list = [
(idx, file, self.image_files_md5[file])
for idx, file in enumerate(self.image_files)
]
batch_size = 100
for i in range(0, len(file_md5_list), batch_size):
if self.check_uploaded_files_stop_event.is_set():
break
batch = file_md5_list[i : i + batch_size]
batch_md5 = [item[2] for item in batch]
md5_param = ",".join(batch_md5)
url = self.settings.base_url.rstrip("/") + "/posts.json"
try:
response = requests.get(url, params={"md5": md5_param})
root = response.json()
found = {}
for elem in root:
if self.check_uploaded_files_stop_event.is_set():
break
post_md5 = elem.get("md5", "").lower()
post_id = elem.get("id")
if post_md5 and post_id:
found[post_md5] = post_id
for idx, file_path, md5 in batch:
if self.check_uploaded_files_stop_event.is_set():
break
self.upload_verified += 1 # Każdy plik w batchu jest zweryfikowany
if md5.lower() in found:
self.uploaded[file_path] = found[md5.lower()]
self.uploaded_count += 1
if self.check_uploaded_files_callback:
self.check_uploaded_files_callback(idx)
else:
self.uploaded[file_path] = False
if self.update_status_bar_callback:
self.update_status_bar_callback()
except Exception as e:
print(_("Błąd podczas sprawdzania paczki uploadu:"), e)
self.schedule_in_main_thread(self.join_check_uploaded_files_thread, 100)
def autotag_files(self, file_paths):
"""
Autotaguje pliki przy użyciu Taggera i wysyła je na serwer.
"""
for file_path in file_paths:
if not os.path.isfile(file_path):
print(_("Plik nie istnieje:"), file_path)
continue
try:
tags, rating = self.compute_final_tags_and_rating_for_file(
file_path, lambda: None
)
print(_("Tagi dla pliku"), file_path, ":", tags, "Rating:", rating)
self.upload_file(
file_path,
final_tags=tags,
final_rating=rating,
progress_queue=None,
cancel_event=None,
)
except Exception as e:
print(_("Błąd podczas autotagowania pliku"), file_path, ":", e)
def autotag_dir(self, dir):
"""
Autotaguje wszystkie pliki w katalogu przy użyciu Taggera i wysyła je na serwer.
"""
if not os.path.isdir(dir):
print(_("Podana ścieżka nie jest katalogiem:"), dir)
return
self.folder_path = dir
self.load_images()
self.check_uploaded_files_thread.join()
self.process_tagger_queue_thread.join()
files_to_upload = [x for x in self.image_files if not self.uploaded[x]]
if not files_to_upload:
print(_("Brak obrazów do przetworzenia w katalogu:"), dir)
return
self.autotag_files(files_to_upload)
def upload_file(
self,
file_path,
final_tags=None,
final_rating=None,
progress_queue: queue.Queue | None = None,
cancel_event: threading.Event | None = None,
info_callback: Callable[[str], Any] | None = None,
warning_callback: Callable[[str], Any] | None = None,
error_callback: Callable[[str], Any] | None = None,
):
base_file_name = os.path.basename(file_path)
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
progress_queue.put(
(
"label",
_("Wysyłam plik {base_file_name}...").format(
base_file_name=base_file_name
),
)
)
url = self.settings.base_url.rstrip("/") + "/api/danbooru/add_post"
tags = final_tags
fields = {
"login": self.settings.username,
"password": self.settings.password,
"tags": tags,
"source": "",
}
rating_value = self.rating_map.get(final_rating, "")
if rating_value:
fields["rating"] = rating_value
try:
total_size = os.path.getsize(file_path)
def progress_callback(bytes_read, total_size):
if progress_queue:
percentage = int(bytes_read / total_size * 100)
progress_queue.put(("progress", percentage))
with open(file_path, "rb") as f:
wrapped_file = ProgressFile(
f, progress_callback, total_size, cancel_event
)
files = {"file": (base_file_name, wrapped_file, "image/png")}
response = requests.post(url, data=fields, files=files)
if progress_queue:
progress_queue.put(("progress", 100))
show_warn = False
post_url = None
if response.status_code in (200, 201):
message = _("Wysyłanie zakończone powodzeniem!")
post_url = response.headers.get("X-Danbooru-Location", None)
elif response.status_code == 409:
message = _(
"Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}"
).format(
status_code=response.status_code,
text=response.headers.get("X-Danbooru-Errors", ""),
)
post_url = response.headers.get("X-Danbooru-Location", None)
show_warn = True
else:
message = _(
"Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}"
).format(status_code=response.status_code, text=response.text)
show_warn = True
# Aktualizacja wyglądu listy musimy użyć domyślnych argumentów w lambdzie, aby zachować bieżący indeks
if show_warn:
if not final_tags:
if warning_callback:
warning_callback(message)
else:
print("[WARN]", _("Wysyłanie"), message)
else:
if not final_tags:
if info_callback:
info_callback(message)
else:
print("[INFO]", _("Wysyłanie"), message)
if self.upload_file_success_callback:
self.upload_file_success_callback(file_path)
if post_url:
post_id = post_url.split("/")[-1]
self.uploaded[file_path] = post_id
self.uploaded_count += 1
self.after(0, self.update_status_bar)
except Exception as e:
if error_callback:
error_callback(str(e))
else:
print("[ERROR]", _("Błąd wysyłania pliku"), file_path, ":", e)
finally:
if self.upload_file_completed_callback:
self.upload_file_completed_callback()
def upload_all_files(
self,
progress_queue: queue.Queue = None,
cancel_event: threading.Event = None,
secondary_progress_queue: queue.Queue = None,
update_status_callback=None,
manual_tags: set = set(),
info_callback: Callable[[str], Any] | None = None,
warning_callback: Callable[[str], Any] | None = None,
error_callback: Callable[[str], Any] | None = None,
):
files_to_upload = [x for x in self.image_files if not self.uploaded[x]]
files_count = len(files_to_upload)
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
file_idx = 0
for file_path in files_to_upload:
if progress_queue:
progress_queue.put(("progress", file_idx * 100 / files_count))
progress_queue.put(
("label", f"Wysyłam plik {file_idx+1}/{files_count}...")
)
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Anulowano operację!")))
return
if not self.uploaded.get(file_path, False):
final_tags, final_rating = self.compute_final_tags_and_rating_for_file(
file_path,
update_status_callback, # lambda: self.after(0, self.update_status_bar)
manual_tags, # set(self.manual_tags_manager.manual_tags)
)
print(
_(
"Wysyłanie {file_path} z tagami: {final_tags} i ratingiem: {final_rating}"
).format(
file_path=file_path,
final_tags=final_tags,
final_rating=final_rating,
)
)
self.upload_file(
file_path,
final_tags=final_tags,
final_rating=final_rating,
progress_queue=secondary_progress_queue,
cancel_event=cancel_event,
info_callback=info_callback,
warning_callback=warning_callback,
error_callback=error_callback,
)
file_idx += 1
if progress_queue:
progress_queue.put(("label", _("Przesłano pliki!")))
progress_queue.put(("progress", 100))
def edit_file(
self,
file_path,
final_tags=None,
final_rating=None,
progress_queue=None,
cancel_event=None,
info_callback: Callable[[str], Any] | None = None,
error_callback: Callable[[str], Any] | None = None,
):
"""
Update tags and rating for an existing post without uploading the file.
"""
base_file_name = os.path.basename(file_path)
post_id = self.uploaded.get(file_path)
if not post_id:
if error_callback:
error_callback(_("Post nie został znaleziony dla tego pliku"))
else:
print(
"[ERROR]",
_("Post nie został znaleziony dla tego pliku"),
file_path,
)
return
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
progress_queue.put(
(
"label",
_("Aktualizuję tagi dla {base_file_name}...").format(
base_file_name=base_file_name
),
)
)
try:
# Check for cancellation before starting the operation.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Get authentication session and token
session = login(self.settings)
auth_token = get_auth_token(session, self.settings)
# Check cancellation after login if needed.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Prepare tags and rating
tags = final_tags
rating_value = self.rating_map.get(final_rating, "?")
# Prepare API request
url = self.settings.base_url.rstrip("/") + "/post/set"
payload = {
"auth_token": auth_token,
"image_id": post_id,
"title": base_file_name,
"owner": self.settings.username,
"tags": tags,
"source": "",
"rating": rating_value,
}
if progress_queue:
progress_queue.put(("progress", 50))
# Check for cancellation before sending the update request.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Send update request
response = session.post(url, data=payload, allow_redirects=False)
# Handle 302 redirect as success case
if response.status_code == 302:
if progress_queue:
progress_queue.put(("progress", 100))
message = _("Tagi zostały zaktualizowane!")
if not final_tags: # Only show success if not bulk operation
if info_callback:
info_callback(message)
else:
print("[INFO]", _("Sukces edycji"), message)
# Update UI state
if self.upload_file_completed_callback:
self.upload_file_completed_callback()
return
# Handle other status codes
error_msg = _("Błąd podczas aktualizacji tagów\nStatus: {code}").format(
code=response.status_code
)
if response.text:
error_msg += f"\n{_('Treść:')} {response.text}"
if error_callback:
error_callback(error_msg)
else:
print("[ERROR]", _("Błąd edycji"), error_msg)
except Exception as e:
if error_callback:
error_callback(str(e))
else:
print("[ERROR]", _("Krytyczny błąd edycji"), file_path, ":", e)
finally:
if progress_queue:
progress_queue.put(("progress", 100))