上传文件至 /

This commit is contained in:
wangzhiming 2025-05-06 17:57:02 +08:00
commit 53407664e6
5 changed files with 534 additions and 0 deletions

240
API.py Normal file
View File

@ -0,0 +1,240 @@
import pandas as pd
import numpy as np
from datetime import datetime
from datetime import timedelta
import pymysql
from sqlalchemy import create_engine
from dateutil.relativedelta import relativedelta
# 在API.py开头添加
import sys
# 替换原来的get_user_date函数
def get_user_date():
if len(sys.argv) > 1:
datetime_input = sys.argv[1]
try:
return datetime.strptime(datetime_input, '%Y-%m-%d %H:%M:%S')
except ValueError:
print("日期格式不正确请使用YYYY-MM-DD HH:MM:SS格式", file=sys.stderr)
sys.exit(1)
else:
print("没有传入日期时间参数", file=sys.stderr)
sys.exit(1)
# 导入自定义卦象计算模块
from guaCalc_huangjijingshi import guaCalc_huangjijingshi
from luckCalc_huangjijingshi import luckCalc_huangjijingshi
# ========== 数据库连接配置 ==========
# 你的账号、密码、主机、端口
username = 'cn_ainvest_db'
password = 'cn_ainvest_sd3a1'
host = 'rm-2zewagytttzk6f24xno.mysql.rds.aliyuncs.com'
port = 3306
database = 'ai_strategy' # 这里改成你要的数据库
# 创建 SQLAlchemy engine
engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}')
# ========== 文件路径配置 ==========
configPath = r'C:\AI trading\config\Rey\test_reinforcement'
matlabPath = r'D:\Dropbox\Matlab\Rey\MATLAB'
# 修改为你的 Excel 文件路径
tempPath = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\皇极经世.xlsx'
# ========== 加载 64 卦映射表 ==========
Map64Gua = pd.read_excel(tempPath, sheet_name="bagua")
# 转换爻数据为整数
for col in ['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']:
Map64Gua[col] = Map64Gua[col].apply(lambda x: int(x) if str(x).isdigit() else x)
Map64GuaOmit = Map64Gua[Map64Gua['change_omit'] == 0]
# ========== 从数据库加载 24 节气数据 ==========
def load_solar_terms(conn_params):
"""通过 SQLAlchemy 连接读取 solar_terms 表"""
sqlquery = 'SELECT * FROM solar_terms'
df = pd.read_sql(sqlquery, engine)
# 尝试将 As_Of_Date 转为 datetime如果失败就变成 NaT
df['As_Of_Date'] = pd.to_datetime(df['As_Of_Date'], errors='coerce')
# 过滤掉早于 1900-01-01 或转换失败的日期
df = df[df['As_Of_Date'] >= pd.Timestamp('1900-01-01')]
df = df.dropna(subset=['As_Of_Date'])
# df['As_Of_Date'] = df['As_Of_Date']
df = df.sort_values('As_Of_Date')
return df
# ========== 用户输入日期 ==========
# def get_user_date():
# if len(sys.argv) > 1: # 命令行参数方式
# date_input = sys.argv[1]
# else: # 标准输入方式
# date_input = sys.stdin.read().strip()
# try:
# year, month, day = map(int, date_input.split('-'))
# return datetime(year, month, day, 0, 0, 0)
# except ValueError:
# print("日期格式不正确请使用YYYY-MM-DD格式", file=sys.stderr)
# sys.exit(1)
# 获取用户输入
def get_user_date():
# date_input = sys.stdin.read().strip() if len(sys.argv) == 1 else sys.argv[1]
if len(sys.argv) > 1:
datetime_input = sys.argv[1]
else:
print("没有传入日期时间参数", file=sys.stderr)
sys.exit(1)
try:
# 解析包含时间的日期
return datetime.strptime(datetime_input, '%Y-%m-%d %H:%M:%S')
# # 确保日期格式为 YYYY-MM-DD
# return datetime.strptime(date_input, '%Y-%m-%d')
except ValueError:
print("日期格式不正确请使用YYYY-MM-DD格式", file=sys.stderr)
sys.exit(1)
# 加载数据并标记重要节气
solar_terms = load_solar_terms(engine)
important_terms = ['冬至', '雨水', '谷雨', '夏至', '处暑', '霜降']
solar_terms['isImportant'] = solar_terms['Solar_Terms'].isin(important_terms).astype(int)
Map24Jieqi = solar_terms.copy()
# ========== 读取CSV文件并处理每个日期的卦象 ==========
csv_path = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\python1\2020 - 2030年每天卦.csv'
table = pd.read_csv(csv_path)
# 获取用户输入
kDate = get_user_date()
#print("\n计算日期:", kDate.strftime('%Y-%m-%d'))
# 计算卦象
Gua1Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck = guaCalc_huangjijingshi(
Map64Gua, Map24Jieqi, kDate
)
# print("日卦:", Gua1Day)
import json # 导入 JSON 库
# ========== 计算并返回 JSON 格式的结果 ==========
# ========== 计算并返回 JSON 格式的结果 ==========
if __name__ == "__main__":
print("=== 调试开始 ===", file=sys.stderr) # 打印到 stderr 不会干扰 stdout 的 JSON
kDate = get_user_date()
try:
Gua1Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck = guaCalc_huangjijingshi(
Map64Gua, Map24Jieqi, kDate
)
# # 计算10年前当前日期减10年
# Yearpre10 = kDate.replace(year=kDate.year - 10)
# # 计算10年后当前日期加10年
# Yearpast10 = kDate.replace(year=kDate.year + 10)
# 或者使用relativedelta更精确处理闰年等情况
Yearpre10 = datetime(2010, 1, 1)
Yearpast10 = datetime(2030, 1, 1,)
# 生成日期范围修正end参数
date_range = pd.date_range(
start=Yearpre10.replace(month=1, day=1), # 确保从1月1日开始
end=Yearpast10.replace(month=1, day=1), # 确保到1月1日结束
freq='YS' # 每年第一天
)
#date_range = pd.date_range(start=Yearpre10, end=Yearpast10, freq='YS') # 每年第一天
year_gua_list = []
for date in date_range:
_, _, _, Gua1Year, _ = guaCalc_huangjijingshi(Map64Gua, Map24Jieqi, date)
year_gua_list.append({
'Year': date.year,
'Trigram': Gua1Year.trigram if hasattr(Gua1Year, 'trigram') else None,
})
yearGuaMap = pd.DataFrame(year_gua_list)
#========== 计算年份吉凶 ==========
LuckYear = luckCalc_huangjijingshi(Map64Gua, yearGuaMap, GuaLuck)
# # 检查 Gua1Day 的类型并正确处理
# if isinstance(Gua1Day, pd.DataFrame):
# # 如果是 DataFrame提取第一行
# day_data = Gua1Day.iloc[0]
# elif isinstance(Gua1Day, pd.Series):
# # 如果是 Series直接使用
# day_data = Gua1Day
# elif isinstance(Gua1Day, dict):
# # 如果是字典,直接使用
# day_data = Gua1Day
# else:
# raise ValueError("Gua1Day 的类型不支持,必须是 DataFrame、Series 或 dict")
def format_gua_data(gua_data):
"""通用格式化卦象数据的函数"""
if isinstance(gua_data, (pd.DataFrame, pd.Series)):
data = gua_data.iloc[0] if isinstance(gua_data, pd.DataFrame) else gua_data
return {
'id': int(data.get('id', 0)),
'trigram': str(data.get('trigram', '')),
'yaoAll': str(data.get('yaoAll', '')),
'yao1': int(data.get('yao1', 0)),
'yao2': int(data.get('yao2', 0)),
'yao3': int(data.get('yao3', 0)),
'yao4': int(data.get('yao4', 0)),
'yao5': int(data.get('yao5', 0)),
'yao6': int(data.get('yao6', 0)),
'value_2binary': int(data.get('value_2binary', 0)),
'change_omit': int(data.get('change_omit', 0)),
'type': str(data.get('Type', 'unknown'))
}
elif isinstance(gua_data, dict):
return gua_data
else:
return {}
# 构建结果字典,确保所有值是 Python 原生类型
# result = {
# 'date': kDate.strftime('%Y-%m-%d'),
# 'day_gua': {
# 'id': int(day_data.get('id', 0)), # 如果没有值,使用默认值 0
# 'trigram': str(day_data.get('trigram', '')), # 如果没有值,使用空字符串
# 'yaoAll': str(day_data.get('yaoAll', '')), # 如果没有值,使用空字符串
# 'yao1': int(day_data.get('yao1', 0)), # 如果没有值,使用默认值 0
# 'yao2': int(day_data.get('yao2', 0)), # 如果没有值,使用默认值 0
# 'yao3': int(day_data.get('yao3', 0)), # 如果没有值,使用默认值 0
# 'yao4': int(day_data.get('yao4', 0)), # 如果没有值,使用默认值 0
# 'yao5': int(day_data.get('yao5', 0)), # 如果没有值,使用默认值 0
# 'yao6': int(day_data.get('yao6', 0)), # 如果没有值,使用默认值 0
# 'value_2binary': int(day_data.get('value_2binary', 0)), # 如果没有值,使用默认值 0
# 'change_omit': int(day_data.get('change_omit', 0)), # 如果没有值,使用默认值 0
# 'type': str(day_data.get('Type', 'day')) # 如果没有值,使用默认值 'day'
# }
# }
# 构建完整结果
result = {
'date': kDate.strftime('%Y-%m-%d'),
'year_gua': format_gua_data(Gua1Year),
'month_gua': format_gua_data(Gua1Month),
'day_gua': format_gua_data(Gua1Day),
'hour_gua': format_gua_data(Gua1Hour),
'luck_gua': format_gua_data(GuaLuck),
'luck_years': [
{
'year': row['Year'],
'trigram': row['trigram'],
'yaoAll': row['yaoAll']
} for _, row in LuckYear.iterrows()
] if isinstance(LuckYear, pd.DataFrame) else []
}
# 在打印 JSON 前检查内容
print("=== 要输出的 JSON 内容 ===", file=sys.stderr)
print(result, file=sys.stderr)
# 输出 JSON 格式的结果
# print(json.dumps(result, ensure_ascii=False, indent=4)) # 添加 indent 参数更好查看格式
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as e:
import traceback
traceback.print_exc(file=sys.stderr) # 打印错误堆栈,帮助调试
print(f"计算错误: {str(e)}", file=sys.stderr)
sys.exit(1)

