This commit is contained in:
maxinxin 2025-06-11 10:29:27 +08:00
parent 69725bb5cf
commit de432dfdab
26 changed files with 2790 additions and 133387 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

60
aaa.py
View File

@ -1,60 +0,0 @@
#!/usr/bin/env python3
# main.py - 主程序入口,初始化日志配置
import logging
from bbb import process_data
def setup_logging():
"""统一配置日志系统"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# 移除所有现有处理器(避免重复)
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO) # 控制台只显示INFO及以上级别
# 文件处理器
file_handler = logging.FileHandler('app.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG) # 文件记录所有DEBUG及以上级别
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
def main():
"""主函数"""
# 初始化日志配置
setup_logging()
# 获取当前模块的logger
logger = logging.getLogger(__name__)
logger.info("程序启动")
try:
# 调用工具模块的函数
result = process_data([1, 2, 3])
logger.debug(f"处理结果: {result}")
# 模拟一个警告情况
if len(result) > 2:
logger.warning("结果数据量较大")
logger.info("程序正常结束")
except Exception as e:
logger.error(f"程序出错: {str(e)}", exc_info=True)
if __name__ == "__main__":
main()

View File

@ -1,34 +0,0 @@
import pandas as pd
import time
from datetime import datetime
# 模拟你的数据as_of_date 是未来的时间 2025-05-16 19:19:29.767913
data = {
"as_of_date": ["2025-05-16 19:19:29.767913"]
}
df = pd.DataFrame(data)
# 获取当前时间戳(用于对比)
current_time = time.time()
print(f"当前时间戳: {current_time}")
# 模拟你的计算逻辑
last_idx = 0 # 假设 last_idx 是 0
if pd.notna(df.at[last_idx, 'as_of_date']):
# 解析时间并转成时间戳
start_time = datetime.strptime(df.at[last_idx, 'as_of_date'], "%Y-%m-%d %H:%M:%S.%f").timestamp()
start_time1 = pd.to_datetime(df.at[last_idx, 'as_of_date']).timestamp()
print(start_time - start_time1)
print(f"as_of_date 时间戳: {start_time}")
# 计算时间差(当前时间 - 未来时间,应该是负数)
time_diff = time.time() - start_time
print(f"原始时间差(秒): {time_diff}")
# 四舍五入到 2 位小数
rounded_diff = round(time_diff, 2)
print(f"四舍五入后: {rounded_diff}")
# 计算绝对值(如果你想要正数)
abs_diff = round(abs(time_diff), 2)
print(f"绝对值(正数): {abs_diff}")

View File

@ -1,26 +0,0 @@
import requests
import logging
def get_ip_data():
"""从API获取代理IP和端口"""
url = "http://api.tianqiip.com/getip"
params = {
"secret": "d8wqfdf0qhrnxgne", # 替换为你的提取秘钥
"num": 1,
"yys": "电信",
"type": "json",
"lb": "\n",
# "region": "1,2,3",
"port": 2,
"time": 3,
"ts": 1,
"ys": 1,
"cs": 1,
"sign": "386ff88188185bc6070ec011266745b3", # 用户签名
"mr": 1
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
if data.get("code") == 1000:
logging.info("获取代理IP成功")
return data.get("data")[0]

39
app.log

File diff suppressed because one or more lines are too long

40
bbb.py
View File

@ -1,40 +0,0 @@
# utils.py - 工具模块,使用日志功能
import logging
import logging
# 获取子模块logger
logger = logging.getLogger(__name__)
# 添加专属文件处理器(只添加一次)
if not logger.handlers:
# 专属文件处理器
module_file_handler = logging.FileHandler('utils.log')
module_file_handler.setLevel(logging.DEBUG)
# 设置专属格式(可选)
module_formatter = logging.Formatter(
'[%(asctime)s] %(levelname)s @ %(funcName)s: %(message)s'
)
module_file_handler.setFormatter(module_formatter)
logger.addHandler(module_file_handler)
logger.propagate = True # 仍然传播到root logger主日志文件
def process_data(data):
"""数据处理函数"""
logger.debug(f"开始处理数据: {data}")
try:
# 模拟数据处理
result = [x * 2 for x in data]
# 模拟一个调试信息
logger.info(f"中间结果: {result}")
return result
except Exception as e:
logger.error(f"数据处理失败: {str(e)}", exc_info=True)
raise

View File

@ -1,154 +0,0 @@
import os
import time
import subprocess
import requests
import ctypes
import sys
from typing import Tuple
import logging
logger = logging.getLogger(__name__)
def is_admin() -> bool:
"""检查管理员权限"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def check_internet_connection(test_urls=None, timeout=5) -> bool:
"""检测网络连接"""
if test_urls is None:
test_urls = [
"https://www.baidu.com",
"https://www.qq.com",
"https://www.cloudflare.com"
]
for url in test_urls:
try:
response = requests.get(url, timeout=timeout)
if response.status_code == 200:
return True
except:
continue
return False
def run_netsh_command(command: list, timeout: int = 10) -> Tuple[bool, str]:
"""安全执行命令"""
try:
result = subprocess.run(
command,
check=True,
shell=True,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False # 捕获字节输出
)
# 解码输出,忽略错误
stdout = result.stdout.decode('utf-8', errors='ignore').strip()
stderr = result.stderr.decode('utf-8', errors='ignore').strip()
output = stdout if stdout else stderr
return True, output
except subprocess.CalledProcessError as e:
# 解码异常中的输出
stdout = e.stdout.decode('utf-8', errors='ignore').strip()
stderr = e.stderr.decode('utf-8', errors='ignore').strip()
output = stderr if stderr else stdout or "Unknown error"
return False, f"{output} (exit code {e.returncode})"
except subprocess.TimeoutExpired as e:
# 处理超时输出
stdout = e.stdout.decode('utf-8', errors='ignore').strip() if e.stdout else ""
stderr = e.stderr.decode('utf-8', errors='ignore').strip() if e.stderr else ""
output = f"{stdout} {stderr}".strip()
return False, f"Timeout after {timeout}s: {output}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def connect_wifi(ssid: str = 'MaxEntropy', password: str = 'cskj12345678') -> bool:
# 权限检查
if not is_admin():
print("请以管理员身份运行!")
return False
# 网络检测
if check_internet_connection():
print("当前网络正常")
return True
# 生成配置文件
profile_path = f"wifi_temp_{ssid}.xml"
try:
# 生成配置文件内容
wifi_config = f"""<?xml version="1.0"?>
<WLANProfile xmlns="http://www.microsoft.com/networking/WLAN/profile/v1">
<name>{ssid}</name>
<SSIDConfig>
<SSID><name>{ssid}</name></SSID>
</SSIDConfig>
<connectionType>ESS</connectionType>
<connectionMode>auto</connectionMode>
<MSM>
<security>
<authEncryption>
<authentication>WPA2PSK</authentication>
<encryption>AES</encryption>
<useOneX>false</useOneX>
</authEncryption>
<sharedKey>
<keyType>passPhrase</keyType>
<protected>false</protected>
<keyMaterial>{password}</keyMaterial>
</sharedKey>
</security>
</MSM>
</WLANProfile>"""
# 写入临时文件使用UTF-8编码
with open(profile_path, "w", encoding='utf-8') as f:
f.write(wifi_config.strip())
# 删除旧配置文件(如果有)
run_netsh_command(["netsh", "wlan", "delete", "profile", ssid])
# 添加配置文件
success, msg = run_netsh_command([
"netsh", "wlan", "add", "profile",
f"filename={profile_path}"
])
if not success:
print(f"[添加配置文件失败] {msg}")
return False
# 执行连接
success, msg = run_netsh_command([
"netsh", "wlan", "connect",
f"name={ssid}",
"interface=Wi-Fi"
])
if not success:
print(f"[连接失败] {msg}")
return False
# 等待连接
for i in range(5):
time.sleep(2)
if check_internet_connection():
print(f"成功连接至 {ssid}")
return True
print(f"等待连接中...({i + 1}/5)")
print("连接超时")
return False
finally:
# 清理临时文件
if os.path.exists(profile_path):
try:
os.remove(profile_path)
except Exception as e:
print(f"清理临时文件失败: {str(e)}")
if __name__ == "__main__":
connect_wifi(ssid="ZTE_A5DA76", password="1234567890")

