zhitou_trade/trade_port_trade.py
2025-07-08 15:38:17 +08:00

303 lines
9.9 KiB
Python

import trade_logic
import pandas as pd
from sqlalchemy import create_engine
from tools import *
import datetime
import logging
import sys
import time
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import logging
from pathlib import Path
from config import ssid,password
from sqlalchemy.exc import OperationalError, TimeoutError
# 获取根路径
root_path = Path(__file__).parent
def setup_logging():
"""统一配置日志系统"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# 移除所有现有处理器(避免重复)
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO) # 控制台只显示INFO及以上级别
# 文件处理器
file_handler = logging.FileHandler('Auto_Trade_Log.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG) # 文件记录所有DEBUG及以上级别
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
# 封装下read_sql函数
def robust_read_sql(sql,engine_type, max_retries=3, retry_delay=5) -> pd.DataFrame:
"""真正的网络中断容错查询"""
for attempt in range(max_retries + 1): # +1 确保至少尝试一次
try:
# 每次尝试都创建新引擎(解决连接池失效问题)
result = pd.read_sql(sql, engine_type)
return result
except (OperationalError, TimeoutError) as e:
# 捕获特定的网络/数据库连接错误
logger.warning(f"数据库连接异常 (尝试 {attempt+1}/{max_retries+1}): {str(e)}")
if attempt < max_retries:
# 执行网络恢复操作
connect_wifi(ssid,password)
logger.info(f"等待 {retry_delay * (attempt + 1)} 秒后重试...")
time.sleep(retry_delay * (attempt + 1)) # 指数退避策略
else:
logger.error("达到最大重试次数,放弃查询")
raise
except Exception as e:
# 非网络错误则直接抛出
logger.error(f"非网络数据库错误: {str(e)}")
raise
class auto_trade:
def __init__(self, users, op,order_id):
self.op = op
self.users = users
self.order_id = order_id
def trade_main(self,user):
user_main = trade_logic.Trade(user, self.op,self.order_id)
print('3333')
user_main.trade_by_op()
def trades(self):
for user in self.users:
logger.info(user)
try:
self.trade_main(user)
writer = ExcelDataWriter()
writer.update_latest_record(
status='成功',
remarks='实盘数据',
risk_response=''
)
logger.info(f'{user}交易已经完成')
except Exception as er:
ip_address = get_host_ip()
try:
send_email('auto trade error',(ip_address+' '+user + ' ' + str(er)))
except Exception as e:
logger.error('----邮箱发送报错----')
logger.error(e)
if "UnboundLocalError: local variable 'user' referenced before assignment" in str(sys.exc_info):
pass
elif user=='15033920398':
continue
else:
logger.error('交易错误')
logger.error(er)
#
writer = ExcelDataWriter()
writer.update_latest_record(
status='失败',
remarks='实盘数据',
risk_response=str(er)
)
continue
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
def get_userlist(order_id):
today = str(datetime.date.today())
Intranat_ip = get_host_ip()
# Intranat_ip = '10.0.0.15'
engine_user_center = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.'
'mysql.rds.aliyuncs.com:3306/user_center', encoding="utf-8", echo=False)
logger.info('----正在获取用户列表----')
logger.info(f'{order_id}')
logger.info(Intranat_ip)
sql = f'select Distinct Account_name from trade_instruction where ID_Order = "{order_id}" and Intranet_IP ="{Intranat_ip}"'
userlist = (robust_read_sql(sql,engine_user_center))['Account_name'].tolist()
logger.info('----成功获取用户列表----')
return userlist
def send_email(subject: str, message: str, receive_users: list = ['1055017575@qq.com','2293908092@qq.com','728892034@qq.com'], file_path: str = 'no_path'):
'''
Parameters
----------
subject : str
title.
message : str
message.
receive_users : list, optional
receive. The default is ['1055017575@qq.com'].
file_path : str, optional
path. The default is 'no_path'.
Returns
-------
None.
'''
# send_user = 'it_ainvest@zohomail.cn' # zoho
# send_pwd = '2022@AIWANTE' # 密码
# receive_user = receive_users
# # receive_user = ['1055017575@qq.com','yi.zhou@ainvest.io','2293908092@qq.com']
#
# msg = MIMEMultipart()
# msg['subject'] = subject
# msg['from'] = send_user
# msg['to'] = ','.join(receive_user)
# msg.attach(MIMEText(message, 'html', 'utf-8'))
#
# if file_path != 'no_path':
# part = MIMEApplication(open(file_path, 'rb').read())
# file_name = file_path.split('/')[-1]
# part.add_header('Content-Disposition', 'attachment', filename=file_name)
# msg.attach(part)
# else:
# pass
#
# server = smtplib.SMTP_SSL("smtp.zoho.com.cn", 465) # smtp.163.com
# server.login(send_user, send_pwd)
# server.sendmail(send_user, receive_user, msg.as_string())
# server.quit()
# print('邮件发送成功')
send_user = 'ainvest_family@ainvest.io' # zoho
send_pwd = '2016@aiwante' # 密码
receive_user = receive_users
# receive_user = ['1055017575@qq.com','yi.zhou@ainvest.io','2293908092@qq.com']
msg = MIMEMultipart()
msg['subject'] = subject
msg['from'] = send_user
msg['to'] = ','.join(receive_user)
msg.attach(MIMEText(message, 'html', 'utf-8'))
if file_path != 'no_path':
part = MIMEApplication(open(file_path, 'rb').read())
file_name = file_path.split('/')[-1]
part.add_header('Content-Disposition', 'attachment', filename=file_name)
msg.attach(part)
else:
pass
server = smtplib.SMTP_SSL("smtp.zoho.com", 465) # smtp.163.com
server.login(send_user, send_pwd)
server.sendmail(send_user, receive_user, msg.as_string())
server.quit()
print('邮件发送成功')
def trade_instruction_monitor(last_order: int = 0):
'''
交易监控
Returns
-------
None.
'''
engine_user_center = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.'
'mysql.rds.aliyuncs.com:3306/user_center', encoding="utf-8", echo=False)
if last_order == 0:
last_order_id = str(
int(robust_read_sql(f'select max(ID_Order) from trade_instruction', engine_user_center).values.tolist()[0][0]))
elif last_order == 1:
last_order_id = '-1'
else:
last_order_id = last_order
while True:
order_id = str(
int(robust_read_sql(f'select max(ID_Order) from trade_instruction', engine_user_center).values.tolist()[0][0]))
if order_id == last_order_id:
time.sleep(1)
pass
elif int(order_id) - int(last_order_id) > 1:
order_id = str(int(last_order_id)+1)
# ip = get_host_ip()
# # message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' '+ str(ip) +' ' + order_id + ' begin trade'
# # # send_email('自动交易提醒', message)
time.sleep(1)
op = str(robust_read_sql(f'select Trade_Type from trade_instruction where ID_Order="{order_id}"', engine_user_center).values.tolist()[0][0])
userlist = get_userlist(order_id)
logger.info(userlist)
logger.info('----开始自动交易----')
trade_main = auto_trade(userlist,op,order_id)
trade_main.trades()
logger.info('----结束自动交易')
last_order_id = order_id
elif int(order_id) - int(last_order_id) == 1:
# ip = get_host_ip()
# message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + str(
# ip) + ' ' + order_id + ' begin trade'
# # send_email('自动交易提醒', message)
time.sleep(1)
op = str(robust_read_sql(f'select Trade_Type from trade_instruction where ID_Order="{order_id}"',
engine_user_center).values.tolist()[0][0])
userlist = get_userlist(order_id)
print(userlist)
trade_main = auto_trade(userlist, op,order_id)
trade_main.trades()
last_order_id = order_id
else:
time.sleep(0.5)
pass
if __name__ == '__main__':
setup_logging()
# 获取当前模块的logger
logger = logging.getLogger(__name__)
logger.info("程序启动")
connect_wifi(ssid,password)
logger.info('初始化全局代理成功')
trade_instruction_monitor(0)
# trade_instruction_monitor(117775)