56
bianyao_huangjijingshi.py Normal file
View File

@ -0,0 +1,56 @@
import pandas as pd
def bianyao_huangjijingshi(Map64Gua: pd.DataFrame, yaoOrig: str) -> pd.DataFrame:
"""
皇极经世变爻计算
输入原爻 '000100'输出所有变爻之后的卦象合集
"""
#print("Map64Gua:",Map64Gua['yaoAll'].apply(type).value_counts())
# 找到原始卦所在的行
if 'yaoAll' not in Map64Gua.columns:
raise ValueError("'yaoAll' 列在 Map64Gua 中不存在,请检查数据")
Map64Gua = Map64Gua.copy()
Map64Gua['yaoAll'] = Map64Gua['yaoAll'].astype(str).str.zfill(6).str.strip()
yaoOrig = str(yaoOrig).zfill(6).strip()
# 找到原始卦所在的行
matches = Map64Gua[Map64Gua['yaoAll'] == yaoOrig]
if matches.empty:
raise ValueError(f"未找到 yaoAll 为 {yaoOrig} 的卦象")
k2 = matches.iloc[0] # 取第一行
# 计算原卦的二进制值
a0 = (k2['yao1'] + k2['yao2']*2 + k2['yao3']*4 +
k2['yao4']*8 + k2['yao5']*16 + k2['yao6']*32)
# 依次翻转每一爻,得到新卦的二进制值
a1 = (k2['yao1'] + k2['yao2']*2 + k2['yao3']*4 +
k2['yao4']*8 + k2['yao5']*16 + (1 - k2['yao6'])*32)
a2 = (k2['yao1'] + k2['yao2']*2 + k2['yao3']*4 +
k2['yao4']*8 + (1 - k2['yao5'])*16 + k2['yao6']*32)
a3 = (k2['yao1'] + k2['yao2']*2 + k2['yao3']*4 +
(1 - k2['yao4'])*8 + k2['yao5']*16 + k2['yao6']*32)
a4 = (k2['yao1'] + k2['yao2']*2 + (1 - k2['yao3'])*4 +
k2['yao4']*8 + k2['yao5']*16 + k2['yao6']*32)
a5 = (k2['yao1'] + (1 - k2['yao2'])*2 + k2['yao3']*4 +
k2['yao4']*8 + k2['yao5']*16 + k2['yao6']*32)
a6 = ((1 - k2['yao1']) + k2['yao2']*2 + k2['yao3']*4 +
k2['yao4']*8 + k2['yao5']*16 + k2['yao6']*32)
# 结果列表
result = pd.DataFrame()
# 查找对应的卦象
for a in [a0, a1, a2, a3, a4, a5, a6]:
row = Map64Gua[Map64Gua['value_2binary'] == a]
if not row.empty:
result = pd.concat([result, row], ignore_index=True)
return result

