2025-05-12 13:50:21 +08:00

340 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
from datetime import datetime
from datetime import timedelta
import pymysql
from sqlalchemy import create_engine
from dateutil.relativedelta import relativedelta
import logging
import time
import os
total_start = time.time()
# 在文件开头添加日志配置
def setup_logging():
log_dir = os.path.join(os.path.dirname(__file__), 'logs')
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, 'api.log')),
logging.StreamHandler()
]
)
setup_logging()
logger = logging.getLogger(__name__)
# 在API.py开头添加
import sys
# 替换原来的get_user_date函数
def get_user_date():
if len(sys.argv) > 1:
datetime_input = sys.argv[1]
try:
return datetime.strptime(datetime_input, '%Y-%m-%d %H:%M:%S')
except ValueError:
print("日期格式不正确请使用YYYY-MM-DD HH:MM:SS格式", file=sys.stderr)
sys.exit(1)
else:
print("没有传入日期时间参数", file=sys.stderr)
sys.exit(1)
# 导入自定义卦象计算模块
from guaCalc_huangjijingshi import guaCalc_huangjijingshi
from luckCalc_huangjijingshi import luckCalc_huangjijingshi
# ========== 数据库连接配置 ==========
# 你的账号、密码、主机、端口
username = 'cn_ainvest_db'
password = 'cn_ainvest_sd3a1'
host = 'rm-2zewagytttzk6f24xno.mysql.rds.aliyuncs.com'
port = 3306
database = 'ai_strategy' # 这里改成你要的数据库
# 创建 SQLAlchemy engine
engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}')
# ========== 文件路径配置 ==========
configPath = r'C:\AI trading\config\Rey\test_reinforcement'
matlabPath = r'D:\Dropbox\Matlab\Rey\MATLAB'
# 修改为 Excel 文件路径
tempPath = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\皇极经世.xlsx'
# ========== 加载 64 卦映射表 ==========
Map64Gua = pd.read_excel(tempPath, sheet_name="bagua")
# 转换爻数据为整数
for col in ['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']:
Map64Gua[col] = Map64Gua[col].apply(lambda x: int(x) if str(x).isdigit() else x)
Map64GuaOmit = Map64Gua[Map64Gua['change_omit'] == 0]
# ========== 从数据库加载 24 节气数据 ==========
def load_solar_terms(conn_params):
"""通过 SQLAlchemy 连接读取 solar_terms 表"""
sqlquery = "SELECT * FROM solar_terms WHERE As_Of_Date > '1744-01-01'"
df = pd.read_sql(sqlquery, engine)
# 尝试将 As_Of_Date 转为 datetime如果失败就变成 NaT
df['As_Of_Date'] = pd.to_datetime(df['As_Of_Date'], errors='coerce')
# 过滤掉早于 1900-01-01 或转换失败的日期
df = df[df['As_Of_Date'] >= pd.Timestamp('1780-01-01')]
df = df.dropna(subset=['As_Of_Date'])
# df['As_Of_Date'] = df['As_Of_Date']
df = df.sort_values('As_Of_Date')
return df
# ========== 用户输入日期 ==========
# def get_user_date():
# if len(sys.argv) > 1: # 命令行参数方式
# date_input = sys.argv[1]
# else: # 标准输入方式
# date_input = sys.stdin.read().strip()
# try:
# year, month, day = map(int, date_input.split('-'))
# return datetime(year, month, day, 0, 0, 0)
# except ValueError:
# print("日期格式不正确请使用YYYY-MM-DD格式", file=sys.stderr)
# sys.exit(1)
# 获取用户输入
def get_user_date():
# date_input = sys.stdin.read().strip() if len(sys.argv) == 1 else sys.argv[1]
if len(sys.argv) > 1:
datetime_input = sys.argv[1]
else:
print("没有传入日期时间参数", file=sys.stderr)
sys.exit(1)
try:
# 解析包含时间的日期
return datetime.strptime(datetime_input, '%Y-%m-%d %H:%M:%S')
# # 确保日期格式为 YYYY-MM-DD
# return datetime.strptime(date_input, '%Y-%m-%d')
except ValueError:
print("日期格式不正确请使用YYYY-MM-DD格式", file=sys.stderr)
sys.exit(1)
# 加载数据并标记重要节气
load_start = time.time()
solar_terms = load_solar_terms(engine)
logger.info(f"节气加载完成,耗时: {time.time() - load_start:.2f}")
important_terms = ['冬至', '雨水', '谷雨', '夏至', '处暑', '霜降']
solar_terms['isImportant'] = solar_terms['Solar_Terms'].isin(important_terms).astype(int)
Map24Jieqi = solar_terms.copy()
# ========== 读取CSV文件并处理每个日期的卦象 ==========
read_start = time.time()
csv_path = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\python1\2020 - 2030年每天卦.csv'
table = pd.read_csv(csv_path)
logger.info(f"读取csv文件完成耗时: {time.time() - read_start:.2f}")
try:
piwen_path = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\python1\批文.xlsx'
piwen_data = pd.read_excel(piwen_path)
# 打印列名用于调试
print("批文文件列名:", piwen_data.columns.tolist(), file=sys.stderr)
# 检查必要的列是否存在
if not all(col in piwen_data.columns for col in ['卦名', '命卦批文', '八个字基本盘']):
print("批文文件缺少必要列请检查Excel文件列名", file=sys.stderr)
piwen_data = pd.DataFrame(columns=['卦名', '命卦批文', '八个字基本盘'])
except Exception as e:
print(f"读取批文文件失败: {str(e)}", file=sys.stderr)
piwen_data = pd.DataFrame(columns=['卦名', '命卦批文', '八个字基本盘'])
def get_piwen_info(trigram_name):
"""根据卦名获取批文和基本盘信息"""
if not isinstance(trigram_name, str):
trigram_name = str(trigram_name)
try:
match = piwen_data[piwen_data['卦名'] == trigram_name]
if not match.empty:
return {
'piwen': match.iloc[0]['命卦批文'],
'basic_info': match.iloc[0]['八个字基本盘']
}
else:
print(f"未找到卦名 {trigram_name} 的批文信息", file=sys.stderr)
return {'piwen': '暂无批文信息', 'basic_info': '暂无基本盘信息'}
except Exception as e:
print(f"获取批文信息出错: {str(e)}", file=sys.stderr)
return {'piwen': '暂无批文信息', 'basic_info': '暂无基本盘信息'}
# 获取用户输入
kDate = get_user_date()
#print("\n计算日期:", kDate.strftime('%Y-%m-%d'))
# 计算卦象
Gua1Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck = guaCalc_huangjijingshi(
Map64Gua, Map24Jieqi, kDate
)
# print("日卦:", Gua1Day)
import json # 导入 JSON 库
# ========== 计算并返回 JSON 格式的结果 ==========
# ========== 计算并返回 JSON 格式的结果 ==========
if __name__ == "__main__":
print("=== 调试开始 ===", file=sys.stderr) # 打印到 stderr 不会干扰 stdout 的 JSON
try:
input_start = time.time()
kDate = get_user_date()
logger.info(f"获取用户输入完成,耗时: {time.time() - input_start:.2f}")
# 计算卦象
calc_start = time.time()
Gua1Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck = guaCalc_huangjijingshi(
Map64Gua, Map24Jieqi, kDate
)
logger.info(f"卦象计算完成,耗时: {time.time() - calc_start:.2f}")
# # 计算10年前当前日期减10年
# Yearpre10 = kDate.replace(year=kDate.year - 10)
# # 计算10年后当前日期加10年
# Yearpast10 = kDate.replace(year=kDate.year + 10)
# 或者使用relativedelta更精确处理闰年等情况
# 计算年份卦象
year_start = time.time()
Yearpre10 = datetime(2010, 1, 1)
Yearpast10 = datetime(2030, 1, 1,)
# 生成日期范围修正end参数
date_range = pd.date_range(
start=Yearpre10.replace(month=1, day=1), # 确保从1月1日开始
end=Yearpast10.replace(month=1, day=1), # 确保到1月1日结束
freq='YS' # 每年第一天
)
#date_range = pd.date_range(start=Yearpre10, end=Yearpast10, freq='YS') # 每年第一天
year_gua_list = []
for date in date_range:
_, _, _, Gua1Year, _ = guaCalc_huangjijingshi(Map64Gua, Map24Jieqi, date)
year_gua_list.append({
'Year': date.year,
'Trigram': Gua1Year.trigram if hasattr(Gua1Year, 'trigram') else None,
})
yearGuaMap = pd.DataFrame(year_gua_list)
logger.info(f"年份卦象计算完成,耗时: {time.time() - year_start:.2f}")
#========== 计算年份吉凶 ==========
# 计算年份吉凶
luck_start = time.time()
LuckYear = luckCalc_huangjijingshi(Map64Gua, yearGuaMap, GuaLuck)
logger.info(f"吉凶计算完成,耗时: {time.time() - luck_start:.2f}")
# # 检查 Gua1Day 的类型并正确处理
# if isinstance(Gua1Day, pd.DataFrame):
# # 如果是 DataFrame提取第一行
# day_data = Gua1Day.iloc[0]
# elif isinstance(Gua1Day, pd.Series):
# # 如果是 Series直接使用
# day_data = Gua1Day
# elif isinstance(Gua1Day, dict):
# # 如果是字典,直接使用
# day_data = Gua1Day
# else:
# raise ValueError("Gua1Day 的类型不支持,必须是 DataFrame、Series 或 dict")
def format_gua_data(gua_data):
"""通用格式化卦象数据的函数"""
if isinstance(gua_data, pd.DataFrame):
# DataFrame类型 - 取第一行转为字典
data = gua_data.iloc[0].to_dict()
elif isinstance(gua_data, pd.Series):
# Series类型 - 直接转为字典
data = gua_data.to_dict()
elif isinstance(gua_data, dict):
# 已经是字典类型
data = gua_data
else:
# 未知类型返回空字典
return {}
# 统一处理字典数据
return {
'id': int(data.get('id', 0)),
'trigram': str(data.get('trigram', '')),
'yaoAll': str(data.get('yaoAll', '')),
'yao1': int(data.get('yao1', 0)),
'yao2': int(data.get('yao2', 0)),
'yao3': int(data.get('yao3', 0)),
'yao4': int(data.get('yao4', 0)),
'yao5': int(data.get('yao5', 0)),
'yao6': int(data.get('yao6', 0)),
'value_2binary': int(data.get('value_2binary', 0)),
'change_omit': int(data.get('change_omit', 0)),
'type': str(data.get('Type', 'unknown'))
}
# 构建结果字典,确保所有值是 Python 原生类型
# result = {
# 'date': kDate.strftime('%Y-%m-%d'),
# 'day_gua': {
# 'id': int(day_data.get('id', 0)), # 如果没有值,使用默认值 0
# 'trigram': str(day_data.get('trigram', '')), # 如果没有值,使用空字符串
# 'yaoAll': str(day_data.get('yaoAll', '')), # 如果没有值,使用空字符串
# 'yao1': int(day_data.get('yao1', 0)), # 如果没有值,使用默认值 0
# 'yao2': int(day_data.get('yao2', 0)), # 如果没有值,使用默认值 0
# 'yao3': int(day_data.get('yao3', 0)), # 如果没有值,使用默认值 0
# 'yao4': int(day_data.get('yao4', 0)), # 如果没有值,使用默认值 0
# 'yao5': int(day_data.get('yao5', 0)), # 如果没有值,使用默认值 0
# 'yao6': int(day_data.get('yao6', 0)), # 如果没有值,使用默认值 0
# 'value_2binary': int(day_data.get('value_2binary', 0)), # 如果没有值,使用默认值 0
# 'change_omit': int(day_data.get('change_omit', 0)), # 如果没有值,使用默认值 0
# 'type': str(day_data.get('Type', 'day')) # 如果没有值,使用默认值 'day'
# }
# }
# 构建完整结果
build_start = time.time()
result = {
'date': kDate.strftime('%Y-%m-%d'),
'year_gua': {
**format_gua_data(Gua1Year),
**get_piwen_info(format_gua_data(Gua1Year).get('trigram', ''))
},
'month_gua': {
**format_gua_data(Gua1Month),
**get_piwen_info(format_gua_data(Gua1Month).get('trigram', ''))
},
'day_gua': {
**format_gua_data(Gua1Day),
**get_piwen_info(format_gua_data(Gua1Day).get('trigram', ''))
},
'hour_gua': {
**format_gua_data(Gua1Hour),
**get_piwen_info(format_gua_data(Gua1Hour).get('trigram', ''))
},
'luck_gua': {
**format_gua_data(GuaLuck),
**get_piwen_info(format_gua_data(GuaLuck).get('trigram', ''))
},
'luck_years': [
{
'year': row['Year'],
'trigram': row['trigram'],
'yaoAll': row['yaoAll'],
**get_piwen_info(row['trigram'])
} for _, row in LuckYear.iterrows()
] if isinstance(LuckYear, pd.DataFrame) else []
}
logger.info(f"结果构建完成,耗时: {time.time() - build_start:.2f}")
# 在打印 JSON 前检查内容
print("=== 要输出的 JSON 内容 ===", file=sys.stderr)
print(result, file=sys.stderr)
# 输出 JSON 格式的结果
# print(json.dumps(result, ensure_ascii=False, indent=4)) # 添加 indent 参数更好查看格式
# 输出结果
output_start = time.time()
print(json.dumps(result, ensure_ascii=False, indent=2))
logger.info(f"结果输出完成,耗时: {time.time() - output_start:.2f}")
total_time = time.time() - total_start
logger.info(f"=== API 执行完成,总耗时: {total_time:.2f}秒 ===")
except Exception as e:
import traceback
traceback.print_exc(file=sys.stderr) # 打印错误堆栈,帮助调试
print(f"计算错误: {str(e)}", file=sys.stderr)
sys.exit(1)