zhitou_trade/tools.py
2025-06-30 17:02:50 +08:00

1183 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import warnings
from hashlib import md5
import pywinauto
import pyautogui
from sqlalchemy import create_engine
import psutil
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import uuid
import redis
import ctypes
import platform
import re
import winreg
import random
import sys
import datetime
import pyperclip
import pandas as pd
import os
import requests
import subprocess
import time
import logging
from lxml import etree
from config import proxifier_path,ppx_path,wlan_name
from pathlib import Path
root_path = Path(__file__).parent
# 初始化日志
logger = logging.getLogger(__name__)
def is_admin() -> bool:
"""检查管理员权限"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def check_internet_connection(test_urls=None, timeout=5) -> bool:
"""检测网络连接"""
if test_urls is None:
test_urls = [
"https://www.baidu.com",
"https://www.qq.com",
"https://www.cloudflare.com"
]
for url in test_urls:
try:
response = requests.get(url, timeout=timeout)
if response.status_code == 200:
return True
except:
continue
return False
def run_netsh_command(command: list, timeout: int = 10) -> tuple:
"""安全执行netsh命令"""
try:
result = subprocess.run(
command,
check=True,
shell=True,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False # 捕获字节输出
)
# 解码输出,忽略错误
stdout = result.stdout.decode('utf-8', errors='ignore').strip()
stderr = result.stderr.decode('utf-8', errors='ignore').strip()
output = stdout if stdout else stderr
return True, output
except subprocess.CalledProcessError as e:
# 解码异常中的输出
stdout = e.stdout.decode('utf-8', errors='ignore').strip()
stderr = e.stderr.decode('utf-8', errors='ignore').strip()
output = stderr if stderr else stdout or "Unknown error"
return False, f"{output} (exit code {e.returncode})"
except subprocess.TimeoutExpired as e:
# 处理超时输出
stdout = e.stdout.decode('utf-8', errors='ignore').strip() if e.stdout else ""
stderr = e.stderr.decode('utf-8', errors='ignore').strip() if e.stderr else ""
output = f"{stdout} {stderr}".strip()
return False, f"Timeout after {timeout}s: {output}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def connect_wifi(ssid: str = 'MaxEntropy', password: str = 'cskj12345678') -> bool:
"""
优化的WiFi连接函数
Args:
ssid: WiFi网络名称
password: WiFi密码
Returns:
bool: 连接成功返回True否则返回False
"""
# 权限检查
if not is_admin():
print("请以管理员身份运行!")
return False
# 网络检测
if check_internet_connection():
print("当前网络正常")
return True
# 生成配置文件
profile_path = f"wifi_temp_{ssid}.xml"
try:
# 生成配置文件内容
wifi_config = f"""<?xml version="1.0"?>
<WLANProfile xmlns="http://www.microsoft.com/networking/WLAN/profile/v1">
<name>{ssid}</name>
<SSIDConfig>
<SSID><name>{ssid}</name></SSID>
</SSIDConfig>
<connectionType>ESS</connectionType>
<connectionMode>auto</connectionMode>
<MSM>
<security>
<authEncryption>
<authentication>WPA2PSK</authentication>
<encryption>AES</encryption>
<useOneX>false</useOneX>
</authEncryption>
<sharedKey>
<keyType>passPhrase</keyType>
<protected>false</protected>
<keyMaterial>{password}</keyMaterial>
</sharedKey>
</security>
</MSM>
</WLANProfile>"""
# 写入临时文件使用UTF-8编码
with open(profile_path, "w", encoding='utf-8') as f:
f.write(wifi_config.strip())
# 删除旧配置文件(如果有)
run_netsh_command(["netsh", "wlan", "delete", "profile", ssid])
# 添加配置文件
success, msg = run_netsh_command([
"netsh", "wlan", "add", "profile",
f"filename={profile_path}"
])
if not success:
print(f"[添加配置文件失败] {msg}")
return False
# 执行连接
success, msg = run_netsh_command([
"netsh", "wlan", "connect",
f"name={ssid}",
"interface=Wi-Fi"
])
if not success:
print(f"[连接失败] {msg}")
return False
# 等待连接
for i in range(5):
time.sleep(2)
if check_internet_connection():
print(f"成功连接至 {ssid}")
return True
print(f"等待连接中...({i + 1}/5)")
print("连接超时")
return False
finally:
# 清理临时文件
if os.path.exists(profile_path):
try:
os.remove(profile_path)
except Exception as e:
print(f"清理wifi连接的临时文件失败: {str(e)}")
# 设置代理
# 设置代理
def set_proxy(proxy_ip: str, proxy_port: str):
"""一个函数完成所有功能:切换代理 -> 重启程序 -> 验证 -> 对比"""
# ========== 第一部分:生成新的配置文件 ==========
import glob
import re
# 查找所有现有的temp文件
current_temp_files = glob.glob(os.path.join(ppx_path, "temp*.ppx"))
# 收集所有文件的序号
file_nums = []
for file_path in current_temp_files:
filename = os.path.basename(file_path)
match = re.match(r'temp(\d+)\.ppx$', filename)
if match:
file_nums.append(int(match.group(1)))
# 确定下一个序号如果已有文件用最小序号减1否则从1000000开始
if file_nums:
next_num = min(file_nums) - 1 # 使用最小序号减1
else:
next_num = 1000000 # 初始值
# 创建新文件名和路径
new_filename = f"temp{next_num}.ppx"
new_ppx_path = os.path.join(ppx_path, new_filename)
logger.info(f"创建新的配置文件: {new_filename}")
# 确定当前配置文件(使用最小序号文件)
if file_nums:
current_num = min(file_nums)
current_file = os.path.join(ppx_path, f"temp{current_num}.ppx")
else:
# 首次运行创建基础配置文件
current_file = os.path.join(ppx_path, "temp1000000.ppx")
if not os.path.exists(current_file):
tree = etree.ElementTree(etree.Element('ProxifierProfile'))
root = tree.getroot()
proxy_node = etree.SubElement(root, 'Proxy', id="100")
etree.SubElement(proxy_node, 'Address').text = ""
etree.SubElement(proxy_node, 'Port').text = ""
with open(current_file, 'wb') as f:
f.write(b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n')
tree.write(f, encoding='utf-8', xml_declaration=False, pretty_print=True)
logger.info(f"已创建基础配置文件: {current_file}")
# 读取当前配置文件
with open(current_file, 'rb') as f:
tree = etree.parse(f)
# 更新代理设置id="100"
proxy_node = tree.find('.//Proxy[@id="100"]')
if proxy_node is None:
raise ValueError("未找到id=100的代理节点")
address = proxy_node.find('./Address')
if address is not None:
address.text = proxy_ip
port_node = proxy_node.find('./Port')
if port_node is not None:
port_node.text = str(proxy_port)
# 保存为新文件
with open(new_ppx_path, 'wb') as f:
f.write(b'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n')
tree.write(f, encoding='utf-8', xml_declaration=False, pretty_print=True)
logger.info(f"新配置文件已保存: {new_ppx_path}")
# 删除原文件(只删除最小序号文件)
if os.path.exists(current_file):
try:
os.remove(current_file)
logger.info(f"已删除原文件: {current_file}")
except Exception as e:
logger.warning(f"删除原文件失败: {e}")
# 配置名称(不含扩展名)
config_name = f"temp{next_num}"
# ========== 第二部分:加载代理配置文件 ==========
# (这部分代码保持不变,与原版相同)
try:
app = pywinauto.Application().connect(path=proxifier_path, timeout=2)
window_title_pattern = r'temp\d+ - Proxifier'
main_window = app.window(title_re=window_title_pattern).wait("ready", timeout=20)
main_window.set_focus()
logger.info(f"已连接到运行中的Proxifier配置: {config_name}")
except Exception as e:
logger.info(f"Proxifier未运行或连接失败: {e}")
try:
app = pywinauto.Application().start(f'"{proxifier_path}" "{new_ppx_path}"')
window_title_pattern = r'temp\d+ - Proxifier'
main_window = app.window(title_re=window_title_pattern).wait("ready", timeout=20)
main_window.set_focus()
logger.info(f"Proxifier已启动并加载配置: {config_name}")
except Exception as e2:
logger.error(f"启动Proxifier失败: {e2}")
raise
# 重新加载配置文件
pyautogui.PAUSE = 0.1
logger.info("正在重新加载配置文件...")
pyautogui.keyDown('alt')
pyautogui.keyUp('alt')
pyautogui.press('enter')
pyautogui.press('tab')
pyautogui.press('enter')
pyautogui.press('tab')
pyautogui.press('enter')
# ========== 第三部分:验证结果 ==========
# 关闭可能的干扰界面
pyautogui.press('escape')
pyautogui.press('escape')
# 读取当前代理设置(相同的按键序列)
pyautogui.keyDown('alt')
pyautogui.keyUp('alt')
pyautogui.press('tab')
pyautogui.press('enter')
pyautogui.press('enter')
pyautogui.press('enter')
# 复制IP
pyautogui.hotkey('ctrl', 'a')
pyautogui.hotkey('ctrl', 'c')
time.sleep(0.2)
current_ip = pyperclip.paste().strip()
# 复制端口
pyautogui.press('tab')
pyautogui.hotkey('ctrl', 'a')
pyautogui.hotkey('ctrl', 'c')
time.sleep(0.2)
current_port = pyperclip.paste().strip()
logger.info(f"当前代理IP: {current_ip}")
logger.info(f"当前代理端口: {current_port}")
# 关闭可能的干扰界面
pyautogui.press('escape')
pyautogui.press('escape')
pyautogui.press('escape')
pyautogui.press('escape')
# ========== 第三部分:验证结果 ==========
if current_ip == proxy_ip and current_port == str(proxy_port):
logger.info("成功!代理切换正确")
else:
raise Exception("代理切换错误")
class ExcelDataWriter:
"""
Excel数据写入工具类
列名顺序as_of_date, id, account, proxy_ip, ip_location, exit_ip,
ip_check_result, usage_result, ip_switch_time,
transaction_duration, ip_survival_time, remarks,
platform_risk_response
"""
def __init__(self, filename=root_path.joinpath('mysql_table').joinpath('ip_tracking.xlsx'), sheet_name='Sheet1'):
"""
初始化Excel写入器
:param filename: Excel文件名默认ip_tracking.xlsx
"""
self.filename = filename
self.sheet_name = sheet_name
# 严格按照要求的列名和顺序
self.columns = [
'as_of_date',
'broker',
'id',
'account',
'proxy_ip',
'proxy_port',
'ip_location',
'exit_ip',
'ip_cross_check_result',
'usage_result',
'ip_switch_time',
'transaction_duration',
'ip_survival_time',
'remarks',
'platform_risk_response'
]
# 如果文件不存在创建带指定列名的空Excel文件
if not os.path.exists(self.filename):
pd.DataFrame(columns=self.columns).to_excel(self.filename, sheet_name=self.sheet_name, index=False)
def write_data(self, data_dict):
"""追加单条数据到 Excel自动补全缺失列"""
try:
# 1. 补全缺失列
full_data = {col: data_dict.get(col) for col in self.columns}
# 2. 读取现有数据
if os.path.exists(self.filename):
existing_df = pd.read_excel(self.filename)
# 验证现有列是否匹配
if list(existing_df.columns) != self.columns:
raise ValueError("现有文件的列名不匹配,请使用新文件")
else:
existing_df = pd.DataFrame(columns=self.columns)
# 3. 创建新数据行
new_row = pd.DataFrame([full_data])[self.columns]
# 4. 合并数据
updated_df = pd.concat([existing_df, new_row], ignore_index=True)
# 5. 写入文件
updated_df.to_excel(self.filename, index=False)
print('数据一')
return True
except Exception as e:
print(f"Excel 写入失败: {str(e)}")
return False
def update_latest_record(self, status='成功', remarks='', risk_response=''):
"""
更新最新记录的状态信息
:param status: 使用结果(成功/失败)
:param remarks: 备注信息
:param risk_response: 平台风险响应
:return: 成功返回True失败返回False
"""
# 读取整个Excel文件
df = pd.read_excel(self.filename, sheet_name=self.sheet_name)
if len(df) == 0:
raise ValueError("Excel文件中没有记录可更新")
# 获取最新记录索引
last_idx = df.index[-1]
# 计算交易持续时间
if pd.notna(df.at[last_idx, 'as_of_date']):
start_time = datetime.datetime.strptime(df.at[last_idx, 'as_of_date'], "%Y-%m-%d %H:%M:%S.%f").timestamp()
transaction_duration = round(time.time() - start_time, 2)
else:
transaction_duration = 0
# 更新字段
df['usage_result'] = df['usage_result'].astype(str)
df['remarks'] = df['remarks'].astype(str)
df['platform_risk_response'] = df['platform_risk_response'].astype(str)
df.at[last_idx, 'usage_result'] = status
df.at[last_idx, 'transaction_duration'] = transaction_duration
df.at[last_idx, 'remarks'] = remarks
df.at[last_idx, 'platform_risk_response'] = risk_response
# 保存回文件
df.to_excel(self.filename, sheet_name=self.sheet_name, index=False)
def get_unavailable_ips(self, broker, account):
"""目标返回指定券商brokerhao指定账户account不能使用过的代理 IP 列表"""
# 读取整个Excel文件
df = pd.read_excel(self.filename, sheet_name=self.sheet_name)
# 如果数据为空,返回空列表
if len(df) == 0:
return []
# 获取该券商下所有已使用的IP
broker_used_ips = df[df['broker'] == broker]['proxy_ip'].tolist()
# 获取该券商下该账户已使用的IP
account_used_ips = df[(df['broker'] == broker) & (df['account'] == account)]['proxy_ip'].tolist()
# 计算不能使用的IP = 该券商所有已用IP - 该账户已使用的IP
unused_ips = list(set(broker_used_ips) - set(account_used_ips))
return unused_ips
# TempStdoutArea对象为为重定向标准输出准备的空间以列表形式存在
class TempStdoutArea:
def __init__(self):
self.buffer = []
def write(self, *args, **kwargs):
self.buffer.append(args)
# 实现获取指定控件坐标的对象
class GetControlCoord:
# app为 pywinauto框架实例化的Application对象
def __init__(self, app):
self.app = app
# 获取指定控件坐标的方法control参数传入指定控件
def get_coord(self, control):
# 重定向标准输出,将原本的标准输出信息写入自定义的空间内,以获取控件信息
stdout = sys.stdout
sys.stdout = TempStdoutArea()
# pywinauto框架提供的print_control_identifiers()方法,可以打印出应用程序的控件信息,其中就包含控件坐标
self.app.print_control_identifiers()
# 将控件信息转移到control_identifiers并还原标准输出
control_identifiers, sys.stdout = sys.stdout, stdout
# print(control_identifiers.buffer)
all_coord = []
# 遍历控件信息,获取指定控件的坐标
for a_control_identifier in control_identifiers.buffer:
# print(a_control_identifier)
# print(a_control_identifier[0])
a_control_identifier = a_control_identifier[0]
if a_control_identifier.find(control) > -1:
# print(a_control_identifier)
a_coord = []
str_coord = a_control_identifier[
a_control_identifier.find('(') + 1: a_control_identifier.find(')')].split(',')
# print(str_coord)
for i in str_coord:
num = i.strip()[1:]
a_coord.append(num)
all_coord.append(a_coord)
return all_coord
def send_email(subject: str, message: str, receive_users: list = ['1055017575@qq.com', '2293908092@qq.com'],
file_path: str = 'no_path'):
'''
Parameters
----------
subject : str
title.
message : str
message.
receive_users : list, optional
receive. The default is ['1055017575@qq.com'].
file_path : str, optional
path. The default is 'no_path'.
Returns
-------
None.
'''
send_user = 'it_ainvest@zohomail.cn' # zoho
send_pwd = '2022@AIWANTE' # 密码
receive_user = receive_users
msg = MIMEMultipart()
msg['subject'] = subject
msg['from'] = send_user
msg['to'] = ','.join(receive_user)
msg.attach(MIMEText(message, 'html', 'utf-8'))
if file_path != 'no_path':
part = MIMEApplication(open(file_path, 'rb').read())
file_name = file_path.split('/')[-1]
part.add_header('Content-Disposition', 'attachment', filename=file_name)
msg.attach(part)
else:
pass
server = smtplib.SMTP_SSL("smtp.zoho.com.cn", 465)
server.login(send_user, send_pwd)
server.sendmail(send_user, receive_user, msg.as_string())
server.quit()
print('邮件发送成功')
def download_data_from_db(sql, schema_name):
"""
create connection with DB
阿里云数据库连接
:param sql: 查询语句
:param schema_name: 数据库名称
:return: 查询结果数据
"""
cnx = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.'
'mysql.rds.aliyuncs.com:3306/%s?charset=utf8' %
schema_name, encoding="utf-8", echo=False)
df = pd.read_sql(sql, con=cnx)
return df
# 转换股票代码
def tranTicker(tick):
tick = str(tick)
if len(tick) == 8:
tick = tick.strip('SH').strip('SZ')
if tick.startswith('6') or tick.startswith('11') or tick.startswith('10') or tick.startswith('51'):
tick = 'SH' + tick
elif tick.startswith('0') or tick.startswith('3') or tick.startswith('15') or tick.startswith(
'16') or tick.startswith(
'12'):
tick = 'SZ' + tick
else:
pass
elif len(tick) == 6:
if tick.startswith('6') or tick.startswith('11') or tick.startswith('10') or tick.startswith('51'):
tick = 'SH' + tick
elif tick.startswith('0') or tick.startswith('15') or tick.startswith('12') or tick.startswith(
'16') or tick.startswith('3'):
tick = 'SZ' + tick
else:
pass
else:
num = 6 - len(tick)
tick = 'SZ' + num * '0' + tick
return tick
def get_price(tick, level='1', op="buy"):
level = int(level)
str_ticker = tick.lower()
rep_data = requests.get("http://qt.gtimg.cn/q=" + str_ticker).text
stocks_detail = "".join(rep_data)
stock_detail = stocks_detail.split(";")[0]
if len(stock_detail) < 49:
return
stock = stock_detail.split("~")
buy1 = float(stock[9])
buy2 = float(stock[11])
buy3 = float(stock[13])
buy4 = float(stock[15])
buy5 = float(stock[17])
sell1 = float(stock[19])
sell2 = float(stock[21])
sell3 = float(stock[23])
sell4 = float(stock[25])
sell5 = float(stock[27])
if op == "buy" and level == 1:
return buy1
elif op == "buy" and level == 2:
return buy2
elif op == "buy" and level == 3:
return buy3
elif op == "buy" and level == 4:
return buy4
elif op == "buy" and level == 5:
return buy5
elif op == "sell" and level == 1:
return sell1
elif op == "sell" and level == 2:
return sell2
elif op == "sell" and level == 3:
return sell3
elif op == "sell" and level == 4:
return sell4
elif op == "sell" and level == 5:
return sell5
else:
return float(stock[3])
def get_host_ip():
"""
查询本机ip地址
:return:
"""
s = uuid.UUID(int=uuid.getnode()).hex[-12:]
return s
def tryrun(func):
def wrapper(*args, **kwargs):
while True:
countnum = 0
try:
res = func(*args, **kwargs)
except RuntimeError:
countnum += 1
time.sleep(1)
if countnum > 120:
break
return res
return wrapper
def tryrun2(func):
def wrapper(*args, **kwargs):
while True:
countnum = 0
try:
res = func(*args, **kwargs)
except RuntimeError:
countnum += 1
time.sleep(10)
if countnum > 90:
break
return res
return wrapper
def output(func):
def wrapper(*args, **kwargs):
res = func(*args, **kwargs)
print(args[1])
return res
return wrapper
class Logger():
"""
This is get log class
"""
def __init__(self, path, name, Flevel=logging.DEBUG):
import os
# 确保日志目录存在
os.makedirs(path, exist_ok=True)
self.time = time.strftime("%Y-%m-%d")
self.fileName = os.path.join(path, name + ".log")
# 为每个实例创建唯一的logger名称避免重复
logger_name = f"{name}_{id(self)}"
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(Flevel)
# 清除已有的处理器,避免重复
self.logger.handlers.clear()
# 设置格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')
def debug(self, message):
"""
This is debug
"""
self.logger.debug(message)
def info(self, message):
"""
This is info
"""
self.logger.info(message)
def warn(self, message):
"""
This is warn
"""
self.logger.warning(message)
def error(self, message):
"""
This is error
"""
self.logger.error(message)
def critical(self, message):
"""
This is critical
"""
self.logger.critical(message)
def softInfo(self, message):
"""
This is output
"""
self.logger.info(message)
def init_redis(ip, port1, db1, psw=''):
'''
redis初始化(待修改为密码版本)
Parameters
----------
ip : TYPE
DESCRIPTION.
port1 : TYPE
DESCRIPTION.
db1 : TYPE
DESCRIPTION.
psw : TYPE, optional
DESCRIPTION. The default is ''.
Returns
-------
TYPE
DESCRIPTION.
'''
global r
time.sleep(5)
# r = redis.Redis(host=ip, port=port1, password=psw)
pool = redis.ConnectionPool(host=ip, port=port1, password=psw)
r = redis.Redis(connection_pool=pool)
return r.ping()
def redis_lget(key, num):
'''
redis获取指定key的值
Parameters
----------
key : TYPE
DESCRIPTION.
num : TYPE
DESCRIPTION.
Returns
-------
vec : TYPE
DESCRIPTION.
'''
init_redis('82.156.3.152', 6379, 0, '1508181008')
vec = r.lrange(key, 0, num - 1)
return vec
class SetMac(object):
"""
修改 本地连接 mac地址
"""
def __init__(self):
# regex to MAC address like 00-00-00-00-00-00 or 00:00:00:00:00:00 or
# 000000000000
self.MAC_ADDRESS_RE = re.compile(r"""
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})[:-]?
([0-9A-F]{1,2})
""", re.I | re.VERBOSE) # re.I: case-insensitive matching. re.VERBOSE: just look nicer.
self.WIN_REGISTRY_PATH = r"SYSTEM\CurrentControlSet\Control\Class\{4D36E972-E325-11CE-BFC1-08002BE10318}"
self.logger = logging.getLogger(__name__)
def is_admin(self):
"""
is user an admin?
:return:
"""
if ctypes.windll.shell32.IsUserAnAdmin() == 0:
self.logger.error(
'Sorry! You should run this with administrative privileges if you want to change your MAC address.')
sys.exit()
else:
self.logger.info('admin')
def get_macinfos(self):
"""
查看所有mac信息
:return:
"""
self.logger.info('=' * 50)
mac_info = subprocess.check_output('GETMAC /v /FO list', stderr=subprocess.STDOUT)
mac_info = mac_info.decode('gbk')
self.logger.info('Your MAC address:\n'+mac_info)
# 想要匹配的连接名
target_connection_name = "WLAN 2"
# 构建正则表达式模式
pattern = re.compile(
r"连接名:\s+" + re.escape(target_connection_name) + r".*?物理地址:\s+([0-9A-Fa-f-]+)",
re.DOTALL
)
# 执行搜索
match = pattern.search(mac_info).group(1)
return match
def get_target_device(self):
"""
返回 本地连接 网络适配器
:return:
"""
mac_info = subprocess.check_output('GETMAC /v /FO list', stderr=subprocess.STDOUT)
mac_info = mac_info.decode('gbk')
search = re.search(r'(WLAN 2)\s+网络适配器: (.+)\s+物理地址:', mac_info)
self.logger.info(search)
target_name, target_device = (search.group(1), search.group(2).strip()) if search else ('', '')
if not all([target_name, target_device]):
self.logger.error('Cannot find the target device')
sys.exit()
self.logger.info(target_name+target_device)
return target_device
def get_network_adapter_info(self):
"""
获取网络适配器信息
re:
{'16': 'Realtek RTL8821CE 802.11ac PCIe Adapter #2'}
"""
try:
# 执行 wmic 命令并获取输出
cmd = "wmic path win32_networkadapter get index, name"
output = subprocess.check_output(cmd, universal_newlines=True)
# 按换行符分割输出内容
lines = output.strip().split('\n')
# 去除每行前后的空白字符
lines = [line.strip() for line in lines]
# 过滤掉空行
non_empty_lines = [line for line in lines if line]
result = {}
# 从第二行开始遍历
for line in non_empty_lines[1:]:
# 查找第一个空白字符的位置
first_space_index = line.find(' ')
# 提取 index
index = line[:first_space_index].strip()
# 提取 name
name = line[first_space_index:].strip()
result[index] = name
return result
except subprocess.CalledProcessError as e:
self.logger.error(f"命令执行失败,错误代码: {e.returncode}")
self.logger.error(e.output)
except Exception as e:
self.logger.error(f"发生异常: {e}")
return []
def set_mac_address(self, target_device, new_mac):
"""
设置新mac地址
:param target_device: 本地连接 网络适配器
:param new_mac: 新mac地址
:return:
"""
if not self.MAC_ADDRESS_RE.match(new_mac):
self.logger.error('Please input a correct MAC address')
return
# Locate adapter's registry and update network address (mac)
reg_hdl = winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE)
key = winreg.OpenKey(reg_hdl, self.WIN_REGISTRY_PATH)
info = winreg.QueryInfoKey(key)
# Find adapter key based on sub keys
adapter_key = None
adapter_path = None
target_index = -1
adapter_dict = self.get_network_adapter_info()
for index in range(info[0]):
subkey = winreg.EnumKey(key, index)
path = self.WIN_REGISTRY_PATH + "\\" + subkey
if subkey == 'Properties':
break
# Check for adapter match for appropriate interface
new_key = winreg.OpenKey(reg_hdl, path)
try:
adapterIndex = str(int(subkey))
adapterName = adapter_dict.get(adapterIndex)
adapterDesc = winreg.QueryValueEx(new_key, "DriverDesc")
if adapterName == target_device:
adapter_path = path
target_index = adapterIndex
break
else:
winreg.CloseKey(new_key)
except (WindowsError) as err:
if err.errno == 2: # register value not found, ok to ignore
pass
else:
raise err
if adapter_path is None:
self.logger.error('Device not found.')
winreg.CloseKey(key)
winreg.CloseKey(reg_hdl)
return
# Registry path found update mac addr
adapter_key = winreg.OpenKey(reg_hdl, adapter_path, 0, winreg.KEY_WRITE)
# winreg.SetValueEx(adapter_key, "NetworkAddress", 0, winreg.REG_SZ, -----new_mac)
winreg.SetValueEx(adapter_key, "NetworkAddress", 0, winreg.REG_SZ, new_mac)
winreg.CloseKey(adapter_key)
winreg.CloseKey(key)
winreg.CloseKey(reg_hdl)
self.restart_adapter(target_index, target_device)
def restart_adapter(self, target_index, target_device):
"""
Disables and then re-enables device interface
"""
if platform.release() == 'XP':
# description, adapter_name, address, current_address = find_interface(device)
cmd = "devcon hwids =net"
try:
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except FileNotFoundError:
raise
query = r'(' + target_device + r'\r\n\s*.*:\r\n\s*)PCI\\(([A-Z]|[0-9]|_|&)*)'
query = query.encode('ascii')
match = re.search(query, result)
cmd = 'devcon restart "PCI\\' + str(match.group(2).decode('ascii')) + '"'
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
else:
cmd = "wmic path win32_networkadapter where index=" + str(target_index) + " call disable"
subprocess.check_output(cmd)
cmd = "wmic path win32_networkadapter where index=" + str(target_index) + " call enable"
subprocess.check_output(cmd)
def run(self):
self.is_admin()
mac = self.get_macinfos()
warnings.filterwarnings('ignore')
log = Logger(f'{root_path}/logs', '18242094506')
log.warn('MAC 重置之前:' + mac)
target_device = self.get_target_device()
# self.set_mac_address(target_device, '82E82CF2158F')
# self.set_mac_address(target_device, '28CDC4CBFDCF')
self.set_mac_address(target_device, self.randomMAC())
mac_new = self.get_macinfos()
warnings.filterwarnings('ignore')
log = Logger(f'{root_path}/logs', '18242094506')
log.warn('MAC 重置之后:' + mac_new)
# 28-CD-C4-CB-FD-CF
def randomMAC(self):
while True:
first_byte = 0x02
# 从 UUIDv4 获取后 5 个字节,随机性极高
random_bytes = uuid.uuid4().bytes
mac_bytes = [first_byte] + list(random_bytes[1:6])
one_mac = ''.join(f'{b:02X}' for b in mac_bytes)
used_mac = redis_lget('Mac', -1)
used_mac = [i.decode() for i in used_mac]
if one_mac not in used_mac:
r.rpush('Mac', one_mac)
break
self.logger.info('random mac:'+one_mac)
return one_mac
def get_host_ip():
"""
查询本机ip地址,uuid地址
:return:
"""
s = uuid.UUID(int=uuid.getnode()).hex[-12:]
return s
def get_auth_type(url):
time1 = int(time.time() * 1000)
url = f'http://192.168.100.1/login.cgi?_={time1}'
response = requests.get(url)
if 'WWW-Authenticate' in response.headers:
return response.headers['WWW-Authenticate']
return None
def parse_auth_params(auth_header):
params = {}
for param in auth_header.split(','):
key, value = param.strip().split('=', 1)
params[key] = value.strip('"')
return params
def hex_md5(data):
return md5(data.encode()).hexdigest()
def generate_cnonce():
rand = random.randint(0, 100000)
date = int(time.time() * 1000)
salt = f"{rand}{date}"
tmp = hex_md5(salt)
return tmp[:16]
def compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc):
ha1 = hex_md5(f"{username}:{realm}:{password}")
ha2 = hex_md5(f"GET:{uri}")
response = hex_md5(f"{ha1}:{nonce}:{nc}:{cnonce}:{qop}:{ha2}")
return response
def do_login(username, password, base_url):
url = f"{base_url}/login.cgi"
auth_header = get_auth_type(url)
if not auth_header:
print("Failed to get authentication parameters")
return False
auth_params = parse_auth_params(auth_header)
realm = auth_params.get('Digest realm')
nonce = auth_params.get('nonce')
qop = auth_params.get('qop')
if not all([realm, nonce, qop]):
print("Missing required authentication parameters")
return False
cnonce = generate_cnonce()
nc = '00000001'
uri = '/cgi/protected.cgi'
response = compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc)
headers = {
'Authorization': f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="{uri}", response="{response}", qop={qop}, nc={nc}, cnonce="{cnonce}"'
}
login_url = f"{base_url}/login.cgi?Action=Digest&username={username}&realm={realm}&nonce={nonce}&response={response}&qop={qop}&cnonce={cnonce}&nc={nc}&temp=asr"
response = requests.get(login_url, headers=headers)
uri = "/cgi/xml_action.cgi"
response = compute_digest_response(username, password, realm, nonce, uri, qop, cnonce, nc)
headers = {
'Authorization': f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="{uri}", response="{response}", qop={qop}, nc={nc}, cnonce="{cnonce}"'
}
reset_url = 'http://192.168.100.1/xml_action.cgi?method=get&module=duster&file=reset'
response = requests.get(reset_url, headers=headers)
print(response.content)
if response.status_code == 200:
print("Login successful")
return True
else:
print(f"Login failed: {response.status_code} {response.reason}")
return False
def get_ip():
"""
查询本机ip地址,查询本机IP地址没用你得知道券商查到的是什么IP地址
:return:
"""
return requests.get('https://checkip.amazonaws.com').text.strip()
def get_ip_data():
"""从API获取代理IP和端口"""
url = "http://api.tianqiip.com/getip"
params = {
"secret": "d8wqfdf0qhrnxgne", # 替换为你的提取秘钥
"num": 1,
"yys": "电信",
"type": "json",
"lb": "\n",
# "region": "1,2,3",
"port": 3,
"time": 5,
"ts": 1,
"ys": 1,
"cs": 1,
"sign": "386ff88188185bc6070ec011266745b3", # 用户签名
"mr": 1
}
response = requests.get(url, params)
if response.status_code == 200:
data = response.json()
if data.get("code") == 1000:
logging.info("获取代理IP成功")
return data.get("data")[0]
if __name__ == '__main__':
print(get_host_ip())
set_proxy('36.27.103.251', '40033')