zhitou_trade/trade_port_trade.py
2025-05-26 09:19:39 +08:00

313 lines
10 KiB
Python

import uuid
import trade_logic
import pandas as pd
from sqlalchemy import create_engine
# from autobasic import mts
import socket
from tools import *
import datetime
import logging
import sys
import time
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
# from voice_notification import call_user
import getpass
import uuid
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from connect_wifi import connect_wifi
import logging
import os
from logging.handlers import RotatingFileHandler
def setup_logging():
"""统一配置日志系统"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# 移除所有现有处理器(避免重复)
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO) # 控制台只显示INFO及以上级别
# 文件处理器
file_handler = logging.FileHandler('Auto_Trade_Log1.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG) # 文件记录所有DEBUG及以上级别
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
def create_conn():
"""
Check if the database connection is valid before afternoon trading session
Only called once at the beginning of the afternoon session
"""
return create_engine('mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.mysql.rds.aliyuncs.com:3306/ticker_daily_original_database',
encoding="utf-8", echo=False)
class auto_trade:
def __init__(self, users, op,order_id):
self.op = op
self.users = users
self.order_id = order_id
def trade_main(self,user):
user_main = trade_logic.Trade(user, self.op,self.order_id)
print('3333')
user_main.trade_by_op()
def trades(self):
for user in self.users:
logger.info(user)
try:
self.trade_main(user)
reset_proxy_to_default()
if not connect_wifi('ZTE_A5DA76','1234567890'):
logger.info("无法建立网络连接,请检查配置")
else:
logger.info("网络已就绪")
writer = ExcelDataWriter()
writer.update_latest_record(
status='成功',
remarks='测试数据',
risk_response=''
)
logger.info(f'{user}交易已经完成')
except Exception as er:
reset_proxy_to_default()
if not connect_wifi('ZTE_A5DA76','1234567890'): # "4G-UFI-18C8", "1234567890" 'Redmi K40','123456789'
logger.info("无法建立网络连接,请检查配置")
else:
logger.info("网络已就绪")
ip_address = get_host_ip()
try:
send_email('auto trade error',(ip_address+' '+user))
except Exception as e:
logger.error('----邮箱发送报错----')
logger.error(e)
if "UnboundLocalError: local variable 'user' referenced before assignment" in str(sys.exc_info):
pass
elif user=='15033920398':
continue
else:
# call_user(server_describe='trade error')
# call_user(phonenum = '13436825841',server_describe='trade error')
logger.error('交易錯誤')
logger.error(er)
#
writer = ExcelDataWriter()
writer.update_latest_record(
status='失败',
remarks='测试数据',
risk_response=''
)
continue
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# def get_host_ip():
# """
# 查询本机ip地址
# :return:
# """
# try:
# s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# s.connect(('8.8.8.8',80))
# ip=s.getsockname()[0]
# finally:
# s.close()
# return ip
def get_userlist(order_id):
today = str(datetime.date.today())
Intranat_ip = get_host_ip()
# Intranat_ip = '10.0.0.15'
engine_user_center = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.'
'mysql.rds.aliyuncs.com:3306/user_center', encoding="utf-8", echo=False)
logger.info('----正在获取用户列表----')
logger.info(f'{order_id}')
logger.info(Intranat_ip)
userlist = (pd.read_sql(f'select Distinct Account_name from trade_instruction where ID_Order = "{order_id}" and Intranet_IP ="{Intranat_ip}"',engine_user_center))['Account_name'].tolist()
logger.info('----成功获取用户列表----')
return userlist
def send_email(subject: str, message: str, receive_users: list = ['1055017575@qq.com','2293908092@qq.com'], file_path: str = 'no_path'):
'''
Parameters
----------
subject : str
title.
message : str
message.
receive_users : list, optional
receive. The default is ['1055017575@qq.com'].
file_path : str, optional
path. The default is 'no_path'.
Returns
-------
None.
'''
# send_user = 'it_ainvest@zohomail.cn' # zoho
# send_pwd = '2022@AIWANTE' # 密码
# receive_user = receive_users
# # receive_user = ['1055017575@qq.com','yi.zhou@ainvest.io','2293908092@qq.com']
#
# msg = MIMEMultipart()
# msg['subject'] = subject
# msg['from'] = send_user
# msg['to'] = ','.join(receive_user)
# msg.attach(MIMEText(message, 'html', 'utf-8'))
#
# if file_path != 'no_path':
# part = MIMEApplication(open(file_path, 'rb').read())
# file_name = file_path.split('/')[-1]
# part.add_header('Content-Disposition', 'attachment', filename=file_name)
# msg.attach(part)
# else:
# pass
#
# server = smtplib.SMTP_SSL("smtp.zoho.com.cn", 465) # smtp.163.com
# server.login(send_user, send_pwd)
# server.sendmail(send_user, receive_user, msg.as_string())
# server.quit()
# print('邮件发送成功')
send_user = 'ainvest_family@ainvest.io' # zoho
send_pwd = '2016@aiwante' # 密码
receive_user = receive_users
# receive_user = ['1055017575@qq.com','yi.zhou@ainvest.io','2293908092@qq.com']
msg = MIMEMultipart()
msg['subject'] = subject
msg['from'] = send_user
msg['to'] = ','.join(receive_user)
msg.attach(MIMEText(message, 'html', 'utf-8'))
if file_path != 'no_path':
part = MIMEApplication(open(file_path, 'rb').read())
file_name = file_path.split('/')[-1]
part.add_header('Content-Disposition', 'attachment', filename=file_name)
msg.attach(part)
else:
pass
server = smtplib.SMTP_SSL("smtp.zoho.com", 465) # smtp.163.com
server.login(send_user, send_pwd)
server.sendmail(send_user, receive_user, msg.as_string())
server.quit()
print('邮件发送成功')
def trade_instruction_monitor(last_order: int = 0):
'''
交易监控
Returns
-------
None.
'''
engine_user_center = create_engine(
'mysql+pymysql://cn_ainvest_db:cn_ainvest_sd3a1@rm-2zewagytttzk6f24xno.'
'mysql.rds.aliyuncs.com:3306/user_center', encoding="utf-8", echo=False)
if last_order == 0:
last_order_id = str(
int(pd.read_sql(f'select max(ID_Order) from trade_instruction', engine_user_center).values.tolist()[0][0]))
elif last_order == 1:
last_order_id = '-1'
else:
last_order_id = last_order
while True:
order_id = str(
int(pd.read_sql(f'select max(ID_Order) from trade_instruction', engine_user_center).values.tolist()[0][0]))
if order_id == last_order_id:
time.sleep(1)
pass
elif int(order_id) - int(last_order_id) > 1:
order_id = str(int(last_order_id)+1)
ip = get_host_ip()
# message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' '+ str(ip) +' ' + order_id + ' begin trade'
# # send_email('自动交易提醒', message)
time.sleep(1)
op = str(pd.read_sql(f'select Trade_Type from trade_instruction where ID_Order="{order_id}"', engine_user_center).values.tolist()[0][0])
userlist = get_userlist(order_id)
logger.info(userlist)
logger.info('----开始自动交易----')
trade_main = auto_trade(userlist,op,order_id)
trade_main.trades()
logger.info('----结束自动交易')
last_order_id = order_id
elif int(order_id) - int(last_order_id) == 1:
ip = get_host_ip()
message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + str(
ip) + ' ' + order_id + ' begin trade'
# send_email('自动交易提醒', message)
time.sleep(1)
op = str(pd.read_sql(f'select Trade_Type from trade_instruction where ID_Order="{order_id}"',
engine_user_center).values.tolist()[0][0])
userlist = get_userlist(order_id)
print(userlist)
trade_main = auto_trade(userlist, op,order_id)
trade_main.trades()
last_order_id = order_id
else:
time.sleep(0.5)
pass
if __name__ == '__main__':
# call_user()
# 测试
setup_logging()
# 获取当前模块的logger
logger = logging.getLogger(__name__)
logger.info("程序启动")
reset_proxy_to_default()
if not connect_wifi('ZTE_A5DA76','1234567890'): # "4G-UFI-18C8", "1234567890" 'Redmi K40','123456789'
logger.info("无法建立网络连接,请检查配置")
else:
logger.info("网络已就绪")
logger.info('初始化全局代理成功')
trade_instruction_monitor(0)
# trade_instruction_monitor(115138)