All checks were successful
Gitea/kapitanbooru-uploader/pipeline/head This commit looks good
120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
from .I18N import _
|
|
|
|
|
|
import inspect
|
|
import queue
|
|
import threading
|
|
import tkinter as tk
|
|
from tkinter import ttk
|
|
|
|
|
|
class ProcessingDialog:
|
|
def __init__(self, root, target_function, *args):
|
|
self.root = root
|
|
self.top = tk.Toplevel(root)
|
|
self.top.title(_("Processing..."))
|
|
self.top.geometry("300x150")
|
|
self.top.protocol("WM_DELETE_WINDOW", self.on_close)
|
|
|
|
self.label = tk.Label(self.top, text=_("Processing, please wait..."))
|
|
self.label.pack(pady=10)
|
|
|
|
# Start with indeterminate progress bar
|
|
self.progress = ttk.Progressbar(self.top, mode="indeterminate")
|
|
self.progress.pack(pady=10, fill="x")
|
|
self.progress.start(10)
|
|
|
|
sig = inspect.signature(target_function)
|
|
if "secondary_progress_queue" in sig.parameters:
|
|
self.sub_progress = ttk.Progressbar(self.top, mode="indeterminate")
|
|
self.sub_progress.pack(pady=10, fill="x")
|
|
self.sub_progress.start(10)
|
|
|
|
# Setup communication queue and periodic checker
|
|
self.queue = queue.Queue()
|
|
self.sub_queue = queue.Queue()
|
|
self.running = True
|
|
self.cancel_event = threading.Event() # Cancellation flag
|
|
self.thread = threading.Thread(
|
|
target=self.run_task, args=(target_function, *args)
|
|
)
|
|
self.thread.start()
|
|
self.top.after(100, self.process_queue)
|
|
|
|
def process_queue(self):
|
|
"""Process messages from the background thread"""
|
|
while self.running:
|
|
try:
|
|
msg = self.queue.get_nowait()
|
|
|
|
if msg[0] == "mode":
|
|
self.progress.config(mode=msg[1])
|
|
if msg[1] == "determinate":
|
|
self.progress["value"] = 0
|
|
self.progress.stop()
|
|
elif msg[1] == "indeterminate":
|
|
self.progress["value"] = 0
|
|
self.progress.start()
|
|
elif msg[0] == "max":
|
|
self.progress["maximum"] = msg[1]
|
|
elif msg[0] == "progress":
|
|
self.progress["value"] = msg[1]
|
|
elif msg[0] == "label":
|
|
self.label.config(text=msg[1])
|
|
|
|
self.top.update_idletasks()
|
|
except queue.Empty:
|
|
break
|
|
|
|
try:
|
|
msg = self.sub_queue.get_nowait()
|
|
|
|
if msg[0] == "mode":
|
|
self.sub_progress.config(mode=msg[1])
|
|
if msg[1] == "determinate":
|
|
self.sub_progress["value"] = 0
|
|
self.sub_progress.stop()
|
|
elif msg[1] == "indeterminate":
|
|
self.sub_progress["value"] = 0
|
|
self.sub_progress.start()
|
|
elif msg[0] == "max":
|
|
self.sub_progress["maximum"] = msg[1]
|
|
elif msg[0] == "progress":
|
|
self.sub_progress["value"] = msg[1]
|
|
|
|
self.top.update_idletasks()
|
|
except queue.Empty:
|
|
break
|
|
|
|
if self.running:
|
|
self.top.after(100, self.process_queue)
|
|
|
|
def run_task(self, target_function, *args):
|
|
"""Execute target function with progress queue if supported"""
|
|
try:
|
|
sig = inspect.signature(target_function)
|
|
kwargs = {}
|
|
if "progress_queue" in sig.parameters:
|
|
kwargs["progress_queue"] = self.queue
|
|
if "secondary_progress_queue" in sig.parameters:
|
|
kwargs["secondary_progress_queue"] = self.sub_queue
|
|
if "cancel_event" in sig.parameters:
|
|
kwargs["cancel_event"] = self.cancel_event
|
|
target_function(*args, **kwargs)
|
|
finally:
|
|
self.close_dialog()
|
|
|
|
def close_dialog(self):
|
|
"""Safely close the dialog"""
|
|
if self.running:
|
|
self.running = False
|
|
self.progress.stop()
|
|
self.top.after(0, self.top.destroy)
|
|
self.root.after(100, self.thread.join)
|
|
|
|
def on_close(self):
|
|
"""Handle manual window closure"""
|
|
self.running = False
|
|
self.cancel_event.set() # Notify target function that cancellation is requested
|
|
self.top.destroy()
|