IPTV-CHECK/iptv_check.py
peterpt 2a0be5f65f
Version 3.0 - Python Gui and CLI
A full rework on the app , user can now use cli mode or gui mode , multiple new functionalities integrated
2025-11-06 12:53:51 -07:00

1664 lines
104 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import os
import re
import subprocess
import tempfile
import threading
import signal
import shutil
from urllib.parse import urlparse, urljoin
import argparse
import logging
import configparser
from pathlib import Path
import time
import json
# --- Third-party libraries ---
import requests
try:
from PIL import Image
except ImportError:
sys.exit("Pillow library not found. Please install it: pip install Pillow")
try:
import pytesseract
except ImportError:
sys.exit("pytesseract library not found. Please install it: pip install pytesseract")
try:
from colorama import init, Fore, Style
init(autoreset=True)
except ImportError:
class Fore: RED = ''; GREEN = ''; YELLOW = ''; BLUE = ''; CYAN = ''; RESET = ''
class Style: BRIGHT = ''; RESET_ALL = ''
# --- GUI Components ---
try:
import tkinter as tk
from tkinter import ttk, filedialog, scrolledtext, messagebox, simpledialog
GUI_AVAILABLE = True
except ImportError:
GUI_AVAILABLE = False
# --- Configuration & Constants ---
VERSION = "3.0" # Add granular UI lock during processing
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
AUDIO_USER_AGENT = "iTunes/9.1.1"
MIN_FILE_SIZE_BYTES = 100
DEFAULT_CAPTURE_DURATION = 4
OCR_FRAME_EXTRACTION_TIMEOUT = 20
WORKER_PROCESS_TIMEOUT_SECONDS = 30
UNCHECKABLE_URL_LENGTH_THRESHOLD = 250
UNCHECKABLE_KEYWORDS = ['token', 'auth', 'login', 'key', 'signature']
TEMP_FILE_PREFIX = "iptv-check-temp-"
YT_DLP_TIMEOUT_SECONDS = 20
# --- Portable File Paths ---
APP_DIR = Path(__file__).resolve().parent
CONFIG_FILE_PATH = APP_DIR / "iptv_checker_config.ini"
LINKS_DB_PATH = APP_DIR / "iptv_checker_links.ini"
DEBUG_LOG_PATH = APP_DIR / "debug.log"
# --- Language/Internationalization ---
LANGUAGES = {}
DEFAULT_LANG = "en"
def load_languages():
global LANGUAGES
LANGUAGES = {
"en": {
"title": "IPTV-Check", "file": "File", "recheck_output": "Recheck Output File", "exit": "Exit",
"tools": "Tools", "website_finder": "Website M3U Finder...", "database": "Database",
"manage_links": "Manage Default Links...", "check_all_links": "Check All Default Links", "settings": "Settings",
"configure": "Configure...", "language": "Language", "debug": "Debug", "view_log": "View Debug Log",
"about_menu": "About", "creators": "Creators...", "update": "Update...", "m3u_input_label": "M3U File or URL:",
"output_file_label": "Output File:", "browse": "Browse...", "start_check": "Start Check",
"stop_save": "Stop & Save", "timeout_label": "Timeout (s):", "workers_label": "Workers:",
"remaining_label": "Remaining:", "log_display_label": "Log Display", "channel_name_radio": "Channel Name",
"channel_url_radio": "Channel URL", "options_label": "Options", "use_ocr_check": "Use OCR",
"skip_known_check": "Skip Known Good URLs", "search_label": "Search in Output File...",
"no_internet": "No Internet Connection. Waiting for connection...",
"tooltip_ocr": "If checked, uses OCR (Tesseract) to analyze a frame from video streams.\nThis helps detect error screens (e.g., 'Login required', geo-blocked).\nThis check is slower.",
"tooltip_timeout": "Network timeout in seconds for each stream check.", "tooltip_workers": "Number of parallel checks to run at once.",
"tooltip_log_display": "Changes the display format in the log view during a check.",
"tooltip_skip_known": "If checked, the app will read your output file first.\nAny stream URL from the new source that already exists in the output file will be skipped.\nThis prevents duplicates and speeds up checking large lists.",
"tooltip_channel_name": "In the log window, display the channel's name from the M3U file.",
"tooltip_channel_url": "In the log window, display the full URL of the channel's stream."
},
"pt": {
"title": "IPTV-Check", "file": "Arquivo", "recheck_output": "Verificar Novamente Arquivo de Saída", "exit": "Sair",
"tools": "Ferramentas", "website_finder": "Localizador de M3U em Websites...", "database": "Base de Dados",
"manage_links": "Gerir Links Padrão...", "check_all_links": "Verificar Todos os Links Padrão", "settings": "Definições",
"configure": "Configurar...", "language": "Idioma", "debug": "Depuração", "view_log": "Ver Registo de Depuração",
"about_menu": "Sobre", "creators": "Criadores...", "update": "Atualizar...", "m3u_input_label": "Arquivo M3U ou URL:",
"output_file_label": "Arquivo de Saída:", "browse": "Procurar...", "start_check": "Iniciar Verificação",
"stop_save": "Parar e Salvar", "timeout_label": "Timeout (s):", "workers_label": "Workers:",
"remaining_label": "Restantes:", "log_display_label": "Exibição do Registo", "channel_name_radio": "Nome do Canal",
"channel_url_radio": "URL do Canal", "options_label": "Opções", "use_ocr_check": "Usar OCR",
"skip_known_check": "Ignorar URLs já existentes", "search_label": "Pesquisar no Arquivo de Saída...",
"no_internet": "Sem Conexão à Internet. Aguardando conexão...",
"tooltip_ocr": "Se marcado, usa OCR (Tesseract) para analisar um frame dos streams de vídeo.\nAjuda a detetar ecrãs de erro (ex: 'Login necessário', bloqueio geográfico).\nEsta verificação é mais lenta.",
"tooltip_timeout": "Tempo limite de rede em segundos para cada verificação de stream.", "tooltip_workers": "Número de verificações paralelas a serem executadas de uma vez.",
"tooltip_log_display": "Altera o formato de exibição na vista de registo durante uma verificação.",
"tooltip_skip_known": "Se marcado, a aplicação lerá primeiro o seu arquivo de saída.\nQualquer URL de stream da nova fonte que já exista no arquivo de saída será ignorado.\nIsto evita duplicados e acelera a verificação de listas grandes.",
"tooltip_channel_name": "Na janela de registo, exibir o nome do canal do arquivo M3U.",
"tooltip_channel_url": "Na janela de registo, exibir o URL completo do stream do canal."
},
"es": {
"title": "IPTV-Check", "file": "Archivo", "recheck_output": "Revisar Archivo de Salida", "exit": "Salir",
"tools": "Herramientas", "website_finder": "Buscador de M3U en Sitios Web...", "database": "Base de Datos",
"manage_links": "Gestionar Enlaces Predeterminados...", "check_all_links": "Revisar Todos los Enlaces Predeterminados", "settings": "Ajustes",
"configure": "Configurar...", "language": "Idioma", "debug": "Depuración", "view_log": "Ver Registro de Depuración",
"about_menu": "Acerca de", "creators": "Creadores...", "update": "Actualizar...", "m3u_input_label": "Archivo M3U o URL:",
"output_file_label": "Archivo de Salida:", "browse": "Explorar...", "start_check": "Iniciar Revisión",
"stop_save": "Detener y Guardar", "timeout_label": "Timeout (s):", "workers_label": "Workers:",
"remaining_label": "Restantes:", "log_display_label": "Visualización del Registro", "channel_name_radio": "Nombre del Canal",
"channel_url_radio": "URL del Canal", "options_label": "Opciones", "use_ocr_check": "Usar OCR",
"skip_known_check": "Omitir URLs conocidas", "search_label": "Buscar en Archivo de Salida...",
"no_internet": "Sin Conexión a Internet. Esperando conexión...",
"tooltip_ocr": "Si está marcado, usa OCR (Tesseract) para analizar un fotograma de los flujos de video.\nAyuda a detectar pantallas de error (ej: 'Se requiere inicio de sesión', geobloqueado).\nEsta revisión es más lenta.",
"tooltip_timeout": "Tiempo de espera de red en segundos para cada revisión de flujo.", "tooltip_workers": "Número de revisiones paralelas para ejecutar a la vez.",
"tooltip_log_display": "Cambia el formato de visualización en la vista de registro durante una revisión.",
"tooltip_skip_known": "Si está marcado, la aplicación leerá primero su archivo de salida.\nCualquier URL de flujo de la nueva fuente que ya exista en el archivo de salida será omitida.\nEsto evita duplicados y acelera la revisión de listas grandes.",
"tooltip_channel_name": "En la ventana de registro, mostrar el nombre del canal del archivo M3U.",
"tooltip_channel_url": "En la ventana de registro, mostrar la URL completa del flujo del canal."
},
"fr": {
"title": "IPTV-Check", "file": "Fichier", "recheck_output": "Revérifier le Fichier de Sortie", "exit": "Quitter",
"tools": "Outils", "website_finder": "Chercheur de M3U sur Site Web...", "database": "Base de Données",
"manage_links": "Gérer les Liens par Défaut...", "check_all_links": "Vérifier Tous les Liens par Défaut", "settings": "Paramètres",
"configure": "Configurer...", "language": "Langue", "debug": "Débogage", "view_log": "Voir le Journal de Débogage",
"about_menu": "À propos", "creators": "Créateurs...", "update": "Mettre à jour...", "m3u_input_label": "Fichier M3U ou URL :",
"output_file_label": "Fichier de Sortie :", "browse": "Parcourir...", "start_check": "Démarrer la Vérification",
"stop_save": "Arrêter et Enregistrer", "timeout_label": "Timeout (s) :", "workers_label": "Workers :",
"remaining_label": "Restants :", "log_display_label": "Affichage du Journal", "channel_name_radio": "Nom de la Chaîne",
"channel_url_radio": "URL de la Chaîne", "options_label": "Options", "use_ocr_check": "Utiliser l'OCR",
"skip_known_check": "Ignorer les URL connues", "search_label": "Rechercher dans le Fichier de Sortie...",
"no_internet": "Pas de Connexion Internet. En attente de connexion...",
"tooltip_ocr": "Si coché, utilise l'OCR (Tesseract) pour analyser une image des flux vidéo.\nCela aide à détecter les écrans d'erreur (ex: 'Connexion requise', géo-bloqué).\nCette vérification est plus lente.",
"tooltip_timeout": "Délai d'attente réseau en secondes pour chaque vérification de flux.", "tooltip_workers": "Nombre de vérifications parallèles à exécuter en même temps.",
"tooltip_log_display": "Change le format d'affichage dans la vue du journal pendant une vérification.",
"tooltip_skip_known": "Si coché, l'application lira d'abord votre fichier de sortie.\nToute URL de flux de la nouvelle source qui existe déjà dans le fichier de sortie sera ignorée.\nCela évite les doublons et accélère la vérification des grandes listes.",
"tooltip_channel_name": "Dans la fenêtre du journal, afficher le nom de la chaîne du fichier M3U.",
"tooltip_channel_url": "Dans la fenêtre du journal, afficher l'URL complète du flux de la chaîne."
},
"it": {
"title": "IPTV-Check", "file": "File", "recheck_output": "Ricontrolla File di Output", "exit": "Esci",
"tools": "Strumenti", "website_finder": "Trova M3U su Sito Web...", "database": "Database",
"manage_links": "Gestisci Link Predefiniti...", "check_all_links": "Controlla Tutti i Link Predefiniti", "settings": "Impostazioni",
"configure": "Configura...", "language": "Lingua", "debug": "Debug", "view_log": "Visualizza Log di Debug",
"about_menu": "Informazioni", "creators": "Creatori...", "update": "Aggiorna...", "m3u_input_label": "File M3U o URL:",
"output_file_label": "File di Output:", "browse": "Sfoglia...", "start_check": "Avvia Controllo",
"stop_save": "Ferma e Salva", "timeout_label": "Timeout (s):", "workers_label": "Workers:",
"remaining_label": "Rimanenti:", "log_display_label": "Visualizzazione Log", "channel_name_radio": "Nome Canale",
"channel_url_radio": "URL Canale", "options_label": "Opzioni", "use_ocr_check": "Usa OCR",
"skip_known_check": "Salta URL conosciuti", "search_label": "Cerca nel File di Output...",
"no_internet": "Nessuna Connessione Internet. In attesa di connessione...",
"tooltip_ocr": "Se selezionato, utilizza l'OCR (Tesseract) per analizzare un frame dai flussi video.\nAiuta a rilevare schermate di errore (es. 'Login richiesto', geo-bloccato).\nQuesto controllo è più lento.",
"tooltip_timeout": "Timeout di rete in secondi per ogni controllo del flusso.", "tooltip_workers": "Numero di controlli paralleli da eseguire contemporaneamente.",
"tooltip_log_display": "Modifica il formato di visualizzazione nel log durante un controllo.",
"tooltip_skip_known": "Se selezionato, l'app leggerà prima il tuo file di output.\nQualsiasi URL di flusso dalla nuova fonte che esiste già nel file di output verrà saltato.\nCiò previene i duplicati e accelera il controllo di grandi liste.",
"tooltip_channel_name": "Nella finestra di log, visualizza il nome del canale dal file M3U.",
"tooltip_channel_url": "Nella finestra di log, visualizza l'URL completo del flusso del canale."
},
"de": {
"title": "IPTV-Check", "file": "Datei", "recheck_output": "Ausgabedatei erneut prüfen", "exit": "Beenden",
"tools": "Werkzeuge", "website_finder": "Website-M3U-Finder...", "database": "Datenbank",
"manage_links": "Standard-Links verwalten...", "check_all_links": "Alle Standard-Links prüfen", "settings": "Einstellungen",
"configure": "Konfigurieren...", "language": "Sprache", "debug": "Debug", "view_log": "Debug-Protokoll anzeigen",
"about_menu": "Über", "creators": "Entwickler...", "update": "Aktualisieren...", "m3u_input_label": "M3U-Datei oder URL:",
"output_file_label": "Ausgabedatei:", "browse": "Durchsuchen...", "start_check": "Prüfung starten",
"stop_save": "Stoppen & Speichern", "timeout_label": "Timeout (s):", "workers_label": "Workers:",
"remaining_label": "Verbleibend:", "log_display_label": "Protokollanzeige", "channel_name_radio": "Kanalname",
"channel_url_radio": "Kanal-URL", "options_label": "Optionen", "use_ocr_check": "OCR verwenden",
"skip_known_check": "Bekannte URLs überspringen", "search_label": "In Ausgabedatei suchen...",
"no_internet": "Keine Internetverbindung. Warte auf Verbindung...",
"tooltip_ocr": "Wenn aktiviert, wird OCR (Tesseract) verwendet, um einen Frame aus Videostreams zu analysieren.\nHilft bei der Erkennung von Fehlerbildschirmen (z.B. 'Login erforderlich', geo-geblockt).\nDiese Prüfung ist langsamer.",
"tooltip_timeout": "Netzwerk-Timeout in Sekunden für jede Stream-Prüfung.", "tooltip_workers": "Anzahl der gleichzeitig auszuführenden parallelen Prüfungen.",
"tooltip_log_display": "Ändert das Anzeigeformat in der Protokollansicht während einer Prüfung.",
"tooltip_skip_known": "Wenn aktiviert, liest die App zuerst Ihre Ausgabedatei.\nJede Stream-URL aus der neuen Quelle, die bereits in der Ausgabedatei vorhanden ist, wird übersprungen.\nDies verhindert Duplikate und beschleunigt die Prüfung großer Listen.",
"tooltip_channel_name": "Zeigt im Protokollfenster den Namen des Kanals aus der M3U-Datei an.",
"tooltip_channel_url": "Zeigt im Protokollfenster die vollständige URL des Kanal-Streams an."
},
"ru": {
"title": "IPTV-Check", "file": "Файл", "recheck_output": "Перепроверить выходной файл", "exit": "Выход",
"tools": "Инструменты", "website_finder": "Поиск M3U на сайтах...", "database": "База данных",
"manage_links": "Управление ссылками по умолчанию...", "check_all_links": "Проверить все ссылки по умолчанию", "settings": "Настройки",
"configure": "Настроить...", "language": "Язык", "debug": "Отладка", "view_log": "Просмотр журнала отладки",
"about_menu": "О программе", "creators": "Создатели...", "update": "Обновить...", "m3u_input_label": "Файл M3U или URL:",
"output_file_label": "Выходной файл:", "browse": "Обзор...", "start_check": "Начать проверку",
"stop_save": "Остановить и сохранить", "timeout_label": "Тайм-аут (с):", "workers_label": "Потоки:",
"remaining_label": "Осталось:", "log_display_label": "Отображение журнала", "channel_name_radio": "Имя канала",
"channel_url_radio": "URL канала", "options_label": "Опции", "use_ocr_check": "Использовать OCR",
"skip_known_check": "Пропускать известные URL", "search_label": "Поиск в выходном файле...",
"no_internet": "Нет подключения к Интернету. Ожидание подключения...",
"tooltip_ocr": "Если отмечено, используется OCR (Tesseract) для анализа кадра из видеопотоков.\nЭто помогает обнаруживать экраны ошибок (например, 'Требуется вход', геоблокировка).\nЭта проверка медленнее.",
"tooltip_timeout": "Тайм-аут сети в секундах для каждой проверки потока.", "tooltip_workers": "Количество одновременных параллельных проверок.",
"tooltip_log_display": "Изменяет формат отображения в журнале во время проверки.",
"tooltip_skip_known": "Если отмечено, приложение сначала прочитает ваш выходной файл.\nЛюбой URL потока из нового источника, который уже существует в выходном файле, будет пропущен.\nЭто предотвращает дубликаты и ускоряет проверку больших списков.",
"tooltip_channel_name": "В окне журнала отображать имя канала из файла M3U.",
"tooltip_channel_url": "В окне журнала отображать полный URL потока канала."
},
"zh": {
"title": "IPTV-Check", "file": "文件", "recheck_output": "重新检查输出文件", "exit": "退出",
"tools": "工具", "website_finder": "网站 M3U 查找器...", "database": "数据库",
"manage_links": "管理默认链接...", "check_all_links": "检查所有默认链接", "settings": "设置",
"configure": "配置...", "language": "语言", "debug": "调试", "view_log": "查看调试日志",
"about_menu": "关于", "creators": "创作者...", "update": "更新...", "m3u_input_label": "M3U 文件或 URL:",
"output_file_label": "输出文件:", "browse": "浏览...", "start_check": "开始检查",
"stop_save": "停止并保存", "timeout_label": "超时 (秒):", "workers_label": "线程数:",
"remaining_label": "剩余:", "log_display_label": "日志显示", "channel_name_radio": "频道名称",
"channel_url_radio": "频道 URL", "options_label": "选项", "use_ocr_check": "使用 OCR",
"skip_known_check": "跳过已知良好的URL", "search_label": "在输出文件中搜索...",
"no_internet": "无网络连接。正在等待连接...",
"tooltip_ocr": "如果选中,将使用 OCR (Tesseract) 分析视频流中的一帧。\n这有助于检测错误屏幕(例如“需要登录”、“地理封锁”)。\n此检查速度较慢。",
"tooltip_timeout": "每次流检查的网络超时时间(秒)。", "tooltip_workers": "一次运行的并行检查数。",
"tooltip_log_display": "在检查期间更改日志视图中的显示格式。",
"tooltip_skip_known": "如果选中,应用程序将首先读取您的输出文件。\n新源中任何已存在于输出文件中的流 URL 都将被跳过。\n这样可以防止重复并加快检查大型列表的速度。",
"tooltip_channel_name": "在日志窗口中,显示 M3U 文件中的频道名称。",
"tooltip_channel_url": "在日志窗口中,显示频道的完整流 URL。"
}
}
# --- Setup Debug Logging ---
def setup_logging():
logging.basicConfig(filename=DEBUG_LOG_PATH, filemode='w', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.info(f"IPTV-Check v{VERSION} started.")
# --- Dependency Checks ---
FFMPEG_PATH = shutil.which('ffmpeg')
FFPROBE_PATH = shutil.which('ffprobe')
YT_DLP_PATH = shutil.which('yt-dlp')
GIT_PATH = shutil.which('git')
def find_and_set_tesseract_path():
path = shutil.which('tesseract');
if path: pytesseract.tesseract_cmd = path; return True
return False
TESSERACT_INSTALLED = find_and_set_tesseract_path()
OCR_FAILED_ONCE = False
def check_dependencies():
essential_deps = {'ffmpeg': FFMPEG_PATH, 'ffprobe': FFPROBE_PATH, 'yt-dlp': YT_DLP_PATH}
optional_deps = {'tesseract': TESSERACT_INSTALLED}
missing_essential = [name for name, path in essential_deps.items() if not path]
missing_optional = [name for name, found in optional_deps.items() if not found]
return missing_essential, missing_optional
# --- "The Janitor": Cleanup on Start ---
def cleanup_stale_temp_files():
try:
temp_dir = tempfile.gettempdir()
for f_name in [f for f in os.listdir(temp_dir) if f.startswith(TEMP_FILE_PREFIX)]:
try: os.remove(os.path.join(temp_dir, f_name))
except OSError as e: logging.warning(f"Could not remove stale temp file: {f_name}. Reason: {e}")
except Exception as e:
logging.error(f"Error during initial temp file cleanup: {e}", exc_info=True)
# --- Configuration & Database Management ---
class IniManager:
def __init__(self, path):
self.path = path
def load(self, defaults_map=None):
config = configparser.ConfigParser()
if self.path.exists():
try:
config.read(self.path, encoding='utf-8')
except configparser.Error as e:
logging.error(f"Error reading config file {self.path}: {e}. A new one will be created.")
config = configparser.ConfigParser()
needs_save = False
if defaults_map:
for section, defaults in defaults_map.items():
if not config.has_section(section):
config.add_section(section)
for key, value in defaults.items():
config.set(section, key, value)
needs_save = True
if needs_save: self.save(config)
return {section.lower(): dict(config.items(section)) for section in config.sections()}
def save(self, config_obj):
try:
with open(self.path, 'w', encoding='utf-8') as configfile:
config_obj.write(configfile)
except Exception as e:
logging.error(f"Failed to save INI file at {self.path}: {e}", exc_info=True)
if GUI_AVAILABLE: messagebox.showerror("File Error", f"Could not save settings to {self.path}.\nPlease check permissions.")
# --- Core Logic ---
def sanitize_url_aggressively(url):
try:
m3u8_query_index = url.find('.m3u8?')
if m3u8_query_index != -1 and 'ads.' in url[m3u8_query_index:]:
sanitized_url = url[:m3u8_query_index + len('.m3u8')]; logging.info(f"Aggressively sanitized URL: '{url}' -> '{sanitized_url}'"); return sanitized_url
except Exception as e: logging.warning(f"Error during aggressive URL sanitization: {e}", exc_info=True)
return url
def parse_m3u(content):
pattern = re.compile(r'#EXTINF:-1.*?group-title="([^"]*)"[^,]*?,([^\n]+)\n(?:#EXT[^\n]+\n)*(https?://[^\s]+)', re.IGNORECASE)
matches = pattern.findall(content)
streams = []
if not matches:
pattern = re.compile(r'#EXTINF:-1.*?,([^\n]+)\n(?:#EXT[^\n]+\n)*(https?://[^\s]+)')
matches = pattern.findall(content)
for title, url in matches:
streams.append({'title': title.strip(), 'url': url.strip(), 'group': 'General'})
else:
for group, title, url in matches:
streams.append({'title': title.strip(), 'url': url.strip(), 'group': group.strip() or "General"})
return streams
def write_m3u_header(file_handle):
if file_handle: file_handle.write("#EXTM3U\n\n"); file_handle.flush()
def write_m3u_entry(file_handle, stream_info):
if file_handle:
group = stream_info.get('group', 'General'); title = stream_info.get('title', 'Unknown'); url = stream_info.get('url', '')
file_handle.write(f"#EXTINF:-1 group-title=\"{group}\",{title}\n{url}\n\n"); file_handle.flush()
def build_timeout_config(network_timeout_s):
return {"network_timeout_us": str(network_timeout_s * 1000000), "ffmpeg_flags": ['-analyzeduration', '5M', '-probesize', '5M'] if network_timeout_s >= 20 else []}
# --- TOOLTIP CLASS (GUI ONLY) ---
if GUI_AVAILABLE:
class ToolTip:
def __init__(self, widget, text):
self.widget = widget
self.text = text
self.tooltip_window = None
self.widget.bind("<Enter>", self.show_tooltip)
self.widget.bind("<Leave>", self.hide_tooltip)
def show_tooltip(self, event=None):
x, y, _, _ = self.widget.bbox("insert")
x += self.widget.winfo_rootx() + 25
y += self.widget.winfo_rooty() + 25
self.tooltip_window = tk.Toplevel(self.widget)
self.tooltip_window.wm_overrideredirect(True)
self.tooltip_window.wm_geometry(f"+{x}+{y}")
label = tk.Label(self.tooltip_window, text=self.text, justify='left',
background="#ffffe0", relief='solid', borderwidth=1,
font=("tahoma", "8", "normal"))
label.pack(ipadx=1)
def hide_tooltip(self, event=None):
if self.tooltip_window:
self.tooltip_window.destroy()
self.tooltip_window = None
# --- GUI APPLICATION CLASSES (GUI ONLY) ---
if GUI_AVAILABLE:
class BaseToplevel(tk.Toplevel):
def __init__(self, parent):
super().__init__(parent); self.transient(parent); self._create_context_menu()
def _create_context_menu(self):
self.entry_context_menu = tk.Menu(self, tearoff=0)
self.entry_context_menu.add_command(label="Cut", command=lambda: self.focus_get().event_generate("<<Cut>>"))
self.entry_context_menu.add_command(label="Copy", command=lambda: self.focus_get().event_generate("<<Copy>>"))
self.entry_context_menu.add_command(label="Paste", command=lambda: self.focus_get().event_generate("<<Paste>>"))
def _show_context_menu(self, event):
widget = event.widget
try:
if self.clipboard_get(): self.entry_context_menu.entryconfig("Paste", state="normal")
else: self.entry_context_menu.entryconfig("Paste", state="disabled")
except tk.TclError: self.entry_context_menu.entryconfig("Paste", state="disabled")
if widget.selection_present(): self.entry_context_menu.entryconfig("Cut", state="normal"); self.entry_context_menu.entryconfig("Copy", state="normal")
else: self.entry_context_menu.entryconfig("Cut", state="disabled"); self.entry_context_menu.entryconfig("Copy", state="disabled")
self.entry_context_menu.tk_popup(event.x_root, event.y_root)
class AddPatternDialog(simpledialog.Dialog):
def body(self, master):
self.title("Add Stream Pattern")
ttk.Label(master, text="URL Pattern (e.g., .m3u8, /stream):").grid(row=0, sticky="w")
self.pattern_entry = ttk.Entry(master, width=30)
self.pattern_entry.grid(row=1, padx=5, pady=5)
ttk.Label(master, text="Stream Type:").grid(row=2, sticky="w")
self.type_var = tk.StringVar(value="video")
self.type_combo = ttk.Combobox(master, textvariable=self.type_var, values=["video", "audio"], state="readonly")
self.type_combo.grid(row=3, padx=5, pady=5)
return self.pattern_entry
def apply(self):
self.result = (self.pattern_entry.get().strip(), self.type_var.get())
class SettingsWindow(BaseToplevel):
def __init__(self, parent):
super().__init__(parent); self.title("Configure Settings"); self.parent = parent; self.config_manager = parent.config_manager
all_configs = self.config_manager.load({})
self.settings = all_configs.get('settings', {})
self.stream_patterns = all_configs.get('streampatterns', {})
self.notebook = ttk.Notebook(self); self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
player_frame = ttk.Frame(self.notebook, padding="10"); self.notebook.add(player_frame, text='Player')
self.media_player_path = tk.StringVar(value=self.settings.get('mediaplayerpath', ''))
mp_frame = ttk.LabelFrame(player_frame, text="Media Player Executable Path", padding="5"); mp_frame.pack(fill=tk.X, expand=True, pady=5)
mp_entry = ttk.Entry(mp_frame, textvariable=self.media_player_path); mp_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5))
mp_entry.bind("<Button-3>", self._show_context_menu); ttk.Button(mp_frame, text="Browse...", command=self._browse_player).pack(side=tk.LEFT)
types_frame = ttk.Frame(self.notebook, padding="10"); self.notebook.add(types_frame, text='Stream Patterns')
self.listbox = tk.Listbox(types_frame, height=10); self.listbox.pack(fill=tk.BOTH, expand=True, pady=5); self.populate_list()
btn_frame = ttk.Frame(types_frame); btn_frame.pack(fill=tk.X, pady=5)
ttk.Button(btn_frame, text="Add...", command=self.add_type).pack(side=tk.LEFT, expand=True, fill=tk.X, padx=2)
ttk.Button(btn_frame, text="Edit...", command=self.edit_type).pack(side=tk.LEFT, expand=True, fill=tk.X, padx=2)
ttk.Button(btn_frame, text="Remove", command=self.remove_type).pack(side=tk.LEFT, expand=True, fill=tk.X, padx=2)
button_frame = ttk.Frame(self); button_frame.pack(fill=tk.X, expand=True, padx=10, pady=(0, 10))
ttk.Button(button_frame, text="Save & Close", command=self.on_save).pack(side=tk.RIGHT)
self.grab_set()
def _browse_player(self):
path = filedialog.askopenfilename(title="Select Media Player Executable");
if path: self.media_player_path.set(path)
def populate_list(self):
self.listbox.delete(0, tk.END)
for ext, type_ in sorted(self.stream_patterns.items()): self.listbox.insert(tk.END, f"{ext}: {type_}")
def add_type(self):
dialog = AddPatternDialog(self)
if dialog.result:
pattern, type_ = dialog.result
if pattern:
self.stream_patterns[pattern] = type_
self.populate_list()
def edit_type(self):
selected_idx = self.listbox.curselection()
if not selected_idx: return
pattern, type_ = self.listbox.get(selected_idx[0]).split(': ', 1)
new_type = simpledialog.askstring("Edit Type", f"Enter new type for '{pattern}':", initialvalue=type_, parent=self)
if new_type and new_type.lower() in ['video', 'audio']:
self.stream_patterns[pattern] = new_type.lower(); self.populate_list()
else: messagebox.showerror("Invalid Type", "Type must be 'video' or 'audio'.", parent=self)
def remove_type(self):
selected_idx = self.listbox.curselection()
if not selected_idx: return
pattern, _ = self.listbox.get(selected_idx[0]).split(': ', 1)
if messagebox.askyesno("Confirm Remove", f"Remove pattern '{pattern}'?"): del self.stream_patterns[pattern]; self.populate_list()
def on_save(self):
self.settings['mediaplayerpath'] = self.media_player_path.get()
new_config = configparser.ConfigParser()
new_config['Settings'] = self.settings
new_config['StreamPatterns'] = self.stream_patterns
self.config_manager.save(new_config)
self.parent.load_settings(); self.destroy()
class WebsiteFinderWindow(BaseToplevel):
def __init__(self, parent):
super().__init__(parent); self.title("Website M3U Finder"); self.parent = parent; self.geometry("600x400")
body = ttk.Frame(self, padding="10"); body.pack(fill=tk.BOTH, expand=True)
top_frame = ttk.Frame(body); top_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(top_frame, text="URL:").pack(side=tk.LEFT, padx=(0, 5)); self.url_var = tk.StringVar()
url_entry = ttk.Entry(top_frame, textvariable=self.url_var); url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
url_entry.bind("<Button-3>", self._show_context_menu)
self.find_btn = ttk.Button(top_frame, text="Find M3Us", command=self.start_find); self.find_btn.pack(side=tk.LEFT, padx=(5, 0))
self.results_box = tk.Listbox(body); self.results_box.pack(fill=tk.BOTH, expand=True)
bottom_frame = ttk.Frame(body); bottom_frame.pack(fill=tk.X, pady=(10, 0))
self.status_var = tk.StringVar(value="Ready."); ttk.Label(bottom_frame, textvariable=self.status_var, anchor="w").pack(side=tk.LEFT)
self.add_to_db_btn = ttk.Button(bottom_frame, text="Add Selected to DB", state=tk.DISABLED, command=self.add_to_db); self.add_to_db_btn.pack(side=tk.RIGHT)
self.grab_set()
def start_find(self):
url = self.url_var.get().strip();
if not url: return
self.find_btn.config(state=tk.DISABLED); self.add_to_db_btn.config(state=tk.DISABLED)
self.results_box.delete(0, tk.END); self.status_var.set("Searching..."); self.update_idletasks()
threading.Thread(target=self._find_thread, args=(url,), daemon=True).start()
def _find_thread(self, url):
try:
self.parent.log(f"[*] Website Finder: Fetching {url}", "info")
r = requests.get(url, headers={'User-Agent': USER_AGENT}, timeout=15); r.raise_for_status()
valid_links = set()
if r.url.lower().endswith(('.m3u', '.m3u8')):
self.status_var.set("Found direct redirect to M3U file.")
self.results_box.insert(tk.END, r.url); valid_links.add(r.url)
else:
self.status_var.set("Scanning page for M3U links...")
pattern = re.compile(r'["\'](https?://[^\'" >]+?\.(?:m3u8?)|[^\'" >]+?\.(?:m3u8?))["\']'); found_links = pattern.findall(r.text)
for link in found_links:
full_url = urljoin(url, link); self.status_var.set(f"Validating: {full_url[:50]}...")
try:
head_r = requests.head(full_url, headers={'User-Agent': USER_AGENT}, timeout=5, allow_redirects=True)
if head_r.status_code == 200: valid_links.add(full_url); self.results_box.insert(tk.END, full_url)
except requests.RequestException: continue
if valid_links: self.status_var.set(f"Found {len(valid_links)} valid M3U link(s)."); self.add_to_db_btn.config(state=tk.NORMAL)
else: self.status_var.set("No valid M3U links found on page or via redirect.")
except Exception as e:
self.status_var.set(f"Error: {e}"); self.parent.log(f"[!] Website Finder Error: {e}", "red")
finally:
self.find_btn.config(state=tk.NORMAL)
def add_to_db(self):
selected_indices = self.results_box.curselection()
if not selected_indices: messagebox.showwarning("No Selection", "Please select one or more links to add.", parent=self); return
db_config = configparser.ConfigParser(); db_config.read(self.parent.links_db_manager.path)
if not db_config.has_section('DefaultLinks'): db_config.add_section('DefaultLinks')
links = dict(db_config.items('DefaultLinks')); added_count = 0
for idx in selected_indices:
url = self.results_box.get(idx)
try: name = Path(urlparse(url).path).stem.replace('-', ' ').replace('_', ' ').title()
except: name = f"link_{int(time.time())}"
if url not in links.values(): links[name.lower().replace(' ', '_')] = url; added_count += 1
db_config['DefaultLinks'] = links
self.parent.links_db_manager.save(db_config)
messagebox.showinfo("Success", f"Added {added_count} new link(s) to the database.", parent=self)
class LinksManagerWindow(BaseToplevel):
def __init__(self, parent):
super().__init__(parent); self.title("Default Links Database"); self.links_db_manager = parent.links_db_manager
body = ttk.Frame(self, padding="10"); body.pack(fill=tk.BOTH, expand=True)
self.listbox = tk.Listbox(body, height=15); self.listbox.pack(fill=tk.BOTH, expand=True, pady=5); self.populate_list()
btn_frame = ttk.Frame(body); btn_frame.pack(fill=tk.X, pady=5)
ttk.Button(btn_frame, text="Add...", command=self.add_link).pack(side=tk.LEFT, expand=True, fill=tk.X, padx=2)
ttk.Button(btn_frame, text="Edit...", command=self.edit_link).pack(side=tk.LEFT, expand=True, fill=tk.X, padx=2)
ttk.Button(btn_frame, text="Remove", command=self.remove_link).pack(side=tk.LEFT, expand=True, fill=tk.X, padx=2)
self.grab_set()
def populate_list(self):
self.listbox.delete(0, tk.END); self.links = self.links_db_manager.load({'DefaultLinks': {}}).get('defaultlinks', {})
for name, url in sorted(self.links.items()): self.listbox.insert(tk.END, f"{name}: {url}")
def add_link(self):
url = simpledialog.askstring("Add Link", "Enter the M3U URL:", parent=self)
if not url or not url.strip():
return
url = url.strip()
if url in self.links.values():
messagebox.showinfo("Duplicate", "This URL already exists in the database.", parent=self)
return
try:
parsed_url = urlparse(url)
path_stem = Path(parsed_url.path).stem
base_name = path_stem if path_stem and path_stem != '/' else parsed_url.hostname
if not base_name:
base_name = f"link_{int(time.time())}"
except Exception:
base_name = f"link_{int(time.time())}"
key = base_name.strip().lower().replace(' ', '_').replace('-', '_')
key = re.sub(r'\W+', '', key)
final_key = key
counter = 2
while final_key in self.links.keys():
final_key = f"{key}_{counter}"
counter += 1
self.links[final_key] = url
self.save_and_repopulate()
messagebox.showinfo("Success", f"Added link to database with name: {final_key}", parent=self)
def edit_link(self):
selected_idx = self.listbox.curselection();
if not selected_idx: return
name, url = self.listbox.get(selected_idx[0]).split(': ', 1); key = name
if key not in self.links:
key = name.lower().replace(' ', '_')
if key not in self.links:
messagebox.showerror("Error", "Could not find the selected link key in the database.", parent=self)
return
new_name = simpledialog.askstring("Edit Link", "Enter new name:", initialvalue=name, parent=self)
if not new_name or not new_name.strip(): return
new_url = simpledialog.askstring("Edit Link", "Enter new URL:", initialvalue=url, parent=self)
if not new_url or not new_url.strip(): return
del self.links[key]
self.links[new_name.strip().lower().replace(' ', '_')] = new_url.strip()
self.save_and_repopulate()
def remove_link(self):
selected_idx = self.listbox.curselection()
if not selected_idx: return
if not messagebox.askyesno("Confirm Remove", "Are you sure?"): return
name, _ = self.listbox.get(selected_idx[0]).split(': ', 1); key = name
if key not in self.links:
key = name.lower().replace(' ', '_')
if key not in self.links:
messagebox.showerror("Error", "Could not find the selected link key in the database.", parent=self)
return
del self.links[key]; self.save_and_repopulate()
def save_and_repopulate(self):
config = configparser.ConfigParser(); config['DefaultLinks'] = self.links
self.links_db_manager.save(config); self.populate_list()
# --- BASE CLASS FOR CHECKING LOGIC ---
class CheckerBase:
def __init__(self):
self.streams_to_check = []
self.active_processes = {}
self.is_running = False
self.processed_count = 0
self.online_count = 0
self.uncheckable_count = 0
self.journal_file = None
self.uncheckable_file = None
self.current_check_id = None
self.lock = threading.RLock() # FIX: Use a Re-entrant Lock to prevent deadlocks
self.config_manager = IniManager(CONFIG_FILE_PATH)
defaults = {'Settings': {}, 'StreamPatterns': {'.m3u8': 'video', '.m3u': 'video', '.ts': 'video', '.mp3': 'audio', '.aac': 'audio', '/stream': 'audio'}}
all_configs = self.config_manager.load(defaults)
self.stream_patterns = all_configs.get('streampatterns', {})
def _get_stream_type(self, url):
if not FFPROBE_PATH: return 'unknown'
try:
cmd = [FFPROBE_PATH, '-v', 'quiet', '-user_agent', AUDIO_USER_AGENT, '-i', url]
result = subprocess.run(cmd, capture_output=True, text=True, errors='ignore', timeout=10)
output = result.stderr
if "Stream #" in output and "Video:" in output: return 'video'
if "Stream #" in output and "Audio:" in output: return 'audio'
return 'unknown'
except (subprocess.TimeoutExpired, Exception):
return 'unknown'
def _perform_ocr_check(self, video_file_path):
temp_image_file = None
try:
with tempfile.NamedTemporaryFile(suffix=".png", prefix=TEMP_FILE_PREFIX, delete=False) as tf: temp_image_file = tf.name
subprocess.run([FFMPEG_PATH, '-y', '-hide_banner', '-loglevel', 'error', '-ss', '00:00:01', '-i', video_file_path, '-vframes', '1', temp_image_file], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=OCR_FRAME_EXTRACTION_TIMEOUT)
if os.path.exists(temp_image_file) and os.path.getsize(temp_image_file) > 0:
text = pytesseract.image_to_string(Image.open(temp_image_file)).lower()
if any(keyword in text for keyword in ["error", "login", "failed", "access denied", "unavailable", "not found"]): return "OFF (OCR: Error Screen)"
return "ON"
except Exception as ocr_error:
global OCR_FAILED_ONCE; logging.warning(f"OCR check failed: {ocr_error}", exc_info=True)
if not OCR_FAILED_ONCE: self.log(f"[!] OCR check failed. Assuming 'ON'.", "yellow"); OCR_FAILED_ONCE = True
return "ON (OCR Failed)"
finally:
if temp_image_file and os.path.exists(temp_image_file):
try: os.remove(temp_image_file)
except OSError: pass
def _force_kill_worker(self, pid, data):
try:
if os.name == 'posix':
os.killpg(os.getpgid(pid), signal.SIGKILL)
else:
subprocess.run(['taskkill', '/F', '/T', '/PID', str(pid)], capture_output=True, check=False)
data['proc'].wait(timeout=2)
except (ProcessLookupError, OSError, subprocess.TimeoutExpired): pass
finally:
if data and data.get('temp_file') and os.path.exists(data['temp_file']):
try: os.remove(data['temp_file'])
except OSError: pass
def log(self, message, tag=None, stream_id=None):
pass
# --- GUI APPLICATION CLASS ---
if GUI_AVAILABLE:
class AppGUI(tk.Tk, CheckerBase):
def __init__(self, cli_args, missing_optional_deps=None):
tk.Tk.__init__(self)
CheckerBase.__init__(self)
self.cli_args = cli_args
self.links_db_manager = IniManager(LINKS_DB_PATH)
self.lang_code = self.config_manager.load({'Settings': {'language': DEFAULT_LANG}}).get('settings', {}).get('language', DEFAULT_LANG)
if self.lang_code not in LANGUAGES:
self.lang_code = DEFAULT_LANG
self.i18n = LANGUAGES[self.lang_code]
self.title(f"{self.i18n.get('title')} {VERSION}")
self.geometry("900x700")
self.log_url_map = {}
self.is_rechecking = False
self.known_good_urls = set()
self.protocol("WM_DELETE_WINDOW", self.on_closing)
self._create_main_menu()
self._create_context_menu()
self.main_frame = ttk.Frame(self, padding="10")
self.main_frame.pack(fill=tk.BOTH, expand=True)
self._create_widgets()
if missing_optional_deps and 'tesseract' in missing_optional_deps:
messagebox.showwarning("Optional Dependency Missing",
"Tesseract OCR not found.\nThe OCR feature will be disabled.\n\nTo enable it, please install Tesseract for your OS (e.g., 'sudo apt install tesseract-ocr').")
self.ocr_check.config(state=tk.DISABLED)
self.load_settings()
self.after(500, self._periodic_internet_check)
def log(self, message, tag=None, stream_id=None):
# This is the GUI-specific implementation of the log method
if stream_id: self.log_view.insert(tk.END, message + "\n", (tag, stream_id))
else: self.log_view.insert(tk.END, message + "\n", tag)
self.log_view.see(tk.END); self.update_idletasks()
def load_settings(self):
defaults = {'Settings': {'mediaplayerpath': '', 'useocr': 'yes' if TESSERACT_INSTALLED else 'no',
'skipknowngoodurls': 'no', 'language': 'en'},
'StreamPatterns': {'.m3u8': 'video', '.m3u': 'video', '.ts': 'video', '.mp3': 'audio', '.aac': 'audio', '/stream': 'audio'}}
all_configs = self.config_manager.load(defaults)
self.settings = all_configs.get('settings', {})
self.stream_patterns = all_configs.get('streampatterns', {})
if hasattr(self, 'use_ocr_var'):
self.use_ocr_var.set(self.settings.get('useocr') == 'yes')
self.skip_knowngood_urls_var.set(self.settings.get('skipknowngoodurls') == 'yes')
def _check_internet_connection(self):
try:
requests.get("https://www.google.com", timeout=5, headers={'User-Agent': USER_AGENT})
return True
except requests.ConnectionError:
return False
def _periodic_internet_check(self):
has_internet = self._check_internet_connection()
if has_internet:
self.internet_status_label.pack_forget()
if self.start_button['state'] == tk.DISABLED and not self.is_running:
self._set_ui_state(True)
else:
self.internet_status_label.pack(side=tk.TOP, fill=tk.X)
if self.start_button['state'] == tk.NORMAL:
self._set_ui_state(False)
self.after(30000, self._periodic_internet_check)
def _set_ui_state(self, enabled):
state = tk.NORMAL if enabled else tk.DISABLED
self.start_button.config(state=state)
self.browse_button.config(state=state)
self.input_entry.config(state='normal' if enabled else 'disabled')
self.output_entry.config(state='normal' if enabled else 'disabled')
self.timeout_menu.config(state='readonly' if enabled else 'disabled')
self.workers_menu.config(state='readonly' if enabled else 'disabled')
search_file_exists = os.path.exists(self.output_var.get()) and os.path.getsize(self.output_var.get()) > 0
self.search_entry.config(state='normal' if enabled and search_file_exists else 'disabled')
for menu_name in ["File", "Tools", "Database", "Settings", "Debug", "About", "Language"]:
try: self.menu_bar.entryconfig(menu_name, state=state)
except tk.TclError: pass
def on_closing(self):
if self.is_running and messagebox.askyesno("Confirm Exit", "A check is running. Stop and exit?"): self.stop_processing(); self.destroy()
elif not self.is_running: self.destroy()
def _create_main_menu(self):
self.menu_bar = tk.Menu(self)
self.config(menu=self.menu_bar)
self.file_menu = tk.Menu(self.menu_bar, tearoff=0)
self.file_menu.add_command(label=self.i18n.get("recheck_output"), command=self.recheck_output_file)
self.file_menu.add_separator()
self.file_menu.add_command(label=self.i18n.get("exit"), command=self.on_closing)
self.menu_bar.add_cascade(label=self.i18n.get("file"), menu=self.file_menu)
self.tools_menu = tk.Menu(self.menu_bar, tearoff=0)
self.tools_menu.add_command(label=self.i18n.get("website_finder"), command=self.open_website_finder)
self.menu_bar.add_cascade(label=self.i18n.get("tools"), menu=self.tools_menu)
self.db_menu = tk.Menu(self.menu_bar, tearoff=0)
self.db_menu.add_command(label=self.i18n.get("manage_links"), command=self.open_links_manager)
self.db_menu.add_separator()
self.db_menu.add_command(label=self.i18n.get("check_all_links"), command=self.start_database_check)
self.menu_bar.add_cascade(label=self.i18n.get("database"), menu=self.db_menu)
settings_menu = tk.Menu(self.menu_bar, tearoff=0)
settings_menu.add_command(label=self.i18n.get("configure"), command=lambda: SettingsWindow(self))
self.menu_bar.add_cascade(label=self.i18n.get("settings"), menu=settings_menu)
self.lang_menu = tk.Menu(self.menu_bar, tearoff=0)
self.selected_lang = tk.StringVar(value=self.lang_code)
for lang_code in sorted(LANGUAGES.keys()):
self.lang_menu.add_radiobutton(label=lang_code.upper(), variable=self.selected_lang, value=lang_code, command=self.change_language)
self.menu_bar.add_cascade(label=self.i18n.get("language"), menu=self.lang_menu)
debug_menu = tk.Menu(self.menu_bar, tearoff=0)
debug_menu.add_command(label=self.i18n.get("view_log"), command=self._view_debug_log)
self.menu_bar.add_cascade(label=self.i18n.get("debug"), menu=debug_menu)
self.about_menu = tk.Menu(self.menu_bar, tearoff=0)
self.about_menu.add_command(label=self.i18n.get("creators"), command=self.show_creators)
self.about_menu.add_command(label=self.i18n.get("update"), command=self.update_app)
self.menu_bar.add_cascade(label=self.i18n.get("about_menu"), menu=self.about_menu)
def change_language(self):
new_lang = self.selected_lang.get()
if new_lang != self.lang_code:
if messagebox.askyesno("Change Language", "The application needs to restart to apply the language change. Restart now?"):
config = configparser.ConfigParser(); config.read(CONFIG_FILE_PATH, encoding='utf-8')
if not config.has_section('Settings'): config.add_section('Settings')
config.set('Settings', 'language', new_lang)
self.config_manager.save(config)
os.execl(sys.executable, sys.executable, *sys.argv)
def show_creators(self):
messagebox.showinfo("Creators", "Project Leader: peterpt\nCode Assistance by Gemini Pro Model\n\nhttps://github.com/peterpt/")
def update_app(self):
if not GIT_PATH:
messagebox.showerror("Update Error", "'git' command not found. Please update manually.")
return
if messagebox.askyesno("Confirm Update", "This will attempt to update the application from GitHub. Are you sure?"):
try:
self.log(">>> Attempting to update via git...", "info")
result = subprocess.run([GIT_PATH, 'pull'], capture_output=True, text=True, check=True, cwd=APP_DIR)
if "Already up to date." in result.stdout:
messagebox.showinfo("Update", "You are already using the latest version.")
self.log(">>> Already up to date.", "info")
else:
messagebox.showinfo("Update Successful", "Update complete. Please restart the application.")
self.log(">>> Update successful. Please restart.", "green")
except subprocess.CalledProcessError as e:
messagebox.showerror("Update Failed", f"An error occurred during update:\n\n{e.stderr}")
self.log(f"[!] Update failed: {e.stderr}", "red")
def _view_debug_log(self):
try:
if sys.platform == "win32": os.startfile(DEBUG_LOG_PATH)
elif sys.platform == "darwin": subprocess.run(["open", DEBUG_LOG_PATH])
else: subprocess.run(["xdg-open", DEBUG_LOG_PATH])
except Exception as e: self.log(f"[!] Could not open debug log: {e}", "red")
def _create_context_menu(self):
self.entry_context_menu = tk.Menu(self, tearoff=0); self.entry_context_menu.add_command(label="Cut", command=lambda: self.focus_get().event_generate("<<Cut>>")); self.entry_context_menu.add_command(label="Copy", command=lambda: self.focus_get().event_generate("<<Copy>>")); self.entry_context_menu.add_command(label="Paste", command=lambda: self.focus_get().event_generate("<<Paste>>"))
self.log_context_menu = tk.Menu(self, tearoff=0); self.log_context_menu.add_command(label="Open in Player", command=self._open_in_player); self.log_context_menu.add_separator(); self.log_context_menu.add_command(label="Copy URL", command=self._copy_log_url)
def _show_context_menu(self, event):
widget = event.widget
try:
if self.clipboard_get(): self.entry_context_menu.entryconfig("Paste", state="normal")
else: self.entry_context_menu.entryconfig("Paste", state="disabled")
except tk.TclError:
self.entry_context_menu.entryconfig("Paste", state="disabled")
if widget.selection_present():
self.entry_context_menu.entryconfig("Cut", state="normal")
self.entry_context_menu.entryconfig("Copy", state="normal")
else:
self.entry_context_menu.entryconfig("Cut", state="disabled")
self.entry_context_menu.entryconfig("Copy", state="disabled")
self.entry_context_menu.tk_popup(event.x_root, event.y_root)
def _show_log_context_menu(self, event):
self.log_view.tag_remove("sel", "1.0", "end"); clicked_index = self.log_view.index(f"@{event.x},{event.y}")
line_tags = self.log_view.tag_names(clicked_index); stream_tag = next((tag for tag in line_tags if tag.startswith("stream_")), None)
if stream_tag and stream_tag in self.log_url_map:
line_start = self.log_view.index(f"{clicked_index} linestart"); line_end = self.log_view.index(f"{clicked_index} lineend")
self.log_view.tag_add("sel", line_start, line_end); self.log_context_menu.entryconfig("Open in Player", state="normal"); self.log_context_menu.entryconfig("Copy URL", state="normal")
else: self.log_context_menu.entryconfig("Open in Player", state="disabled"); self.log_context_menu.entryconfig("Copy URL", state="disabled")
self.log_context_menu.tk_popup(event.x_root, event.y_root)
def _get_url_from_selected_log(self):
sel_ranges = self.log_view.tag_ranges("sel")
if not sel_ranges: return None
tags = self.log_view.tag_names(sel_ranges[0]); stream_tag = next((tag for tag in tags if tag.startswith("stream_")), None)
return self.log_url_map.get(stream_tag)
def _open_in_player(self):
url = self._get_url_from_selected_log(); player_path = self.settings.get('mediaplayerpath')
if not player_path: self.log("[!] Media player path not configured in Settings.", "red"); return
if url:
try: self.log(f"[*] Opening in external player...", "info"); subprocess.Popen([player_path, url])
except Exception as e: self.log(f"[!] Failed to open media player: {e}", "red")
def _copy_log_url(self):
url = self._get_url_from_selected_log()
if url: self.clipboard_clear(); self.clipboard_append(url); self.log("[*] URL copied to clipboard.", "info")
def _create_widgets(self):
self.internet_status_label = ttk.Label(self, text=self.i18n.get("no_internet"),
foreground="white", background="red", anchor="center", padding=5)
io_frame = ttk.Frame(self.main_frame); io_frame.pack(fill=tk.X, pady=5); io_frame.columnconfigure(1, weight=1)
ttk.Label(io_frame, text=self.i18n.get("m3u_input_label")).grid(row=0, column=0, sticky="w", padx=5)
self.input_var = tk.StringVar(value=self.cli_args.file or ''); self.input_entry = ttk.Entry(io_frame, textvariable=self.input_var)
self.input_entry.grid(row=0, column=1, sticky="we")
self.input_entry.bind("<Button-3>", self._show_context_menu)
self.browse_button = ttk.Button(io_frame, text=self.i18n.get("browse"), command=self.browse_file); self.browse_button.grid(row=0, column=2, padx=5)
ttk.Label(io_frame, text=self.i18n.get("output_file_label")).grid(row=1, column=0, sticky="w", padx=5, pady=5)
self.output_var = tk.StringVar(value=self.cli_args.output); self.output_entry = ttk.Entry(io_frame, textvariable=self.output_var)
self.output_entry.grid(row=1, column=1, sticky="we", pady=5); self.output_entry.bind("<Button-3>", self._show_context_menu)
control_frame = ttk.Frame(self.main_frame); control_frame.pack(fill=tk.X, pady=5)
self.start_button = ttk.Button(control_frame, text=self.i18n.get("start_check"), command=self.start_processing); self.start_button.pack(side="left", padx=5)
self.stop_button = ttk.Button(control_frame, text=self.i18n.get("stop_save"), command=self.stop_processing, state=tk.DISABLED); self.stop_button.pack(side="left", padx=5)
ttk.Label(control_frame, text=self.i18n.get("timeout_label")).pack(side="left", padx=(15, 5))
self.timeout_var = tk.StringVar(value=str(self.cli_args.timeout)); self.timeout_menu = ttk.Combobox(control_frame, textvariable=self.timeout_var, values=[str(i) for i in range(2, 31)], state="readonly", width=4); self.timeout_menu.pack(side="left")
ToolTip(self.timeout_menu, self.i18n.get("tooltip_timeout"))
ttk.Label(control_frame, text=self.i18n.get("workers_label")).pack(side="left", padx=(10, 5))
self.workers_var = tk.StringVar(value=str(self.cli_args.workers)); self.workers_menu = ttk.Combobox(control_frame, textvariable=self.workers_var, values=[str(i) for i in range(1, 21)], state="readonly", width=4); self.workers_menu.pack(side="left")
ToolTip(self.workers_menu, self.i18n.get("tooltip_workers"))
self.search_var = tk.StringVar(); self.search_var.trace_add("write", self._on_search_change)
self.search_entry = ttk.Entry(control_frame, textvariable=self.search_var, width=20)
self.search_entry.pack(side="left", padx=(10,0)); self.search_entry.insert(0, self.i18n.get("search_label")); self.search_entry.config(foreground="grey")
self.search_entry.bind("<FocusIn>", self._on_search_focus_in); self.search_entry.bind("<FocusOut>", self._on_search_focus_out)
self.search_entry.config(state=tk.DISABLED)
self.remaining_label = ttk.Label(control_frame, text=f"{self.i18n.get('remaining_label')} 0"); self.remaining_label.pack(side="right", padx=10)
options_frame = ttk.Frame(self.main_frame); options_frame.pack(fill=tk.X, pady=5)
log_display_frame = ttk.LabelFrame(options_frame, text=self.i18n.get("log_display_label"))
log_display_frame.pack(side="left", fill="x", expand=True, padx=(0,5))
self.log_display_var = tk.StringVar(value="name")
self.channel_name_radio = ttk.Radiobutton(log_display_frame, text=self.i18n.get("channel_name_radio"), variable=self.log_display_var, value="name")
self.channel_name_radio.pack(side="left", padx=10)
ToolTip(self.channel_name_radio, self.i18n.get("tooltip_channel_name"))
self.channel_url_radio = ttk.Radiobutton(log_display_frame, text=self.i18n.get("channel_url_radio"), variable=self.log_display_var, value="url")
self.channel_url_radio.pack(side="left", padx=10)
ToolTip(self.channel_url_radio, self.i18n.get("tooltip_channel_url"))
adv_frame = ttk.LabelFrame(options_frame, text=self.i18n.get("options_label")); adv_frame.pack(side="left", fill="x", expand=True, padx=(5,0))
self.use_ocr_var = tk.BooleanVar(); self.ocr_check = ttk.Checkbutton(adv_frame, text=self.i18n.get("use_ocr_check"), variable=self.use_ocr_var, command=self.save_checkbox_settings)
self.ocr_check.pack(side="left", padx=10)
if not TESSERACT_INSTALLED: self.ocr_check.config(state=tk.DISABLED)
ToolTip(self.ocr_check, self.i18n.get("tooltip_ocr"))
self.skip_knowngood_urls_var = tk.BooleanVar(); self.skip_check = ttk.Checkbutton(adv_frame, text=self.i18n.get("skip_known_check"), variable=self.skip_knowngood_urls_var, command=self.save_checkbox_settings)
self.skip_check.pack(side="left", padx=10)
ToolTip(self.skip_check, self.i18n.get("tooltip_skip_known"))
self.progress = ttk.Progressbar(self.main_frame, orient="horizontal", mode="determinate"); self.progress.pack(fill=tk.X, pady=5)
self.log_view = scrolledtext.ScrolledText(self.main_frame, wrap=tk.WORD, height=12); self.log_view.pack(fill=tk.BOTH, expand=True)
self.log_view.tag_config("green", foreground="#009900"); self.log_view.tag_config("red", foreground="#CC0000"); self.log_view.tag_config("yellow", foreground="#E69B00"); self.log_view.tag_config("info", foreground="#0073B7")
self.log_view.bind("<Button-3>", self._show_log_context_menu)
def _on_search_focus_in(self, event):
if self.search_var.get() == self.i18n.get("search_label"):
self.search_entry.delete(0, "end")
self.search_entry.config(foreground="black")
def _on_search_focus_out(self, event):
if not self.search_var.get():
self.search_entry.insert(0, self.i18n.get("search_label"))
self.search_entry.config(foreground="grey")
def _on_search_change(self, *args):
query = self.search_var.get().lower()
if self.is_running or not query or query == self.i18n.get("search_label").lower(): return
output_file = self.output_var.get()
if not os.path.exists(output_file): return
self.log_view.delete('1.0', tk.END)
try:
with open(output_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read()
found_streams = parse_m3u(content)
results = [s for s in found_streams if query in s['title'].lower()]
if results:
self.log(f"--- Found {len(results)} matches for '{query}' in {os.path.basename(output_file)} ---", "info")
for stream in results: self.log(f"[FOUND] {stream['title']}", "green")
else:
self.log(f"--- No matches found for '{query}' ---", "yellow")
except Exception as e:
self.log(f"[!] Error reading or parsing output file for search: {e}", "red")
def save_checkbox_settings(self):
config = configparser.ConfigParser(); config.read(self.config_manager.path, encoding='utf-8')
if not config.has_section('Settings'): config.add_section('Settings')
config.set('Settings', 'useocr', 'yes' if self.use_ocr_var.get() else 'no')
config.set('Settings', 'skipknowngoodurls', 'yes' if self.skip_knowngood_urls_var.get() else 'no')
self.config_manager.save(config); self.load_settings()
def browse_file(self):
filename = filedialog.askopenfilename(title="Select M3U File", filetypes=(("M3U files", "*.m3u*"),("All files", "*.*")))
if filename: self.input_var.set(filename)
def recheck_output_file(self):
self.start_processing(is_recheck=True)
def open_website_finder(self): WebsiteFinderWindow(self)
def open_links_manager(self): LinksManagerWindow(self)
def _set_ui_state_for_checking(self, is_starting):
"""Locks or unlocks the UI elements based on whether a check is running."""
# Determine the new state for most widgets
state = tk.DISABLED if is_starting else tk.NORMAL
readonly_state = tk.DISABLED if is_starting else 'readonly'
# --- Control main window widgets ---
self.start_button.config(state=state)
self.browse_button.config(state=state)
self.input_entry.config(state='normal' if not is_starting else 'disabled')
self.output_entry.config(state='normal' if not is_starting else 'disabled')
self.timeout_menu.config(state=readonly_state)
self.workers_menu.config(state=readonly_state)
self.channel_name_radio.config(state=state)
self.channel_url_radio.config(state=state)
self.skip_check.config(state=state)
# Special handling for OCR check to respect Tesseract installation status
if is_starting:
self.ocr_check.config(state=tk.DISABLED)
else:
self.ocr_check.config(state=tk.NORMAL if TESSERACT_INSTALLED else tk.DISABLED)
# Control the Stop button (it has the opposite logic)
self.stop_button.config(state=tk.NORMAL if is_starting else tk.DISABLED)
# --- Control specific, disruptive menu items ---
self.file_menu.entryconfig(self.i18n.get("recheck_output"), state=state)
self.tools_menu.entryconfig(self.i18n.get("website_finder"), state=state)
self.db_menu.entryconfig(self.i18n.get("check_all_links"), state=state)
self.about_menu.entryconfig(self.i18n.get("update"), state=state)
# Control the entire language menu cascade, as changing language restarts the app
self.menu_bar.entryconfig(self.i18n.get("language"), state=state)
def start_database_check(self):
if self.is_running: return
self.log_view.delete('1.0', tk.END)
self._set_ui_state_for_checking(True)
self.log("--- Starting Database Check ---", "info")
def do_db_check():
self.log("[*] Loading all playlists from the database...", "info")
links = self.links_db_manager.load().get('defaultlinks', {})
if not links:
self.log("[!] No links found in the database.", "yellow")
self.after(0, self.stop_processing)
return
all_content = ""
for name, url in links.items():
try:
self.log(f" -> Downloading '{name}'...", "info")
r = requests.get(url, headers={'User-Agent': USER_AGENT}, timeout=20)
r.raise_for_status()
all_content += r.content.decode('utf-8', errors='ignore') + "\n"
except Exception as e:
self.log(f"[!] Failed to download '{name}': {e}", "red")
if all_content:
self.input_var.set("[Database Check]")
all_streams = parse_m3u(all_content)
self.after(0, self._start_check_internal, all_streams, False)
else:
self.log("[!] Database check failed: No valid playlists could be downloaded.", "red")
self.after(0, self.stop_processing)
threading.Thread(target=do_db_check, daemon=True).start()
def start_processing(self, is_recheck=False):
if self.is_running: return
input_path = self.output_var.get() if is_recheck else self.input_var.get()
if not input_path or input_path == "[Database Check]":
self.log("[!] Please provide a valid M3U file or URL.", "red")
return
self.log_view.delete('1.0', tk.END)
self._set_ui_state_for_checking(True)
if is_recheck:
self.log(f"--- Starting Recheck of: {os.path.basename(input_path)} ---", "info")
else:
self.log("--- Starting Manual Check ---", "info")
def do_start_check():
try:
if urlparse(input_path).scheme in ('http', 'https'):
self.log(f"[*] Downloading: {input_path}", "info")
r = requests.get(input_path, headers={'User-Agent': USER_AGENT}, timeout=20)
r.raise_for_status()
content = r.content.decode('utf-8', errors='ignore')
else:
if not os.path.exists(input_path):
self.after(0, lambda: messagebox.showerror("File Not Found", f"The file '{input_path}' does not exist."))
self.after(0, self.stop_processing)
return
self.log(f"[*] Reading local file: {input_path}", "info")
with open(input_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
all_streams = parse_m3u(content)
self.after(0, self._start_check_internal, all_streams, is_recheck)
except requests.RequestException as e:
self.log(f"\n[!] Error downloading playlist: {e}", "red")
self.after(0, self.stop_processing)
except Exception as e:
self.log(f"\n[!] Error loading playlist: {e}", "red")
self.after(0, self.stop_processing)
threading.Thread(target=do_start_check, daemon=True).start()
def _start_check_internal(self, all_streams, is_recheck):
self.is_running = True
self.current_check_id = time.time()
self.is_rechecking = is_recheck
self.search_entry.config(state=tk.DISABLED)
output_path = self.output_var.get().strip()
if not output_path:
self.log("[!] Please provide an Output File name.", "red")
self.stop_processing()
return
self.progress['value'] = 0
self.processed_count = 0
self.uncheckable_count = 0
self.online_count = 0
self.log_url_map.clear()
self.known_good_urls.clear()
global OCR_FAILED_ONCE
OCR_FAILED_ONCE = False
logging.info(f"--- Starting new check with ID: {self.current_check_id} ---")
if self.skip_knowngood_urls_var.get() and not self.is_rechecking:
if os.path.exists(output_path):
output_filename = os.path.basename(output_path)
self.log(f"[*] Comparing against output file ('{output_filename}') to skip duplicates...", "info")
try:
with open(output_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read()
self.known_good_urls = {s['url'] for s in parse_m3u(content)}
self.log(f" -> Found {len(self.known_good_urls)} URLs in the output file to be skipped.", "info")
except Exception as e:
self.log(f"[!] Could not read existing output file to check for duplicates: {e}", "yellow")
self.streams_to_check = [s for s in all_streams if s['url'] not in self.known_good_urls]
if self.known_good_urls:
skipped_count = len(all_streams) - len(self.streams_to_check)
if skipped_count > 0:
self.log(f"[*] Skipped {skipped_count} streams that are already in the output file.", "info")
if not self.streams_to_check:
log_msg = "[!] No new streams to check." if not self.is_rechecking else "[!] No streams found in file to recheck."
self.log(log_msg, "yellow")
self.stop_processing()
return
part_file_path = f"{output_path}.part"; uncheckable_file_path = "uncheckable.m3u"
self.journal_file = open(part_file_path, "w", encoding='utf-8-sig'); self.uncheckable_file = open(uncheckable_file_path, "w", encoding='utf-8-sig')
write_m3u_header(self.journal_file); write_m3u_header(self.uncheckable_file)
self.total_stream_count = len(self.streams_to_check)
self.progress['maximum'] = self.total_stream_count
self.remaining_label.config(text=f"{self.i18n.get('remaining_label')} {self.total_stream_count}");
self.log(f"[*] Found {self.total_stream_count} new streams to check. Starting...", "info")
self._monitor_workers()
def stop_processing(self):
self.is_running = False
self.current_check_id = None
self.log(">>> Sending stop signal and cleaning up...", "yellow")
active_procs = list(self.active_processes.items())
self.active_processes.clear()
for pid, data in active_procs:
self._force_kill_worker(pid, data)
self.log(">>> All workers terminated.", "yellow")
if self.journal_file:
self.journal_file.close()
self.journal_file = None
if self.uncheckable_file:
self.uncheckable_file.close()
self.uncheckable_file = None
output_path = self.output_var.get().strip(); part_file_path = f"{output_path}.part"
if self.is_rechecking:
if os.path.exists(part_file_path) and os.path.getsize(part_file_path) > 15:
shutil.move(part_file_path, output_path)
self.log(f"[+] Playlist cleaned and saved to: {output_path}", "green")
else:
if os.path.exists(output_path): os.remove(output_path)
if os.path.exists(part_file_path): os.remove(part_file_path)
self.log(f"[!] No working streams found. Output file '{output_path}' removed.", "yellow")
else:
if os.path.exists(part_file_path) and os.path.getsize(part_file_path) > 15:
with open(part_file_path, 'r', encoding='utf-8-sig') as part_f:
new_content = part_f.read()
new_streams_count = len(re.findall(r'#EXTINF', new_content))
output_needs_header = not os.path.exists(output_path) or os.path.getsize(output_path) == 0
with open(output_path, 'a', encoding='utf-8-sig') as main_f:
if output_needs_header:
main_f.write("#EXTM3U\n\n")
part_content_no_header = new_content.replace("#EXTM3U\n\n", "", 1)
main_f.write(part_content_no_header)
self.log(f"[+] Appended {new_streams_count} new working streams to: {output_path}", "green")
if os.path.exists(part_file_path):
os.remove(part_file_path)
if self.uncheckable_count > 0: self.log(f"[!] {self.uncheckable_count} streams were saved to 'uncheckable.m3u'", "yellow")
self.remaining_label.config(text=f"{self.i18n.get('remaining_label')} 0")
self._set_ui_state_for_checking(False)
search_file_exists = os.path.exists(self.output_var.get()) and os.path.getsize(self.output_var.get()) > 0
self.search_entry.config(state='normal' if search_file_exists else 'disabled')
if self.input_var.get() == "[Database Check]":
self.input_var.set("")
def _monitor_workers(self):
if not self.is_running:
return
pids_to_remove = []
now = time.time()
for pid, data in self.active_processes.items():
if data['proc'].poll() is not None:
self._process_result(data)
pids_to_remove.append(pid)
elif (now - data['start_time']) > WORKER_PROCESS_TIMEOUT_SECONDS:
logging.warning(f"Worker PID {pid} for URL {data['stream_info']['url']} timed out. Forcing termination.")
self._force_kill_worker(pid, data)
self.after(0, self._handle_worker_failure, data['stream_info'], "(Timeout)", data['check_id'])
pids_to_remove.append(pid)
for pid in pids_to_remove:
if pid in self.active_processes:
del self.active_processes[pid]
max_workers = int(self.workers_var.get())
if len(self.active_processes) < max_workers and self.streams_to_check:
self._launch_worker(self.streams_to_check.pop(0), self.current_check_id)
if not self.streams_to_check and not self.active_processes:
self.log("\n--- Check Complete ---", "info")
self.stop_processing()
return
self.after(100, self._monitor_workers)
def _handle_worker_failure(self, stream_info, reason, check_id):
if check_id != self.current_check_id:
logging.warning(f"Discarding stale worker failure from a previous check ({check_id}).")
return
self.processed_count += 1
self.progress['value'] = self.processed_count
self.remaining_label.config(text=f"{self.i18n.get('remaining_label')} {self.total_stream_count - self.processed_count}")
display_text = stream_info['title'] if self.log_display_var.get() == 'name' else stream_info['url']
stream_id_tag = f"stream_{self.processed_count}"
self.log_url_map[stream_id_tag] = stream_info['url']
self.log(f"[{'OFF ' + reason:^15}] {display_text}", "red", stream_id=stream_id_tag)
logging.info(f"Stream offline: [{reason}] {stream_info['title']} ({stream_info['url']})")
def _launch_worker(self, stream_info, check_id):
def run_in_thread():
try:
if check_id != self.current_check_id: return
final_url = sanitize_url_aggressively(stream_info['url']) if stream_info.get('is_retry') else stream_info['url']
if 'youtube.com/' in final_url or 'youtu.be/' in final_url:
try:
yt_cmd = [YT_DLP_PATH, '--get-url', final_url]
logging.debug(f"Executing yt-dlp command: {' '.join(yt_cmd)}")
result = subprocess.run(yt_cmd, capture_output=True, text=True, timeout=YT_DLP_TIMEOUT_SECONDS)
if result.returncode == 0 and result.stdout.strip():
final_url = result.stdout.strip().splitlines()[0]
logging.debug(f"yt-dlp found direct URL: {final_url}")
else:
raise RuntimeError(f"yt-dlp failed: {result.stderr.strip()}")
except Exception as e:
logging.error(f"yt-dlp failed for {stream_info['url']}: {e}")
self.after(0, self._handle_worker_failure, stream_info, "(YouTube Error)", check_id)
return
check_type = None
for pattern, type_ in self.stream_patterns.items():
if pattern in final_url:
check_type = type_
break
if check_type is None and re.search(r':\d+(?:/[^.]*)?$', final_url):
check_type = 'audio'
if check_type is None:
probe_result = self._get_stream_type(final_url)
check_type = 'video' if probe_result == 'video' else 'audio'
self.after(0, self._schedule_process, stream_info, final_url, check_type, check_id)
except Exception as e:
logging.error(f"Error in worker thread for {stream_info['url']}: {e}", exc_info=True)
self.after(0, self._handle_worker_failure, stream_info, "(Launch Error)", check_id)
threading.Thread(target=run_in_thread, daemon=True).start()
def _schedule_process(self, stream_info, final_url, check_type, check_id):
if check_id != self.current_check_id:
logging.warning(f"Discarding stale process schedule from a previous check ({check_id}).")
return
temp_file = None
try:
proc = None; probe_type = 'ffprobe'
use_ffmpeg = check_type == 'video' and self.use_ocr_var.get()
popen_kwargs = {'stdin': subprocess.DEVNULL, 'stdout': subprocess.PIPE, 'stderr': subprocess.PIPE, 'text': True, 'errors': 'ignore'}
if os.name == 'posix': popen_kwargs['start_new_session'] = True
else: popen_kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
if use_ffmpeg:
probe_type = 'ffmpeg'
with tempfile.NamedTemporaryFile(suffix=".mp4", prefix=TEMP_FILE_PREFIX, delete=False) as tf: temp_file = tf.name
ffmpeg_cmd = [FFMPEG_PATH, '-y', '-hide_banner'] + ['-user_agent', USER_AGENT, '-timeout', str(build_timeout_config(int(self.timeout_var.get()))['network_timeout_us'])] + build_timeout_config(int(self.timeout_var.get()))['ffmpeg_flags'] + ['-t', str(DEFAULT_CAPTURE_DURATION), '-i', final_url] + ['-c', 'copy', '-bsf:a', 'aac_adtstoasc', temp_file]
proc = subprocess.Popen(ffmpeg_cmd, **popen_kwargs)
else:
user_agent = AUDIO_USER_AGENT if check_type == 'audio' else USER_AGENT
ffprobe_cmd = [FFPROBE_PATH, '-v', 'error', '-user_agent', user_agent, '-timeout', str(build_timeout_config(int(self.timeout_var.get()))['network_timeout_us']), '-i', final_url]
proc = subprocess.Popen(ffprobe_cmd, **popen_kwargs)
self.active_processes[proc.pid] = {"proc": proc, "stream_info": stream_info, "temp_file": temp_file, "start_time": time.time(), "check_type": check_type, "probe_type": probe_type, "check_id": check_id}
except Exception as e:
logging.error(f"Error scheduling process for {stream_info['url']}: {e}", exc_info=True)
if temp_file and os.path.exists(temp_file):
try: os.remove(temp_file)
except OSError: pass
self.after(0, self._handle_worker_failure, stream_info, "(Schedule Error)", check_id)
def _process_result(self, data):
check_id = data['check_id']
if check_id != self.current_check_id:
logging.warning(f"Discarding stale process result from a previous check ({check_id}).")
if data['temp_file'] and os.path.exists(data['temp_file']):
try: os.remove(data['temp_file'])
except OSError: pass
return
proc = data['proc']; stream_info = data['stream_info']; temp_file = data['temp_file']
display_text = stream_info['title'] if self.log_display_var.get() == 'name' else stream_info['url']
tag = "red"; detailed_status = "OFF"
try:
stdout, stderr = proc.communicate()
logging.debug(f"[{data['probe_type'].upper()}] stdout for {stream_info['url']}:\n{stdout}")
logging.debug(f"[{data['probe_type'].upper()}] stderr for {stream_info['url']}:\n{stderr}")
if data['probe_type'] == 'ffprobe':
if proc.returncode == 0: detailed_status = "ON"
else: detailed_status = "OFF (Probe Error)"
else: # ffmpeg
if proc.returncode == 0 and os.path.exists(temp_file) and os.path.getsize(temp_file) >= MIN_FILE_SIZE_BYTES:
if data['check_type'] == 'video' and self.use_ocr_var.get():
detailed_status = self._perform_ocr_check(temp_file)
else:
detailed_status = "ON"
else:
detailed_status = "OFF (Capture Error)"
except Exception as e:
detailed_status = "OFF (Processing Error)"
logging.error(f"Error processing result for {stream_info['url']}: {e}", exc_info=True)
self.processed_count += 1
self.progress['value'] = self.processed_count
self.remaining_label.config(text=f"{self.i18n.get('remaining_label')} {self.total_stream_count - self.processed_count}")
gui_status_text = "OFF"
if "ON" in detailed_status:
self.online_count += 1
if data['check_type'] == 'audio': stream_info['group'] = 'Radios'
write_m3u_entry(self.journal_file, stream_info)
tag = "green"; gui_status_text = "ON"
else:
url = stream_info['url']
if len(url) > UNCHECKABLE_URL_LENGTH_THRESHOLD or any(kw in url.lower() for kw in UNCHECKABLE_KEYWORDS):
write_m3u_entry(self.uncheckable_file, stream_info)
self.uncheckable_count += 1
stream_id_tag = f"stream_{self.processed_count}"
self.log_url_map[stream_id_tag] = stream_info['url']
self.log(f"[{gui_status_text:^15}] {display_text}", tag, stream_id=stream_id_tag)
if temp_file and os.path.exists(temp_file):
try: os.remove(temp_file)
except OSError as e: logging.warning(f"Could not remove temp file {temp_file}: {e}")
# --- CLI APPLICATION CLASS ---
class AppCLI(CheckerBase):
def __init__(self, args):
super().__init__()
self.args = args
self.links_db_manager = IniManager(LINKS_DB_PATH)
self.total_streams = 0
def log(self, message, tag=None):
color_map = {"green": Fore.GREEN, "red": Fore.RED, "yellow": Fore.YELLOW, "info": Fore.CYAN}
color = color_map.get(tag, "")
with self.lock:
print(f"{color}{message}{Style.RESET_ALL}")
def run(self):
print(f"{Style.BRIGHT}{Fore.CYAN}--- IPTV-Check v{VERSION} (CLI Mode) ---{Style.RESET_ALL}")
signal.signal(signal.SIGINT, self._signal_handler)
content = self._get_content()
if not content: return
all_streams = parse_m3u(content)
if not all_streams:
self.log("[!] No streams found in the provided source.", "yellow")
return
is_recheck = bool(self.args.recheck)
output_path = self.args.recheck if is_recheck else self.args.output
known_good_urls = set()
if not is_recheck and not self.args.no_skip:
if os.path.exists(output_path):
self.log(f"[*] Comparing against output file ('{os.path.basename(output_path)}') to skip duplicates...", "yellow")
try:
with open(output_path, 'r', encoding='utf-8', errors='ignore') as f: existing_content = f.read()
known_good_urls = {s['url'] for s in parse_m3u(existing_content)}
self.log(f" -> Found {len(known_good_urls)} URLs in the output file to be skipped.", "info")
except Exception as e:
self.log(f"[!] Could not read existing output file: {e}", "red")
self.streams_to_check = [s for s in all_streams if s['url'] not in known_good_urls]
if known_good_urls:
skipped_count = len(all_streams) - len(self.streams_to_check)
if skipped_count > 0:
self.log(f"[*] Skipped {skipped_count} streams that are already in the output file.", "info")
if not self.streams_to_check:
self.log("[!] No new streams to check.", "yellow")
return
self.log(f"[*] Found {len(self.streams_to_check)} new streams to check. Starting...", "green")
part_file_path = f"{output_path}.part"; uncheckable_file_path = "uncheckable.m3u"
self.journal_file = open(part_file_path, "w", encoding='utf-8-sig')
self.uncheckable_file = open(uncheckable_file_path, "w", encoding='utf-8-sig')
write_m3u_header(self.journal_file); write_m3u_header(self.uncheckable_file)
self.total_streams = len(self.streams_to_check)
self.is_running = True
self.current_check_id = time.time()
queue = self.streams_to_check[:]
threads = []
for _ in range(self.args.workers):
thread = threading.Thread(target=self._worker, args=(queue,))
thread.daemon = True
thread.start()
threads.append(thread)
while any(t.is_alive() for t in threads):
time.sleep(0.5)
# This loop allows Ctrl+C to be detected
self.is_running = False # Ensure all threads stop after finishing their current item
for t in threads:
t.join()
self._finalize_check(output_path, is_recheck)
def _signal_handler(self, sig, frame):
if self.is_running:
print(f"\n{Style.BRIGHT}{Fore.YELLOW}>>> Ctrl+C detected. Stopping launch of new checks. Please wait for active checks to finish...{Style.RESET_ALL}")
self.is_running = False
def _get_content(self):
content = ""
input_path = self.args.file or self.args.recheck
try:
if input_path:
if urlparse(input_path).scheme in ('http', 'https'):
print(f"[*] Downloading: {input_path}")
r = requests.get(input_path, headers={'User-Agent': USER_AGENT}, timeout=20); r.raise_for_status()
content = r.content.decode('utf-8', errors='ignore')
else:
operation_message = "Re-checking file" if self.args.recheck else "Reading local file"
print(f"[*] {operation_message}: {input_path}")
with open(input_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read()
elif self.args.database:
print("[*] Loading all playlists from the database...")
links = self.links_db_manager.load().get('defaultlinks', {})
if not links:
print(f"{Fore.RED}[!] No links found in the database.{Style.RESET_ALL}"); return None
for name, url in links.items():
try:
print(f" -> Downloading '{name}'...")
r = requests.get(url, headers={'User-Agent': USER_AGENT}, timeout=20); r.raise_for_status()
content += r.content.decode('utf-8', errors='ignore') + "\n"
except Exception as e:
print(f"{Fore.RED}[!] Failed to download '{name}': {e}{Style.RESET_ALL}")
except Exception as e:
print(f"{Style.BRIGHT}{Fore.RED}\n[!] Error loading playlist: {e}{Style.RESET_ALL}"); return None
return content
def _worker(self, queue):
while self.is_running:
try:
with self.lock:
if not queue: break
stream_info = queue.pop(0)
except IndexError:
break
if not self.is_running: break
final_url, check_type = self._prepare_stream(stream_info)
if final_url is None: continue
self._execute_check(stream_info, final_url, check_type)
def _prepare_stream(self, stream_info):
try:
final_url = sanitize_url_aggressively(stream_info['url'])
if 'youtube.com/' in final_url or 'youtu.be/' in final_url:
yt_cmd = [YT_DLP_PATH, '--get-url', final_url]
result = subprocess.run(yt_cmd, capture_output=True, text=True, timeout=YT_DLP_TIMEOUT_SECONDS)
if result.returncode == 0 and result.stdout.strip():
final_url = result.stdout.strip().splitlines()[0]
else: raise RuntimeError(f"yt-dlp failed: {result.stderr.strip()}")
check_type = None
for pattern, type_ in self.stream_patterns.items():
if pattern in final_url: check_type = type_; break
if check_type is None and re.search(r':\d+(?:/[^.]*)?$', final_url): check_type = 'audio'
if check_type is None:
probe_result = self._get_stream_type(final_url)
check_type = 'video' if probe_result == 'video' else 'audio'
return final_url, check_type
except Exception as e:
logging.error(f"Error preparing stream {stream_info['url']}: {e}")
self._handle_cli_failure(stream_info, "(Prepare Error)")
return None, None
def _execute_check(self, stream_info, final_url, check_type):
temp_file = None; detailed_status = "OFF"
try:
use_ffmpeg = check_type == 'video' and self.args.ocr
popen_kwargs = {'stdin': subprocess.DEVNULL, 'stdout': subprocess.PIPE, 'stderr': subprocess.PIPE, 'text': True, 'errors': 'ignore'}
if os.name == 'posix': popen_kwargs['start_new_session'] = True
else: popen_kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
if use_ffmpeg:
with tempfile.NamedTemporaryFile(suffix=".mp4", prefix=TEMP_FILE_PREFIX, delete=False) as tf: temp_file = tf.name
ffmpeg_cmd = [FFMPEG_PATH, '-y', '-hide_banner'] + ['-user_agent', USER_AGENT, '-timeout', str(build_timeout_config(self.args.timeout)['network_timeout_us'])] + build_timeout_config(self.args.timeout)['ffmpeg_flags'] + ['-t', str(DEFAULT_CAPTURE_DURATION), '-i', final_url] + ['-c', 'copy', '-bsf:a', 'aac_adtstoasc', temp_file]
proc = subprocess.Popen(ffmpeg_cmd, **popen_kwargs)
else:
user_agent = AUDIO_USER_AGENT if check_type == 'audio' else USER_AGENT
ffprobe_cmd = [FFPROBE_PATH, '-v', 'error', '-user_agent', user_agent, '-timeout', str(build_timeout_config(self.args.timeout)['network_timeout_us']), '-i', final_url]
proc = subprocess.Popen(ffprobe_cmd, **popen_kwargs)
stdout, stderr = proc.communicate(timeout=WORKER_PROCESS_TIMEOUT_SECONDS)
if proc.returncode == 0:
if use_ffmpeg and os.path.exists(temp_file) and os.path.getsize(temp_file) >= MIN_FILE_SIZE_BYTES:
detailed_status = self._perform_ocr_check(temp_file)
elif not use_ffmpeg:
detailed_status = "ON"
except subprocess.TimeoutExpired:
proc.kill()
detailed_status = "OFF (Timeout)"
except Exception as e:
logging.error(f"Error executing check for {final_url}: {e}")
detailed_status = "OFF (Execution Error)"
finally:
if temp_file and os.path.exists(temp_file):
try: os.remove(temp_file)
except OSError: pass
self._handle_cli_result(stream_info, detailed_status, check_type)
def _handle_cli_failure(self, stream_info, reason):
with self.lock:
self.processed_count += 1
display_text = stream_info['title'] if self.args.log_format == 'name' else stream_info['url']
self.log(f"[{Fore.RED}{reason:^15}{Fore.RESET}] {display_text}")
def _handle_cli_result(self, stream_info, detailed_status, check_type):
with self.lock:
self.processed_count += 1
display_text = stream_info['title'] if self.args.log_format == 'name' else stream_info['url']
if "ON" in detailed_status:
self.online_count += 1
if check_type == 'audio': stream_info['group'] = 'Radios'
write_m3u_entry(self.journal_file, stream_info)
self.log(f"[{Fore.GREEN}{'ON':^15}{Fore.RESET}] {display_text}", tag="green")
else:
self.log(f"[{Fore.RED}{detailed_status:^15}{Fore.RESET}] {display_text}", tag="red")
url = stream_info['url']
if len(url) > UNCHECKABLE_URL_LENGTH_THRESHOLD or any(kw in url.lower() for kw in UNCHECKABLE_KEYWORDS):
write_m3u_entry(self.uncheckable_file, stream_info)
self.uncheckable_count += 1
def _finalize_check(self, output_path, is_recheck):
print(f"\n{Style.BRIGHT}{Fore.CYAN}--- Check Complete ---{Style.RESET_ALL}")
if self.journal_file: self.journal_file.close()
if self.uncheckable_file: self.uncheckable_file.close()
part_file_path = f"{output_path}.part"
if is_recheck:
if os.path.exists(part_file_path) and os.path.getsize(part_file_path) > 15:
shutil.move(part_file_path, output_path)
print(f"{Fore.GREEN}[+] Playlist cleaned and saved to: {output_path}{Style.RESET_ALL}")
else:
if os.path.exists(output_path): os.remove(output_path)
if os.path.exists(part_file_path): os.remove(part_file_path)
print(f"{Fore.YELLOW}[!] No working streams found. Output file '{output_path}' removed.{Style.RESET_ALL}")
else:
if os.path.exists(part_file_path) and os.path.getsize(part_file_path) > 15:
with open(part_file_path, 'r', encoding='utf-8-sig') as part_f: new_content = part_f.read()
output_needs_header = not os.path.exists(output_path) or os.path.getsize(output_path) == 0
with open(output_path, 'a', encoding='utf-8-sig') as main_f:
if output_needs_header: main_f.write("#EXTM3U\n\n")
part_content_no_header = new_content.replace("#EXTM3U\n\n", "", 1)
main_f.write(part_content_no_header)
print(f"{Fore.GREEN}[+] Appended {self.online_count} new working streams to: {output_path}{Style.RESET_ALL}")
if os.path.exists(part_file_path): os.remove(part_file_path)
if self.uncheckable_count > 0: print(f"{Fore.YELLOW}[!] {self.uncheckable_count} streams were saved to 'uncheckable.m3u'{Style.RESET_ALL}")
print(f"\n{Style.BRIGHT}Summary: {self.online_count} Online, {self.total_streams - self.online_count} Offline, {self.total_streams} Total.{Style.RESET_ALL}")
# --- MAIN APPLICATION ENTRY POINT ---
def main():
setup_logging(); cleanup_stale_temp_files(); load_languages()
missing_essential, missing_optional = check_dependencies()
parser = argparse.ArgumentParser(description=f"IPTV-Check v{VERSION}.", add_help=False)
parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help='Show this help message and exit.')
parser.add_argument('-gui', '--gui', action='store_true', help='Launch the graphical user interface.')
input_group = parser.add_argument_group('CLI Input Options (choose one)')
input_group = input_group.add_mutually_exclusive_group()
input_group.add_argument('-f', '--file', help='Path or URL to the M3U playlist file.')
input_group.add_argument('-d', '--database', action='store_true', help='Use the default links database as input.')
input_group.add_argument('-r', '--recheck', help='Re-check an existing output file (e.g., updated.m3u).')
cli_opts = parser.add_argument_group('CLI General Options')
cli_opts.add_argument('-o', '--output', default='updated.m3u', help='Output file for working streams.')
cli_opts.add_argument('-w', '--workers', type=int, default=10, help='Number of parallel workers (1-20).')
cli_opts.add_argument('-t', '--timeout', type=int, default=5, help='Network timeout in seconds for each stream.')
cli_opts.add_argument('--log-format', choices=['name', 'url'], default='name', help='Log output format for the CLI progress bar.')
cli_opts.add_argument('--ocr', action='store_true', help='Enable OCR checking for video streams.')
cli_opts.add_argument('--no-skip', action='store_true', help='Disable skipping of known good URLs found in the output file.')
args = parser.parse_args()
is_cli_mode = args.file or args.database or args.recheck
if args.gui:
if not GUI_AVAILABLE:
print(f"{Fore.RED}{Style.BRIGHT}[!] GUI mode requires Tkinter, which could not be imported.{Style.RESET_ALL}"); sys.exit(1)
if missing_essential:
error_msg = (f"FATAL ERROR: The following required programs are not installed or not in your system's PATH:\n\n"
f"{', '.join(missing_essential)}\n\n"
"Please install them and ensure they are accessible to run the application.")
logging.critical(f"Missing essential dependencies: {', '.join(missing_essential)}")
root = tk.Tk(); root.withdraw(); messagebox.showerror("Fatal Dependency Error", error_msg)
sys.exit(1)
app = AppGUI(args, missing_optional)
app.mainloop()
elif is_cli_mode:
if missing_essential:
error_msg = (f"FATAL ERROR: The following required programs are not installed or not in your system's PATH:\n\n"
f"{', '.join(missing_essential)}\n\n"
"Please install them and ensure they are accessible to run the application.")
logging.critical(f"Missing essential dependencies: {', '.join(missing_essential)}")
print(f"{Fore.RED}{Style.BRIGHT}{error_msg}{Style.RESET_ALL}")
sys.exit(1)
if args.recheck and args.output != 'updated.m3u' and not any(arg in sys.argv for arg in ['-o', '--output']):
args.output = args.recheck
cli_app = AppCLI(args)
cli_app.run()
else:
parser.print_help()
print("\nTo start the graphical interface, run:\npython3 iptv_check.py --gui")
sys.exit(0)
if __name__ == "__main__":
main()