99
guaCalc_huangjijingshi.py Normal file
View File

@ -0,0 +1,99 @@
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def guaCalc_huangjijingshi(Map64Gua, Map24Jieqi, kDate):
"""
Calculate year, month, day, and hour gua based on Huangji Jingshi method
Args:
Map64Gua (pd.DataFrame): 64 gua mapping table
Map24Jieqi (pd.DataFrame): 24 solar terms data
kDate (datetime): Target date for calculation
Returns:
tuple: (Gua4Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck)
"""
# Helper functions that need to be implemented separately
from bianyao_huangjijingshi import bianyao_huangjijingshi
from tuigua_huangjijingshi import tuigua_huangjijingshi
# Bagua for 360 years
yearRef = {'trigram': '', 'value_2binary': '111110', 'start_year': 1744}
Gua360Year = bianyao_huangjijingshi(Map64Gua, yearRef['value_2binary'])
Gua360Year['yearStart'] = [yearRef['start_year'] + i * 60 for i in range(len(Gua360Year))]
Gua360Year['yearEnd'] = [start + 59 for start in Gua360Year['yearStart']]
# Bagua for 60 years
kYear = kDate.year
kYearOrig = kYear
solar_terms = Map24Jieqi
solar_terms1 = solar_terms[solar_terms['Solar_Terms'] == '冬至']
# Find the last winter solstice before our date
k1 = np.where((kDate - solar_terms1['As_Of_Date']) > timedelta(0))[0][-1]
kYear = (solar_terms1['As_Of_Date'].iloc[k1] + timedelta(days=90)).year
# Find which 60-year period we're in
mask = (Gua360Year['yearStart'] <= kYear) & (Gua360Year['yearEnd'] >= kYear)
k1 = np.where(mask)[0][0] # Get first match
a1 = Gua360Year['value_2binary'].iloc[k1]
a2 = kYear - Gua360Year['yearStart'].iloc[k1]
Gua1Year = tuigua_huangjijingshi(Map64Gua, a1, a2 + 1)
Gua1Year['Type'] = 'year'
# print(type(Gua1Year))
# print(Gua1Year)
# Bagua for 360 days (months)
dayRef = {
'trigram': Gua1Year['trigram'],
'yaoAll': Gua1Year['yaoAll'],
'start_day': 1,
'end_day': 360
}
Gua365days = bianyao_huangjijingshi(Map64Gua, Gua1Year['yaoAll'])
#print("Gua365days.columns:",print(Gua365days.columns))
# Find important solar terms
solar_termsSimple = Map24Jieqi[Map24Jieqi['isImportant'] == 1]
# Find the last important solar term before our date
kk = np.where((kDate - solar_termsSimple['As_Of_Date']) > timedelta(0))[0][-1]
# Find winter solstice within these terms
kk2 = np.where(solar_termsSimple['Solar_Terms'].iloc[:kk+1] == '冬至')[0][-1]
a1 = Gua365days['value_2binary'].iloc[kk - kk2 + 1]
Gua1Month = Gua365days.iloc[[kk - kk2 + 1]].copy()
Gua1Month['Type'] = 'month'
# Calculate day gua
kDateMonth = solar_termsSimple['As_Of_Date'].iloc[kk]
kGap = (kDate.date() - kDateMonth.date()).days
a2 = round(kGap)
Gua1Day = tuigua_huangjijingshi(Map64Gua, a1, a2)
Gua1Day['Type'] = 'day'
# Calculate hour gua (4-hour blocks)
Gua24Hours = bianyao_huangjijingshi(Map64Gua, Gua1Day['yaoAll'])
k1 = min(int(np.ceil(kDate.hour / 4)), len(Gua24Hours) - 1)
Gua4Hour = Gua24Hours.iloc[[k1 + 1]].copy()
Gua4Hour['Type'] = 'hour'
# Calculate luck gua
abc = pd.concat([
Gua1Year[['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']],
Gua1Month[['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']],
Gua1Day[['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']],
Gua4Hour[['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']]
])
abc2 = np.mod(np.nansum(abc, axis=0), 2)[:6] # 确保只有6位
abc3 = np.sum(abc2 * [1, 2, 4, 8, 16, 32])
k1 = np.where(Map64Gua['value_2binary'] == abc3)[0][0]
GuaLuck = Map64Gua.iloc[[k1]].copy()
GuaLuck['Type'] = 'luck'
return Gua4Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck

