actually works
Some checks failed
Gitea/kapitanbooru-uploader/pipeline/head There was a failure building this commit

Now with tagger

Miejsce na zdjęcie

Linki do wiki

Zapis ustawień

Tagger działa w tle

Kolorujemy pliki po ratingu

Tagger cache

Tagi w bazie

Pobranie implikacji tagów

Autocomplete

Podział na pliki i skrypty + nowe API

Structure for package

Version 0.1.0
This commit is contained in:
Michał Leśniak 2025-02-13 22:11:35 +01:00
commit 5a97d610a7
18 changed files with 3069 additions and 0 deletions

174
.gitignore vendored Normal file
View File

@ -0,0 +1,174 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

54
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,54 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Module",
"type": "debugpy",
"request": "launch",
"module": "kapitanbooru_uploader"
},
{
"name": "Python: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Python: Attach using Process ID",
"type": "debugpy",
"request": "attach",
"processId": "${command:pickProcess}"
},
{
"name": "Python: Remote Attach",
"type": "debugpy",
"request": "attach"
},
{
"name": "Python: Terminal (external)",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "externalTerminal"
},
{
"name": "Python: Terminal (integrated)",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Python main",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/main.py",
"console": "integratedTerminal"
}
]
}

38
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,38 @@
pipeline {
agent { label 'Pi4' } // Use Raspberry Pi 4 agent
environment {
PIP_EXTRA_INDEX_URL = 'http://localhost:8090/simple/' // Local PyPI repo
PACKAGE_NAME = 'kapitanbooru_uploader' // Your package name
}
stages {
stage('Checkout') {
steps {
checkout scm
}
}
stage('Setup Python') {
steps {
sh 'python3.13 -m venv venv' // Create a virtual environment
sh '. venv/bin/activate && pip install --upgrade pip build twine'
}
}
stage('Build Package') {
steps {
sh '. venv/bin/activate && python -m build' // Builds the package
}
}
stage('Publish to Local PyPI') {
steps {
sh '. venv/bin/activate && twine upload --repository-url http://localhost:8090/ dist/*'
}
}
}
post {
cleanup {
sh 'rm -rf venv dist build *.egg-info' // Clean up build artifacts
}
}
}

7
LICENSE Normal file
View File

@ -0,0 +1,7 @@
Copyright 2025 Michał Leśniak
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

69
README.md Normal file
View File

@ -0,0 +1,69 @@
# Kapitanbooru Uploader
Kapitanbooru Uploader is a GUI application for uploading images to KapitanBooru. It provides features such as tag management, automatic tagging using wdtagger, and more.
## Features
- **Image Upload**: Easily upload images to KapitanBooru.
- **Tag Management**: Manage tags for images, including automatic tagging using wdtagger.
- **Cache Management**: Cache results from wdtagger to speed up processing.
- **Settings**: Configure application settings such as default tags, cache expiry, and more.
## Installation
1. Clone the repository:
```sh
git clone https://git.mlesniak.pl/kapitan/kapitanbooru-uploader.git
cd kapitanbooru-uploader
```
2. Install the required dependencies:
```sh
pip install -r requirements.txt
```
3. Ensure you have the required Python version:
```sh
python --version
# Should be >= 3.13
```
## Usage
1. Run the application:
```sh
python -m kapitanbooru_uploader.main
```
2. Select the folder containing the images you want to upload.
3. Manage tags and upload images using the GUI.
## Configuration
Configuration settings can be found in the `settings.json` file located in the application data directory. You can modify settings such as username, password, base URL, default tags, and more.
## Development
### Running Tests
To run tests, use the following command:
```sh
pytest
```
### Debugging
You can use the provided VSCode launch configurations to debug the application. Open the `.vscode/launch.json` file and select the appropriate configuration.
## Contributing
Contributions are welcome! Please fork the repository and submit a pull request.
## License
This project is licensed under the MIT License. See the `LICENSE` file for details.
## Contact
For any questions or issues, please contact Michał Leśniak at [kapitan@mlesniak.pl](mailto:kapitan@mlesniak.pl).

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,16 @@
# Klasa pomocnicza do monitorowania postępu uploadu
class ProgressFile:
def __init__(self, f, callback, total_size):
self.f = f
self.callback = callback
self.total_size = total_size
self.read_bytes = 0
def read(self, size=-1):
data = self.f.read(size)
self.read_bytes += len(data)
self.callback(self.read_bytes, self.total_size)
return data
def __getattr__(self, attr):
return getattr(self.f, attr)

View File

