import pandas as pd import numpy as np from datetime import datetime from datetime import timedelta import pymysql from sqlalchemy import create_engine from dateutil.relativedelta import relativedelta import logging import time import os total_start = time.time() # 在文件开头添加日志配置 def setup_logging(): log_dir = os.path.join(os.path.dirname(__file__), 'logs') os.makedirs(log_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(log_dir, 'api.log')), logging.StreamHandler() ] ) setup_logging() logger = logging.getLogger(__name__) # 在API.py开头添加 import sys # 替换原来的get_user_date函数 def get_user_date(): if len(sys.argv) > 1: datetime_input = sys.argv[1] try: return datetime.strptime(datetime_input, '%Y-%m-%d %H:%M:%S') except ValueError: print("日期格式不正确,请使用YYYY-MM-DD HH:MM:SS格式", file=sys.stderr) sys.exit(1) else: print("没有传入日期时间参数", file=sys.stderr) sys.exit(1) # 导入自定义卦象计算模块 from guaCalc_huangjijingshi import guaCalc_huangjijingshi from luckCalc_huangjijingshi import luckCalc_huangjijingshi # ========== 数据库连接配置 ========== # 你的账号、密码、主机、端口 username = 'cn_ainvest_db' password = 'cn_ainvest_sd3a1' host = 'rm-2zewagytttzk6f24xno.mysql.rds.aliyuncs.com' port = 3306 database = 'ai_strategy' # 这里改成你要的数据库 # 创建 SQLAlchemy engine engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}') # ========== 文件路径配置 ========== configPath = r'C:\AI trading\config\Rey\test_reinforcement' matlabPath = r'D:\Dropbox\Matlab\Rey\MATLAB' # 修改为 Excel 文件路径 tempPath = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\皇极经世.xlsx' # ========== 加载 64 卦映射表 ========== Map64Gua = pd.read_excel(tempPath, sheet_name="bagua") # 转换爻数据为整数 for col in ['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']: Map64Gua[col] = Map64Gua[col].apply(lambda x: int(x) if str(x).isdigit() else x) Map64GuaOmit = Map64Gua[Map64Gua['change_omit'] == 0] # ========== 从数据库加载 24 节气数据 ========== def load_solar_terms(conn_params): """通过 SQLAlchemy 连接读取 solar_terms 表""" sqlquery = "SELECT * FROM solar_terms WHERE As_Of_Date > '1744-01-01'" df = pd.read_sql(sqlquery, engine) # 尝试将 As_Of_Date 转为 datetime,如果失败就变成 NaT df['As_Of_Date'] = pd.to_datetime(df['As_Of_Date'], errors='coerce') # 过滤掉早于 1900-01-01 或转换失败的日期 df = df[df['As_Of_Date'] >= pd.Timestamp('1780-01-01')] df = df.dropna(subset=['As_Of_Date']) # df['As_Of_Date'] = df['As_Of_Date'] df = df.sort_values('As_Of_Date') return df # ========== 用户输入日期 ========== # def get_user_date(): # if len(sys.argv) > 1: # 命令行参数方式 # date_input = sys.argv[1] # else: # 标准输入方式 # date_input = sys.stdin.read().strip() # try: # year, month, day = map(int, date_input.split('-')) # return datetime(year, month, day, 0, 0, 0) # except ValueError: # print("日期格式不正确,请使用YYYY-MM-DD格式", file=sys.stderr) # sys.exit(1) # 获取用户输入 def get_user_date(): # date_input = sys.stdin.read().strip() if len(sys.argv) == 1 else sys.argv[1] if len(sys.argv) > 1: datetime_input = sys.argv[1] else: print("没有传入日期时间参数", file=sys.stderr) sys.exit(1) try: # 解析包含时间的日期 return datetime.strptime(datetime_input, '%Y-%m-%d %H:%M:%S') # # 确保日期格式为 YYYY-MM-DD # return datetime.strptime(date_input, '%Y-%m-%d') except ValueError: print("日期格式不正确,请使用YYYY-MM-DD格式", file=sys.stderr) sys.exit(1) # 加载数据并标记重要节气 load_start = time.time() solar_terms = load_solar_terms(engine) logger.info(f"节气加载完成,耗时: {time.time() - load_start:.2f}秒") important_terms = ['冬至', '雨水', '谷雨', '夏至', '处暑', '霜降'] solar_terms['isImportant'] = solar_terms['Solar_Terms'].isin(important_terms).astype(int) Map24Jieqi = solar_terms.copy() # ========== 读取CSV文件并处理每个日期的卦象 ========== read_start = time.time() csv_path = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\python1\2020 - 2030年每天卦.csv' table = pd.read_csv(csv_path) logger.info(f"读取csv文件完成,耗时: {time.time() - read_start:.2f}秒") try: piwen_path = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\python1\批文.xlsx' piwen_data = pd.read_excel(piwen_path) # 打印列名用于调试 print("批文文件列名:", piwen_data.columns.tolist(), file=sys.stderr) # 检查必要的列是否存在 if not all(col in piwen_data.columns for col in ['卦名', '命卦批文', '八个字基本盘']): print("批文文件缺少必要列,请检查Excel文件列名", file=sys.stderr) piwen_data = pd.DataFrame(columns=['卦名', '命卦批文', '八个字基本盘']) except Exception as e: print(f"读取批文文件失败: {str(e)}", file=sys.stderr) piwen_data = pd.DataFrame(columns=['卦名', '命卦批文', '八个字基本盘']) def get_piwen_info(trigram_name): """根据卦名获取批文和基本盘信息""" if not isinstance(trigram_name, str): trigram_name = str(trigram_name) try: match = piwen_data[piwen_data['卦名'] == trigram_name] if not match.empty: return { 'piwen': match.iloc[0]['命卦批文'], 'basic_info': match.iloc[0]['八个字基本盘'] } else: print(f"未找到卦名 {trigram_name} 的批文信息", file=sys.stderr) return {'piwen': '暂无批文信息', 'basic_info': '暂无基本盘信息'} except Exception as e: print(f"获取批文信息出错: {str(e)}", file=sys.stderr) return {'piwen': '暂无批文信息', 'basic_info': '暂无基本盘信息'} # 获取用户输入 kDate = get_user_date() #print("\n计算日期:", kDate.strftime('%Y-%m-%d')) # 计算卦象 Gua1Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck = guaCalc_huangjijingshi( Map64Gua, Map24Jieqi, kDate ) # print("日卦:", Gua1Day) import json # 导入 JSON 库 # ========== 计算并返回 JSON 格式的结果 ========== # ========== 计算并返回 JSON 格式的结果 ========== if __name__ == "__main__": print("=== 调试开始 ===", file=sys.stderr) # 打印到 stderr 不会干扰 stdout 的 JSON try: input_start = time.time() kDate = get_user_date() logger.info(f"获取用户输入完成,耗时: {time.time() - input_start:.2f}秒") # 计算卦象 calc_start = time.time() Gua1Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck = guaCalc_huangjijingshi( Map64Gua, Map24Jieqi, kDate ) logger.info(f"卦象计算完成,耗时: {time.time() - calc_start:.2f}秒") # # 计算10年前(当前日期减10年) # Yearpre10 = kDate.replace(year=kDate.year - 10) # # 计算10年后(当前日期加10年) # Yearpast10 = kDate.replace(year=kDate.year + 10) # 或者使用relativedelta(更精确处理闰年等情况) # 计算年份卦象 year_start = time.time() Yearpre10 = datetime(2010, 1, 1) Yearpast10 = datetime(2030, 1, 1,) # 生成日期范围(修正end参数) date_range = pd.date_range( start=Yearpre10.replace(month=1, day=1), # 确保从1月1日开始 end=Yearpast10.replace(month=1, day=1), # 确保到1月1日结束 freq='YS' # 每年第一天 ) #date_range = pd.date_range(start=Yearpre10, end=Yearpast10, freq='YS') # 每年第一天 year_gua_list = [] for date in date_range: _, _, _, Gua1Year, _ = guaCalc_huangjijingshi(Map64Gua, Map24Jieqi, date) year_gua_list.append({ 'Year': date.year, 'Trigram': Gua1Year.trigram if hasattr(Gua1Year, 'trigram') else None, }) yearGuaMap = pd.DataFrame(year_gua_list) logger.info(f"年份卦象计算完成,耗时: {time.time() - year_start:.2f}秒") #========== 计算年份吉凶 ========== # 计算年份吉凶 luck_start = time.time() LuckYear = luckCalc_huangjijingshi(Map64Gua, yearGuaMap, GuaLuck) logger.info(f"吉凶计算完成,耗时: {time.time() - luck_start:.2f}秒") # # 检查 Gua1Day 的类型并正确处理 # if isinstance(Gua1Day, pd.DataFrame): # # 如果是 DataFrame,提取第一行 # day_data = Gua1Day.iloc[0] # elif isinstance(Gua1Day, pd.Series): # # 如果是 Series,直接使用 # day_data = Gua1Day # elif isinstance(Gua1Day, dict): # # 如果是字典,直接使用 # day_data = Gua1Day # else: # raise ValueError("Gua1Day 的类型不支持,必须是 DataFrame、Series 或 dict") def format_gua_data(gua_data): """通用格式化卦象数据的函数""" if isinstance(gua_data, pd.DataFrame): # DataFrame类型 - 取第一行转为字典 data = gua_data.iloc[0].to_dict() elif isinstance(gua_data, pd.Series): # Series类型 - 直接转为字典 data = gua_data.to_dict() elif isinstance(gua_data, dict): # 已经是字典类型 data = gua_data else: # 未知类型返回空字典 return {} # 统一处理字典数据 return { 'id': int(data.get('id', 0)), 'trigram': str(data.get('trigram', '')), 'yaoAll': str(data.get('yaoAll', '')), 'yao1': int(data.get('yao1', 0)), 'yao2': int(data.get('yao2', 0)), 'yao3': int(data.get('yao3', 0)), 'yao4': int(data.get('yao4', 0)), 'yao5': int(data.get('yao5', 0)), 'yao6': int(data.get('yao6', 0)), 'value_2binary': int(data.get('value_2binary', 0)), 'change_omit': int(data.get('change_omit', 0)), 'type': str(data.get('Type', 'unknown')) } # 构建结果字典,确保所有值是 Python 原生类型 # result = { # 'date': kDate.strftime('%Y-%m-%d'), # 'day_gua': { # 'id': int(day_data.get('id', 0)), # 如果没有值,使用默认值 0 # 'trigram': str(day_data.get('trigram', '')), # 如果没有值,使用空字符串 # 'yaoAll': str(day_data.get('yaoAll', '')), # 如果没有值,使用空字符串 # 'yao1': int(day_data.get('yao1', 0)), # 如果没有值,使用默认值 0 # 'yao2': int(day_data.get('yao2', 0)), # 如果没有值,使用默认值 0 # 'yao3': int(day_data.get('yao3', 0)), # 如果没有值,使用默认值 0 # 'yao4': int(day_data.get('yao4', 0)), # 如果没有值,使用默认值 0 # 'yao5': int(day_data.get('yao5', 0)), # 如果没有值,使用默认值 0 # 'yao6': int(day_data.get('yao6', 0)), # 如果没有值,使用默认值 0 # 'value_2binary': int(day_data.get('value_2binary', 0)), # 如果没有值,使用默认值 0 # 'change_omit': int(day_data.get('change_omit', 0)), # 如果没有值,使用默认值 0 # 'type': str(day_data.get('Type', 'day')) # 如果没有值,使用默认值 'day' # } # } # 构建完整结果 build_start = time.time() result = { 'date': kDate.strftime('%Y-%m-%d'), 'year_gua': { **format_gua_data(Gua1Year), **get_piwen_info(format_gua_data(Gua1Year).get('trigram', '')) }, 'month_gua': { **format_gua_data(Gua1Month), **get_piwen_info(format_gua_data(Gua1Month).get('trigram', '')) }, 'day_gua': { **format_gua_data(Gua1Day), **get_piwen_info(format_gua_data(Gua1Day).get('trigram', '')) }, 'hour_gua': { **format_gua_data(Gua1Hour), **get_piwen_info(format_gua_data(Gua1Hour).get('trigram', '')) }, 'luck_gua': { **format_gua_data(GuaLuck), **get_piwen_info(format_gua_data(GuaLuck).get('trigram', '')) }, 'luck_years': [ { 'year': row['Year'], 'trigram': row['trigram'], 'yaoAll': row['yaoAll'], **get_piwen_info(row['trigram']) } for _, row in LuckYear.iterrows() ] if isinstance(LuckYear, pd.DataFrame) else [] } logger.info(f"结果构建完成,耗时: {time.time() - build_start:.2f}秒") # 在打印 JSON 前检查内容 print("=== 要输出的 JSON 内容 ===", file=sys.stderr) print(result, file=sys.stderr) # 输出 JSON 格式的结果 # print(json.dumps(result, ensure_ascii=False, indent=4)) # 添加 indent 参数更好查看格式 # 输出结果 output_start = time.time() print(json.dumps(result, ensure_ascii=False, indent=2)) logger.info(f"结果输出完成,耗时: {time.time() - output_start:.2f}秒") total_time = time.time() - total_start logger.info(f"=== API 执行完成,总耗时: {total_time:.2f}秒 ===") except Exception as e: import traceback traceback.print_exc(file=sys.stderr) # 打印错误堆栈,帮助调试 print(f"计算错误: {str(e)}", file=sys.stderr) sys.exit(1)