zhitou_trade/tools.py

1062 lines
33 KiB
Python
Raw Normal View History

2025-05-22 16:47:45 +08:00
import socket
import warnings
from hashlib import md5
import requests
from sqlalchemy import create_engine
import pandas as pd
import logging
import time
import sys
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import getpass
import os
import uuid
import redis
import ctypes
import platform
import re
import subprocess
import winreg
import random
2025-05-26 09:19:39 +08:00
project_name = 'zhitou_trade'
2025-05-22 16:47:45 +08:00
from hashlib import md5
import sys
import datetime
import pandas as pd
import os
from openpyxl import load_workbook
import subprocess
import time
import requests
import requests
import subprocess
import time
# from alibabacloud_tea_openapi import models as open_api_models
# from alibabacloud_credentials.client import Client as CredentialClient
# from alibabacloud_credentials.models import Config as CredentialConfig
# from alibabacloud_ocr_api20210707.client import Client as OcrClient
# from alibabacloud_ocr_api20210707 import models as ocr_models
# from alibabacloud_tea_util import models as util_models
# from alibabacloud_tea_util.client import Client as StreamClient
2025-05-26 09:19:39 +08:00
from pathlib import Path
root_path = Path(__file__).parent
2025-05-22 16:47:45 +08:00
def connect_to_wifi(ssid='MaxEntropy', password='cskj12345678'):
# 先检测当前网络是否正常
try:
response = requests.get("https://www.baidu.com", timeout=5)
if response.status_code == 200:
print("当前网络连接正常无需重连WiFi")
return True
except:
print("当前网络不可用将尝试连接WiFi...")
# 生成 WiFi 配置文件XML格式
wifi_config = f"""<?xml version="1.0"?>
<WLANProfile xmlns="http://www.microsoft.com/networking/WLAN/profile/v1">
<name>{ssid}</name>
<SSIDConfig>
<SSID>
<name>{ssid}</name>
</SSID>
</SSIDConfig>
<connectionType>ESS</connectionType>
<connectionMode>auto</connectionMode>
<MSM>
<security>
<authEncryption>
<authentication>WPA2PSK</authentication>
<encryption>AES</encryption>
<useOneX>false</useOneX>
</authEncryption>
<sharedKey>
<keyType>passPhrase</keyType>
<protected>false</protected>
<keyMaterial>{password}</keyMaterial>
</sharedKey>
</security>
</MSM>
</WLANProfile>"""
try:
# 保存配置文件
with open("wifi_profile.xml", "w") as f:
f.write(wifi_config.strip())
# 添加 WiFi 配置文件
subprocess.run(
["netsh", "wlan", "add", "profile", "filename=wifi_profile.xml"],
check=True,
shell=True,
)
# 连接 WiFi
subprocess.run(
["netsh", "wlan", "connect", "name=" + ssid],
check=True,
shell=True,
)
# 等待连接成功
max_retries = 5
for i in range(max_retries):
try:
response = requests.get("https://www.baidu.com", timeout=5)
if response.status_code == 200:
print(f"成功连接到 WiFi: {ssid}")
return True
except:
if i < max_retries - 1:
print(f"等待网络连接...({i + 1}/{max_retries})")
time.sleep(2)
print("WiFi连接失败: 超过最大重试次数")
return False
except subprocess.CalledProcessError as e:
print(f"WiFi配置失败: {e}")
return False
except Exception as e:
print(f"发生未知错误: {e}")
if os.path.exists("wifi_profile.xml"):
os.remove("wifi_profile.xml")
return False
class ExcelDataWriter:
"""
Excel数据写入工具类定制列名版
列名顺序as_of_date, id, account, proxy_ip, ip_location, exit_ip,
ip_check_result, usage_result, ip_switch_time,
transaction_duration, ip_survival_time, remarks,
platform_risk_response
"""
2025-05-26 09:19:39 +08:00
def __init__(self, filename=root_path.joinpath('mysql_table').joinpath('ip_tracking.xlsx'),sheet_name = 'Sheet1'):
2025-05-22 16:47:45 +08:00
"""
初始化Excel写入器
:param filename: Excel文件名默认ip_tracking.xlsx
"""
self.filename = filename
self.sheet_name = sheet_name
# 严格按照要求的列名和顺序
self.columns = [
'as_of_date',
'broker',
'id',
'account',
'proxy_ip',
'proxy_port',
'ip_location',
'exit_ip',
'ip_cross_check_result',
'usage_result',
'ip_switch_time',
'transaction_duration',
'ip_survival_time',
'remarks',
'platform_risk_response'
]
# 如果文件不存在创建带指定列名的空Excel文件
if not os.path.exists(self.filename):
pd.DataFrame(columns=self.columns).to_excel(self.filename, sheet_name=self.sheet_name,index=False)
def write_data(self, data_dict):
"""追加单条数据到 Excel自动补全缺失列"""
try:
# 1. 补全缺失列
full_data = {col: data_dict.get(col) for col in self.columns}
# 2. 读取现有数据
if os.path.exists(self.filename):
existing_df = pd.read_excel(self.filename)
# 验证现有列是否匹配
if list(existing_df.columns) != self.columns:
raise ValueError("现有文件的列名不匹配,请使用新文件")
else:
existing_df = pd.DataFrame(columns=self.columns)
# 3. 创建新数据行
new_row = pd.DataFrame([full_data])[self.columns]
# 4. 合并数据
updated_df = pd.concat([existing_df, new_row], ignore_index=True)
# 5. 写入文件
updated_df.to_excel(self.filename, index=False)
print('数据一')
return True
except Exception as e:
print(f"Excel 写入失败: {str(e)}")
return False
def update_latest_record(self, status='成功', remarks='', risk_response=''):
"""
更新最新记录的状态信息
:param status: 使用结果成功/失败
:param remarks: 备注信息
:param risk_response: 平台风险响应
:return: 成功返回True失败返回False
"""
# 读取整个Excel文件
df = pd.read_excel(self.filename, sheet_name=self.sheet_name)
if len(df) == 0:
raise ValueError("Excel文件中没有记录可更新")
# 获取最新记录索引
last_idx = df.index[-1]
# 计算交易持续时间
if pd.notna(df.at[last_idx, 'as_of_date']):
start_time = datetime.datetime.strptime(df.at[last_idx, 'as_of_date'], "%Y-%m-%d %H:%M:%S.%f").timestamp()
transaction_duration = round(time.time() - start_time, 2)
else:
transaction_duration = 0
# 更新字段
df['usage_result'] = df['usage_result'].astype(str)
df['remarks'] = df['remarks'].astype(str)
df['platform_risk_response'] = df['platform_risk_response'].astype(str)
df.at[last_idx, 'usage_result'] = status
df.at[last_idx, 'transaction_duration'] = transaction_duration
df.at[last_idx, 'remarks'] = remarks
df.at[last_idx, 'platform_risk_response'] = risk_response
# 保存回文件
df.to_excel(self.filename, sheet_name=self.sheet_name, index=False)
def get_unavailable_ips(self, broker, account):
"""目标返回指定券商brokerhao指定账户account不能使用过的代理 IP 列表"""
# 读取整个Excel文件
df = pd.read_excel(self.filename, sheet_name=self.sheet_name)
# 如果数据为空,返回空列表
if len(df) == 0:
return []
# 获取该券商下所有已使用的IP
broker_used_ips = df[df['broker'] == broker]['proxy_ip'].tolist()
# 获取该券商下该账户已使用的IP
account_used_ips = df[(df['broker'] == broker) & (df['account'] == account)]['proxy_ip'].tolist()
# 计算不能使用的IP = 该券商所有已用IP - 该账户已使用的IP
unused_ips = list(set(broker_used_ips) - set(account_used_ips))
return unused_ips
# TempStdoutArea对象为为重定向标准输出准备的空间以列表形式存在
class TempStdoutArea:
def __init__(self):
self.buffer = []
def write(self, *args, **kwargs):
self.buffer.append(args)
# 实现获取指定控件坐标的对象
class GetControlCoord:
# app为 pywinauto框架实例化的Application对象
def __init__(self, app):
self.app = app
# 获取指定控件坐标的方法control参数传入指定控件
def get_coord(self, control):
# 重定向标准输出,将原本的标准输出信息写入自定义的空间内,以获取控件信息
stdout = sys.stdout
sys.stdout = TempStdoutArea()
# pywinauto框架提供的print_control_identifiers()方法,可以打印出应用程序的控件信息,其中就包含控件坐标
self.app.print_control_identifiers()
# 将控件信息转移到control_identifiers并还原标准输出
control_identifiers, sys.stdout = sys.stdout, stdout
# print(control_identifiers.buffer)
all_coord = []
# 遍历控件信息,获取指定控件的坐标
for a_control_identifier in control_identifiers.buffer:
# print(a_control_identifier)
# print(a_control_identifier[0])
a_control_identifier = a_control_identifier[0]
if a_control_identifier.find(control) > -1:
# print(a_control_identifier)
a_coord = []
str_coord = a_control_identifier[
a_control_identifier.find('(') + 1: a_control_identifier.find(')')].split(',')
# print(str_coord)
for i in str_coord:
num = i.strip()[1:]
a_coord.append(num)
all_coord.append(a_coord)
return all_coord
def send_email(subject: str, message: str, receive_users: list = ['1055017575@qq.com', '2293908092@qq.com'],
file_path: str = 'no_path'):
'''
Parameters
----------
subject : str
title.
message : str
message.
receive_users : list, optional
receive. The default is ['1055017575@qq.com'].
file_path : str, optional
path. The default is 'no_path'.
Returns
-------
None.
'''
send_user = 'it_ainvest@zohomail.cn' # zoho
send_pwd = '2022@AIWANTE' # 密码
receive_user = receive_users
msg = MIMEMultipart()
msg['subject'] = subject
msg['from'] = send_user
msg['to'] = ','.join(receive_user)
msg.attach(MIMEText(message, 'html', 'utf-8'))
if file_path != 'no_path':
part = MIMEApplication(open(file_path, 'rb').read())
file_name = file_path.split('/')[-1]
part.add_header('Content-Disposition', 'attachment', filename=file_name)
msg.attach(part)
else:
pass
server = smtplib.SMTP_SSL("smtp.zoho.com.cn", 465)
server.login(send_user, send_pwd)
server.sendmail(send_user, receive_user, msg.as_string())
server.quit()
print('邮件发送成功')
def download_data_from_db(sql, schema_name):
"""
create connection with DB
阿里云数据库连接
:param sql: 查询语句
:param schema_name: 数据库名称
:return: 查询结果数据
"""
cnx = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.'
'mysql.rds.aliyuncs.com:3306/%s?charset=utf8' %
schema_name, encoding="utf-8", echo=False)
df = pd.read_sql(sql, con=cnx)
return df
# 转换股票代码
def tranTicker(tick):
tick = str(tick)
if len(tick) == 8:
tick = tick.strip('SH').strip('SZ')
if tick.startswith('6') or tick.startswith('11') or tick.startswith('10') or tick.startswith('51'):
tick = 'SH' + tick
elif tick.startswith('0') or tick.startswith('3') or tick.startswith('15') or tick.startswith(
'16') or tick.startswith(
'12'):
tick = 'SZ' + tick
else:
pass
elif len(tick) == 6:
if tick.startswith('6') or tick.startswith('11') or tick.startswith('10') or tick.startswith('51'):
tick = 'SH' + tick
elif tick.startswith('0') or tick.startswith('15') or tick.startswith('12') or tick.startswith(
'16') or tick.startswith('3'):
tick = 'SZ' + tick
else:
pass
else:
num = 6 - len(tick)
tick = 'SZ' + num * '0' + tick
return tick
def ocr_image(image_path):
"""
传入图片路径返回识别到的文字内容
"""
access_key_id = 'LTAI5tA4dy591hCgEBfsxZei'
access_key_secret = 'edYHXSVYda8HomlEsnsyQzw1QGu1EN'
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片文件不存在: {image_path}")
# 第一步:用新版 credential 初始化
credential_config = CredentialConfig(
type='access_key',
access_key_id=access_key_id,
access_key_secret=access_key_secret
)
credential = CredentialClient(credential_config)
# 第二步openapi config
config = open_api_models.Config(
credential=credential,
endpoint='ocr-api.cn-hangzhou.aliyuncs.com'
)
# 第三步:初始化 OCR client
client = OcrClient(config)
# 第四步:准备请求参数
body_stream = StreamClient.read_from_file_path(image_path)
request = ocr_models.RecognizeAllTextRequest(
body=body_stream,
type='Advanced'
)
runtime = util_models.RuntimeOptions()
# 第五步:发起请求
response = client.recognize_all_text_with_options(request, runtime)
return response.body.data.content
def get_price(tick, level='1', op="buy"):
level = int(level)
str_ticker = tick.lower()
rep_data = requests.get("http://qt.gtimg.cn/q=" + str_ticker).text
stocks_detail = "".join(rep_data)
stock_detail = stocks_detail.split(";")[0]
if len(stock_detail) < 49:
return
stock = stock_detail.split("~")
buy1 = float(stock[9])
buy2 = float(stock[11])
buy3 = float(stock[13])
buy4 = float(stock[15])
buy5 = float(stock[17])
sell1 = float(stock[19])
sell2 = float(stock[21])
sell3 = float(stock[23])
sell4 = float(stock[25])
sell5 = float(stock[27])
if op == "buy" and level == 1:
return buy1
elif op == "buy" and level == 2:
return buy2
elif op == "buy" and level == 3:
return buy3
elif op == "buy" and level == 4:
return buy4
elif op == "buy" and level == 5:
return buy5
elif op == "sell" and level == 1:
return sell1
elif op == "sell" and level == 2:
return sell2
elif op == "sell" and level == 3:
return sell3
elif op == "sell" and level == 4:
return sell4
elif op == "sell" and level == 5:
return sell5
else:
return float(stock[3])
def get_host_ip():
"""
查询本机ip地址
:return:
"""
s = uuid.UUID(int=uuid.getnode()).hex[-12:]
return s
def tryrun(func):
def wrapper(*args, **kwargs):
while True:
countnum = 0
try:
res = func(*args, **kwargs)
except RuntimeError:
countnum += 1
time.sleep(1)
if countnum > 120:
break
return res
return wrapper
def tryrun2(func):
def wrapper(*args, **kwargs):
while True:
countnum = 0
try:
res = func(*args, **kwargs)
except RuntimeError:
countnum += 1
time.sleep(10)
if countnum > 90:
break
return res
return wrapper
def output(func):
def wrapper(*args, **kwargs):
res = func(*args, **kwargs)
print(args[1])
return res
return wrapper
class Logger():
"""
This is get log class
"""
def __init__(self, path, name, Flevel=logging.DEBUG):
self.time = time.strftime("%Y-%m-%d")
self.fileName = path + '/' + name + ".log"
self.logger = logging.getLogger(self.fileName)
self.logger.setLevel(Flevel)
if not self.logger.handlers:
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')
fh = logging.FileHandler(self.fileName, encoding='utf-8')
fh.setFormatter(fmt)
fh.setLevel(Flevel)
self.logger.addHandler(fh)
@output
def debug(self, message):
"""
This is debug
"""
self.logger.debug(message)
@output
def info(self, message):
"""
This is info
"""
self.logger.info(message)
@output
def warn(self, message):
"""
This is warn
"""
self.logger.warn(message)
@output
def error(self, message):
"""
This is error
"""
self.logger.error(message)
@output
def critical(self, message):
"""
This is critical
"""
self.logger.critical(message)
def softInfo(self, message):
"""
This is output
"""
self.logger.info(message)
def init_redis(ip, port1, db1, psw=''):
'''
redis初始化(待修改为密码版本)
Parameters
----------
ip : TYPE
DESCRIPTION.
port1 : TYPE
DESCRIPTION.
db1 : TYPE
DESCRIPTION.
psw : TYPE, optional
DESCRIPTION. The default is ''.
Returns
-------
TYPE
DESCRIPTION.
'''
global r
time.sleep(5)
# r = redis.Redis(host=ip, port=port1, password=psw)
pool = redis.ConnectionPool(host=ip, port=port1, password=psw)
r = redis.Redis(connection_pool=pool)
return r.ping()
def redis_lget(key,num):
'''
redis获取指定key的值
Parameters
----------
key : TYPE
DESCRIPTION.
num : TYPE
DESCRIPTION.
Returns
-------
vec : TYPE
DESCRIPTION.
'''
print('1')
init_redis('82.156.3.152', 6379, 0, '1508181008')
print('2')
vec = r.lrange(key,0,num-1)
print('3')
if len(vec)>0:
print(vec)
return vec
class SetMac(object):
"""
修改 本地连接 mac地址
"""
def __init__(self):
# regex to MAC address like 00-00-00-00-00-00 or 00:00:00:00:00:00 or
# 000000000000
self.MAC_ADDRESS_RE = re.compile(r"""
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})
""", re.I | re.VERBOSE) # re.I: case-insensitive matching. re.VERBOSE: just look nicer.
self.WIN_REGISTRY_PATH = "SYSTEM\CurrentControlSet\Control\Class\{4D36E972-E325-11CE-BFC1-08002BE10318}"
def is_admin(self):
"""
is user an admin?
:return:
"""
if ctypes.windll.shell32.IsUserAnAdmin() == 0:
print('Sorry! You should run this with administrative privileges if you want to change your MAC address.')
sys.exit()
else:
print('admin')
def get_macinfos(self):
"""
查看所有mac信息
:return:
"""
print('=' * 50)
mac_info = subprocess.check_output('GETMAC /v /FO list', stderr=subprocess.STDOUT)
mac_info = mac_info.decode('gbk')
print('Your MAC address:\n', mac_info)
# 想要匹配的连接名
2025-05-26 09:19:39 +08:00
target_connection_name = "WLAN"
2025-05-22 16:47:45 +08:00
# 构建正则表达式模式
pattern = re.compile(
r"连接名:\s+" + re.escape(target_connection_name) + r".*?物理地址:\s+([0-9A-Fa-f-]+)",
re.DOTALL
)
# 执行搜索
match = pattern.search(mac_info).group(1)
return match
def get_target_device(self):
"""
返回 本地连接 网络适配器
:return:
"""
mac_info = subprocess.check_output('GETMAC /v /FO list', stderr=subprocess.STDOUT)
mac_info = mac_info.decode('gbk')
print(mac_info)
2025-05-26 09:19:39 +08:00
search = re.search(r'(WLAN)\s+网络适配器: (.+)\s+物理地址:', mac_info)
2025-05-22 16:47:45 +08:00
print(search)
target_name, target_device = (search.group(1), search.group(2).strip()) if search else ('', '')
if not all([target_name, target_device]):
print('Cannot find the target device')
sys.exit()
print(target_name, target_device)
return target_device
def get_network_adapter_info(self):
"""
获取网络适配器信息
re:
{'16': 'Realtek RTL8821CE 802.11ac PCIe Adapter #2'}
"""
try:
# 执行 wmic 命令并获取输出
cmd = "wmic path win32_networkadapter get index, name"
output = subprocess.check_output(cmd, universal_newlines=True)
# 按换行符分割输出内容
lines = output.strip().split('\n')
# 去除每行前后的空白字符
lines = [line.strip() for line in lines]
# 过滤掉空行
non_empty_lines = [line for line in lines if line]
result = {}
# 从第二行开始遍历
for line in non_empty_lines[1:]:
# 查找第一个空白字符的位置
first_space_index = line.find(' ')
# 提取 index
index = line[:first_space_index].strip()
# 提取 name
name = line[first_space_index:].strip()
result[index] = name
return result
except subprocess.CalledProcessError as e:
print(f"命令执行失败,错误代码: {e.returncode}")
print(e.output)
except Exception as e:
print(f"发生异常: {e}")
return []
def set_mac_address(self, target_device, new_mac):
"""
设置新mac地址
:param target_device: 本地连接 网络适配器
:param new_mac: 新mac地址
:return:
"""
if not self.MAC_ADDRESS_RE.match(new_mac):
print('Please input a correct MAC address')
return
# Locate adapter's registry and update network address (mac)
reg_hdl = winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE)
key = winreg.OpenKey(reg_hdl, self.WIN_REGISTRY_PATH)
info = winreg.QueryInfoKey(key)
# Find adapter key based on sub keys
adapter_key = None
adapter_path = None
target_index = -1
adapter_dict = self.get_network_adapter_info()
for index in range(info[0]):
subkey = winreg.EnumKey(key, index)
path = self.WIN_REGISTRY_PATH + "\\" + subkey
if subkey == 'Properties':
break
# Check for adapter match for appropriate interface
new_key = winreg.OpenKey(reg_hdl, path)
try:
adapterIndex = str(int(subkey))
adapterName = adapter_dict.get(adapterIndex)
adapterDesc = winreg.QueryValueEx(new_key, "DriverDesc")
if adapterName == target_device:
adapter_path = path
target_index = adapterIndex
break
else:
winreg.CloseKey(new_key)
except (WindowsError) as err:
if err.errno == 2: # register value not found, ok to ignore
pass
else:
raise err
if adapter_path is None:
print('Device not found.')
winreg.CloseKey(key)
winreg.CloseKey(reg_hdl)
return
# Registry path found update mac addr
adapter_key = winreg.OpenKey(reg_hdl, adapter_path, 0, winreg.KEY_WRITE)
# winreg.SetValueEx(adapter_key, "NetworkAddress", 0, winreg.REG_SZ, -----new_mac)
winreg.SetValueEx(adapter_key, "NetworkAddress", 0, winreg.REG_SZ, new_mac)
winreg.CloseKey(adapter_key)
winreg.CloseKey(key)
winreg.CloseKey(reg_hdl)
# Adapter must be restarted in order for change to take affect
# print 'Now you should restart your netsh'
self.restart_adapter(target_index, target_device)
def restart_adapter(self, target_index, target_device):
"""
Disables and then re-enables device interface
"""
if platform.release() == 'XP':
# description, adapter_name, address, current_address = find_interface(device)
cmd = "devcon hwids =net"
try:
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except FileNotFoundError:
raise
query = '(' + target_device + '\r\n\s*.*:\r\n\s*)PCI\\\\(([A-Z]|[0-9]|_|&)*)'
query = query.encode('ascii')
match = re.search(query, result)
cmd = 'devcon restart "PCI\\' + str(match.group(2).decode('ascii')) + '"'
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
else:
cmd = "wmic path win32_networkadapter where index=" + str(target_index) + " call disable"
subprocess.check_output(cmd)
cmd = "wmic path win32_networkadapter where index=" + str(target_index) + " call enable"
subprocess.check_output(cmd)
def run(self):
self.is_admin()
mac = self.get_macinfos()
warnings.filterwarnings('ignore')
log = Logger(f'{root_path}/logs', '18242094506')
log.warn('MAC 重置之前:'+mac)
target_device = self.get_target_device()
# self.set_mac_address(target_device, '82E82CF2158F')
# self.set_mac_address(target_device, '28CDC4CBFDCF')
self.set_mac_address(target_device, self.randomMAC())
mac_new = self.get_macinfos()
warnings.filterwarnings('ignore')
log = Logger(f'{root_path}/logs', '18242094506')
log.warn('MAC 重置之后:'+mac_new)
# 28-CD-C4-CB-FD-CF
def randomMAC(self):
# mac = [ 0x52, 0x54, 0x00,
# random.randint(0x00, 0x7f),
# random.randint(0x00, 0xff),
# random.randint(0x00, 0xff) ]
#
# while True:
# one_mac = ''.join(map(lambda x: "%02x" % x, mac)).upper()
# used_mac = redis_lget('Mac', -1)
# used_mac = [i.decode() for i in used_mac]
#
# if one_mac not in used_mac:
# r.rpush('Mac',one_mac)
# break
while True:
first_byte = 0x02
# 从 UUIDv4 获取后 5 个字节,随机性极高
random_bytes = uuid.uuid4().bytes
mac_bytes = [first_byte] + list(random_bytes[1:6])
one_mac = ''.join(f'{b:02X}' for b in mac_bytes)
used_mac = redis_lget('Mac', -1)
used_mac = [i.decode() for i in used_mac]
if one_mac not in used_mac:
r.rpush('Mac', one_mac)
break
print('1111111', one_mac)
return one_mac
def get_host_ip():
"""
查询本机ip地址
:return:
"""
s = uuid.UUID(int=uuid.getnode()).hex[-12:]
return s
def get_auth_type(url):
time1 = int(time.time() * 1000)
url = f'http://192.168.100.1/login.cgi?_={time1}'
response = requests.get(url)
if 'WWW-Authenticate' in response.headers:
return response.headers['WWW-Authenticate']
return None
def parse_auth_params(auth_header):
params = {}
for param in auth_header.split(','):
key, value = param.strip().split('=', 1)
params[key] = value.strip('"')
return params
def hex_md5(data):
return md5(data.encode()).hexdigest()
def generate_cnonce():
rand = random.randint(0, 100000)
date = int(time.time() * 1000)
salt = f"{rand}{date}"
tmp = hex_md5(salt)
return tmp[:16]
def compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc):
ha1 = hex_md5(f"{username}:{realm}:{password}")
ha2 = hex_md5(f"GET:{uri}")
response = hex_md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}")
return response
def do_login(username, password, base_url):
url = f"{base_url}/login.cgi"
auth_header = get_auth_type(url)
if not auth_header:
print("Failed to get authentication parameters")
return False
auth_params = parse_auth_params(auth_header)
realm = auth_params.get('Digest realm')
nonce = auth_params.get('nonce')
qop = auth_params.get('qop')
if not all([realm, nonce, qop]):
print("Missing required authentication parameters")
return False
cnonce = generate_cnonce()
nc = '00000001'
uri = '/cgi/protected.cgi'
response = compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc)
headers = {
'Authorization': f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="{uri}", response="{response}", qop={qop}, nc={nc}, cnonce="{cnonce}"'
}
login_url = f"{base_url}/login.cgi?Action=Digest&username={username}&realm={realm}&nonce={nonce}&response={response}&qop={qop}&cnonce={cnonce}&nc={nc}&temp=asr"
response = requests.get(login_url, headers=headers)
uri = "/cgi/xml_action.cgi"
response = compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc)
headers = {
'Authorization': f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="{uri}", response="{response}", qop={qop}, nc={nc}, cnonce="{cnonce}"'
}
reset_url = 'http://192.168.100.1/xml_action.cgi?method=get&module=duster&file=reset'
response = requests.get(reset_url, headers=headers)
print(response.content)
if response.status_code == 200:
print("Login successful")
return True
else:
print(f"Login failed: {response.status_code} {response.reason}")
return False
def get_ip():
"""
查询本机ip地址
:return:
"""
try:
requests.get('https://checkip.amazonaws.com').text.strip()
except:
return '60.162.69.53'
def reset_proxy_to_default():
"""恢复系统代理为默认(关闭)"""
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings",
0, winreg.KEY_WRITE
)
try:
winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, 0)
winreg.SetValueEx(key, "ProxyServer", 0, winreg.REG_SZ, "")
winreg.SetValueEx(key, "ProxyOverride", 0, winreg.REG_SZ, "<local>")
finally:
winreg.CloseKey(key)
internet_set_option = ctypes.windll.Wininet.InternetSetOptionW
internet_set_option(0, 37, 0, 0)
internet_set_option(0, 39, 0, 0)
def reset_wlan():
warnings.filterwarnings('ignore')
log = Logger(f'{root_path}/logs', '18242094506')
log.warn('===================')
log.warn('ip 重置前:'+get_ip())
count = 0
while True:
base_url = "http://192.168.100.1"
username = "admin"
password = "admin"
do_login(username, password, base_url)
time.sleep(1)
t1 = time.time()
while True:
try:
requests.get('https://www.baidu.com')
print('网络重连成功,共耗时',time.time()-t1,'s')
time.sleep(3)
print('变更后IP',get_ip())
warnings.filterwarnings('ignore')
log = Logger(f'{root_path}/logs', '18242094506')
log.warn('ip 重置后:'+ get_ip())
break
except:
pass
one_ip = get_ip()
used_IP = redis_lget('IP', -1)
used_IP = [i.decode() for i in used_IP]
print('!!!!!!one_ip:', one_ip)
print('!!!!!!used_IP:',used_IP)
if one_ip not in used_IP:
r.rpush('IP',one_ip)
print('!!!!入库了 成功rpush')
break
else:
#IP 重复 记录log
warnings.filterwarnings('ignore')
log = Logger(f'{root_path}/logs', '18242094506')
log.warn('IP not reset')
if count ==3:
send_email('重启IP出现问题', one_ip)
exit()
if __name__ == '__main__':
print(get_host_ip())