88
huangjijingshi20250217.py Normal file
View File

@ -0,0 +1,88 @@
import pandas as pd
import numpy as np
from datetime import datetime
import pymysql
from sqlalchemy import create_engine
# 导入自定义卦象计算模块
from guaCalc_huangjijingshi import guaCalc_huangjijingshi
from luckCalc_huangjijingshi import luckCalc_huangjijingshi
# ========== 数据库连接配置 ==========
# 你的账号、密码、主机、端口
username = 'cn_ainvest_db'
password = 'cn_ainvest_sd3a1'
host = 'rm-2zewagytttzk6f24xno.mysql.rds.aliyuncs.com'
port = 3306
database = 'ai_strategy' # 这里改成你要的数据库
# 创建 SQLAlchemy engine
engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}')
# ========== 文件路径配置 ==========
configPath = r'C:\AI trading\config\Rey\test_reinforcement'
matlabPath = r'D:\Dropbox\Matlab\Rey\MATLAB'
# 修改为你的 Excel 文件路径
tempPath = r'C:\Users\24011\Documents\WeChat Files\wxid_k4ep58f81rx421\FileStorage\File\2025-04\tuigua_huangjijingshi - 副本\tuigua_huangjijingshi - 副本\皇极经世.xlsx'
# ========== 加载 64 卦映射表 ==========
Map64Gua = pd.read_excel(tempPath, sheet_name="bagua")
# 转换爻数据为整数
for col in ['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']:
Map64Gua[col] = Map64Gua[col].apply(lambda x: int(x) if str(x).isdigit() else x)
Map64GuaOmit = Map64Gua[Map64Gua['change_omit'] == 0]
# ========== 从数据库加载 24 节气数据 ==========
def load_solar_terms(conn_params):
"""通过 SQLAlchemy 连接读取 solar_terms 表"""
sqlquery = 'SELECT * FROM solar_terms'
df = pd.read_sql(sqlquery, engine)
df['As_Of_Date'] = pd.to_datetime(df['As_Of_Date'])
df = df.sort_values('As_Of_Date')
return df
# 加载数据并标记重要节气
solar_terms = load_solar_terms(engine)
important_terms = ['冬至', '雨水', '谷雨', '夏至', '处暑', '霜降']
solar_terms['isImportant'] = solar_terms['Solar_Terms'].isin(important_terms).astype(int)
Map24Jieqi = solar_terms.copy()
# ========== 示例:计算某个日期的卦象 ==========
kDate = datetime(1973, 7, 17, 17, 20, 0)
print("计算日期:", kDate)
# 调用卦象计算函数
Gua1Hour, Gua1Day, Gua1Month, Gua1Year, GuaLuck = guaCalc_huangjijingshi(
Map64Gua, Map24Jieqi, kDate
)
print("年卦:", Gua1Year)
print("月卦:", Gua1Month)
print("日卦:", Gua1Day)
print("时卦:", Gua1Hour)
print("吉凶卦:", GuaLuck)
# ========== 计算 1990-2030 每年的年卦 ==========
# 生成日期范围
start_date = datetime(1990, 1, 1)
end_date = datetime(2030, 1, 1)
date_range = pd.date_range(start=start_date, end=end_date, freq='YS') # 每年第一天
year_gua_list = []
for date in date_range:
_, _, _, Gua1Year, _ = guaCalc_huangjijingshi(Map64Gua, Map24Jieqi, date)
year_gua_list.append({
'Year': date.year,
'Trigram': Gua1Year.trigram if hasattr(Gua1Year, 'trigram') else None,
})
yearGuaMap = pd.DataFrame(year_gua_list)
#========== 计算年份吉凶 ==========
LuckYear = luckCalc_huangjijingshi(Map64Gua, yearGuaMap, GuaLuck)
print("\n年份吉凶计算结果:")
print(LuckYear)

