import socket import warnings from hashlib import md5 import requests from sqlalchemy import create_engine import pandas as pd import logging import time import sys import smtplib from email.header import Header from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication import getpass import os import uuid import redis import ctypes import platform import re import subprocess import winreg import random project_name = 'auto_trade_20230130' from hashlib import md5 import sys import datetime import pandas as pd import os from openpyxl import load_workbook import subprocess import time import requests import requests import subprocess import time # from alibabacloud_tea_openapi import models as open_api_models # from alibabacloud_credentials.client import Client as CredentialClient # from alibabacloud_credentials.models import Config as CredentialConfig # from alibabacloud_ocr_api20210707.client import Client as OcrClient # from alibabacloud_ocr_api20210707 import models as ocr_models # from alibabacloud_tea_util import models as util_models # from alibabacloud_tea_util.client import Client as StreamClient def connect_to_wifi(ssid='MaxEntropy', password='cskj12345678'): # 先检测当前网络是否正常 try: response = requests.get("https://www.baidu.com", timeout=5) if response.status_code == 200: print("当前网络连接正常,无需重连WiFi") return True except: print("当前网络不可用,将尝试连接WiFi...") # 生成 WiFi 配置文件(XML格式) wifi_config = f""" {ssid} {ssid} ESS auto WPA2PSK AES false passPhrase false {password} """ try: # 保存配置文件 with open("wifi_profile.xml", "w") as f: f.write(wifi_config.strip()) # 添加 WiFi 配置文件 subprocess.run( ["netsh", "wlan", "add", "profile", "filename=wifi_profile.xml"], check=True, shell=True, ) # 连接 WiFi subprocess.run( ["netsh", "wlan", "connect", "name=" + ssid], check=True, shell=True, ) # 等待连接成功 max_retries = 5 for i in range(max_retries): try: response = requests.get("https://www.baidu.com", timeout=5) if response.status_code == 200: print(f"成功连接到 WiFi: {ssid}") return True except: if i < max_retries - 1: print(f"等待网络连接...({i + 1}/{max_retries})") time.sleep(2) print("WiFi连接失败: 超过最大重试次数") return False except subprocess.CalledProcessError as e: print(f"WiFi配置失败: {e}") return False except Exception as e: print(f"发生未知错误: {e}") if os.path.exists("wifi_profile.xml"): os.remove("wifi_profile.xml") return False class ExcelDataWriter: """ Excel数据写入工具类(定制列名版) 列名顺序:as_of_date, id, account, proxy_ip, ip_location, exit_ip, ip_check_result, usage_result, ip_switch_time, transaction_duration, ip_survival_time, remarks, platform_risk_response """ def __init__(self, filename=r'C:\auto_trade_20230130\mysql_table\ip_tracking.xlsx',sheet_name = 'Sheet1'): """ 初始化Excel写入器 :param filename: Excel文件名(默认ip_tracking.xlsx) """ self.filename = filename self.sheet_name = sheet_name # 严格按照要求的列名和顺序 self.columns = [ 'as_of_date', 'broker', 'id', 'account', 'proxy_ip', 'proxy_port', 'ip_location', 'exit_ip', 'ip_cross_check_result', 'usage_result', 'ip_switch_time', 'transaction_duration', 'ip_survival_time', 'remarks', 'platform_risk_response' ] # 如果文件不存在,创建带指定列名的空Excel文件 if not os.path.exists(self.filename): pd.DataFrame(columns=self.columns).to_excel(self.filename, sheet_name=self.sheet_name,index=False) def write_data(self, data_dict): """追加单条数据到 Excel(自动补全缺失列)""" try: # 1. 补全缺失列 full_data = {col: data_dict.get(col) for col in self.columns} # 2. 读取现有数据 if os.path.exists(self.filename): existing_df = pd.read_excel(self.filename) # 验证现有列是否匹配 if list(existing_df.columns) != self.columns: raise ValueError("现有文件的列名不匹配,请使用新文件") else: existing_df = pd.DataFrame(columns=self.columns) # 3. 创建新数据行 new_row = pd.DataFrame([full_data])[self.columns] # 4. 合并数据 updated_df = pd.concat([existing_df, new_row], ignore_index=True) # 5. 写入文件 updated_df.to_excel(self.filename, index=False) print('数据一') return True except Exception as e: print(f"Excel 写入失败: {str(e)}") return False def update_latest_record(self, status='成功', remarks='', risk_response=''): """ 更新最新记录的状态信息 :param status: 使用结果(成功/失败) :param remarks: 备注信息 :param risk_response: 平台风险响应 :return: 成功返回True,失败返回False """ # 读取整个Excel文件 df = pd.read_excel(self.filename, sheet_name=self.sheet_name) if len(df) == 0: raise ValueError("Excel文件中没有记录可更新") # 获取最新记录索引 last_idx = df.index[-1] # 计算交易持续时间 if pd.notna(df.at[last_idx, 'as_of_date']): start_time = datetime.datetime.strptime(df.at[last_idx, 'as_of_date'], "%Y-%m-%d %H:%M:%S.%f").timestamp() transaction_duration = round(time.time() - start_time, 2) else: transaction_duration = 0 # 更新字段 df['usage_result'] = df['usage_result'].astype(str) df['remarks'] = df['remarks'].astype(str) df['platform_risk_response'] = df['platform_risk_response'].astype(str) df.at[last_idx, 'usage_result'] = status df.at[last_idx, 'transaction_duration'] = transaction_duration df.at[last_idx, 'remarks'] = remarks df.at[last_idx, 'platform_risk_response'] = risk_response # 保存回文件 df.to_excel(self.filename, sheet_name=self.sheet_name, index=False) def get_unavailable_ips(self, broker, account): """目标:返回指定券商(brokerhao)下,指定账户(account)不能使用过的代理 IP 列表""" # 读取整个Excel文件 df = pd.read_excel(self.filename, sheet_name=self.sheet_name) # 如果数据为空,返回空列表 if len(df) == 0: return [] # 获取该券商下所有已使用的IP broker_used_ips = df[df['broker'] == broker]['proxy_ip'].tolist() # 获取该券商下该账户已使用的IP account_used_ips = df[(df['broker'] == broker) & (df['account'] == account)]['proxy_ip'].tolist() # 计算不能使用的IP = 该券商所有已用IP - 该账户已使用的IP unused_ips = list(set(broker_used_ips) - set(account_used_ips)) return unused_ips # TempStdoutArea对象为为重定向标准输出准备的空间,以列表形式存在 class TempStdoutArea: def __init__(self): self.buffer = [] def write(self, *args, **kwargs): self.buffer.append(args) # 实现获取指定控件坐标的对象 class GetControlCoord: # app为 pywinauto框架实例化的Application对象 def __init__(self, app): self.app = app # 获取指定控件坐标的方法,control参数传入指定控件 def get_coord(self, control): # 重定向标准输出,将原本的标准输出信息写入自定义的空间内,以获取控件信息 stdout = sys.stdout sys.stdout = TempStdoutArea() # pywinauto框架提供的print_control_identifiers()方法,可以打印出应用程序的控件信息,其中就包含控件坐标 self.app.print_control_identifiers() # 将控件信息转移到control_identifiers,并还原标准输出 control_identifiers, sys.stdout = sys.stdout, stdout # print(control_identifiers.buffer) all_coord = [] # 遍历控件信息,获取指定控件的坐标 for a_control_identifier in control_identifiers.buffer: # print(a_control_identifier) # print(a_control_identifier[0]) a_control_identifier = a_control_identifier[0] if a_control_identifier.find(control) > -1: # print(a_control_identifier) a_coord = [] str_coord = a_control_identifier[ a_control_identifier.find('(') + 1: a_control_identifier.find(')')].split(',') # print(str_coord) for i in str_coord: num = i.strip()[1:] a_coord.append(num) all_coord.append(a_coord) return all_coord def send_email(subject: str, message: str, receive_users: list = ['1055017575@qq.com', '2293908092@qq.com'], file_path: str = 'no_path'): ''' Parameters ---------- subject : str title. message : str message. receive_users : list, optional receive. The default is ['1055017575@qq.com']. file_path : str, optional path. The default is 'no_path'. Returns ------- None. ''' send_user = 'it_ainvest@zohomail.cn' # zoho send_pwd = '2022@AIWANTE' # 密码 receive_user = receive_users msg = MIMEMultipart() msg['subject'] = subject msg['from'] = send_user msg['to'] = ','.join(receive_user) msg.attach(MIMEText(message, 'html', 'utf-8')) if file_path != 'no_path': part = MIMEApplication(open(file_path, 'rb').read()) file_name = file_path.split('/')[-1] part.add_header('Content-Disposition', 'attachment', filename=file_name) msg.attach(part) else: pass server = smtplib.SMTP_SSL("smtp.zoho.com.cn", 465) server.login(send_user, send_pwd) server.sendmail(send_user, receive_user, msg.as_string()) server.quit() print('邮件发送成功') def download_data_from_db(sql, schema_name): """ create connection with DB 阿里云数据库连接 :param sql: 查询语句 :param schema_name: 数据库名称 :return: 查询结果数据 """ cnx = create_engine( 'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.' 'mysql.rds.aliyuncs.com:3306/%s?charset=utf8' % schema_name, encoding="utf-8", echo=False) df = pd.read_sql(sql, con=cnx) return df # 转换股票代码 def tranTicker(tick): tick = str(tick) if len(tick) == 8: tick = tick.strip('SH').strip('SZ') if tick.startswith('6') or tick.startswith('11') or tick.startswith('10') or tick.startswith('51'): tick = 'SH' + tick elif tick.startswith('0') or tick.startswith('3') or tick.startswith('15') or tick.startswith( '16') or tick.startswith( '12'): tick = 'SZ' + tick else: pass elif len(tick) == 6: if tick.startswith('6') or tick.startswith('11') or tick.startswith('10') or tick.startswith('51'): tick = 'SH' + tick elif tick.startswith('0') or tick.startswith('15') or tick.startswith('12') or tick.startswith( '16') or tick.startswith('3'): tick = 'SZ' + tick else: pass else: num = 6 - len(tick) tick = 'SZ' + num * '0' + tick return tick def ocr_image(image_path): """ 传入图片路径,返回识别到的文字内容 """ access_key_id = 'LTAI5tA4dy591hCgEBfsxZei' access_key_secret = 'edYHXSVYda8HomlEsnsyQzw1QGu1EN' if not os.path.exists(image_path): raise FileNotFoundError(f"图片文件不存在: {image_path}") # 第一步:用新版 credential 初始化 credential_config = CredentialConfig( type='access_key', access_key_id=access_key_id, access_key_secret=access_key_secret ) credential = CredentialClient(credential_config) # 第二步:openapi config config = open_api_models.Config( credential=credential, endpoint='ocr-api.cn-hangzhou.aliyuncs.com' ) # 第三步:初始化 OCR client client = OcrClient(config) # 第四步:准备请求参数 body_stream = StreamClient.read_from_file_path(image_path) request = ocr_models.RecognizeAllTextRequest( body=body_stream, type='Advanced' ) runtime = util_models.RuntimeOptions() # 第五步:发起请求 response = client.recognize_all_text_with_options(request, runtime) return response.body.data.content def get_price(tick, level='1', op="buy"): level = int(level) str_ticker = tick.lower() rep_data = requests.get("http://qt.gtimg.cn/q=" + str_ticker).text stocks_detail = "".join(rep_data) stock_detail = stocks_detail.split(";")[0] if len(stock_detail) < 49: return stock = stock_detail.split("~") buy1 = float(stock[9]) buy2 = float(stock[11]) buy3 = float(stock[13]) buy4 = float(stock[15]) buy5 = float(stock[17]) sell1 = float(stock[19]) sell2 = float(stock[21]) sell3 = float(stock[23]) sell4 = float(stock[25]) sell5 = float(stock[27]) if op == "buy" and level == 1: return buy1 elif op == "buy" and level == 2: return buy2 elif op == "buy" and level == 3: return buy3 elif op == "buy" and level == 4: return buy4 elif op == "buy" and level == 5: return buy5 elif op == "sell" and level == 1: return sell1 elif op == "sell" and level == 2: return sell2 elif op == "sell" and level == 3: return sell3 elif op == "sell" and level == 4: return sell4 elif op == "sell" and level == 5: return sell5 else: return float(stock[3]) def get_host_ip(): """ 查询本机ip地址 :return: """ s = uuid.UUID(int=uuid.getnode()).hex[-12:] return s def tryrun(func): def wrapper(*args, **kwargs): while True: countnum = 0 try: res = func(*args, **kwargs) except RuntimeError: countnum += 1 time.sleep(1) if countnum > 120: break return res return wrapper def tryrun2(func): def wrapper(*args, **kwargs): while True: countnum = 0 try: res = func(*args, **kwargs) except RuntimeError: countnum += 1 time.sleep(10) if countnum > 90: break return res return wrapper def output(func): def wrapper(*args, **kwargs): res = func(*args, **kwargs) print(args[1]) return res return wrapper class Logger(): """ This is get log class """ def __init__(self, path, name, Flevel=logging.DEBUG): self.time = time.strftime("%Y-%m-%d") self.fileName = path + '/' + name + ".log" self.logger = logging.getLogger(self.fileName) self.logger.setLevel(Flevel) if not self.logger.handlers: fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') fh = logging.FileHandler(self.fileName, encoding='utf-8') fh.setFormatter(fmt) fh.setLevel(Flevel) self.logger.addHandler(fh) @output def debug(self, message): """ This is debug """ self.logger.debug(message) @output def info(self, message): """ This is info """ self.logger.info(message) @output def warn(self, message): """ This is warn """ self.logger.warn(message) @output def error(self, message): """ This is error """ self.logger.error(message) @output def critical(self, message): """ This is critical """ self.logger.critical(message) def softInfo(self, message): """ This is output """ self.logger.info(message) def init_redis(ip, port1, db1, psw=''): ''' redis初始化(待修改为密码版本) Parameters ---------- ip : TYPE DESCRIPTION. port1 : TYPE DESCRIPTION. db1 : TYPE DESCRIPTION. psw : TYPE, optional DESCRIPTION. The default is ''. Returns ------- TYPE DESCRIPTION. ''' global r time.sleep(5) # r = redis.Redis(host=ip, port=port1, password=psw) pool = redis.ConnectionPool(host=ip, port=port1, password=psw) r = redis.Redis(connection_pool=pool) return r.ping() def redis_lget(key,num): ''' redis获取指定key的值 Parameters ---------- key : TYPE DESCRIPTION. num : TYPE DESCRIPTION. Returns ------- vec : TYPE DESCRIPTION. ''' print('1') init_redis('82.156.3.152', 6379, 0, '1508181008') print('2') vec = r.lrange(key,0,num-1) print('3') if len(vec)>0: print(vec) return vec class SetMac(object): """ 修改 本地连接 mac地址 """ def __init__(self): # regex to MAC address like 00-00-00-00-00-00 or 00:00:00:00:00:00 or # 000000000000 self.MAC_ADDRESS_RE = re.compile(r""" ([0-9A-F]{1,2})[:-]? ([0-9A-F]{1,2})[:-]? ([0-9A-F]{1,2})[:-]? ([0-9A-F]{1,2})[:-]? ([0-9A-F]{1,2})[:-]? ([0-9A-F]{1,2}) """, re.I | re.VERBOSE) # re.I: case-insensitive matching. re.VERBOSE: just look nicer. self.WIN_REGISTRY_PATH = "SYSTEM\CurrentControlSet\Control\Class\{4D36E972-E325-11CE-BFC1-08002BE10318}" def is_admin(self): """ is user an admin? :return: """ if ctypes.windll.shell32.IsUserAnAdmin() == 0: print('Sorry! You should run this with administrative privileges if you want to change your MAC address.') sys.exit() else: print('admin') def get_macinfos(self): """ 查看所有mac信息 :return: """ print('=' * 50) mac_info = subprocess.check_output('GETMAC /v /FO list', stderr=subprocess.STDOUT) mac_info = mac_info.decode('gbk') print('Your MAC address:\n', mac_info) # 想要匹配的连接名 target_connection_name = "WLAN 2" # 构建正则表达式模式 pattern = re.compile( r"连接名:\s+" + re.escape(target_connection_name) + r".*?物理地址:\s+([0-9A-Fa-f-]+)", re.DOTALL ) # 执行搜索 match = pattern.search(mac_info).group(1) return match def get_target_device(self): """ 返回 本地连接 网络适配器 :return: """ mac_info = subprocess.check_output('GETMAC /v /FO list', stderr=subprocess.STDOUT) mac_info = mac_info.decode('gbk') print(mac_info) search = re.search(r'(WLAN 2)\s+网络适配器: (.+)\s+物理地址:', mac_info) print(search) target_name, target_device = (search.group(1), search.group(2).strip()) if search else ('', '') if not all([target_name, target_device]): print('Cannot find the target device') sys.exit() print(target_name, target_device) return target_device def get_network_adapter_info(self): """ 获取网络适配器信息 re: {'16': 'Realtek RTL8821CE 802.11ac PCIe Adapter #2'} """ try: # 执行 wmic 命令并获取输出 cmd = "wmic path win32_networkadapter get index, name" output = subprocess.check_output(cmd, universal_newlines=True) # 按换行符分割输出内容 lines = output.strip().split('\n') # 去除每行前后的空白字符 lines = [line.strip() for line in lines] # 过滤掉空行 non_empty_lines = [line for line in lines if line] result = {} # 从第二行开始遍历 for line in non_empty_lines[1:]: # 查找第一个空白字符的位置 first_space_index = line.find(' ') # 提取 index index = line[:first_space_index].strip() # 提取 name name = line[first_space_index:].strip() result[index] = name return result except subprocess.CalledProcessError as e: print(f"命令执行失败,错误代码: {e.returncode}") print(e.output) except Exception as e: print(f"发生异常: {e}") return [] def set_mac_address(self, target_device, new_mac): """ 设置新mac地址 :param target_device: 本地连接 网络适配器 :param new_mac: 新mac地址 :return: """ if not self.MAC_ADDRESS_RE.match(new_mac): print('Please input a correct MAC address') return # Locate adapter's registry and update network address (mac) reg_hdl = winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) key = winreg.OpenKey(reg_hdl, self.WIN_REGISTRY_PATH) info = winreg.QueryInfoKey(key) # Find adapter key based on sub keys adapter_key = None adapter_path = None target_index = -1 adapter_dict = self.get_network_adapter_info() for index in range(info[0]): subkey = winreg.EnumKey(key, index) path = self.WIN_REGISTRY_PATH + "\\" + subkey if subkey == 'Properties': break # Check for adapter match for appropriate interface new_key = winreg.OpenKey(reg_hdl, path) try: adapterIndex = str(int(subkey)) adapterName = adapter_dict.get(adapterIndex) adapterDesc = winreg.QueryValueEx(new_key, "DriverDesc") if adapterName == target_device: adapter_path = path target_index = adapterIndex break else: winreg.CloseKey(new_key) except (WindowsError) as err: if err.errno == 2: # register value not found, ok to ignore pass else: raise err if adapter_path is None: print('Device not found.') winreg.CloseKey(key) winreg.CloseKey(reg_hdl) return # Registry path found update mac addr adapter_key = winreg.OpenKey(reg_hdl, adapter_path, 0, winreg.KEY_WRITE) # winreg.SetValueEx(adapter_key, "NetworkAddress", 0, winreg.REG_SZ, -----new_mac) winreg.SetValueEx(adapter_key, "NetworkAddress", 0, winreg.REG_SZ, new_mac) winreg.CloseKey(adapter_key) winreg.CloseKey(key) winreg.CloseKey(reg_hdl) # Adapter must be restarted in order for change to take affect # print 'Now you should restart your netsh' self.restart_adapter(target_index, target_device) def restart_adapter(self, target_index, target_device): """ Disables and then re-enables device interface """ if platform.release() == 'XP': # description, adapter_name, address, current_address = find_interface(device) cmd = "devcon hwids =net" try: result = subprocess.check_output(cmd, stderr=subprocess.STDOUT) except FileNotFoundError: raise query = '(' + target_device + '\r\n\s*.*:\r\n\s*)PCI\\\\(([A-Z]|[0-9]|_|&)*)' query = query.encode('ascii') match = re.search(query, result) cmd = 'devcon restart "PCI\\' + str(match.group(2).decode('ascii')) + '"' subprocess.check_output(cmd, stderr=subprocess.STDOUT) else: cmd = "wmic path win32_networkadapter where index=" + str(target_index) + " call disable" subprocess.check_output(cmd) cmd = "wmic path win32_networkadapter where index=" + str(target_index) + " call enable" subprocess.check_output(cmd) def run(self): self.is_admin() mac = self.get_macinfos() warnings.filterwarnings('ignore') scripts_path = os.path.dirname(os.path.realpath(__file__)) root_path = scripts_path[:scripts_path.find(project_name) + len(project_name)] log = Logger(f'{root_path}/logs', '18242094506') log.warn('MAC 重置之前:'+mac) target_device = self.get_target_device() # self.set_mac_address(target_device, '82E82CF2158F') # self.set_mac_address(target_device, '28CDC4CBFDCF') self.set_mac_address(target_device, self.randomMAC()) mac_new = self.get_macinfos() warnings.filterwarnings('ignore') scripts_path = os.path.dirname(os.path.realpath(__file__)) root_path = scripts_path[:scripts_path.find(project_name) + len(project_name)] log = Logger(f'{root_path}/logs', '18242094506') log.warn('MAC 重置之后:'+mac_new) # 28-CD-C4-CB-FD-CF def randomMAC(self): # mac = [ 0x52, 0x54, 0x00, # random.randint(0x00, 0x7f), # random.randint(0x00, 0xff), # random.randint(0x00, 0xff) ] # # while True: # one_mac = ''.join(map(lambda x: "%02x" % x, mac)).upper() # used_mac = redis_lget('Mac', -1) # used_mac = [i.decode() for i in used_mac] # # if one_mac not in used_mac: # r.rpush('Mac',one_mac) # break while True: first_byte = 0x02 # 从 UUIDv4 获取后 5 个字节,随机性极高 random_bytes = uuid.uuid4().bytes mac_bytes = [first_byte] + list(random_bytes[1:6]) one_mac = ''.join(f'{b:02X}' for b in mac_bytes) used_mac = redis_lget('Mac', -1) used_mac = [i.decode() for i in used_mac] if one_mac not in used_mac: r.rpush('Mac', one_mac) break print('1111111', one_mac) return one_mac def get_host_ip(): """ 查询本机ip地址 :return: """ s = uuid.UUID(int=uuid.getnode()).hex[-12:] return s def get_auth_type(url): time1 = int(time.time() * 1000) url = f'http://192.168.100.1/login.cgi?_={time1}' response = requests.get(url) if 'WWW-Authenticate' in response.headers: return response.headers['WWW-Authenticate'] return None def parse_auth_params(auth_header): params = {} for param in auth_header.split(','): key, value = param.strip().split('=', 1) params[key] = value.strip('"') return params def hex_md5(data): return md5(data.encode()).hexdigest() def generate_cnonce(): rand = random.randint(0, 100000) date = int(time.time() * 1000) salt = f"{rand}{date}" tmp = hex_md5(salt) return tmp[:16] def compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc): ha1 = hex_md5(f"{username}:{realm}:{password}") ha2 = hex_md5(f"GET:{uri}") response = hex_md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}") return response def do_login(username, password, base_url): url = f"{base_url}/login.cgi" auth_header = get_auth_type(url) if not auth_header: print("Failed to get authentication parameters") return False auth_params = parse_auth_params(auth_header) realm = auth_params.get('Digest realm') nonce = auth_params.get('nonce') qop = auth_params.get('qop') if not all([realm, nonce, qop]): print("Missing required authentication parameters") return False cnonce = generate_cnonce() nc = '00000001' uri = '/cgi/protected.cgi' response = compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc) headers = { 'Authorization': f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="{uri}", response="{response}", qop={qop}, nc={nc}, cnonce="{cnonce}"' } login_url = f"{base_url}/login.cgi?Action=Digest&username={username}&realm={realm}&nonce={nonce}&response={response}&qop={qop}&cnonce={cnonce}&nc={nc}&temp=asr" response = requests.get(login_url, headers=headers) uri = "/cgi/xml_action.cgi" response = compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc) headers = { 'Authorization': f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="{uri}", response="{response}", qop={qop}, nc={nc}, cnonce="{cnonce}"' } reset_url = 'http://192.168.100.1/xml_action.cgi?method=get&module=duster&file=reset' response = requests.get(reset_url, headers=headers) print(response.content) if response.status_code == 200: print("Login successful") return True else: print(f"Login failed: {response.status_code} {response.reason}") return False def get_ip(): """ 查询本机ip地址 :return: """ try: requests.get('https://checkip.amazonaws.com').text.strip() except: return '60.162.69.53' def reset_proxy_to_default(): """恢复系统代理为默认(关闭)""" key = winreg.OpenKey( winreg.HKEY_CURRENT_USER, r"Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings", 0, winreg.KEY_WRITE ) try: winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, 0) winreg.SetValueEx(key, "ProxyServer", 0, winreg.REG_SZ, "") winreg.SetValueEx(key, "ProxyOverride", 0, winreg.REG_SZ, "") finally: winreg.CloseKey(key) internet_set_option = ctypes.windll.Wininet.InternetSetOptionW internet_set_option(0, 37, 0, 0) internet_set_option(0, 39, 0, 0) def reset_wlan(): warnings.filterwarnings('ignore') scripts_path = os.path.dirname(os.path.realpath(__file__)) root_path = scripts_path[:scripts_path.find(project_name) + len(project_name)] log = Logger(f'{root_path}/logs', '18242094506') log.warn('===================') log.warn('ip 重置前:'+get_ip()) count = 0 while True: base_url = "http://192.168.100.1" username = "admin" password = "admin" do_login(username, password, base_url) time.sleep(1) t1 = time.time() while True: try: requests.get('https://www.baidu.com') print('网络重连成功,共耗时',time.time()-t1,'s') time.sleep(3) print('变更后IP',get_ip()) warnings.filterwarnings('ignore') scripts_path = os.path.dirname(os.path.realpath(__file__)) root_path = scripts_path[:scripts_path.find(project_name) + len(project_name)] log = Logger(f'{root_path}/logs', '18242094506') log.warn('ip 重置后:'+ get_ip()) break except: pass one_ip = get_ip() used_IP = redis_lget('IP', -1) used_IP = [i.decode() for i in used_IP] print('!!!!!!one_ip:', one_ip) print('!!!!!!used_IP:',used_IP) if one_ip not in used_IP: r.rpush('IP',one_ip) print('!!!!入库了 成功rpush') break else: #IP 重复 记录log warnings.filterwarnings('ignore') scripts_path = os.path.dirname(os.path.realpath(__file__)) root_path = scripts_path[:scripts_path.find(project_name) + len(project_name)] log = Logger(f'{root_path}/logs', '18242094506') log.warn('IP not reset') if count ==3: send_email('重启IP出现问题', one_ip) exit() if __name__ == '__main__': print(get_host_ip())