zhitou_trade/trade_port_trade.py
2025-06-12 11:05:53 +08:00

390 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uuid
import trade_logic
import pandas as pd
from sqlalchemy import create_engine
# from autobasic import mts
import socket
from tools import *
import datetime
import logging
import sys
import time
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
# from voice_notification import call_user
import uuid
import logging
import os
from pathlib import Path
# 获取根路径
root_path = Path(__file__).parent
def setup_logging():
"""统一配置日志系统"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# 移除所有现有处理器(避免重复)
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO) # 控制台只显示INFO及以上级别
# 文件处理器
file_handler = logging.FileHandler('Auto_Trade_Log1.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG) # 文件记录所有DEBUG及以上级别
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
def create_conn():
"""
Check if the database connection is valid before afternoon trading session
Only called once at the beginning of the afternoon session
"""
return create_engine('mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.mysql.rds.aliyuncs.com:3306/ticker_daily_original_database',
encoding="utf-8", echo=False)
class auto_trade:
def __init__(self, users, op,order_id):
self.op = op
self.users = users
self.order_id = order_id
# 加载WiFi配置文件
self.wifi_configs = self._load_wifi_config()
def _load_wifi_config(self):
"""加载WiFi配置文件"""
config_path = os.path.join(root_path, 'wifi_config.txt')
try:
with open(config_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
wifi_configs = []
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split(',')
if len(parts) == 2:
wifi_configs.append({'ssid': parts[0].strip(), 'password': parts[1].strip()})
logger.info(f"WiFi配置加载成功{len(wifi_configs)}个配置")
return wifi_configs
except FileNotFoundError:
logger.warning(f"WiFi配置文件 {config_path} 不存在,使用默认配置")
return [{'ssid': 'ZTE_A5DA76', 'password': '1234567890'}]
except Exception as e:
logger.error(f"WiFi配置文件读取错误: {e}")
return [{'ssid': 'ZTE_A5DA76', 'password': '1234567890'}]
def _connect_wifi(self):
"""尝试连接WiFi"""
connected = False
for i, wifi_config in enumerate(self.wifi_configs):
wifi_ssid = wifi_config['ssid']
wifi_password = wifi_config['password']
if i == 0:
logger.info(f"尝试连接主WiFi: {wifi_ssid}")
else:
logger.info(f"尝试连接备用WiFi: {wifi_ssid}")
if connect_wifi(wifi_ssid, wifi_password):
logger.info(f"网络已就绪WiFi: {wifi_ssid}")
connected = True
break
if not connected:
logger.error("无法建立网络连接,请检查配置")
def trade_main(self,user):
user_main = trade_logic.Trade(user, self.op,self.order_id)
print('3333')
user_main.trade_by_op()
def trades(self):
for user in self.users:
logger.info(user)
try:
self.trade_main(user)
reset_proxy_to_default()
self._connect_wifi()
writer = ExcelDataWriter()
writer.update_latest_record(
status='成功',
remarks='测试数据',
risk_response=''
)
logger.info(f'{user}交易已经完成')
except Exception as er:
reset_proxy_to_default()
self._connect_wifi()
ip_address = get_host_ip()
try:
send_email('auto trade error',(ip_address+' '+user))
except Exception as e:
logger.error('----邮箱发送报错----')
logger.error(e)
if "UnboundLocalError: local variable 'user' referenced before assignment" in str(sys.exc_info):
pass
elif user=='15033920398':
continue
else:
# call_user(server_describe='trade error')
# call_user(phonenum = '13436825841',server_describe='trade error')
logger.error('交易錯誤')
logger.error(er)
#
writer = ExcelDataWriter()
writer.update_latest_record(
status='失败',
remarks='测试数据',
risk_response=''
)
continue
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# def get_host_ip():
# """
# 查询本机ip地址
# :return:
# """
# try:
# s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# s.connect(('8.8.8.8',80))
# ip=s.getsockname()[0]
# finally:
# s.close()
# return ip
def load_wifi_config():
"""加载WiFi配置文件"""
config_path = os.path.join(root_path, 'wifi_config.txt')
try:
with open(config_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
wifi_configs = []
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split(',')
if len(parts) == 2:
wifi_configs.append({'ssid': parts[0].strip(), 'password': parts[1].strip()})
logger.info(f"WiFi配置加载成功{len(wifi_configs)}个配置")
return wifi_configs
except FileNotFoundError:
logger.warning(f"WiFi配置文件 {config_path} 不存在,使用默认配置")
return [{'ssid': 'ZTE_A5DA76', 'password': '1234567890'}]
except Exception as e:
logger.error(f"WiFi配置文件读取错误: {e}")
return [{'ssid': 'ZTE_A5DA76', 'password': '1234567890'}]
def connect_wifi_from_config():
"""从配置文件连接WiFi"""
wifi_configs = load_wifi_config()
connected = False
for i, wifi_config in enumerate(wifi_configs):
wifi_ssid = wifi_config['ssid']
wifi_password = wifi_config['password']
if i == 0:
logger.info(f"尝试连接主WiFi: {wifi_ssid}")
else:
logger.info(f"尝试连接备用WiFi: {wifi_ssid}")
if connect_wifi(wifi_ssid, wifi_password):
logger.info(f"网络已就绪WiFi: {wifi_ssid}")
connected = True
break
if not connected:
logger.error("无法建立网络连接,请检查配置")
def get_userlist(order_id):
today = str(datetime.date.today())
Intranat_ip = get_host_ip()
# Intranat_ip = '10.0.0.15'
engine_user_center = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.'
'mysql.rds.aliyuncs.com:3306/user_center', encoding="utf-8", echo=False)
logger.info('----正在获取用户列表----')
logger.info(f'{order_id}')
logger.info(Intranat_ip)
userlist = (pd.read_sql(f'select Distinct Account_name from trade_instruction where ID_Order = "{order_id}" and Intranet_IP ="{Intranat_ip}"',engine_user_center))['Account_name'].tolist()
logger.info('----成功获取用户列表----')
return userlist
def send_email(subject: str, message: str, receive_users: list = ['1055017575@qq.com','2293908092@qq.com'], file_path: str = 'no_path'):
'''
Parameters
----------
subject : str
title.
message : str
message.
receive_users : list, optional
receive. The default is ['1055017575@qq.com'].
file_path : str, optional
path. The default is 'no_path'.
Returns
-------
None.
'''
# send_user = 'it_ainvest@zohomail.cn' # zoho
# send_pwd = '2022@AIWANTE' # 密码
# receive_user = receive_users
# # receive_user = ['1055017575@qq.com','yi.zhou@ainvest.io','2293908092@qq.com']
#
# msg = MIMEMultipart()
# msg['subject'] = subject
# msg['from'] = send_user
# msg['to'] = ','.join(receive_user)
# msg.attach(MIMEText(message, 'html', 'utf-8'))
#
# if file_path != 'no_path':
# part = MIMEApplication(open(file_path, 'rb').read())
# file_name = file_path.split('/')[-1]
# part.add_header('Content-Disposition', 'attachment', filename=file_name)
# msg.attach(part)
# else:
# pass
#
# server = smtplib.SMTP_SSL("smtp.zoho.com.cn", 465) # smtp.163.com
# server.login(send_user, send_pwd)
# server.sendmail(send_user, receive_user, msg.as_string())
# server.quit()
# print('邮件发送成功')
send_user = 'ainvest_family@ainvest.io' # zoho
send_pwd = '2016@aiwante' # 密码
receive_user = receive_users
# receive_user = ['1055017575@qq.com','yi.zhou@ainvest.io','2293908092@qq.com']
msg = MIMEMultipart()
msg['subject'] = subject
msg['from'] = send_user
msg['to'] = ','.join(receive_user)
msg.attach(MIMEText(message, 'html', 'utf-8'))
if file_path != 'no_path':
part = MIMEApplication(open(file_path, 'rb').read())
file_name = file_path.split('/')[-1]
part.add_header('Content-Disposition', 'attachment', filename=file_name)
msg.attach(part)
else:
pass
server = smtplib.SMTP_SSL("smtp.zoho.com", 465) # smtp.163.com
server.login(send_user, send_pwd)
server.sendmail(send_user, receive_user, msg.as_string())
server.quit()
print('邮件发送成功')
def trade_instruction_monitor(last_order: int = 0):
'''
交易监控
Returns
-------
None.
'''
engine_user_center = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.'
'mysql.rds.aliyuncs.com:3306/user_center', encoding="utf-8", echo=False)
if last_order == 0:
last_order_id = str(
int(pd.read_sql(f'select max(ID_Order) from trade_instruction', engine_user_center).values.tolist()[0][0]))
elif last_order == 1:
last_order_id = '-1'
else:
last_order_id = last_order
while True:
order_id = str(
int(pd.read_sql(f'select max(ID_Order) from trade_instruction', engine_user_center).values.tolist()[0][0]))
if order_id == last_order_id:
time.sleep(1)
pass
elif int(order_id) - int(last_order_id) > 1:
order_id = str(int(last_order_id)+1)
ip = get_host_ip()
# message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' '+ str(ip) +' ' + order_id + ' begin trade'
# # send_email('自动交易提醒', message)
time.sleep(1)
op = str(pd.read_sql(f'select Trade_Type from trade_instruction where ID_Order="{order_id}"', engine_user_center).values.tolist()[0][0])
userlist = get_userlist(order_id)
logger.info(userlist)
logger.info('----开始自动交易----')
trade_main = auto_trade(userlist,op,order_id)
trade_main.trades()
logger.info('----结束自动交易')
last_order_id = order_id
elif int(order_id) - int(last_order_id) == 1:
ip = get_host_ip()
message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + str(
ip) + ' ' + order_id + ' begin trade'
# send_email('自动交易提醒', message)
time.sleep(1)
op = str(pd.read_sql(f'select Trade_Type from trade_instruction where ID_Order="{order_id}"',
engine_user_center).values.tolist()[0][0])
userlist = get_userlist(order_id)
print(userlist)
trade_main = auto_trade(userlist, op,order_id)
trade_main.trades()
last_order_id = order_id
else:
time.sleep(0.5)
pass
if __name__ == '__main__':
# call_user()
# 测试
setup_logging()
# 获取当前模块的logger
logger = logging.getLogger(__name__)
logger.info("程序启动")
reset_proxy_to_default()
connect_wifi_from_config()
logger.info('初始化全局代理成功')
trade_instruction_monitor(0)
# trade_instruction_monitor(115138)