View File

@ -0,0 +1,51 @@
import pandas as pd
import numpy as np
def luckCalc_huangjijingshi(Map64Gua, yearGuaMap, GuaLuck):
"""
计算年份吉凶卦象
Args:
Map64Gua (pd.DataFrame): 64卦映射表包含卦名爻信息和二进制值
yearGuaMap (list or pd.DataFrame): 年份卦象映射表格式为 [年份, 卦名]
GuaLuck (pd.DataFrame): 吉凶卦象包含6爻信息
Returns:
pd.DataFrame: 包含年份年卦和吉凶卦的结果表
"""
LuckYear = []
# 确保 yearGuaMap 是 DataFrame如果是列表先转换
if isinstance(yearGuaMap, list):
yearGuaMap = pd.DataFrame(yearGuaMap, columns=['Year', 'Trigram'])
for i in range(len(yearGuaMap)):
kYear = yearGuaMap.iloc[i] # 当前年份和卦名
trigram = kYear['Trigram']
# 找到当前卦名对应的卦象(爻信息)
k1 = Map64Gua[Map64Gua['trigram'] == trigram].index
if len(k1) == 0:
raise ValueError(f"未找到卦名 {trigram} 对应的卦象")
# 提取当前卦的6爻和吉凶卦的6爻合并计算
abc = Map64Gua.loc[k1, ['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']].values
abc_luck = GuaLuck[['yao1', 'yao2', 'yao3', 'yao4', 'yao5', 'yao6']].values
abc_combined = np.vstack([abc, abc_luck])
# 计算新卦的二进制值模2求和后加权
abc2 = np.mod(np.nansum(abc_combined, axis=0), 2)
abc3 = np.sum(abc2 * [1, 2, 4, 8, 16, 32])
# 查找对应的新卦
new_gua = Map64Gua[Map64Gua['value_2binary'] == abc3].iloc[0]
LuckYear.append(new_gua)
# 合并结果
LuckYear = pd.DataFrame(LuckYear)
LuckYear['Year'] = yearGuaMap['Year'].values
LuckYear['YearGua'] = yearGuaMap['Trigram'].values
# 调整列顺序(年份和年卦在前)
cols = ['Year', 'YearGua'] + [c for c in LuckYear.columns if c not in ['Year', 'YearGua']]
LuckYear = LuckYear[cols]
return LuckYear