Files
kapitanbooru-uploader/kapitanbooru_uploader/Core.py
Kapitan 49d000003a
All checks were successful
Gitea/kapitanbooru-uploader/pipeline/head This commit looks good
Bump version to 0.9.5; update localization files and refactor file upload logic in Core
2025-06-26 18:12:41 +02:00

740 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import glob
import hashlib
import os
import threading
from typing import Any, Callable, Tuple
import concurrent.futures
from packaging.version import parse as parse_version
import queue
import time
import networkx as nx
import requests
from PIL import Image
import wdtagger as wdt
from .I18N import _
from .TagsRepo import TagsRepo
from .settings import Settings
from .common import login, get_auth_token
from .ProgressFile import ProgressFile
from .tag_processing import (
TAG_FIXES,
extract_parameters,
parse_parameters,
process_tag,
extract_artist_from_filename,
)
from .tagger_cache import TaggerCache
class Core:
"""
Core functionality for Kapitanbooru Uploader.
Handles image processing, tagging, and upload logic.
"""
def __init__(self, settings: Settings, gui_mode: bool = True):
self.version = "0.9.5"
self.acknowledged_version = parse_version(self.version)
self.settings = settings
self.tags_repo = TagsRepo(settings)
# Dodatkowe ustawienia dla Taggera
self.tagger_name = "wdtagger"
self.tagger_version = (
"1.0" # możesz ustawić wersję dynamicznie, jeśli to możliwe
)
self.tagger_cache = TaggerCache(
self.settings, self.tagger_name, self.tagger_version
)
self.gui_mode = gui_mode
self.main_thread_queue = queue.Queue()
self.after_events = []
self.implication_graph = self.load_implication_graph()
self.missing_tags = set() # Track tags not in the graph
self.check_uploaded_files_stop_event = threading.Event()
self.check_uploaded_files_thread: threading.Thread | None = None
self.process_tagger_queue_stop_event = threading.Event()
self.process_tagger_queue_thread: threading.Thread | None = None
self.check_uploaded_files_callback = None
self.update_status_bar_callback = None
self.process_tagger_for_image_callback = None
self.upload_file_success_callback: Callable[[str], Any] | None = None
self.upload_file_completed_callback: Callable | None = None
self.folder_path = ""
self.image_files = []
self.image_files_md5 = []
self.current_index = None
self.image_cache = None
self.tagger = wdt.Tagger()
# Liczniki statusu
self.total_files = 0
self.tagger_processed = set()
self.upload_verified = 0
self.uploaded_count = 0
# Mapa ratingów: wyświetlana nazwa -> wartość wysyłana
self.rating_map = {
"General": "g",
"Sensitive": "s",
"Questionable": "q",
"Explicit": "e",
"Unrated": "",
}
# Nowy słownik przechowujący informację, czy dany plik (ścieżka) został już uploadowany
self.uploaded = {} # key: file path, value: True/False
def schedule_in_main_thread(self, func, delay_ms=0):
"""Schedule a function to run in the main thread"""
if self.gui_mode:
# In GUI mode, use the after mechanism
self.after_events.append((time.time() + delay_ms / 1000, func))
else:
# In non-GUI mode, add to queue for immediate execution
self.main_thread_queue.put(func)
def process_main_thread_queue(self):
"""Process pending main thread tasks"""
if self.gui_mode:
# Process scheduled events
now = time.time()
new_events = []
for schedule_time, func in self.after_events:
if now >= schedule_time:
try:
func()
except Exception as e:
print(f"Error in scheduled task: {e}")
else:
new_events.append((schedule_time, func))
self.after_events = new_events
else:
# Process all queued tasks in non-GUI mode
while not self.main_thread_queue.empty():
try:
func = self.main_thread_queue.get_nowait()
func()
except queue.Empty:
break
except Exception as e:
print(f"Error in main thread task: {e}")
def load_implication_graph(self) -> nx.DiGraph:
G = nx.DiGraph()
conn = self.tags_repo.get_conn()
cursor = conn.cursor()
# Step 1: Add all tags from the 'tags' table
cursor.execute(
"""
SELECT
CASE category
WHEN 1 THEN 'artist:' || name
WHEN 3 THEN 'copyright:' || name
WHEN 4 THEN 'character:' || name
WHEN 5 THEN 'meta:' || name
ELSE name
END AS prefixed_name
FROM tags
"""
)
db_tags = {row[0] for row in cursor.fetchall()}
G.add_nodes_from(db_tags)
# Step 2: Add nodes from implications (antecedents/consequents not in 'tags' table)
cursor.execute("SELECT antecedent, consequent FROM tag_closure")
edge_tags = set()
for ant, cons in cursor.fetchall():
edge_tags.add(ant)
edge_tags.add(cons)
G.add_nodes_from(edge_tags - db_tags) # Add tags only in implications
# Step 3: Add edges
cursor.execute("SELECT antecedent, consequent FROM tag_closure")
G.add_edges_from(cursor.fetchall())
conn.close()
return G
def compute_final_tags_and_rating_for_file(
self, file_path, update_status_callback, manual_tags=set()
) -> Tuple[str, str]:
"""
Oblicza finalną listę tagów dla danego pliku oraz rating.
Łączy tagi z:
- pliku (PNG): parsowane przez parse_parameters,
- Taggera (wynik z cache lub wyliczony na bieżąco),
- ustawień (default tags),
- manualnych tagów (z pola manual_tags_entry),
oraz dodaje tag "meta:auto_upload".
Zwraca finalny ciąg tagów oraz rating.
"""
# Pobierz tagi z pliku
try:
img = Image.open(file_path)
parameters = extract_parameters(img, file_path)
artist_tag = extract_artist_from_filename(file_path)
png_tags = set(
[
x
for x in parse_parameters(parameters, self.tags_repo).split()
if process_tag(x, self.tags_repo)[1]
is not None # Ignoruj nieistniejące tagi
]
)
if artist_tag:
png_tags.add("artist:" + artist_tag.replace(" ", "_").replace("\\", ""))
img.close()
except Exception as e:
print(_("Błąd przy otwieraniu pliku"), file_path, ":", e)
png_tags = set()
# Pobierz tagi z Taggera sprawdzając cache
result = self.get_tagger_results(file_path, update_status_callback)
tagger_tags = set()
rating = "Unrated"
tagger_tags.update(
(
TAG_FIXES[tag] if tag in TAG_FIXES else tag
for tag in result.general_tag_data.keys()
)
) # Zamień nieprawidłowe tagi na poprawne
for t in result.character_tags:
full_tag = "character:" + t.replace(" ", "_").replace("\\", "")
# Zamień nieprawidłowe tagi na poprawne
if full_tag in TAG_FIXES:
full_tag = TAG_FIXES[full_tag]
tagger_tags.add(full_tag)
rating = self.map_tagger_rating(result)
# Pobierz tagi z ustawień i manualne
default_tags = set(self.settings.default_tags.split())
# Finalna lista: suma wszystkich tagów
final_tags = default_tags.union(png_tags).union(tagger_tags).union(manual_tags)
final_tags.add("meta:auto_upload")
return " ".join(sorted(final_tags)), rating
def get_tagger_results(
self, file_path, callback: Callable | None = None
) -> wdt.Result:
md5 = self.image_files_md5[file_path]
cached = self.tagger_cache[md5]
if cached:
self.tagger_processed.add(md5)
return cached["result"]
try:
with Image.open(file_path) as img:
result = self.tagger.tag(img)
self.tagger_cache[md5] = result
self.tagger_processed.add(md5)
if callback:
callback()
print(_("Tagger przetworzył:"), f"{file_path}")
return result
except Exception as e:
print(_("Błąd Taggera dla"), file_path, ":", e)
def map_tagger_rating(self, result: wdt.Result) -> str:
"""
Mapuje rating z Taggera na wartość używaną w Kapitanbooru.
"""
if result.rating == "general":
new_rating = "General"
elif result.rating == "sensitive":
new_rating = "Sensitive"
elif result.rating == "questionable":
new_rating = "Questionable"
elif result.rating == "explicit":
new_rating = "Explicit"
else:
new_rating = "Unrated"
return new_rating
def load_images(self):
"""
Ładuje pliki PNG, JPEG, WebP, AVIF i GIF z wybranego folderu.
"""
extensions = ("*.png", "*.jpg", "*.jpeg", "*.webp", "*.avif", "*.gif")
self.image_files = sorted(
file
for ext in extensions
for file in glob.glob(os.path.join(self.folder_path, ext), recursive=True)
)
self.total_files = len(self.image_files)
self.image_files_md5 = {
file: md5
for file, md5 in zip(
self.image_files, self.compute_md5_parallel(self.image_files)
)
}
self.tagger_processed.clear()
for md5 in self.image_files_md5.values():
if self.tagger_cache[md5]:
self.tagger_processed.add(md5)
self.uploaded.clear()
self.upload_verified = 0
self.uploaded_count = 0
for file in self.image_files:
self.uploaded[file] = False
if self.image_files:
self.post_load_processing()
def compute_md5(self, file_path, chunk_size=8192):
"""Compute MD5 for a single file."""
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hash_md5.update(chunk)
except Exception as e:
print(_("Error computing MD5:"), e)
return ""
return hash_md5.hexdigest()
def compute_md5_parallel(self, file_paths):
"""Compute MD5 for multiple files in parallel."""
with concurrent.futures.ThreadPoolExecutor() as executor:
return list(executor.map(self.compute_md5, file_paths))
def post_load_processing(self):
"""
Po załadowaniu plików, sprawdza czy są jakieś pliki do uploadu oraz przetwarza Taggerem pliki.
"""
self.join_check_uploaded_files_thread()
self.check_uploaded_files_thread = threading.Thread(
target=self.check_uploaded_files
)
self.check_uploaded_files_thread.start()
self.join_process_tagger_queue_thread()
self.process_tagger_queue_thread = threading.Thread(
target=self.process_tagger_queue
)
self.process_tagger_queue_thread.start()
def wait_for_completion(self):
"""Wait for background threads to finish (non-GUI mode)"""
# Join the checking thread if running
if (
self.check_uploaded_files_thread
and self.check_uploaded_files_thread.is_alive()
):
self.check_uploaded_files_stop_event.set()
self.check_uploaded_files_thread.join()
# Join the tagger processing thread if running
if (
self.process_tagger_queue_thread
and self.process_tagger_queue_thread.is_alive()
):
self.process_tagger_queue_stop_event.set()
self.process_tagger_queue_thread.join()
# Process any remaining main thread tasks
self.process_main_thread_queue()
def join_check_uploaded_files_thread(self):
if self.check_uploaded_files_thread is not None:
self.check_uploaded_files_stop_event.set()
self.check_uploaded_files_thread.join()
self.check_uploaded_files_thread = None
self.check_uploaded_files_stop_event = threading.Event()
def join_process_tagger_queue_thread(self):
if self.process_tagger_queue_thread is not None:
self.process_tagger_queue_stop_event.set()
self.process_tagger_queue_thread.join()
self.process_tagger_queue_thread = None
self.process_tagger_queue_stop_event = threading.Event()
def process_tagger_for_image(self, file_path):
"""Przetwarza obrazek przy użyciu Taggera i zapisuje wynik do cache."""
result = self.get_tagger_results(file_path, self.update_status_bar_callback)
if self.process_tagger_for_image_callback:
self.process_tagger_for_image_callback(file_path, result.rating)
def process_tagger_queue(self):
"""Przetwarza wszystkie obrazki w tle (pomijając aktualnie wybrany)."""
for file_path in self.image_files:
if self.process_tagger_queue_stop_event.is_set():
break
# Jeśli obrazek jest aktualnie wybrany, pomijamy on będzie przetwarzany w foreground
if (
self.current_index is not None
and file_path == self.image_files[self.current_index]
):
continue
self.process_tagger_for_image(file_path)
self.schedule_in_main_thread(self.join_process_tagger_queue_thread, 100)
def check_uploaded_files(self):
"""
Dla każdego obrazu oblicza MD5, grupuje je w paczki (do 100 skrótów),
wysyła zapytanie do endpointa 'posts.json' dla każdej paczki,
a następnie na podstawie odpowiedzi ustawia w self.uploaded post id dla uploadowanych plików.
"""
file_md5_list = [
(idx, file, self.image_files_md5[file])
for idx, file in enumerate(self.image_files)
]
batch_size = 100
for i in range(0, len(file_md5_list), batch_size):
if self.check_uploaded_files_stop_event.is_set():
break
batch = file_md5_list[i : i + batch_size]
batch_md5 = [item[2] for item in batch]
md5_param = ",".join(batch_md5)
url = self.settings.base_url.rstrip("/") + "/posts.json"
try:
response = requests.get(url, params={"md5": md5_param})
root = response.json()
found = {}
for elem in root:
if self.check_uploaded_files_stop_event.is_set():
break
post_md5 = elem.get("md5", "").lower()
post_id = elem.get("id")
if post_md5 and post_id:
found[post_md5] = post_id
for idx, file_path, md5 in batch:
if self.check_uploaded_files_stop_event.is_set():
break
self.upload_verified += 1 # Każdy plik w batchu jest zweryfikowany
if md5.lower() in found:
self.uploaded[file_path] = found[md5.lower()]
self.uploaded_count += 1
if self.check_uploaded_files_callback:
self.check_uploaded_files_callback(idx)
else:
self.uploaded[file_path] = False
if self.update_status_bar_callback:
self.update_status_bar_callback()
except Exception as e:
print(_("Błąd podczas sprawdzania paczki uploadu:"), e)
self.schedule_in_main_thread(self.join_check_uploaded_files_thread, 100)
def autotag_files(self, file_paths):
"""
Autotaguje pliki przy użyciu Taggera i wysyła je na serwer.
"""
self.image_files = file_paths
self.total_files = len(self.image_files)
self.image_files_md5 = {
file: md5
for file, md5 in zip(
self.image_files, self.compute_md5_parallel(self.image_files)
)
}
self.tagger_processed.clear()
for md5 in self.image_files_md5.values():
if self.tagger_cache[md5]:
self.tagger_processed.add(md5)
self.uploaded.clear()
self.upload_verified = 0
self.uploaded_count = 0
for file in self.image_files:
self.uploaded[file] = False
if not self.image_files:
return
self.post_load_processing()
self.check_uploaded_files_thread.join()
self.process_tagger_queue_thread.join()
self.upload_all_files()
def autotag_dir(self, dir):
"""
Autotaguje wszystkie pliki w katalogu przy użyciu Taggera i wysyła je na serwer.
"""
if not os.path.isdir(dir):
print(_("Podana ścieżka nie jest katalogiem:"), dir)
return
self.folder_path = dir
self.load_images()
self.check_uploaded_files_thread.join()
self.process_tagger_queue_thread.join()
self.upload_all_files()
def upload_file(
self,
file_path,
final_tags=None,
final_rating=None,
progress_queue: queue.Queue | None = None,
cancel_event: threading.Event | None = None,
info_callback: Callable[[str], Any] | None = None,
warning_callback: Callable[[str], Any] | None = None,
error_callback: Callable[[str], Any] | None = None,
):
base_file_name = os.path.basename(file_path)
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
progress_queue.put(
(
"label",
_("Wysyłam plik {base_file_name}...").format(
base_file_name=base_file_name
),
)
)
url = self.settings.base_url.rstrip("/") + "/api/danbooru/add_post"
tags = final_tags
fields = {
"login": self.settings.username,
"password": self.settings.password,
"tags": tags,
"source": "",
}
rating_value = self.rating_map.get(final_rating, "")
if rating_value:
fields["rating"] = rating_value
try:
total_size = os.path.getsize(file_path)
def progress_callback(bytes_read, total_size):
if progress_queue:
percentage = int(bytes_read / total_size * 100)
progress_queue.put(("progress", percentage))
with open(file_path, "rb") as f:
wrapped_file = ProgressFile(
f, progress_callback, total_size, cancel_event
)
files = {"file": (base_file_name, wrapped_file, "image/png")}
response = requests.post(url, data=fields, files=files)
if progress_queue:
progress_queue.put(("progress", 100))
show_warn = False
post_url = None
if response.status_code in (200, 201):
message = _("Wysyłanie zakończone powodzeniem!")
post_url = response.headers.get("X-Danbooru-Location", None)
elif response.status_code == 409:
message = _(
"Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}"
).format(
status_code=response.status_code,
text=response.headers.get("X-Danbooru-Errors", ""),
)
post_url = response.headers.get("X-Danbooru-Location", None)
show_warn = True
else:
message = _(
"Wysyłanie zakończone błędem.\nStatus: {status_code}\nTreść: {text}"
).format(status_code=response.status_code, text=response.text)
show_warn = True
# Aktualizacja wyglądu listy musimy użyć domyślnych argumentów w lambdzie, aby zachować bieżący indeks
if show_warn:
if not final_tags:
if warning_callback:
warning_callback(message)
else:
print("[WARN]", _("Wysyłanie"), message)
else:
if not final_tags:
if info_callback:
info_callback(message)
else:
print("[INFO]", _("Wysyłanie"), message)
if self.upload_file_success_callback:
self.upload_file_success_callback(file_path)
if post_url:
post_id = post_url.split("/")[-1]
self.uploaded[file_path] = post_id
self.uploaded_count += 1
self.after(0, self.update_status_bar)
except Exception as e:
if error_callback:
error_callback(str(e))
else:
print("[ERROR]", _("Błąd wysyłania pliku"), file_path, ":", e)
finally:
if self.upload_file_completed_callback:
self.upload_file_completed_callback()
def upload_all_files(
self,
progress_queue: queue.Queue = None,
cancel_event: threading.Event = None,
secondary_progress_queue: queue.Queue = None,
update_status_callback=None,
manual_tags: set = set(),
info_callback: Callable[[str], Any] | None = None,
warning_callback: Callable[[str], Any] | None = None,
error_callback: Callable[[str], Any] | None = None,
):
files_to_upload = [x for x in self.image_files if not self.uploaded[x]]
files_count = len(files_to_upload)
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
file_idx = 0
for file_path in files_to_upload:
if progress_queue:
progress_queue.put(("progress", file_idx * 100 / files_count))
progress_queue.put(
("label", f"Wysyłam plik {file_idx+1}/{files_count}...")
)
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Anulowano operację!")))
return
if not self.uploaded.get(file_path, False):
final_tags, final_rating = self.compute_final_tags_and_rating_for_file(
file_path,
update_status_callback, # lambda: self.after(0, self.update_status_bar)
manual_tags, # set(self.manual_tags_manager.manual_tags)
)
print(
_(
"Wysyłanie {file_path} z tagami: {final_tags} i ratingiem: {final_rating}"
).format(
file_path=file_path,
final_tags=final_tags,
final_rating=final_rating,
)
)
self.upload_file(
file_path,
final_tags=final_tags,
final_rating=final_rating,
progress_queue=secondary_progress_queue,
cancel_event=cancel_event,
info_callback=info_callback,
warning_callback=warning_callback,
error_callback=error_callback,
)
file_idx += 1
if progress_queue:
progress_queue.put(("label", _("Przesłano pliki!")))
progress_queue.put(("progress", 100))
def edit_file(
self,
file_path,
final_tags=None,
final_rating=None,
progress_queue=None,
cancel_event=None,
info_callback: Callable[[str], Any] | None = None,
error_callback: Callable[[str], Any] | None = None,
):
"""
Update tags and rating for an existing post without uploading the file.
"""
base_file_name = os.path.basename(file_path)
post_id = self.uploaded.get(file_path)
if not post_id:
if error_callback:
error_callback(_("Post nie został znaleziony dla tego pliku"))
else:
print(
"[ERROR]",
_("Post nie został znaleziony dla tego pliku"),
file_path,
)
return
if progress_queue:
progress_queue.put(("mode", "determinate"))
progress_queue.put(("max", 100))
progress_queue.put(
(
"label",
_("Aktualizuję tagi dla {base_file_name}...").format(
base_file_name=base_file_name
),
)
)
try:
# Check for cancellation before starting the operation.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Get authentication session and token
session = login(self.settings)
auth_token = get_auth_token(session, self.settings)
# Check cancellation after login if needed.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Prepare tags and rating
tags = final_tags
rating_value = self.rating_map.get(final_rating, "?")
# Prepare API request
url = self.settings.base_url.rstrip("/") + "/post/set"
payload = {
"auth_token": auth_token,
"image_id": post_id,
"title": base_file_name,
"owner": self.settings.username,
"tags": tags,
"source": "",
"rating": rating_value,
}
if progress_queue:
progress_queue.put(("progress", 50))
# Check for cancellation before sending the update request.
if cancel_event is not None and cancel_event.is_set():
if progress_queue:
progress_queue.put(("label", _("Operacja anulowana")))
return
# Send update request
response = session.post(url, data=payload, allow_redirects=False)
# Handle 302 redirect as success case
if response.status_code == 302:
if progress_queue:
progress_queue.put(("progress", 100))
message = _("Tagi zostały zaktualizowane!")
if not final_tags: # Only show success if not bulk operation
if info_callback:
info_callback(message)
else:
print("[INFO]", _("Sukces edycji"), message)
# Update UI state
if self.upload_file_completed_callback:
self.upload_file_completed_callback()
return
# Handle other status codes
error_msg = _("Błąd podczas aktualizacji tagów\nStatus: {code}").format(
code=response.status_code
)
if response.text:
error_msg += f"\n{_('Treść:')} {response.text}"
if error_callback:
error_callback(error_msg)
else:
print("[ERROR]", _("Błąd edycji"), error_msg)
except Exception as e:
if error_callback:
error_callback(str(e))
else:
print("[ERROR]", _("Krytyczny błąd edycji"), file_path, ":", e)
finally:
if progress_queue:
progress_queue.put(("progress", 100))