View File

@ -1,4 +1,5 @@
# 此脚本调试时非常常用,不可删除
from tools import *
reset_proxy_to_default()

189
iptest.py
View File

@ -1,189 +0,0 @@
import winreg
import ctypes
import re
import sys
import time
import requests
import logging
# 日志配置:同时输出到文件和控制台
logger = logging.getLogger(__name__)
def get_init_ip():
return requests.get('https://checkip.amazonaws.com').text.strip()
def get_proxy_ip(proxy_ip,proxy_port):
"""测试代理ip并返回代理IP的IP值,由于暂时只能使用http连接不支持https连接"""
while True:
try:
url = 'https://checkip.amazonaws.com'
proxyMeta = 'http://45qac9:n2tc94fc@{}:{}'.format(proxy_ip,proxy_port)
proxies = { 'http':proxyMeta,"https": proxyMeta}
return requests.get(url, proxies=proxies).text
except Exception as e:
logger.info(f'出错了{e}')
time.sleep(2)
def set_global_proxy(proxy_ip, proxy_port, enable=True):
"""设置/关闭全局代理"""
key = None
try:
if ctypes.windll.shell32.IsUserAnAdmin() == 0:
logging.error("需要管理员权限来修改系统代理设置")
return False
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings",
0, winreg.KEY_WRITE | winreg.KEY_READ
)
# 备份当前设置
try:
old_proxy = winreg.QueryValueEx(key, "ProxyServer")[0]
old_enable = winreg.QueryValueEx(key, "ProxyEnable")[0]
old_override = winreg.QueryValueEx(key, "ProxyOverride")[0]
except WindowsError:
old_proxy = ""
old_enable = 0
old_override = "<local>"
if enable:
proxy_string = f"{proxy_ip}:{proxy_port}"
winreg.SetValueEx(key, "ProxyServer", 0, winreg.REG_SZ, proxy_string)
winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, 1)
bypass = "<local>;" + ";".join(["/api", "/static"])
winreg.SetValueEx(key, "ProxyOverride", 0, winreg.REG_SZ, bypass)
else:
winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, 0)
# 刷新系统设置
internet_set_option = ctypes.windll.Wininet.InternetSetOptionW
internet_set_option(0, 37, 0, 0) # INTERNET_OPTION_REFRESH
internet_set_option(0, 39, 0, 0) # INTERNET_OPTION_SETTINGS_CHANGED
logging.info("代理设置已" + ("启用" if enable else "关闭"))
if enable:
logging.info(f"代理服务器: {proxy_ip}:{proxy_port}")
return True
except Exception as e:
logging.error(f"设置代理时发生错误: {str(e)}")
if key is not None:
try:
winreg.SetValueEx(key, "ProxyServer", 0, winreg.REG_SZ, old_proxy)
winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, old_enable)
winreg.SetValueEx(key, "ProxyOverride", 0, winreg.REG_SZ, old_override)
logging.info("已恢复原始代理设置")
except:
logging.warning("警告:无法恢复原始代理设置")
return False
finally:
if key:
winreg.CloseKey(key)
def get_ip_data():
"""从API获取代理IP和端口"""
url = "http://api.tianqiip.com/getip"
params = {
"secret": "d8wqfdf0qhrnxgne", # 替换为你的提取秘钥
"num": 1,
"yys": "电信",
"type": "json",
"lb": "\n",
# "region": "1,2,3",
"port": 1,
"time": 5,
"ts": 1,
"ys": 1,
"cs": 1,
"sign": "386ff88188185bc6070ec011266745b3", # 用户签名
"mr": 1
}
response = requests.get(url,params)
if response.status_code == 200:
data = response.json()
if data.get("code") == 1000:
logging.info("获取代理IP成功")
return data.get("data")[0]
def reset_proxy_to_default():
"""恢复系统代理为默认(关闭)"""
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings",
0, winreg.KEY_WRITE
)
try:
winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, 0)
winreg.SetValueEx(key, "ProxyServer", 0, winreg.REG_SZ, "")
winreg.SetValueEx(key, "ProxyOverride", 0, winreg.REG_SZ, "<local>")
finally:
winreg.CloseKey(key)
internet_set_option = ctypes.windll.Wininet.InternetSetOptionW
internet_set_option(0, 37, 0, 0)
internet_set_option(0, 39, 0, 0)
def is_admin():
"""检查是否以管理员身份运行"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def fetch_whitelist():
"""
获取当前IP白名单返回IP列表
"""
url = "http://api.tianqiip.com/white/fetch?key=Maxinxin&brand=2&sign=386ff88188185bc6070ec011266745b3"
try:
response = requests.get(url, timeout=10)
data = response.json()
if data.get("code") == 200:
# 返回IP列表
return data.get("data")
else:
print(f"获取白名单失败: {data.get('msg', 'Unknown error')}")
return []
except Exception as e:
print(f"请求异常: {e}")
return []
if __name__ == "__main__":
if not is_admin():
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(sys.argv), None, 1)
sys.exit(0)
ip_fetch_count = 0 # 计数器
while True:
ip_fetch_count += 1
start_time = time.time()
reset_proxy_to_default()
my_ip = get_init_ip()
whitelist = fetch_whitelist()
print(whitelist)
if my_ip in whitelist:
logging.info(f"当前IP {my_ip} 在白名单中")
else:
logging.info(f"当前IP {my_ip} 不在白名单中")
print(1)
item = get_ip_data()
proxy_ip = item['ip']
proxy_port = item['port']
set_global_proxy(proxy_ip, proxy_port, enable=True)
try:
my_ip1 = get_proxy_ip(proxy_ip,proxy_port)
logging.info(f"{ip_fetch_count}次获取IP当前IP: {my_ip1}")
except Exception as e:
logging.error(f"{ip_fetch_count}次获取IP失败: {e}")
logging.info(f"本次代理切换耗时: {time.time() - start_time:.2f}")
time.sleep(300)

Binary file not shown.

View File

File diff suppressed because it is too large Load Diff

50
requirements.txt Normal file
View File

@ -0,0 +1,50 @@
async-timeout==4.0.2
certifi==2024.8.30
cffi==1.15.1
charset-normalizer==2.0.12
comtypes==1.4.3
cryptography==40.0.2
cycler==0.11.0
decorator==4.4.2
et-xmlfile==1.1.0
greenlet==2.0.2
idna==3.10
imageio==2.15.0
importlib-metadata==4.8.3
jmespath==0.10.0
kiwisolver==1.3.1
matplotlib==3.3.4
MouseInfo==0.1.3
networkx==2.5.1
numpy==1.19.5
openpyxl==3.1.3
packaging==21.3
pandas==1.1.5
Pillow==8.4.0
PyAutoGUI==0.9.54
pycparser==2.21
PyGetWindow==0.0.9
PyMsgBox==1.0.9
PyMySQL==1.0.2
pyparsing==3.1.4
pyperclip==1.9.0
PyRect==0.2.0
PyScreeze==1.0.1
pytesseract==0.3.8
python-dateutil==2.9.0.post0
pytweening==1.2.0
pytz==2024.2
PyWavelets==1.1.1
pywin32==305
pywinauto==0.6.8
redis==4.3.6
requests==2.27.1
scikit-image==0.17.2
scipy==1.5.4
six==1.16.0
SQLAlchemy==1.4.54
tifffile==2020.9.3
typing_extensions==4.1.1
urllib3==1.26.20
xlrd==1.2.0
zipp==3.6.0

162
tools.py
View File

@ -53,24 +53,93 @@ root_path = Path(__file__).parent
def connect_to_wifi(ssid='MaxEntropy', password='cskj12345678'):
# 先检测当前网络是否正常
def is_admin() -> bool:
"""检查管理员权限"""
try:
response = requests.get("https://www.baidu.com", timeout=5)
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
def check_internet_connection(test_urls=None, timeout=5) -> bool:
"""检测网络连接"""
if test_urls is None:
test_urls = [
"https://www.baidu.com",
"https://www.qq.com",
"https://www.cloudflare.com"
]
for url in test_urls:
try:
response = requests.get(url, timeout=timeout)
if response.status_code == 200:
print("当前网络连接正常无需重连WiFi")
return True
except:
print("当前网络不可用将尝试连接WiFi...")
continue
return False
# 生成 WiFi 配置文件XML格式
def run_netsh_command(command: list, timeout: int = 10) -> tuple:
"""安全执行netsh命令"""
try:
result = subprocess.run(
command,
check=True,
shell=True,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False # 捕获字节输出
)
# 解码输出,忽略错误
stdout = result.stdout.decode('utf-8', errors='ignore').strip()
stderr = result.stderr.decode('utf-8', errors='ignore').strip()
output = stdout if stdout else stderr
return True, output
except subprocess.CalledProcessError as e:
# 解码异常中的输出
stdout = e.stdout.decode('utf-8', errors='ignore').strip()
stderr = e.stderr.decode('utf-8', errors='ignore').strip()
output = stderr if stderr else stdout or "Unknown error"
return False, f"{output} (exit code {e.returncode})"
except subprocess.TimeoutExpired as e:
# 处理超时输出
stdout = e.stdout.decode('utf-8', errors='ignore').strip() if e.stdout else ""
stderr = e.stderr.decode('utf-8', errors='ignore').strip() if e.stderr else ""
output = f"{stdout} {stderr}".strip()
return False, f"Timeout after {timeout}s: {output}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def connect_wifi(ssid: str = 'MaxEntropy', password: str = 'cskj12345678') -> bool:
"""
优化的WiFi连接函数
Args:
ssid: WiFi网络名称
password: WiFi密码
Returns:
bool: 连接成功返回True否则返回False
"""
# 权限检查
if not is_admin():
print("请以管理员身份运行!")
return False
# 网络检测
if check_internet_connection():
print("当前网络正常")
return True
# 生成配置文件
profile_path = f"wifi_temp_{ssid}.xml"
try:
# 生成配置文件内容
wifi_config = f"""<?xml version="1.0"?>
<WLANProfile xmlns="http://www.microsoft.com/networking/WLAN/profile/v1">
<name>{ssid}</name>
<SSIDConfig>
<SSID>
<name>{ssid}</name>
</SSID>
<SSID><name>{ssid}</name></SSID>
</SSIDConfig>
<connectionType>ESS</connectionType>
<connectionMode>auto</connectionMode>
@ -90,55 +159,56 @@ def connect_to_wifi(ssid='MaxEntropy', password='cskj12345678'):
</MSM>
</WLANProfile>"""
try:
# 保存配置文件
with open("wifi_profile.xml", "w") as f:
# 写入临时文件使用UTF-8编码
with open(profile_path, "w", encoding='utf-8') as f:
f.write(wifi_config.strip())
# 添加 WiFi 配置文件
subprocess.run(
["netsh", "wlan", "add", "profile", "filename=wifi_profile.xml"],
check=True,
shell=True,
)
# 删除旧配置文件(如果有)
run_netsh_command(["netsh", "wlan", "delete", "profile", ssid])
# 连接 WiFi
subprocess.run(
["netsh", "wlan", "connect", "name=" + ssid],
check=True,
shell=True,
)
# 添加配置文件
success, msg = run_netsh_command([
"netsh", "wlan", "add", "profile",
f"filename={profile_path}"
])
if not success:
print(f"[添加配置文件失败] {msg}")
return False
# 等待连接成功
max_retries = 5
for i in range(max_retries):
try:
response = requests.get("https://www.baidu.com", timeout=5)
if response.status_code == 200:
print(f"成功连接到 WiFi: {ssid}")
return True
except:
if i < max_retries - 1:
print(f"等待网络连接...({i + 1}/{max_retries})")
# 执行连接
success, msg = run_netsh_command([
"netsh", "wlan", "connect",
f"name={ssid}",
"interface=Wi-Fi"
])
if not success:
print(f"[连接失败] {msg}")
return False
# 等待连接
for i in range(5):
time.sleep(2)
if check_internet_connection():
print(f"成功连接至 {ssid}")
return True
print(f"等待连接中...({i + 1}/5)")
print("WiFi连接失败: 超过最大重试次数")
print("连接超时")
return False
except subprocess.CalledProcessError as e:
print(f"WiFi配置失败: {e}")
return False
finally:
# 清理临时文件
if os.path.exists(profile_path):
try:
os.remove(profile_path)
except Exception as e:
print(f"发生未知错误: {e}")
if os.path.exists("wifi_profile.xml"):
os.remove("wifi_profile.xml")
return False
print(f"清理临时文件失败: {str(e)}")
class ExcelDataWriter:
"""
Excel数据写入工具类定制列名版
Excel数据写入工具类
列名顺序as_of_date, id, account, proxy_ip, ip_location, exit_ip,
ip_check_result, usage_result, ip_switch_time,
transaction_duration, ip_survival_time, remarks,
@ -668,7 +738,7 @@ class SetMac(object):
([0-9A-F]{1,2})
""", re.I | re.VERBOSE) # re.I: case-insensitive matching. re.VERBOSE: just look nicer.
self.WIN_REGISTRY_PATH = "SYSTEM\CurrentControlSet\Control\Class\{4D36E972-E325-11CE-BFC1-08002BE10318}"
self.WIN_REGISTRY_PATH = r"SYSTEM\CurrentControlSet\Control\Class\{4D36E972-E325-11CE-BFC1-08002BE10318}"
def is_admin(self):
@ -833,7 +903,7 @@ class SetMac(object):
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except FileNotFoundError:
raise
query = '(' + target_device + '\r\n\s*.*:\r\n\s*)PCI\\\\(([A-Z]|[0-9]|_|&)*)'
query = r'(' + target_device + r'\r\n\s*.*:\r\n\s*)PCI\\(([A-Z]|[0-9]|_|&)*)'
query = query.encode('ascii')
match = re.search(query, result)
cmd = 'devcon restart "PCI\\' + str(match.group(2).decode('ascii')) + '"'

View File

@ -5,7 +5,7 @@
# @File : trade_logic.py
# @Software: PyCharm
from sqlalchemy import create_engine
from sqlalchemy import create_engine, MetaData, Table, insert
import datetime
from tools import *
import re
@ -274,10 +274,39 @@ class Trade:
amount = int(rows['Number_transactions'])
self.log.info('The stocks to '+ operate_name+ ''+ tick+ ' '+ ' price:'+ str(price)+' amount:'+ str(amount))
try:
if operate_name == 'buy':
user_main.buy(tick_code, price, amount)
elif operate_name == 'sell':
user_main.sell(tick_code, price, amount)
except Exception as e:
error_msg = str(e) + ': The stocks to '+ operate_name+ ''+ tick+ ' '+ ' price:'+ str(price)+' amount:'+ str(amount)
self.log.error(error_msg)
send_email('auto trade error',error_msg)
continue
# 交易信息入库
engine = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.mysql.rds.aliyuncs.com:3306/user_center')
data_dict = {
'as_of_date': datetime.datetime.now(), # 日期时间
'broker': self.broker, # 字符串
'account_number': self.user, # 字符串
'ticker': tick, # 字符串
'sell_or_buy': operate_name, # 字符串
'price': price, # 浮点数
'amount': amount # 整数
}
# 获取表对象
metadata = MetaData()
table = Table('entrust_orders', metadata, autoload_with=engine)
# 插入数据(自动提交事务不需要commit显式提交)
with engine.begin() as conn: # 自动提交事务
stmt = insert(table).values(**data_dict)
conn.execute(stmt)
self.log.info('交易记录入库The stocks to '+ operate_name+ ''+ tick+ ' '+ ' price:'+ str(price)+' amount:'+ str(amount))
user_main.exit()
#一键撤单

View File

@ -1,8 +1,11 @@
import trade_logic
import datetime
from tools import *
scripts_path = os.path.dirname(os.path.realpath(__file__))
root_path = scripts_path[:scripts_path.find(project_name)+len(project_name)]
from pathlib import Path
root_path = Path(__file__).parent
class auto_trade:
@ -13,7 +16,6 @@ class auto_trade:
self.log = Logger(f'{root_path}/logs','trade_port')
def trade_main(self,user):
user_main = trade_logic.Trade(user, self.op,self.order_id)

View File

@ -17,14 +17,8 @@ from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
# from voice_notification import call_user
import getpass
import uuid
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from connect_wifi import connect_wifi
import logging
import os
from logging.handlers import RotatingFileHandler
def setup_logging():

View File

@ -1,4 +0,0 @@
[2025-05-08 09:35:14,482] DEBUG @ process_data: 开始处理数据: [1, 2, 3]
[2025-05-08 09:35:14,482] INFO @ process_data: 中间结果: [2, 4, 6]
[2025-05-08 13:24:04,770] DEBUG @ process_data: 开始处理数据: [1, 2, 3]
[2025-05-08 13:24:04,770] INFO @ process_data: 中间结果: [2, 4, 6]

View File

@ -1,25 +0,0 @@
<?xml version="1.0"?>
<WLANProfile xmlns="http://www.microsoft.com/networking/WLAN/profile/v1">
<name>Redmi K40</name>
<SSIDConfig>
<SSID>
<name>Redmi K40</name>
</SSID>
</SSIDConfig>
<connectionType>ESS</connectionType>
<connectionMode>auto</connectionMode>
<MSM>
<security>
<authEncryption>
<authentication>WPA2PSK</authentication>
<encryption>AES</encryption>
<useOneX>false</useOneX>
</authEncryption>
<sharedKey>
<keyType>passPhrase</keyType>
<protected>false</protected>
<keyMaterial>123456789</keyMaterial>
</sharedKey>
</security>
</MSM>
</WLANProfile>