import tkinter as tk from tkinter import ttk, messagebox import requests import json import urllib3 import threading import time from datetime import datetime from typing import Optional, Dict, Any urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # ==================== 配置 ==================== DEFAULT_DOMAIN = "888.hdna57.com" APP_KEY = "NqB93x7ohAQt" DEFAULT_UDID = "33fa7a23-794e-4075-b663-da6a2d9d5cff" QUERY_URL = "https://yiqifa.ccwu.cc/api/records/recent?limit=3" BALANCE_QUERY = """ query getUserBalance($product_id: ProductEnum) { User { id user_balance(product_id: $product_id) } } """ DOMAIN_LIST_QUERY = """ { DomainListV2( domain_device: [DomainDeviceAll, DomainDevicePc] domain_status: [DomainStatusUsing, DomainStatusBackup] ) { id domain_name domain_status location is_use_for_patch __typename } } """ HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Edg/147.0.0.0", "Accept": "application/json, text/plain, */*", "Content-Type": "application/json", "Origin": "https://888.mtvip07.com", "Referer": "https://888.mtvip07.com/user/login-form/login", } # 团队成员验证配置 MEMBERS_API_URL = "https://women.yiqifa.ccwu.cc/list" # 你的 Workers API 地址 REJECT_MESSAGE = "非团队成员,无法使用" # 拒绝提示语,可按需修改 # ============================================= class AutoBetApp: def __init__(self, root): self.root = root self.root.title("万瑞团队 辉达平台自动化投注系统 v1.0") self.root.geometry("1000x680") self.root.configure(bg='#f0f2f5') self.root.minsize(900, 600) style = ttk.Style() style.theme_use('clam') style.configure('TButton', font=('微软雅黑', 9), padding=2) style.configure('TLabel', font=('微软雅黑', 9), background='#f0f2f5') style.configure('TEntry', font=('微软雅黑', 9), padding=2) main_frame = ttk.Frame(root, padding="10") main_frame.pack(fill=tk.BOTH, expand=True) control_bar = ttk.Frame(main_frame) control_bar.pack(fill=tk.X, pady=(0, 10)) ttk.Label(control_bar, text="账号:").pack(side=tk.LEFT, padx=2) self.username_entry = ttk.Entry(control_bar, width=12, state=tk.DISABLED) self.username_entry.pack(side=tk.LEFT, padx=2) self._setup_placeholder(self.username_entry, "请输入用户名") ttk.Label(control_bar, text="密码:").pack(side=tk.LEFT, padx=2) self.password_entry = ttk.Entry(control_bar, show="*", width=12, state=tk.DISABLED) self.password_entry.pack(side=tk.LEFT, padx=2) self.login_btn = ttk.Button(control_bar, text="登录", width=6, command=self.login, state=tk.DISABLED) self.login_btn.pack(side=tk.LEFT, padx=5) self.balance_label = ttk.Label(control_bar, text="余额:--", font=('微软雅黑',9,'bold'), foreground='#e67e22', width=12) self.balance_label.pack(side=tk.LEFT, padx=10) self.refresh_balance_btn = ttk.Button(control_bar, text="刷新余额", width=8, command=self.refresh_balance, state=tk.DISABLED) self.refresh_balance_btn.pack(side=tk.LEFT, padx=2) ttk.Label(control_bar, text="基数:").pack(side=tk.LEFT, padx=(10,2)) self.base_var = tk.StringVar(value="1") self.base_entry = ttk.Entry(control_bar, textvariable=self.base_var, width=4, state=tk.DISABLED) self.base_entry.pack(side=tk.LEFT, padx=2) ttk.Label(control_bar, text="延迟(秒):").pack(side=tk.LEFT, padx=(10,2)) self.delay_var = tk.StringVar(value="5") self.delay_entry = ttk.Entry(control_bar, textvariable=self.delay_var, width=4, state=tk.DISABLED) self.delay_entry.pack(side=tk.LEFT, padx=2) self.start_btn = ttk.Button(control_bar, text="开始", width=6, command=self.start_task, state=tk.DISABLED) self.start_btn.pack(side=tk.LEFT, padx=5) self.stop_btn = ttk.Button(control_bar, text="停止", width=6, command=self.stop_task, state=tk.DISABLED) self.stop_btn.pack(side=tk.LEFT, padx=2) record_frame = ttk.LabelFrame(main_frame, text="投注记录", padding="4") record_frame.pack(fill=tk.BOTH, expand=True) columns = ("时间", "推荐期号", "位置", "号码", "倍数", "投注金额", "盈亏", "累计", "结果") col_widths = {"时间":60, "推荐期号":75, "位置":35, "号码":90, "倍数":35, "投注金额":50, "盈亏":50, "累计":60, "结果":40} self.tree = ttk.Treeview(record_frame, columns=columns, show="headings", height=18) for col in columns: self.tree.heading(col, text=col) self.tree.column(col, width=col_widths.get(col, 70), anchor=tk.CENTER) self.tree.pack(fill=tk.BOTH, expand=True, side=tk.LEFT) scrollbar = ttk.Scrollbar(record_frame, orient=tk.VERTICAL, command=self.tree.yview) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.tree.configure(yscrollcommand=scrollbar.set) self.session = None self.token = None self.username = None self.password = None self.udid = DEFAULT_UDID self.user_id = None self.balance = None self.running = False self.thread = None self.last_period = None self.bet_records = {} self.cumulative_profit = 0.0 self.base_url = None self._start_optimize_domain() print("[启动] 程序初始化完成,正在优选线路...") def _setup_placeholder(self, entry, placeholder, is_password=False): """为普通输入框设置占位符(仅用于非密码框)""" entry.placeholder = placeholder entry.insert(0, placeholder) entry.bind("", lambda e: self._clear_placeholder(entry)) entry.bind("", lambda e: self._restore_placeholder(entry)) def _clear_placeholder(self, entry): if entry.get() == entry.placeholder: entry.delete(0, tk.END) def _restore_placeholder(self, entry): if not entry.get(): entry.insert(0, entry.placeholder) def _http_latency(self, url, timeout=5): try: start = time.time() resp = requests.get(url, timeout=timeout, verify=False, stream=True) resp.close() end = time.time() if resp.status_code == 200: return end - start else: print(f"[HTTP延迟] {url} 返回状态码 {resp.status_code}") return None except Exception as e: print(f"[HTTP延迟] {url} 异常: {e}") return None def _start_optimize_domain(self): self.optimize_window = tk.Toplevel(self.root) self.optimize_window.title("线路优选") self.optimize_window.geometry("280x80") self.optimize_window.resizable(False, False) self.optimize_window.transient(self.root) self.optimize_window.grab_set() ttk.Label(self.optimize_window, text="正在优选线路,请稍候…", font=('微软雅黑', 10)).pack(expand=True) self.optimize_window.protocol("WM_DELETE_WINDOW", lambda: None) self._center_window(self.optimize_window) threading.Thread(target=self._do_optimize, daemon=True).start() def _center_window(self, window): window.update_idletasks() w = window.winfo_width() h = window.winfo_height() root_x = self.root.winfo_x() root_y = self.root.winfo_y() root_w = self.root.winfo_width() root_h = self.root.winfo_height() x = root_x + (root_w - w) // 2 y = root_y + (root_h - h) // 2 window.geometry(f"+{x}+{y}") def _do_optimize(self): try: self.optimize_domain() except Exception as e: print(f"[优选] 后台异常: {e}") finally: self.root.after(0, self._on_optimize_finished) def _on_optimize_finished(self): # ---- 修复焦点问题的关键部分 ---- if hasattr(self, 'optimize_window') and self.optimize_window: self.optimize_window.grab_release() # 解除模态锁定 self.optimize_window.destroy() self.optimize_window = None if not self.base_url: self.base_url = f"https://{DEFAULT_DOMAIN}/APIV2/GraphQL" self.username_entry.config(state=tk.NORMAL) self.password_entry.config(state=tk.NORMAL) self.login_btn.config(state=tk.NORMAL) # 强制主窗口获取焦点,并让光标出现在用户名输入框 self.root.lift() self.root.focus_force() self.username_entry.focus_set() # --------------------------------- print("[线路优选] 完成,可以登录") def optimize_domain(self): print("[线路优选] 正在获取域名列表并测试延迟...") try: temp_session = requests.Session() for k, v in HEADERS.items(): temp_session.headers.update({k: v}) params = {"l": "zh-cn", "pf": "web", "udid": self.udid} url = f"https://{DEFAULT_DOMAIN}/APIV2/GraphQL" payload = {"variables": {}, "query": DOMAIN_LIST_QUERY} resp = temp_session.post(url, params=params, json=payload, timeout=10, verify=False) resp.raise_for_status() data = resp.json() if data and not data.get("errors"): domains = data.get("data", {}).get("DomainListV2", []) using, backup = [], [] for d in domains: host = d["domain_name"] if host.startswith("https://"): host = host[8:] elif host.startswith("http://"): host = host[7:] if d.get("domain_status") == "DomainStatusUsing": using.append(host) elif d.get("domain_status") == "DomainStatusBackup": backup.append(host) candidates = using if using else backup if not candidates: raise Exception("API 未返回任何有效域名") print(f"[线路优选] 候选域名 {len(candidates)} 个,开始 HTTP 延迟测试...") best_host = None best_latency = float('inf') for host in candidates: test_url = f"https://{host}" latency = self._http_latency(test_url, timeout=5) if latency is not None: latency_ms = latency * 1000 print(f" {host} -> {latency_ms:.0f} ms") if latency < best_latency: best_latency = latency best_host = host if latency_ms < 1000: best_host = host best_latency = latency print(f"[线路优选] 找到延迟 {latency_ms:.0f} ms < 1000 ms,立即选用 {host}") break else: print(f" {host} -> 连接失败") if best_host: self.base_url = f"https://{best_host}/APIV2/GraphQL" print(f"[线路优选] 最优域名: {best_host} (延迟 {best_latency*1000:.0f} ms)") else: raise Exception("所有候选域名 HTTP 均不通") else: raise Exception("查询域名列表失败") except Exception as e: self.base_url = f"https://{DEFAULT_DOMAIN}/APIV2/GraphQL" print(f"[线路优选] 异常: {e},回退至默认域名") def graphql_request(self, payload: Dict[str, Any], need_auth: bool = False) -> Optional[Dict[str, Any]]: if not self.session: self.session = requests.Session() for k, v in HEADERS.items(): self.session.headers.update({k: v}) params = {"l": "zh-cn", "pf": "web", "udid": self.udid} if need_auth and self.username: params["ac"] = self.username if need_auth and self.token: self.session.headers.update({"authorization": self.token}) try: resp = self.session.post(self.base_url, params=params, json=payload, timeout=10, verify=False) resp.raise_for_status() return resp.json() except Exception as e: print(f"[GraphQL请求异常] {e}") return None # ================== 团队成员验证 ================== def check_team_member(self, username): """通过 Cloudflare Workers API 检查用户名是否为团队成员""" try: print(f"[验证] 正在从云端获取成员列表...") resp = requests.get(MEMBERS_API_URL, timeout=5, verify=False) if resp.status_code != 200: print(f"[验证] 云端响应异常,状态码: {resp.status_code}") return False members = [line.strip() for line in resp.text.splitlines() if line.strip()] print(f"[验证] 获取到 {len(members)} 个成员,正在检查 '{username}'...") return username in members except Exception as e: print(f"[验证] 验证请求出错: {e}") return False # ================== 登录(含验证) ================== def login(self): username = self.username_entry.get().strip() password = self.password_entry.get().strip() if username == "请输入用户名": messagebox.showwarning("输入错误", "请输入正确的用户名和密码") return if not username or not password: messagebox.showwarning("输入错误", "请输入正确的用户名和密码") return # -------- 团队成员验证 -------- self.login_btn.config(state=tk.DISABLED, text="验证中...") if not self.check_team_member(username): messagebox.showwarning("权限不足", REJECT_MESSAGE) self.login_btn.config(state=tk.NORMAL, text="登录") return # ----------------------------- self.password = password self.login_btn.config(state=tk.DISABLED, text="登录中...") print("[登录] 开始登录流程...") captcha_payload = { "variables": {}, "query": "{\n CaptchaData {\n captcha_id\n captcha_base64_string\n need_verify\n __typename\n }\n}\n" } captcha_resp = self.graphql_request(captcha_payload, need_auth=False) if not captcha_resp or captcha_resp.get("errors"): messagebox.showerror("错误", "获取验证码失败") self.login_btn.config(state=tk.NORMAL, text="登录") return captcha_id = captcha_resp["data"]["CaptchaData"]["captcha_id"] print(f"[登录] 获取到 captcha_id: {captcha_id}") login_payload = { "operationName": "Login", "variables": { "app_key": APP_KEY, "account": username, "password": password, "captcha_id": captcha_id, "captcha_code": "", "google_code": "", "bank_card_real_name": None, "two_step_token": None }, "query": """mutation Login($app_key: String!, $account: String!, $password: String!, $captcha_id: String, $captcha_code: String, $google_code: String, $bank_card_real_name: String, $two_step_token: String) { info: Login( app_key: $app_key account: $account password: $password captcha_id: $captcha_id captcha_code: $captcha_code google_code: $google_code bank_card_real_name: $bank_card_real_name two_step_token: $two_step_token ) { token user_id user_info { id user_account user_name } } }""" } login_resp = self.graphql_request(login_payload, need_auth=False) if not login_resp or login_resp.get("errors"): msg = login_resp.get("errors", [{"message":"登录失败"}])[0]["message"] if login_resp else "无响应" messagebox.showerror("登录失败", msg) self.login_btn.config(state=tk.NORMAL, text="登录") return try: info = login_resp["data"]["info"] self.token = info["token"] self.username = username self.user_id = info["user_id"] self.login_btn.config(state=tk.DISABLED, text="已登录") self.start_btn.config(state=tk.NORMAL) self.refresh_balance_btn.config(state=tk.NORMAL) self.base_entry.config(state=tk.NORMAL) self.delay_entry.config(state=tk.NORMAL) self.username_entry.config(state=tk.DISABLED) self.password_entry.config(state=tk.DISABLED) self.refresh_balance() print("[登录] 登录成功,token已保存") except Exception as e: messagebox.showerror("登录失败", f"解析响应失败: {e}") self.login_btn.config(state=tk.NORMAL, text="登录") def refresh_balance(self): print("[余额] 刷新余额...") payload = { "operationName": "getUserBalance", "variables": {"product_id": "Enum1"}, "query": BALANCE_QUERY } resp = self.graphql_request(payload, need_auth=True) if resp and not resp.get("errors"): try: self.balance = resp["data"]["User"]["user_balance"] self.balance_label.config(text=f"余额:{self.balance:.2f}") print(f"[余额] 当前余额: {self.balance:.2f}") except: self.balance_label.config(text="余额:错误") print("[余额] 解析失败") else: self.balance_label.config(text="余额:失败") print("[余额] 请求失败") def get_yiqifa_records(self): try: url = QUERY_URL + "&_=" + str(int(time.time() * 1000)) print(f"[监控] 请求 yiqifa 数据: {url}") resp = requests.get(url, timeout=5, verify=False) resp.raise_for_status() data = resp.json() print(f"[监控] 获取到 {len(data)} 条记录") if len(data) < 2: print("[监控] 记录数不足2条,无法获取上一期") return None, None curr = data[0] prev = data[1] print(f"[监控] 上一期: {prev['period']} {prev['is_win']}, 当前期: {curr['period']} 票数={curr['tickets']} 票价={curr['price']}") return prev, curr except Exception as e: print(f"[监控] 获取 yiqifa 记录失败: {e}") return None, None def get_lottery_cycle(self): payload = { "operationName": "GetLotteryCycle", "variables": {"game_id": 370}, "query": "query GetLotteryCycle($game_id: Int!) {\n LotteryGame(game_id: $game_id) {\n lottery_cycle_now {\n now_cycle_id\n }\n }\n}" } resp = self.graphql_request(payload, need_auth=True) if not resp or resp.get("errors"): print("[彩票期号] 获取失败") return None try: cycle_id = resp["data"]["LotteryGame"]["lottery_cycle_now"]["now_cycle_id"] print(f"[彩票期号] 当前期号ID: {cycle_id}") return cycle_id except: print("[彩票期号] 解析失败") return None def place_bet(self, cycle_id, station_code, numbers, tickets, price): code_map = {"1万":1, "2千":2, "3百":3, "4十":4, "5个":5} pos = code_map.get(station_code, 0) if pos == 0: return False, f"无效编号:{station_code}" num_list = numbers.split() bet_info = [[], [], [], [], []] bet_info[pos-1] = num_list bet_info_str = json.dumps(bet_info, ensure_ascii=False).replace(" ", "") try: base = int(self.base_var.get()) except: base = 1 bet_multiple = int(tickets * base) bet_amount = price * base print(f"[投注] 倍数={bet_multiple}, 金额={bet_amount}, 位置={pos}, 号码={num_list}") if bet_multiple <= 0 or bet_amount <= 0: return False, f"票数或票价为0无法投注 (票数={tickets},票价={price})" payload = { "operationName": "AddLotteryOrders", "variables": { "input": [{ "game_id": 370, "game_type_id": 65, "game_cycle_id": cycle_id, "bet_info": bet_info_str, "bet_mode": "OneLi", "bet_multiple": bet_multiple, "bet_percent_type": "AdjustPercentType", "bet_percent": 0, "is_follow": False, "follow_commission_percent": None }] }, "query": "mutation AddLotteryOrders($input: [AddLotteryOrderInputObj]!) {\n AddLotteryOrders(orders: $input) {\n message\n order_ids\n __typename\n }\n}" } resp = self.graphql_request(payload, need_auth=True) if not resp: return False, "请求失败" if resp.get("errors"): return False, resp["errors"][0].get("message", "未知错误") try: msg = resp["data"]["AddLotteryOrders"]["message"] return True, "成功" except: return False, "解析失败" def add_bet_record(self, period, station_code, numbers, bet_multiple, bet_amount, pre_balance, result_text): values = ( datetime.now().strftime('%H:%M:%S'), period, station_code, numbers, bet_multiple, f"{bet_amount:.2f}", "--", "--", result_text ) row_id = self.tree.insert("", tk.END, values=values) self.bet_records[period] = { 'row_id': row_id, 'numbers': numbers, 'bet_amount': bet_amount, 'pre_balance': pre_balance } self.tree.yview_moveto(1.0) return row_id def update_bet_result(self, period, win, profit, cumulative): if period in self.bet_records: row_id = self.bet_records[period]['row_id'] self.tree.set(row_id, "盈亏", f"{profit:.2f}") self.tree.set(row_id, "累计", f"{cumulative:.2f}") self.tree.set(row_id, "结果", "√" if win else "×") tag = "win" if win else "lose" self.tree.tag_configure(tag, background='#d4edda' if win else '#f8d7da') self.tree.item(row_id, tags=(tag,)) print(f"[盈亏] 期号{period}, 结果={'√' if win else '×'}, 盈亏={profit:.2f}, 累计={cumulative:.2f}") def monitor_loop(self): print("[监控线程] 启动") while self.running: try: prev, curr = self.get_yiqifa_records() if prev and curr: curr_period = curr['period'] print(f"[监控] 当前周期 last_period={self.last_period}, 获取到周期={curr_period}") if curr_period != self.last_period: print(f"[监控] 检测到新期号: {curr_period}") if self.last_period and self.last_period in self.bet_records: win = (prev['is_win'] == '准点') print(f"[监控] 上一期 {self.last_period} 结果: {'中' if win else '挂'}") self.refresh_balance() current_balance = self.balance if self.balance is not None else 0 pre_balance = self.bet_records[self.last_period]['pre_balance'] bet_amount = self.bet_records[self.last_period]['bet_amount'] if win: profit = current_balance - pre_balance else: profit = -bet_amount self.cumulative_profit += profit self.update_bet_result(self.last_period, win, profit, self.cumulative_profit) try: delay_seconds = int(self.delay_var.get()) except: delay_seconds = 5 if delay_seconds > 0: print(f"[监控] 等待 {delay_seconds} 秒后投注...") time.sleep(delay_seconds) cycle_id = self.get_lottery_cycle() if cycle_id: print("[监控] 准备投注...") self.refresh_balance() pre_balance = self.balance station_code = curr['station_code'] numbers = curr['numbers'] tickets = curr['tickets'] price = curr['price'] if tickets <= 0 or price <= 0: print(f"[监控] 跳过投注:票数={tickets},票价={price} 无效") self.last_period = curr_period continue success, msg = self.place_bet(cycle_id, station_code, numbers, tickets, price) try: base = int(self.base_var.get()) except: base = 1 bet_multiple = int(tickets * base) bet_amount = price * base result_text = "成功" if success else f"失败:{msg}" self.add_bet_record(curr_period, station_code, numbers, bet_multiple, bet_amount, pre_balance, result_text) else: print("[监控] 获取彩票期号失败,本次不投注") self.add_bet_record(curr_period, curr['station_code'], curr['numbers'], 0, 0, 0, "获取期号失败") self.last_period = curr_period else: print("[监控] 期号无变化,等待下次轮询") else: print("[监控] 未获取到有效的 prev/curr 数据") except Exception as e: print(f"[监控] 循环异常: {e}") for _ in range(20): if not self.running: break time.sleep(0.1) print("[监控线程] 退出") def start_task(self): if self.running: return if not self.token: messagebox.showwarning("未登录", "请先登录") return _, curr = self.get_yiqifa_records() if curr: self.last_period = curr['period'] print(f"[开始] 设置 last_period = {self.last_period}") else: self.last_period = None print("[开始] 无法获取初始期号,last_period 为 None") self.running = True self.start_btn.config(state=tk.DISABLED) self.stop_btn.config(state=tk.NORMAL) self.base_entry.config(state=tk.DISABLED) self.delay_entry.config(state=tk.DISABLED) self.thread = threading.Thread(target=self.monitor_loop, daemon=True) self.thread.start() print("[开始] 监控已启动") def stop_task(self): self.running = False self.start_btn.config(state=tk.NORMAL) self.stop_btn.config(state=tk.DISABLED) self.base_entry.config(state=tk.NORMAL) self.delay_entry.config(state=tk.NORMAL) if self.thread: self.thread.join(timeout=1) print("[停止] 监控已停止") if __name__ == "__main__": root = tk.Tk() app = AutoBetApp(root) root.mainloop()