zhitou_trade/autobasic/jy_clienttrader.py
2025-05-22 16:47:45 +08:00

637 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Created on Fri Aug 5 15:29:12 2022
@author: MINGL
"""
import pandas as pd
import pywinauto
from PIL import ImageGrab
from pywinauto import clipboard
from pywinauto import keyboard
from skimage import io
import time
import io as mio
import pytesseract
import warnings
import datetime
from tools import *
from connect_wifi import connect_wifi
from iptest import get_proxy_ip,set_global_proxy,get_ip_data
warnings.filterwarnings('ignore')
scripts_path = os.path.dirname(os.path.realpath(__file__))
root_path = scripts_path[:scripts_path.find(project_name)+len(project_name)]
path = r'C:\trade_software1\trade_software\金元证券同花顺网上交易新一代\xiadan.exe'
class JYClienttrader:
"""
基于同花顺委托下单程序的"长城证券客户端"自动交易程序
"""
def __init__(self, account_name, exe_path=path):
self.path = exe_path
self.account_name = account_name
self.log = Logger(f'{root_path}/logs',self.account_name)
# 用户券商信息
sql = f"select * from ainvest_usercount where username='{account_name}'"
df_count = download_data_from_db(sql, 'ai_strategy_update_iddb')
self.securities_name = str(df_count['securities_username'].tolist()[0])
self.securities_password = str(df_count['securities_password'].tolist()[0])
self.communication_password = str(df_count['communication_password'].tolist()[0])
def exit(self):
'''
结束进程
Returns
-------
None.
'''
self._app.kill()
def _close_prompt_windows(self):
time.sleep(0.1)
for window in self._app.windows(class_name="#32770"):
if window.window_text() != '网上股票交易系统*':
window.close()
time.sleep(0.1)
def login(self):
'''
启动软件并登录
Returns
-------
None.
'''
#软件启动判断,若有进行中进程,先关闭再重启
try:
self._app = pywinauto.Application().connect(
path=self.path, timeout=1
)
self.exit()
self.log.warn('There are running programs, and there are processes in retreat')
except Exception:
pass
#启动软件进程,输入账号密码
self.log.info('Start login')
start_time = time.time()
reset_proxy_to_default() # 初始化proxy信息
# 重置mac
set_mac = SetMac()
set_mac.run()
if not connect_wifi('ZTE_A5DB9D', '1234567890'): # 'Redmi K40','123456789'
self.log.error("无法建立网络连接,请检查配置")
else:
self.log.info("网络已就绪")
get_ip_times = 0
writer = ExcelDataWriter() # 初始化ip_records 表格
used_ip = writer.get_unavailable_ips(broker, account_name)
while get_ip_times < 3:
try:
item = get_ip_data() # 获取动态IP
if item['ip'] in used_ip:
get_ip_times += 1
else:
self.log.info(f'代理IP已经获取{item}')
break
except Exception as e:
self.log.error(f'获取IP失败请检查网络连接{e}')
time.sleep(5)
# raise ConnectionError(f"获取IP失败请检查网络连接。错误详情: {e}")
# 设置全局代理
proxy_ip = item['ip']
proxy_port = item['port']
set_global_proxy(proxy_ip, proxy_port, enable=True)
# exit_ip = get_proxy_ip(proxy_ip,proxy_port)
time_with_change_proxy = time.time() - start_time
self.log.info('全局代理设置成功')
insert_data = {'as_of_date': datetime.datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S.%f'),
'broker': broker,
'account': account_name,
'proxy_ip': proxy_ip,
'proxy_port': proxy_port,
'ip_location': item['prov'] + ':' + item['city'],
# 'exit_ip': exit_ip,
'ip_cross_check_result': '一致',
'ip_switch_time': time_with_change_proxy,
# 'ip_expire_time':item['expire']
'ip_survival_time': '3min'
}
# 写入数据表
writer = ExcelDataWriter() # 初始化ip_records 表格
writer.write_data(insert_data)
self._app = pywinauto.Application().start(self.path)
while True:
try:
tryrun(self._app.window(title_re='用户登录.*').wait("ready"))
break
except RuntimeError:
pass
time.sleep(0.5)
self._app.window(title_re='用户登录.*').Edit1.type_keys(self.securities_name)
time.sleep(0.1)
self._app.window(title_re='用户登录.*').Edit2.type_keys(self.securities_password)
time.sleep(0.1)
# 验证码登录
while True:
# 获取id_code
id_code = self.verify_code(0x5DB)
time.sleep(0.2)
self._app.window(title_re='用户登录.*').Edit3.type_keys('{BACKSPACE}' * 6)
time.sleep(0.2)
self._app.window(title_re='用户登录.*').Edit3.type_keys(id_code)
time.sleep(0.2)
pywinauto.keyboard.send_keys("{ENTER}")
time.sleep(0.5)
try:
self._app.top_window().set_focus()
result_idcode = self._app.top_window().window(control_id=0x3EC, class_name='Static').window_text()
self.log.softInfo(result_idcode)
if '验证码错误' in result_idcode:
self.log.warn('Verification code error')
self._app.top_window().window(control_id=0x2, class_name="Button").click()
time.sleep(0.5)
else:
break
except Exception:
break
time.sleep(2)
#关闭弹窗
# for i in range(3):
# self._turn_off_advertising()
self._close_prompt_windows()
#设置主窗口及菜单窗口
self.main_wnd = self._app.window(title='网上股票交易系统5.0')
self.left_wnd = self.main_wnd.window(class_name='AfxWnd42s', control_id=0xC8,found_index=0).window(class_name='SysTreeView32',control_id=0x81)
# print(self.left_wnd.print_items())
tryrun(self.left_wnd.wait("ready"))
def _turn_off_advertising(self):
'''
关闭弹窗
Returns
-------
None.
'''
try:
self._app.top_window().set_focus()
time.sleep(0.5)
pywinauto.keyboard.send_keys("{ESC}")
pywinauto.keyboard.send_keys("{ENTER}")
pywinauto.keyboard.send_keys("{ESC}")
except Exception as e:
pass
def verify_code(self,control_id):
'''
获取登录验证码
Returns
-------
id_code : TYPE
DESCRIPTION.
'''
time.sleep(0.5)
self._app.top_window().Static.click()
time.sleep(0.2)
self._app.top_window().window(control_id=control_id, class_name='Static').capture_as_image().save(
f'{root_path}/imgs/temp.png')
time.sleep(0.2)
im = io.imread(f'{root_path}/imgs/temp.png')
id_code = pytesseract.image_to_string(im, lang='eng',
config='--psm 6 --oem 3 -c tessedit_char_whitelist=0123456789').strip()
id_code = id_code.replace(' ', '')
return id_code
def verify_code_position(self,control_id):
'''
获取登录验证码
Returns
-------
id_code : TYPE
DESCRIPTION.
'''
time.sleep(0.5)
self._app.top_window().Static.click()
time.sleep(0.2)
# print(self._app.top_window().children())
# self._app.top_window().window(control_id=control_id, class_name='Static').print_control_identifiers()
# print(self._app.top_window().window(control_id=control_id, class_name='Static'))
data = \
GetControlCoord(self._app.top_window().window(control_id=control_id, class_name='Static')).get_coord('Static')[
0]
x_1 = int(data[0])
x_2 = int(data[2]) + 12
y_1 = int(data[1])
y_2 = int(data[3])
ImageGrab.grab(bbox=(x_1, y_1, x_2, y_2)).save(f'{root_path}/imgs/temp.png')
im = io.imread(f'{root_path}/imgs/temp.png')
id_code = pytesseract.image_to_string(im, lang='eng',
config='--psm 6 --oem 3 -c tessedit_char_whitelist=0123456789').strip()
id_code = id_code.replace(' ', '')
return id_code
def buy(self, stock_no, price, num):
'''
买入股票
Parameters
----------
stock_no : TYPE
DESCRIPTION.
price : TYPE
DESCRIPTION.
num : TYPE
DESCRIPTION.
Returns
-------
None.
'''
self.left_wnd.select(['买入[F1]'])
self.__trade(stock_no, price, num)
def sell(self, stock_no, price, num):
'''
卖出股票
Parameters
----------
stock_no : TYPE
DESCRIPTION.
price : TYPE
DESCRIPTION.
num : TYPE
DESCRIPTION.
Returns
-------
None.
'''
self.left_wnd.select(['卖出[F2]'])
self.__trade(stock_no, price, num)
def get_cancel_entrust(self, ticker):
'''
按股票撤单
Parameters
----------
ticker : TYPE
DESCRIPTION.
Returns
-------
None.
'''
self.left_wnd.select(['撤单[F3]'])
time.sleep(1)
# 删除框内ticker
self.main_wnd.window(control_id=0xD14, class_name='Edit').click()
for i in range(7):
pywinauto.keyboard.send_keys("{DELETE}")
# 输入ticker
self.main_wnd.window(control_id=0xD14, class_name='Edit').type_keys(ticker)
time.sleep(0.2)
# 选中ticker
pywinauto.keyboard.send_keys("{ENTER}")
time.sleep(0.1)
# 点击撤单
self.main_wnd.window(control_id=0x44B, class_name='Button').click()
time.sleep(0.1)
# 确认撤单
pywinauto.keyboard.send_keys("{ENTER}")
time.sleep(0.2)
#获取弹窗信息
result = self._app.top_window().window(control_id=0x3EC, class_name='Static').window_text()
self.log.info(result)
pywinauto.keyboard.send_keys("{ENTER}")
time.sleep(0.2)
pywinauto.keyboard.send_keys("{ESC}")
def cancel_entrust(self):
'''
全撤
Returns
-------
None.
'''
# # 进入撤单页面
self.left_wnd.select(['撤单[F3]'])
time.sleep(1)
# 一键全撤
self.main_wnd.window(control_id=0x7531, class_name='Button').click()
self._app.top_window().set_focus()
self.main_wnd.window(control_id=0x44B, class_name='Button').click()
time.sleep(1)
pywinauto.keyboard.send_keys("{ENTER}")
time.sleep(0.5)
pywinauto.keyboard.send_keys("{ENTER}")
def get_unsettled_trades(self):
'''
获取未成交单
Returns
-------
one_df : TYPE
DESCRIPTION.
'''
self.left_wnd.select(['撤单[F3]'])
self._copy_data()
one_df =self._data_to_df()
if len(one_df) == 0:
pass
else:
one_df['证券代码'] = one_df['证券代码'].apply(lambda x: tranTicker(x.strip('=').strip('"')))
return one_df
def get_balance(self):
'''
获取资金情况
Returns
-------
record : TYPE
DESCRIPTION.
'''
self.left_wnd.select(['查询[F4]','资金股票'])
self.main_wnd.window(control_id=0x3f7, class_name="Static").click()
balance_dict_id = {
"总资产": 0x3f7,
"可用金额": 0x3f8,
"股票市值": 0x3f6,
}
result = {}
for key, control_id in balance_dict_id.items():
result[key] = float(self.main_wnd.window(control_id=control_id, class_name="Static").window_text())
record = pd.DataFrame.from_dict(result, orient='index').T
record = record[['可用金额', '股票市值', '总资产']]
record.rename(columns={'可用金额': 'cash', '股票市值': 'Position_amount', '总资产': 'Total_account'}, inplace=True)
record['As_Of_Date'] = datetime.date.today()
record['Account_Name'] = self.account_name
return record
def get_position(self):
'''
获取持仓情况
Returns
-------
df_position : TYPE
DESCRIPTION.
'''
self.left_wnd.select(['查询[F4]','资金股票'])
# 复制表格数据
time.sleep(1)
self.main_wnd.window(control_id=0x417, class_name="CVirtualGridCtrl").click()
self._copy_data()
df_position = self._data_to_df()
if len(df_position) == 0:
self.log.info('--------------------------------------无当日持仓--------------------------------------')
return pd.DataFrame()
df_position['证券代码'] = df_position['证券代码'].apply(lambda x: tranTicker(str(x).replace('="', '').replace('"', '')))
df_position = df_position[['证券代码','证券名称','股票余额','可用余额','成本价','市价','市值','浮动盈亏','盈亏比例(%)']]
df_position.rename(columns={"证券代码": "Ticker",
"证券名称": "Ticker_name",
"股票余额": "Number_transactions",
"可用余额": "cash",
"成本价": "cost_price",
"市价": "market_price",
"市值": "market_value",
"浮动盈亏": "profit_and_loss",
"盈亏比例(%)": "profit_loss_ratio",
}, inplace=True)
df_position['frozen_quantity'] = '0'
df_position['As_Of_Date'] = datetime.date.today()
df_position['Account_Name'] = self.account_name
return df_position
def get_order(self):
'''
获取委托情况
Returns
-------
TYPE
DESCRIPTION.
'''
self.left_wnd.select(['查询[F4]','当日委托'])
self._copy_data()
return self._data_to_df()
def get_today_trades(self):
'''
获取当日成交
Returns
-------
TYPE
DESCRIPTION.
'''
self.left_wnd.select(['查询[F4]','当日成交'])
time.sleep(1)
self.main_wnd.window(control_id=0x417, class_name="CVirtualGridCtrl").click()
self._copy_data()
df = self._data_to_df()
if len(df) == 0:
self.log.info('--------------------------------------无当日成交--------------------------------------')
return pd.DataFrame()
else:
df = df[['成交时间','证券代码','证券名称','操作','成交数量','成交均价']]
df['Account_Name'] = self.account_name
df['As_Of_Date'] = datetime.date.today()
df.rename(columns={'成交时间': 'Trade_time', '证券代码': 'Ticker', '证券名称': 'Ticker_name', '操作': 'Operate',
'成交数量': 'Number_transactions',
'成交均价': 'Average_price'}, inplace=True)
df['Operate'] = df['Operate'].apply(lambda x: '' if '' in x else '')
df['Ticker'] = df['Ticker'].apply(lambda x: tranTicker(str(x).replace('="', '').replace('"', '')))
return df
def __trade(self, stock_no, price, amount):
'''
交易输入函数
Parameters
----------
stock_no : TYPE
DESCRIPTION.
price : TYPE
DESCRIPTION.
amount : TYPE
DESCRIPTION.
Returns
-------
None.
'''
time.sleep(0.2)
# 设置股票代码
self.main_wnd.window(control_id=0x408, class_name="Edit").click()
self.main_wnd.window(control_id=0x408, class_name="Edit").set_text(str(stock_no))
time.sleep(0.5)
# 设置价格
self.main_wnd.window(control_id=0x409, class_name="Edit").set_text(str(price))
time.sleep(0.5)
# 设置股数目
self.main_wnd.window(control_id=0x40A, class_name="Edit").set_text(str(amount))
time.sleep(0.5)
# 点击卖出or买入
self.main_wnd.window(control_id=0x3EE, class_name="Button").click()
time.sleep(1)
#获取弹窗返回的信息
self._app.top_window().set_focus()
one_text = self._app.top_window().window(control_id=0x410, class_name='Static').window_text()
if '委托价格的小数部分应该为2位' in one_text:
# 点击“否”
self.log.warn(one_text.replace('/n',' '))
self._app.top_window().window(control_id=0x7, class_name='Button').click()
# 保留两位小数截断
price = str(price)[:-1]
# 重新设置价格
self.main_wnd.window(control_id=0x409, class_name="Edit").set_text(str(price))
time.sleep(0.5)
# 点击卖出or买入
self.main_wnd.window(control_id=0x3EE, class_name="Button").click()
time.sleep(1)
self._app.top_window().set_focus()
one_text = self._app.top_window().window(control_id=0x410, class_name='Static').window_text()
else:
# 摘取合同信息
self.log.info(one_text.replace('/n',' '))
time.sleep(0.2)
# 确认买入
pywinauto.keyboard.send_keys("{ENTER}")
time.sleep(0.5)
# 摘取合同信息2
one_text2 = self._app.top_window().window(control_id=0x3EC, class_name='Static').window_text()
self.log.info(one_text2.replace('/n', ' '))
# 确认合同
pywinauto.keyboard.send_keys("{ENTER}")
time.sleep(0.5)
def _copy_data(self):
'''
拷贝数据函数
Returns
-------
None.
'''
while True:
keyboard.send_keys('^C')
# 获取验证码
id_code = self.verify_code_position(0x965)
time.sleep(0.2)
self._app.top_window().Edit3.type_keys('{BACKSPACE}' * 6)
time.sleep(0.2)
self._app.top_window().Edit3.type_keys(id_code)
time.sleep(0.2)
pywinauto.keyboard.send_keys("{ENTER}")
time.sleep(0.5)
try:
self._app.top_window().set_focus()
result_idcode = self._app.top_window().window(control_id=0x966, class_name='Static').window_text()
self.log.softInfo(result_idcode)
if '验证码错误' in result_idcode:
self.log.warn('Verification code error')
self._app.top_window().window(control_id=0x2, class_name="Button").click()
time.sleep(0.5)
else:
break
except Exception:
break
def _data_to_df(self):
'''
剪切板转dataframe
Returns
-------
TYPE
DESCRIPTION.
'''
data_table = clipboard.GetData()
df_table = pd.read_csv(mio.StringIO(data_table), delimiter='\t', na_filter=False)
table_dict = df_table.to_dict('records')
if len(table_dict) == 0:
return pd.DataFrame()
else:
data = []
for i in table_dict:
data.append(pd.DataFrame.from_dict(i, orient='index').T)
return pd.concat(data, ignore_index=True)
if __name__ == '__main__':
account_name = str('15166056356')
a = JYClienttrader(account_name)
a.login()
# 买入
# a.buy('600000', '7','100')
# 卖出
# a.sell('600835', '12.45','900')
# 获取当日实际持仓
# print(a.get_position())
# 获取资金情况
# print(a.get_balance())
# 获取当日成交
# print(a.get_today_trades())