@ -0,0 +1,427 @@
from collections import deque
import json
import os
import sqlite3
import time
import requests
from pathlib import Path
from .settings import Settings
# Stałe auth_token (CSRF token) oraz ciasteczka
AUTH_TOKEN = "" # ustaw właściwą wartość
SHM_SESSION = "" # ustaw właściwą wartość
SHM_USER = "" # ustaw właściwą wartość
POST_URL = "http://192.168.1.11:8001/auto_tag/import"
BATCH_SIZE = 1000 # maksymalna liczba wierszy w jednej partii
def flatten_graph(graph):
"""
Dla każdego tagu (klucza) oblicza domknięcie przechodnie
czyli zbiór wszystkich tagów osiągalnych w grafie.
Używamy cache, aby uniknąć wielokrotnych obliczeń.
"""
cache = {}
def dfs(tag):
if tag in cache:
return cache[tag]
result = set()
# Przechodzimy po bezpośrednich konsekwencjach
for nxt in graph.get(tag, set()):
result.add(nxt)
result |= dfs(nxt)
cache[tag] = result
return result
flattened = {}
for tag in graph.keys():
flattened[tag] = dfs(tag)
return flattened
class TagsRepo:
def __init__(self, settings: Settings):
self.settings = settings
self.db_path = os.path.join(
os.path.dirname(settings.get_settings_path()), "tags.db"
)
regenerate = False
if not Path(self.db_path).is_file():
regenerate = True
print(f"Database file not found: {self.db_path}, will regenerate DB")
self.init_tags_db()
if regenerate:
self.regenerate_db()
def get_conn(self):
return sqlite3.connect(self.db_path)
# --- Inicjalizacja bazy tagów ---
def init_tags_db(self):
try:
conn = self.get_conn()
cursor = conn.cursor()
tables = {
"tags": """
CREATE TABLE IF NOT EXISTS "tags" (
"index" INTEGER,
"id" INTEGER,
"name" TEXT,
"post_count" INTEGER,
"category" INTEGER,
"created_at" TIMESTAMP,
"updated_at" TIMESTAMP,
"is_deprecated" INTEGER,
"words" TEXT
)
""",
"tag_aliases": """
CREATE TABLE IF NOT EXISTS "tag_aliases" (
"index" INTEGER,
"alias" TEXT,
"tag" TEXT
)
""",
"tag_closure": """
CREATE TABLE IF NOT EXISTS "tag_closure" (
antecedent TEXT NOT NULL,
consequent TEXT NOT NULL,
depth INTEGER NOT NULL,
PRIMARY KEY (antecedent, consequent)
)
""",
"tag_implications": """
CREATE TABLE IF NOT EXISTS "tag_implications" (
antecedent TEXT NOT NULL,
consequent TEXT NOT NULL,
PRIMARY KEY (antecedent, consequent)
)
""",
}
indexes = {
"tags": [
"""CREATE INDEX IF NOT EXISTS ix_tags_index ON tags ("index")""",
"CREATE INDEX IF NOT EXISTS tags_index_category ON tags (category)",
"CREATE INDEX IF NOT EXISTS tags_index_created_at ON tags (created_at)",
"CREATE INDEX IF NOT EXISTS tags_index_id ON tags (id)",
"CREATE INDEX IF NOT EXISTS tags_index_is_deprecated ON tags (is_deprecated)",
"CREATE INDEX IF NOT EXISTS tags_index_name ON tags (name)",
"CREATE INDEX IF NOT EXISTS tags_index_post_count ON tags (post_count)",
"CREATE INDEX IF NOT EXISTS tags_index_updated_at ON tags (updated_at)",
],
"tag_aliases": [
"""CREATE INDEX IF NOT EXISTS ix_tag_aliases_index ON tag_aliases ("index")""",
"CREATE INDEX IF NOT EXISTS tag_aliases_index_alias ON tag_aliases (alias)",
"CREATE INDEX IF NOT EXISTS tag_aliases_index_tag ON tag_aliases (tag)",
],
"tag_closure": [
"CREATE INDEX IF NOT EXISTS idx_closure_antecedent ON tag_closure (antecedent)"
],
"tag_implications": [
"CREATE INDEX IF NOT EXISTS idx_implications_antecedent ON tag_implications (antecedent)"
],
}
for table, create_stmt in tables.items():
cursor.execute(create_stmt)
for table, index_list in indexes.items():
for index_stmt in index_list:
cursor.execute(index_stmt)
conn.commit()
conn.close()
except Exception as e:
print("Błąd przy inicjalizacji bazy tagów:", e)
def regenerate_db(self):
# Połączenie z bazą SQLite i pobranie tagów
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM tags")
cursor.execute("DELETE FROM tag_aliases")
conn.commit()
rate_limit = 10 # requests per second
min_interval = 1.0 / rate_limit # minimum seconds between requests (0.1 sec)
data_list = []
page = 0
while True:
print(f"Tagi - Pobieranie od id {page}...")
start_time = time.monotonic()
url = f"https://danbooru.donmai.us/tags.json?limit=1000&page=a{page}"
response = requests.get(url)
if response.status_code != 200:
print(
f"Błąd przy pobieraniu strony {page}: HTTP {response.status_code}"
)
break
data = response.json()
if not data:
break
page = None
for item in data:
id = item.get("id")
if not page:
page = id
name = item.get("name")
post_count = item.get("post_count")
category = item.get("category")
created_at = item.get("created_at")
updated_at = item.get("updated_at")
is_deprecated = item.get("is_deprecated")
words = json.dumps(item.get("words"))
data_list.append(
(
id,
name,
post_count,
category,
created_at,
updated_at,
is_deprecated,
words,
)
)
if len(data) < 1000:
break
# Calculate elapsed time and sleep if necessary to enforce the rate limit
elapsed = time.monotonic() - start_time
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
print(f"Tagi - Pobrano {len(data_list)} tagów...")
data_list = sorted(data_list, key=lambda x: x[0])
data_list = [(idx,) + row for idx, row in enumerate(data_list)]
cursor.executemany(
"""
INSERT INTO tags ("index", id, name, post_count, category, created_at, updated_at, is_deprecated, words)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
data_list,
)
conn.commit()
data_list = []
page = 0
while True:
print(f"Aliasy tagów - Pobieranie od id {page}...")
start_time = time.monotonic()
url = f"https://danbooru.donmai.us/tag_aliases.json?limit=1000&only=id,antecedent_name,consequent_name&search[status]=active&page=a{page}"
response = requests.get(url)
if response.status_code != 200:
print(
f"Błąd przy pobieraniu strony {page}: HTTP {response.status_code}"
)
break
data = response.json()
if not data:
break
page = None
for item in data:
id = item.get("id")
if not page:
page = id
antecedent = item.get("antecedent_name")
consequent = item.get("consequent_name")
data_list.append((antecedent, consequent))
if len(data) < 1000:
break
# Calculate elapsed time and sleep if necessary to enforce the rate limit
elapsed = time.monotonic() - start_time
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
print(f"Aliasy tagów - Pobrano {len(data_list)} aliasów tagów...")
data_list = sorted(data_list, key=lambda x: x[0])
data_list = [(idx,) + row for idx, row in enumerate(data_list)]
cursor.executemany(
"""
INSERT INTO tag_aliases ("index", alias, tag)
VALUES (?, ?, ?)
""",
data_list,
)
conn.commit()
data_list = []
# Pobranie tagów kategorii "character" (category = 4)
cursor.execute("SELECT name FROM tags WHERE category = 4")
character_tags = {row[0] for row in cursor.fetchall()}
# Pobranie tagów kategorii "copyright" (category = 3)
cursor.execute("SELECT name FROM tags WHERE category = 3")
copyright_tags = {row[0] for row in cursor.fetchall()}
# Pobranie tagów kategorii "meta" (category = 5)
cursor.execute("SELECT name FROM tags WHERE category = 5")
meta_tags = {row[0] for row in cursor.fetchall()}
# Pobranie tagów kategorii "artist" (category = 1)
cursor.execute("SELECT name FROM tags WHERE category = 1")
artist_tags = {row[0] for row in cursor.fetchall()}
cursor.execute("DELETE FROM tag_implications") # Optional: reset table
cursor.execute("DELETE FROM tag_closure") # Optional: reset table
conn.commit()
# Budujemy strukturę implikacji: słownik, gdzie
# kluczem jest antecedent_name, a wartością zbiór consequent_name.
tag_dict = {}
page = 0
while True:
print(f"Implikacje tagów - Pobieranie od id {page}...")
url = f"https://danbooru.donmai.us/tag_implications.json?limit=1000&page=a{page}"
response = requests.get(url)
if response.status_code != 200:
print(
f"Błąd przy pobieraniu strony {page}: HTTP {response.status_code}"
)
break
data = response.json()
if not data:
break
page = None
for item in data:
id = item.get("id")
if not page:
page = id
if item.get("status") != "active":
continue
antecedent = item.get("antecedent_name")
consequent = item.get("consequent_name")
# Dodanie prefiksu, jeżeli tag należy do jednej z kategorii
if antecedent in character_tags:
antecedent = f"character:{antecedent}"
elif antecedent in copyright_tags:
antecedent = f"copyright:{antecedent}"
elif antecedent in meta_tags:
antecedent = f"meta:{antecedent}"
elif antecedent in artist_tags:
antecedent = f"artist:{antecedent}"
if consequent in character_tags:
consequent = f"character:{consequent}"
elif consequent in copyright_tags:
consequent = f"copyright:{consequent}"
elif consequent in meta_tags:
consequent = f"meta:{consequent}"
elif consequent in artist_tags:
consequent = f"artist:{consequent}"
if antecedent not in tag_dict:
tag_dict[antecedent] = set()
tag_dict[antecedent].add(consequent)
if len(data) < 1000:
break
# Calculate elapsed time and sleep if necessary to enforce the rate limit
elapsed = time.monotonic() - start_time
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
print(f"Implikacje tagów - Pobrano {len(tag_dict)} implikacji tagów...")
# Batch insert all unique pairs
for antecedent, consequents in tag_dict.items():
for consequent in consequents:
if antecedent != consequent:
cursor.execute(
"INSERT OR IGNORE INTO tag_implications VALUES (?, ?)",
(antecedent, consequent),
)
conn.commit()
cursor.executemany(
"INSERT INTO tag_closure VALUES (?, ?, ?)",
self.build_transitive_closure(tag_dict),
)
conn.commit()
conn.close()
def build_transitive_closure(self, tag_dict):
closure = set()
for antecedent in tag_dict:
visited = set()
queue = deque([(antecedent, 0)])
while queue:
current_tag, depth = queue.popleft()
if current_tag in visited:
continue
visited.add(current_tag)
# Add to closure if not self-reference
if current_tag != antecedent:
closure.add((antecedent, current_tag, depth))
# Traverse next level
for next_tag in tag_dict.get(current_tag, []):
queue.append((next_tag, depth + 1))
return closure
# def garbage():
#
# # Spłaszczenie struktury obliczenie domknięcia przechodniego
# flattened_tag_dict = flatten_graph(tag_dict)
# print("Spłaszczono strukturę implikacji.")
#
# # Przygotowanie listy wierszy do CSV
# # Każdy wiersz: (antecedent, consequents jako space-separated string)
# csv_rows = []
# for antecedent, consequents in flattened_tag_dict.items():
# # Sortujemy, żeby wynik był deterministyczny
# consequents_str = " ".join(sorted(consequents))
# csv_rows.append((antecedent, consequents_str))
#
# print(f"Łącznie wierszy do wysłania: {len(csv_rows)}")
#
# # Konfiguracja ciasteczek do żądania POST
# cookies = {
# 'shm_session': SHM_SESSION,
# 'shm_user': SHM_USER
# }
#
# # Wysyłanie danych w partiach po BATCH_SIZE wierszy
# for i in range(0, len(csv_rows), BATCH_SIZE):
# batch = csv_rows[i:i+BATCH_SIZE]
# # Utworzenie pliku CSV w pamięci
# output = io.StringIO()
# writer = csv.writer(output, quoting=csv.QUOTE_ALL)
# for row in batch:
# writer.writerow(row)
# csv_content = output.getvalue()
# output.close()
#
# # Przygotowanie danych formularza (z auth_token) oraz pliku CSV
# data = {
# 'auth_token': AUTH_TOKEN
# }
# files = {
# 'auto_tag_file': ('batch.csv', csv_content, 'text/csv')
# }
#
# print(f"Wysyłanie batcha wierszy {i+1} - {i+len(batch)}...")
# post_response = requests.post(POST_URL, data=data, files=files, cookies=cookies, allow_redirects=False)
# if post_response.status_code in (200, 302):
# print(f"Batch {i+1}-{i+len(batch)} wysłany pomyślnie.")
# else:
# print(f"Błąd przy wysyłaniu batcha {i+1}-{i+len(batch)}: HTTP {post_response.status_code}")
#
#
# print("Wszystkie dane zostały wysłane.")

View File

View File

@ -0,0 +1,12 @@
"""kapitanbooru_uploader.__main__: executed
when kapitanbooru_uploader directory is called as script."""
from .ImageBrowser import ImageBrowser
def main():
app = ImageBrowser()
app.mainloop()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,252 @@
import tkinter as tk
from tkinter import font
from .TagsRepo import TagsRepo
from .common import open_tag_wiki_url
from .tag_processing import process_tag
from .settings import Settings
class AutocompleteEntry(tk.Entry):
def __init__(self, master, tags_repo: TagsRepo, callback=None, *args, **kwargs):
super().__init__(master, *args, **kwargs)
self.tags_repo = tags_repo
self.callback = callback
self.listbox = None
self.listbox_window = None
self.suggestions = []
self.suggestion_map = {}
self.search_after_id = None # Przechowuje ID opóźnionego wyszukiwania
self.selection_index = -1
self.bind("<KeyRelease>", self.on_keyrelease)
self.bind("<Down>", self.on_down)
self.bind("<Up>", self.on_up)
self.bind("<Return>", self.on_return)
self.bind("<Tab>", self.on_return)
self.bind("<FocusOut>", lambda e: self.hide_listbox())
def on_keyrelease(self, event):
if event.keysym in ("Down", "Up", "Return", "Tab"):
return
if event.keysym == "Escape":
self.hide_listbox()
return
if self.search_after_id:
self.after_cancel(self.search_after_id)
self.search_after_id = self.after(200, self.update_suggestions)
def update_suggestions(self):
self.search_after_id = None
# Pobieramy cały tekst oraz indeks kursora
full_text = self.get()
# self.index(tk.INSERT) zwraca indeks w formacie "linia.kolumna", możemy go wykorzystać jako indeks znakowy
text_before_cursor = full_text[: self.index(tk.INSERT)]
# Jeżeli ostatni znak to spacja, to znaczy, że użytkownik zakończył ostatni tag nie sugerujemy
if text_before_cursor and text_before_cursor[-1].isspace():
self.hide_listbox()
return
# Podziel tekst przed kursorem na tokeny (oddzielone spacjami)
tokens = text_before_cursor.split()
prefix = tokens[-1] if tokens else ""
if not prefix:
self.hide_listbox()
return
# Pobieramy sugestie na podstawie prefixu
self.suggestions = self.get_suggestions(prefix)
if self.suggestions:
self.show_listbox()
else:
self.hide_listbox()
def on_return(self, event):
if self.listbox and self.selection_index >= 0:
selected_display = self.listbox.get(self.selection_index)
# Pobieramy wartość do wstawienia z mapy (czyli bez liczby postów)
suggestion = self.suggestion_map.get(selected_display, selected_display)
tag = suggestion
else:
tag = self.get().strip()
if tag and self.callback:
self.callback(tag)
self.delete(0, tk.END)
self.hide_listbox()
return "break"
def get_suggestions(self, prefix):
try:
conn = self.tags_repo.get_conn()
cursor = conn.cursor()
query = """
SELECT name, category, post_count FROM tags
WHERE name LIKE ? AND post_count >= 1
ORDER BY post_count DESC
LIMIT 10
"""
cursor.execute(query, (prefix + "%",))
results = cursor.fetchall()
conn.close()
# Mapowanie kategorii na prefiksy
prefix_map = {1: "artist:", 3: "copyright:", 4: "character:", 5: "meta:"}
suggestions = []
# Utwórz słownik mapujący tekst wyświetlany (z liczbą) na tekst do wstawienia (bez liczby)
self.suggestion_map = {}
for row in results:
name, category, post_count = row
tag_insert = prefix_map.get(category, "") + name
display_text = f"{tag_insert} ({post_count})"
suggestions.append(display_text)
self.suggestion_map[display_text] = tag_insert
return suggestions
except Exception as e:
print("Błąd przy pobieraniu sugestii:", e)
return []
def show_listbox(self):
if self.listbox_window:
self.listbox_window.destroy()
self.listbox_window = tk.Toplevel(self)
self.listbox_window.wm_overrideredirect(True)
self.listbox = tk.Listbox(self.listbox_window, height=6)
self.listbox.bind("<Button-1>", self.on_listbox_click)
self.listbox.bind("<Motion>", self.on_listbox_motion)
for suggestion in self.suggestions:
self.listbox.insert(tk.END, suggestion)
self.listbox.pack(fill=tk.BOTH, expand=True)
# Pobierz czcionkę używaną w listboxie
list_font = font.Font(font=self.listbox.cget("font"))
# Oblicz maksymalną szerokość na podstawie najdłuższego elementu
max_width = (
max(list_font.measure(item) for item in self.suggestions) + 20
) # +20 dla marginesu
# Ustaw szerokość okna na podstawie najszerszego elementu
self.listbox_window.geometry(f"{max_width}x200") # 200 - wysokość okna
# Umieszczamy okno poniżej pola autouzupełniania
x = self.winfo_rootx()
y = self.winfo_rooty() + self.winfo_height()
self.listbox_window.geometry("+%d+%d" % (x, y))
self.listbox_window.deiconify()
self.selection_index = -1
def hide_listbox(self):
if self.listbox_window:
self.listbox_window.destroy()
self.listbox_window = None
self.listbox = None
self.selection_index = -1
def on_listbox_click(self, event):
if self.listbox:
index = self.listbox.curselection()
if index:
value = self.listbox.get(index)
self.delete(0, tk.END)
self.insert(tk.END, value)
self.hide_listbox()
return "break"
def on_listbox_motion(self, event):
if self.listbox:
self.listbox.selection_clear(0, tk.END)
index = self.listbox.nearest(event.y)
self.listbox.selection_set(first=index)
self.selection_index = index
def on_down(self, event):
if self.listbox:
self.selection_index = (self.selection_index + 1) % self.listbox.size()
self.listbox.selection_clear(0, tk.END)
self.listbox.selection_set(self.selection_index)
self.listbox.activate(self.selection_index)
return "break"
def on_up(self, event):
if self.listbox:
self.selection_index = (self.selection_index - 1) % self.listbox.size()
self.listbox.selection_clear(0, tk.END)
self.listbox.selection_set(self.selection_index)
self.listbox.activate(self.selection_index)
return "break"
class TagManager(tk.Frame):
"""
This widget holds a tag input entry (with autocompletion) and a display area
that shows the entered tags. In the display area, left-clicking on a tag removes it,
and right-clicking opens its wiki URL. Tag appearance is adjusted (color/underline)
based on custom logic.
"""
def __init__(self, master, settings: Settings, tags_repo: TagsRepo, *args, **kwargs):
super().__init__(master, *args, **kwargs)
self.tags_repo = tags_repo
self.settings = settings
self.manual_tags = [] # List to hold manually entered tags
# Entry for new tags (with autocompletion)
self.entry = AutocompleteEntry(self, callback=self.add_tag, tags_repo=self.tags_repo)
self.entry.pack(fill=tk.X, padx=5, pady=5)
# Text widget for displaying already entered tags
self.tags_display = tk.Text(self, wrap="word", height=4)
self.tags_display.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.tags_display.config(state="disabled")
# (Optional: add a scrollbar if needed)
def add_tag(self, tag):
"""Add a new tag if it is not already present."""
if tag and tag not in self.manual_tags:
self.manual_tags.append(tag)
self.update_tags_display()
def update_tags_display(self):
"""Refresh the text widget to display all manual tags with styling and event bindings."""
self.tags_display.config(state="normal")
self.tags_display.delete("1.0", tk.END)
for tag in self.manual_tags:
# Process tag to decide its style
_, deprecated = process_tag(tag, self.tags_repo)
if deprecated is True:
color = "red"
underline = 1
elif deprecated is None:
color = "darkorange"
underline = 1
else:
color = "blue"
underline = 0
start_index = self.tags_display.index(tk.INSERT)
self.tags_display.insert(tk.INSERT, tag)
end_index = self.tags_display.index(tk.INSERT)
tag_name = "manual_" + tag
self.tags_display.tag_add(tag_name, start_index, end_index)
self.tags_display.tag_configure(
tag_name, foreground=color, underline=underline
)
# Left-click: remove tag; Right-click: open wiki URL
self.tags_display.tag_bind(tag_name, "<Button-1>", self.remove_tag)
self.tags_display.tag_bind(tag_name, "<Button-3>", self.open_tag_wiki_url)
self.tags_display.insert(tk.INSERT, " ")
self.tags_display.config(state="disabled")
def remove_tag(self, event):
"""Remove the clicked tag from the list and update the display."""
index = self.tags_display.index("@%d,%d" % (event.x, event.y))
for t in self.tags_display.tag_names(index):
if t.startswith("manual_"):
tag = t[len("manual_") :]
if tag in self.manual_tags:
self.manual_tags.remove(tag)
self.update_tags_display()
break
def open_tag_wiki_url(self, event):
"""Open a wiki URL for the clicked tag."""
index = self.tags_display.index("@%d,%d" % (event.x, event.y))
for t in self.tags_display.tag_names(index):
if t.startswith("manual_"):
tag = t[len("manual_") :]
open_tag_wiki_url(tag, self.settings)
break

View File

@ -0,0 +1,124 @@
import subprocess
from bs4 import BeautifulSoup
import requests
from .settings import Settings
def open_tag_wiki_url(tag, settings: Settings):
"""Otwiera w przeglądarce URL strony wiki dla podanego tagu."""
# Usuń prefiksy
for prefix in [
"character:",
"artist:",
"meta:",
"copyright:",
"general:",
]:
if tag.startswith(prefix):
tag = tag[len(prefix) :]
break
url = "https://danbooru.donmai.us/wiki_pages/" + tag
open_webbrowser(url, settings)
def open_webbrowser(url, settings: Settings):
"""Otwiera URL w wybranej przeglądarce (lub domyślnej)."""
if settings.browser:
try:
subprocess.run([settings.browser, url], check=True)
return
except Exception as e:
print("Błąd przy otwieraniu przeglądarki:", e)
import webbrowser
webbrowser.open(url)
def login(settings: Settings):
"""
Log in to the server using settings and return a session with cookies.
settings should have:
- base_url (e.g., "https://example.com")
- username
- password
"""
# Construct the full URL for login
url = settings.base_url.rstrip("/") + "/user_admin/login"
# Prepare the payload for URL-encoded form data
payload = {"user": settings.username, "pass": settings.password}
# Set the proper header
headers = {"Content-Type": "application/x-www-form-urlencoded"}
# Use a session so that cookies are automatically stored for future requests.
session = requests.Session()
# Send the POST request and prevent automatic redirects,
# so we can capture the 302 response with Set-Cookie headers.
response = session.post(url, data=payload, headers=headers, allow_redirects=False)
if response.status_code == 302:
# The session's cookie jar should now contain the cookies set by the server.
shm_user = session.cookies.get("shm_user")
shm_session = session.cookies.get("shm_session")
if not (shm_user and shm_session):
raise Exception("Login succeeded, but expected cookies were not set.")
print("Login successful. Cookies stored in session:")
print(f"shm_user: {shm_user}")
print(f"shm_session: {shm_session}")
return session
else:
raise Exception(f"Login failed: {response.status_code} - {response.text}")
def get_auth_token(session, settings):
"""
Given a logged-in session and settings, fetch the user page
and extract the auth_token from the hidden input field.
settings should have:
- base_url (e.g., "https://example.com")
The session should contain the 'shm_user' cookie.
"""
# Retrieve the user identifier from cookies
shm_user = session.cookies.get("shm_user")
if not shm_user:
raise Exception("shm_user cookie not found; login might have failed.")
# Build the URL to fetch, e.g., /user/<shm_user>
user_url = f"{settings.base_url.rstrip('/')}/user/{shm_user}"
# Option 1: Simply allow redirects (if your server sends 302 and eventually a 200)
# response = session.get(user_url) # redirects allowed by default
# Option 2: If you want to control redirection manually, disable them:
# response = session.get(user_url, allow_redirects=False)
# Then you might have to follow the redirects manually.
# For simplicity, we'll allow redirects:
response = session.get(user_url)
if response.status_code != 200:
raise Exception(
f"Failed to load {user_url}, status code: {response.status_code}"
)
# Parse the returned HTML with BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")
# Look for the hidden input with name "auth_token"
auth_input = soup.find("input", {"name": "auth_token"})
if auth_input and auth_input.has_attr("value"):
auth_token = auth_input["value"]
print(f"Found auth_token: {auth_token}")
return auth_token
else:
raise Exception("auth_token not found in the HTML page.")

View File

@ -0,0 +1,169 @@
#!/usr/bin/env python3
import os
import time
import requests
import json
import tempfile
from PIL import Image
from wdtagger import Tagger
# Stałe (ustaw odpowiednie wartości)
LOGIN = ""
API_KEY = ""
AUTH_TOKEN = ""
SHM_SESSION = ""
SHM_USER = ""
TAGGER = Tagger()
# Bazowy URL Kapitanbooru
BASE_URL = "http://192.168.1.11:8001"
# Funkcja wyciągająca tytuł czyli nazwę pliku bez rozszerzenia
def extract_title(file_name):
return os.path.splitext(file_name)[0]
# Funkcja aktualizująca rating i tagi dla danego obrazu na serwerze
def update_post(post, new_rating):
"""
Aktualizuje post na serwerze, ustawiając rating.
new_rating rating ustalony przez Taggera (bez mapowania general i sensitive).
Rating wysyłamy jako pierwsza litera (mała), a kod 302 traktujemy jako sukces.
"""
post_id = post.get("id")
title = extract_title(post.get("file_name", ""))
owner = LOGIN # Stałe LOGIN
source = post.get("source", "")
rating_param = new_rating[0] if new_rating else "?"
if rating_param == post.get("rating"):
print(f"Post {post_id} już ma rating {new_rating}, pomijam.")
return False
tags = post.get("tag_string")
url = BASE_URL.rstrip("/") + "/post/set"
cookies = {"shm_session": SHM_SESSION, "shm_user": SHM_USER}
data = {
"auth_token": AUTH_TOKEN,
"image_id": post_id,
"title": title,
"owner": owner,
"tags": tags,
"source": source,
"rating": rating_param,
}
try:
# Ustawiamy allow_redirects=False, aby 302 nie było traktowane jako błąd
r = requests.post(url, data=data, cookies=cookies, allow_redirects=False)
if r.status_code in (200, 201, 302):
print(f"Post {post_id} zaktualizowany, rating: {new_rating}")
else:
print(f"Błąd aktualizacji postu {post_id}: {r.status_code} {r.text}")
except Exception as e:
print(f"Błąd przy aktualizacji postu {post_id}: {e}")
return True
def main():
page = 1
posts = json.loads("[]")
total_processed = 0
while True:
# Tworzymy URL do pobierania postów
posts_url = (
f"{BASE_URL}/posts.json?&tags=rating:s&limit=100&page={page}"
f"&login={LOGIN}&api_key={API_KEY}"
)
try:
response = requests.get(posts_url)
if response.status_code != 200:
print(
f"Błąd pobierania posts.json: {response.status_code} {response.text}"
)
break
response_posts = response.json()
except Exception as e:
print("Błąd przy pobieraniu JSON:", e)
break
if not response_posts:
print("Brak więcej postów.")
break
posts.extend(response_posts)
print(
f"Pobrano stronę {page} z {len(response_posts)} postami. Zebrano łącznie {len(posts)} postów."
)
# Opcjonalnie: odczekaj chwilę między postami, aby nie przeciążyć serwera
time.sleep(0.5)
# Jeśli mniej niż 100 postów na stronie, kończymy
if len(response_posts) < 100:
break
page += 1
for post in posts:
total_processed += 1
print(f"\nPrzetwarzam post {post.get('id')} ({total_processed})...")
file_url = post.get("file_url")
if not file_url:
print("Brak file_url, pomijam.")
continue
# Pobieramy obrazek do tymczasowego pliku
try:
r = requests.get(file_url, stream=True)
if r.status_code != 200:
print(f"Błąd pobierania obrazu: {r.status_code}")
continue
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp_file:
for chunk in r.iter_content(chunk_size=8192):
tmp_file.write(chunk)
tmp_file_path = tmp_file.name
print(f"Obrazek zapisany tymczasowo: {tmp_file_path}")
except Exception as e:
print("Błąd przy pobieraniu obrazu:", e)
continue
# Otwieramy obrazek i uruchamiamy Taggera
try:
img = Image.open(tmp_file_path)
except Exception as e:
print("Błąd przy otwieraniu obrazu:", e)
os.remove(tmp_file_path)
continue
try:
result = TAGGER.tag(img)
new_rating = (
result.rating
if result.rating in ["general", "sensitive", "questionable", "explicit"]
else ""
)
print(f"Tagger: rating = {result.rating}")
except Exception as e:
print("Błąd Taggera:", e)
os.remove(tmp_file_path)
continue
finally:
img.close()
# Aktualizujemy post na serwerze
updated = update_post(post, new_rating)
# Usuwamy tymczasowy plik
try:
os.remove(tmp_file_path)
print(f"Tymczasowy plik {tmp_file_path} usunięty.")
except Exception as e:
print("Błąd przy usuwaniu tymczasowego pliku:", e)
# Odczekaj chwilę między postami, aby nie przeciążyć serwera, jeśli aktualizowano
if updated:
time.sleep(0.5)
print(f"\nZakończono przetwarzanie. Łącznie przetworzono {total_processed} postów.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,6 @@
networkx==3.4.2
Pillow==11.1.0
pywin32==308
Requests==2.32.3
wdtagger==0.13.2
bs4==0.0.2

View File

@ -0,0 +1,179 @@
import base64
import importlib
import json
import os
import sqlite3
import subprocess
import sys
# Na Windowsie używamy DPAPI
if sys.platform.startswith("win"):
try:
import win32crypt
import winreg
except ImportError:
win32crypt = None # Upewnij się, że masz zainstalowany pywin32
winreg = None # Upewnij się, że masz zainstalowany pywin32
def get_browser_paths_windows():
"""Returns a dictionary of browsers and their executable paths from Windows registry and Start Menu."""
browsers = {"Default": None} # "Default" for default system browser
# Check the registry for installed browsers
registry_paths = [
r"SOFTWARE\Clients\StartMenuInternet", # 64-bit Windows
r"SOFTWARE\WOW6432Node\Clients\StartMenuInternet", # 32-bit applications
]
for reg_path in registry_paths:
try:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path) as key:
for i in range(winreg.QueryInfoKey(key)[0]): # Iterate over subkeys
browser_name = winreg.EnumKey(key, i)
try:
browser_key_path = os.path.join(
reg_path, browser_name, r"shell\open\command"
)
with winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE, browser_key_path
) as subkey:
command, _ = winreg.QueryValueEx(subkey, None)
browsers[browser_name] = command.strip(
'"'
) # Clean command string
except FileNotFoundError:
pass # Skip if no command found
except FileNotFoundError:
pass # Registry path not found, continue
return browsers
def get_browsers_linux():
"""Detects installed browsers on Linux by checking available executables."""
browsers = {"Default": None}
browser_names = [
"firefox",
"google-chrome",
"chromium",
"opera",
"brave",
"vivaldi",
]
for browser in browser_names:
if (
subprocess.run(
["which", browser], stdout=subprocess.PIPE, stderr=subprocess.PIPE
).returncode
== 0
):
browsers[browser] = browser
return browsers
def detect_installed_browsers():
"""Detects available browsers depending on the OS."""
if sys.platform == "win32" and winreg: # Windows and winreg is available
browsers = get_browser_paths_windows()
elif sys.platform.startswith("linux"): # Linux
browsers = get_browsers_linux()
return browsers
# --- Funkcje pomocnicze do szyfrowania/odszyfrowania hasła na Windowsie ---
def encrypt_password(password):
"""Szyfruje hasło przy użyciu DPAPI i zwraca zakodowaną base64 postać."""
if win32crypt is None:
return password # jeśli brak win32crypt, zwróć hasło w postaci jawnej
blob = win32crypt.CryptProtectData(
password.encode("utf-8"), None, None, None, None, 0
)
return base64.b64encode(blob).decode("utf-8")
def decrypt_password(enc_password):
"""Odszyfrowuje hasło zapisane w formacie base64 przy użyciu DPAPI."""
if win32crypt is None:
return enc_password
enc_data = base64.b64decode(enc_password)
data = win32crypt.CryptUnprotectData(enc_data, None, None, None, 0)
return data[1].decode("utf-8")
class Settings:
def __init__(self):
# Ustawienia domyślne
self.username = ""
self.password = ""
self.base_url = "http://192.168.1.11:8001"
self.default_tags = "artist:kapitan meta:ai-generated"
self.cache_expiry = 604800 # 7 dni w sekundach
self.browser = ""
self.installed_browsers = detect_installed_browsers()
self.load_settings()
self.installed_browsers_reverse = {
v: k for k, v in self.installed_browsers.items()
}
def get_settings_path(self):
"""Ustala ścieżkę do pliku ustawień w zależności od systemu."""
if sys.platform.startswith("win"):
base_dir = os.path.join(os.environ.get("APPDATA", ""), "Kapitanbooru")
else:
base_dir = os.path.expanduser("~/.kapitanbooru")
if not os.path.exists(base_dir):
os.makedirs(base_dir)
return os.path.join(base_dir, "settings.json")
def load_settings(self):
"""Ładuje ustawienia z pliku, jeżeli plik istnieje."""
# Ustawienia domyślne
self.username = ""
self.password = ""
self.base_url = "http://192.168.1.11:8001"
self.default_tags = "artist:kapitan meta:ai-generated"
self.cache_expiry = 604800 # 7 dni w sekundach
self.browser = ""
try:
if os.path.exists(self.get_settings_path()):
with open(self.get_settings_path(), "r", encoding="utf-8") as f:
data = json.load(f)
self.username = data.get("username", self.username)
# Jeśli system Windows, odszyfruj hasło
if sys.platform.startswith("win") and "password" in data:
self.password = decrypt_password(data["password"])
else:
self.password = data.get("password", self.password)
self.base_url = data.get("base_url", self.base_url)
self.default_tags = data.get("default_tags", self.default_tags)
self.cache_expiry = data.get("cache_expiry", self.cache_expiry)
self.browser = data.get("browser", self.browser)
if self.browser not in self.installed_browsers:
self.browser = ""
except Exception as e:
print("Błąd podczas ładowania ustawień:", e)
def save_settings(self):
"""Zapisuje ustawienia do pliku."""
data = {
"username": self.username,
"base_url": self.base_url,
"default_tags": self.default_tags,
"cache_expiry": self.cache_expiry,
"browser": self.browser,
}
# Na Windowsie szyfrujemy hasło
if sys.platform.startswith("win"):
data["password"] = encrypt_password(self.password)
else:
data["password"] = self.password
try:
with open(self.get_settings_path(), "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
except Exception as e:
print("Błąd podczas zapisywania ustawień:", e)

View File

@ -0,0 +1,181 @@
from functools import lru_cache
import re
from .TagsRepo import TagsRepo
@lru_cache(maxsize=1)
def get_character_tags(tags_repo: TagsRepo):
"""Zwraca zbiór nazw tagów z kategorii Character (kategoria = 4) z bazy tags.sqlite."""
try:
conn = tags_repo.get_conn()
cursor = conn.cursor()
cursor.execute("SELECT name FROM tags WHERE category = 4")
rows = cursor.fetchall()
conn.close()
return {row[0] for row in rows}
except Exception as e:
print("Błąd przy pobieraniu tagów postaci:", e)
return set()
@lru_cache(maxsize=1)
def get_copyright_tags(tags_repo: TagsRepo):
"""Zwraca zbiór nazw tagów z kategorii Copyright (kategoria = 3) z bazy tags.sqlite."""
try:
conn = tags_repo.get_conn()
cursor = conn.cursor()
cursor.execute("SELECT name FROM tags WHERE category = 3")
rows = cursor.fetchall()
conn.close()
return {row[0] for row in rows}
except Exception as e:
print("Błąd przy pobieraniu tagów copyright:", e)
return set()
# Wzorce i ustawienia związane z tagami
COEFFICIENT_PATTERN = re.compile(r"^.*?(:\d+|\d+\.\d+)$")
UNESCAPED_PATTERN = re.compile(r"(?<!\\)[\(\)\[\]]+")
SLASH_PATTERN = re.compile(r"\\+(?=[\(\)](?!\^))")
WHITESPACE_PATTERN = re.compile(r"\s+|_+")
AUTO_METATAGS = [
"absurdres",
"high_score",
"great_score",
"masterpiece",
"general",
"sensitive",
"questionable",
"explicit",
"nsfw",
"safe",
"dynamic_angle",
"soft_lighting",
"vibrant_colors",
"cinematic_lighting",
"detailed_background",
"ultra-detailed",
"wallpaper",
]
TAG_FIXES = {
"2boy": "2boys",
"2girl": "2girls",
"exercise": "exercising",
"keyboard_(computer)": "computer_keyboard",
}
MULTI_TAG_FIXES = {
"black_choker_with_heart_charm": ["black_choker", "heart_choker"],
"orange_slit_pupils": ["orange_eyes", "slit_pupils"],
"oversized_black_print_shirt_with_single_bare_shoulder": [
"oversized_shirt",
"black_shirt",
"print_shirt",
"single_bare_shoulder",
],
"two-tone_hair_black_with_red_streaks": [
"two-tone_hair",
"black_hair",
"red_streaks",
],
"very_short_messy_hair": ["very_short_hair", "messy_hair"],
}
def parse_parameters(param_str, tags_repo: TagsRepo):
"""
Funkcja do parsowania zawartości pola 'parameters' z pliku PNG.
"""
tags = (
param_str.split("\nNegative", 1)[0]
.removesuffix(",")
.replace("\n", " ")
.split(",")
)
tags = set([WHITESPACE_PATTERN.sub("_", param.strip()) for param in tags])
tags = set(
[
COEFFICIENT_PATTERN.sub(
"", SLASH_PATTERN.sub("", UNESCAPED_PATTERN.sub("", tag))
)
for tag in tags
]
)
# Zamień nieprawidłowe tagi na poprawne
for wrong_tag in TAG_FIXES:
if wrong_tag in tags:
tags.discard(wrong_tag)
tags.add(TAG_FIXES[wrong_tag])
for wrong_tag in MULTI_TAG_FIXES:
if wrong_tag in tags:
tags.discard(wrong_tag)
tags.update(MULTI_TAG_FIXES[wrong_tag])
# Usuń tagi automatycznie dodawane przez Kapitanbooru
for bad_tag in AUTO_METATAGS:
tags.discard(bad_tag)
# Usuń tagi tekstowe
tags = {tag for tag in tags if not tag.startswith("text:")}
# Pobierz tagi z bazy
character_tags = get_character_tags(tags_repo)
copyright_tags = get_copyright_tags(tags_repo)
# Dla tagów należących do kategorii Character
for tag in list(tags): # iterujemy po kopii zbioru
if tag in character_tags:
tags.discard(tag)
tags.add("character:" + tag)
# Dla tagów należących do kategorii Copyright
for tag in list(tags):
if tag in copyright_tags:
tags.discard(tag)
tags.add("copyright:" + tag)
tags = list(tags)
tags.sort()
return " ".join(tags)
def process_tag(tag, tags_repo: TagsRepo):
"""
Dla danego tagu usuwa znane prefiksy, sprawdza w tabeli tag_aliases,
a następnie w tabeli tags zwraca (processed_tag, status), gdzie status:
- True: tag istnieje i jest deprecated,
- False: tag istnieje i nie jest deprecated,
- None: tag nie istnieje w bazie.
"""
# Usuń prefiks, jeśli występuje
for prefix in ["character:", "artist:", "meta:", "copyright:", "general:"]:
if tag.startswith(prefix):
tag_lookup = tag[len(prefix) :]
break
else:
tag_lookup = tag
# Sprawdź aliasy
try:
conn = tags_repo.get_conn()
cursor = conn.cursor()
cursor.execute("SELECT tag FROM tag_aliases WHERE alias = ?", (tag_lookup,))
row = cursor.fetchone()
if row:
tag_lookup = row[0]
conn.close()
except Exception as e:
print("Błąd podczas odczytu tag_aliases:", e)
# Sprawdź w tabeli tags kolumna name nie zawiera prefiksów
try:
conn = tags_repo.get_conn()
cursor = conn.cursor()
cursor.execute("SELECT is_deprecated FROM tags WHERE name = ?", (tag_lookup,))
row = cursor.fetchone()
conn.close()
if row is not None:
is_deprecated = bool(row[0])
return tag_lookup, is_deprecated
else:
# Tag nie istnieje
return tag_lookup, None
except Exception as e:
print("Błąd podczas odczytu tags:", e)
return tag_lookup, None

View File

@ -0,0 +1,121 @@
import os
import pickle
import sqlite3
import time
from .settings import Settings
class TaggerCache:
def __init__(self, settings: Settings, tagger_name: str, tagger_version: str):
self.tagger_name = tagger_name
self.tagger_version = tagger_version
self.settings = settings
self.cache_db_path = os.path.join(
os.path.dirname(settings.get_settings_path()), "tagger_cache.db"
)
self.init_cache_db()
self.clear_expired_cache()
# --- Inicjalizacja bazy cache ---
def init_cache_db(self):
try:
conn = sqlite3.connect(self.cache_db_path)
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS tagger_cache (
file_md5 TEXT PRIMARY KEY,
tagger_name TEXT,
tagger_version TEXT,
created_at INTEGER,
result BLOB
)
"""
)
conn.commit()
conn.close()
except Exception as e:
print("Błąd przy inicjalizacji bazy cache:", e)
def __getitem__(self, file_md5):
try:
conn = sqlite3.connect(self.cache_db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT tagger_name, tagger_version, created_at, result FROM tagger_cache WHERE file_md5 = ?",
(file_md5,),
)
row = cursor.fetchone()
conn.close()
if row:
created_at = row[2]
if time.time() - created_at < self.settings.cache_expiry:
return {
"tagger_name": row[0],
"tagger_version": row[1],
"created_at": created_at,
"result": pickle.loads(row[3]),
}
else:
self.delete_cache_entry(file_md5)
except Exception as e:
print("Błąd przy odczycie cache dla", file_md5, ":", e)
return None
def __setitem__(self, file_md5, result):
try:
conn = sqlite3.connect(self.cache_db_path)
cursor = conn.cursor()
created_at = int(time.time())
data_blob = sqlite3.Binary(pickle.dumps(result))
cursor.execute(
"""
INSERT OR REPLACE INTO tagger_cache (file_md5, tagger_name, tagger_version, created_at, result)
VALUES (?, ?, ?, ?, ?)
""",
(
file_md5,
self.tagger_name,
self.tagger_version,
created_at,
data_blob,
),
)
conn.commit()
conn.close()
except Exception as e:
print("Błąd przy zapisie cache dla", file_md5, ":", e)
def delete_cache_entry(self, file_md5):
try:
conn = sqlite3.connect(self.cache_db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM tagger_cache WHERE file_md5 = ?", (file_md5,))
conn.commit()
conn.close()
except Exception as e:
print("Błąd przy usuwaniu cache dla", file_md5, ":", e)
def clear_expired_cache(self):
try:
conn = sqlite3.connect(self.cache_db_path)
cursor = conn.cursor()
expiry_threshold = int(time.time() - self.settings.cache_expiry)
cursor.execute(
"DELETE FROM tagger_cache WHERE created_at < ?", (expiry_threshold,)
)
conn.commit()
conn.close()
except Exception as e:
print("Błąd przy czyszczeniu przeterminowanego cache:", e)
def clear_cache(self):
try:
conn = sqlite3.connect(self.cache_db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM tagger_cache")
conn.commit()
conn.close()
return True, None
except Exception as e:
return False, e

28
pyproject.toml Normal file
View File

@ -0,0 +1,28 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "kapitanbooru-uploader"
version = "0.1.0"
description = "A GUI application for uploading images to KapitanBooru"
authors = [
{name = "Michał Leśniak", email = "kapitan@mlesniak.pl"}
]
dependencies = [
"networkx==3.4.2",
"Pillow==11.1.0",
"pywin32==308",
"requests==2.32.3",
"wdtagger==0.13.2",
"bs4==0.0.2"
]
requires-python = ">=3.13"
readme = "README.md"
license = {file = "LICENSE"}
[project.scripts]
kapitanbooru-uploader = "kapitanbooru_uploader.__main__:main"
[tool.setuptools]
packages = ["kapitanbooru_uploader"]