Allineato il target di rendimento minimo all'indice di Hurst

This commit is contained in:
fredmaloggia
2025-11-16 17:36:36 +01:00
committed by GitHub
parent e6e9d51f89
commit 9d5cc56653

View File

@@ -21,9 +21,8 @@ import sqlalchemy as sa
from sqlalchemy import text
import pyodbc
import matplotlib.pyplot as plt
from math import isfinite
#from math import isfinite
import time
# =============================
# Plot saving helper (non-recursive)
@@ -76,7 +75,7 @@ PTF_CURR = "EUR"
WP = 60 # lunghezza finestra pattern (barre)
HA = 10 # orizzonte outcome (barre)
KNN_K = 25 # numero di vicini
THETA = 0.00005 # soglia su outcome per generare segnale
THETA = 0.005 # soglia su outcome per generare segnale
EMBARGO = WP + HA
# Tagging rule-based (soglie)
@@ -86,6 +85,9 @@ STD_COMP_PCT = 0.15
DAYS_PER_YEAR = 252
TOP_N_MAX = 15 # numero massimo di asset ammessi
RP_MAX_WEIGHT = 2 / TOP_N_MAX # 2 x 1/15 ≈ 0.1333 = 13,33%
# =========================================
# UTILS GENERALI
# =========================================
@@ -591,6 +593,12 @@ hurst_df = pd.DataFrame(hurst_rows) if hurst_rows else pd.DataFrame(
meta_df["ISIN"] = meta_df["ISIN"].astype(str).str.strip()
hurst_df["ISIN"] = hurst_df["ISIN"].astype(str).str.strip()
# Mappa ISIN -> Hurst (per usare H come theta_entry nel backtest)
hurst_map = {
str(row["ISIN"]).strip(): (float(row["Hurst"]) if pd.notna(row["Hurst"]) else np.nan)
for _, row in hurst_df.iterrows()
}
summary_hurst = meta_df.merge(hurst_df, on="ISIN", how="left")
cols_hurst = ["ISIN", "Nome", "Categoria", "Asset Class", "Hurst", "Regime"]
summary_hurst = summary_hurst[[c for c in cols_hurst if c in summary_hurst.columns]]
@@ -831,7 +839,12 @@ def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret
bt_signals = []
bt_summary = []
total_t = 0.0
start_all = time.perf_counter()
for i, isin in enumerate(isins, 1):
t0 = time.perf_counter() # ---- INIZIO TIMER SINGOLO CICLO ----
try:
df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR})
if df_isin.empty:
@@ -856,10 +869,26 @@ for i, isin in enumerate(isins, 1):
errors.append({"ISIN": isin, "Errore": f"Serie troppo corta (BT) ({df_isin[col_ret].dropna().shape[0]} punti)"})
continue
# ============================
# THETA = HURST IN PERCENTUALE
# H = 0.50 -> theta_entry = 0.005 (0.5%)
# ============================
isin_str = str(isin).strip()
H_val = hurst_map.get(isin_str, np.nan)
if H_val is None or pd.isna(H_val):
theta_entry = THETA # fallback se H mancante
else:
theta_entry = float(H_val) / 100.0
sig_df, stats = knn_forward_backtest_one_asset(
df_isin=df_isin,
col_date=(col_date if col_date else df_isin.index.name or "idx"),
col_ret=col_ret, Wp=WP, Ha=HA, k=KNN_K, theta_entry=THETA, fee_bps=10
col_ret=col_ret,
Wp=WP,
Ha=HA,
k=KNN_K,
theta_entry=theta_entry,
fee_bps=10,
)
name = meta_df.loc[meta_df["ISIN"]==isin, "Nome"].iloc[0] if (meta_df["ISIN"]==isin).any() else None
@@ -871,19 +900,31 @@ for i, isin in enumerate(isins, 1):
tmp.insert(1, "Nome", name)
tmp.insert(2, "Categoria", cat)
tmp.insert(3, "Asset Class", ac)
tmp["Wp"] = WP; tmp["Ha"] = HA; tmp["k"] = KNN_K; tmp["Theta"] = THETA
tmp["Wp"] = WP; tmp["Ha"] = HA; tmp["k"] = KNN_K; tmp["Theta"] = theta_entry
bt_signals.append(tmp)
stats_row = {"ISIN": isin, "Nome": name, "Categoria": cat, "Asset Class": ac}
stats_row.update(stats)
bt_summary.append(stats_row)
if i % 10 == 0:
print(f"… backtest {i}/{len(isins)} completati")
except Exception as e:
errors.append({"ISIN": isin, "Errore": f"Backtest: {str(e)}"})
# ---- FINE TIMER SINGOLO CICLO ----
dt = time.perf_counter() - t0
total_t += dt
avg_t = total_t / i
eta = avg_t * (len(isins) - i)
print(f"… backtest {i}/{len(isins)} completati — {dt:.2f} sec (avg {avg_t:.2f}s, ETA {eta:.1f}s)")
# ---- TIMER FINALE ----
end_all = time.perf_counter()
print(f"⏱️ Tempo totale: {end_all - start_all:.2f} sec")
print(f"⏱️ Tempo medio per asset: {(end_all - start_all)/len(isins):.2f} sec")
bt_signals_df = pd.concat(bt_signals, ignore_index=True) if bt_signals else pd.DataFrame(
columns=["ISIN","Nome","Categoria","Asset Class","Date","Signal","EstOutcome","AvgDist","Ret+1","PnL","Wp","Ha","k","Theta"]
)
@@ -967,11 +1008,16 @@ def plot_heatmap_monthly(r: pd.Series, title: str, save_path: str = None):
savefig_safe(save_path, dpi=150)
plt.show()
def inverse_vol_weights(df: pd.DataFrame, window=60) -> pd.DataFrame:
def inverse_vol_weights(df, window=60, max_weight=None):
vol = df.rolling(window).std()
inv = 1/vol.replace(0, np.nan)
inv = 1 / vol.replace(0, np.nan)
w = inv.div(inv.sum(axis=1), axis=0)
return w.fillna(method="ffill").fillna(1/max(1, df.shape[1]))
w = w.fillna(method="ffill").fillna(1 / max(1, df.shape[1]))
if max_weight is not None:
w = w.clip(upper=max_weight)
return w
def portfolio_metrics(r: pd.Series):
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
@@ -1175,33 +1221,6 @@ def calibrate_score_weights(
"X_ranked": X
}
# ----------------------------
# ESEMPIO DI USO
# ----------------------------
# 1) SUPERVISIONATO (se hai una colonna target OOS, p.es. 'FWD_CAGR_%')
# res = calibrate_score_weights(
# df_sum,
# metrics_map=[("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)],
# target_col="FWD_CAGR_%", # <-- metti il tuo target futuro
# k_folds=5,
# shrink_equal=0.25,
# corr_shrink=0.10
# )
#
# 2) NON SUPERVISIONATO (se non hai ancora un target OOS)
# res = calibrate_score_weights(
# df_sum,
# metrics_map=[("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)],
# target_col=None
# )
# Applica i pesi stimati per generare lo Score
# X_ranked = res["X_ranked"]; w = res["weights"]
# df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
# df_sum["Score_mode"] = res["mode"]
# print("Pesi stimati:\n", w)
# --- PRE-FLIGHT METRICS GUARD ---------------------------------------------
def _coerce_num(s: pd.Series) -> pd.Series:
return pd.to_numeric(s, errors="coerce").replace([np.inf, -np.inf], np.nan)
@@ -1292,13 +1311,6 @@ else:
{c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]})
print(w)
#######################
# DA TENERE IN EVIDENZA
#######################
TOP_N = 15
base_isins = (
df_sum
@@ -1310,36 +1322,48 @@ base_isins = (
crypto_isin = None
print(f"🧩 ISIN selezionati dinamicamente ({len(base_isins)}): {base_isins}")
# -----------------------------
# 5.3 Costruzione portafogli
# (Equal Weight + Risk Parity con cap)
# -----------------------------
bt = forward_bt_signals.copy()
bt["Date"] = pd.to_datetime(bt["Date"])
bt["ISIN"] = bt["ISIN"].astype(str).str.strip()
bt = bt.sort_values(["Date","ISIN"])
bt = bt.sort_values(["Date", "ISIN"])
wide_pnl = bt.pivot_table(index="Date", columns="ISIN", values="PnL", aggfunc="sum").fillna(0.0)
wide_sig = bt.pivot_table(index="Date", columns="ISIN", values="Signal", aggfunc="last").fillna(0).astype(int)
wide_pnl = (
bt.pivot_table(index="Date", columns="ISIN", values="PnL", aggfunc="sum")
.fillna(0.0)
)
wide_sig = (
bt.pivot_table(index="Date", columns="ISIN", values="Signal", aggfunc="last")
.fillna(0)
.astype(int)
)
# ISIN effettivamente disponibili nel portafoglio
cols = [c for c in base_isins if c in wide_pnl.columns]
if len(cols) == 0:
cols = list(wide_pnl.columns[:min(6, wide_pnl.shape[1])])
if len(cols) > 0:
# Nessun ISIN valido → portafogli in cash (ritorni a 0)
idx = wide_pnl.index
ret_eq = pd.Series(0.0, index=idx, name="Ret_EqW")
ret_rp = pd.Series(0.0, index=idx, name="Ret_RP")
weights_rp = pd.DataFrame(0.0, index=idx, columns=[])
else:
# ---------- Equal Weight ----------
ret_eq = wide_pnl[cols].mean(axis=1)
weights_rp = inverse_vol_weights(wide_pnl[cols], 60)
ret_rp = (wide_pnl[cols] * weights_rp).sum(axis=1)
else:
idx_empty = wide_pnl.index
ret_eq = pd.Series(0.0, index=idx_empty)
ret_rp = pd.Series(0.0, index=idx_empty)
weights_rp = pd.DataFrame(0.0, index=idx_empty, columns=[])
if crypto_isin and (crypto_isin in wide_pnl.columns) and len(cols) > 0:
ret_agg = ret_eq*0.85 + wide_pnl[crypto_isin]*0.15
else:
ret_agg = ret_eq.copy()
# ---------- Risk Parity con cap ----------
# inverse_vol_weights deve accettare il parametro max_weight
weights_rp = inverse_vol_weights(
wide_pnl[cols],
window=60,
max_weight=RP_MAX_WEIGHT # es. 2 / TOP_N_MAX = 0.1333
)
ret_rp = (wide_pnl[cols] * weights_rp).sum(axis=1)
def plot_portfolio_composition(weights: pd.DataFrame,
title: str,
save_path: str | None = None,
@@ -1487,7 +1511,7 @@ def make_active_weights(w_base: pd.DataFrame,
# -----------------------------
# 5.4 Equity line + Heatmap (salva PNG)
# -----------------------------
eq_eq, eq_rp, eq_agg = map(equity_from_returns, [ret_eq, ret_rp, ret_agg])
eq_eq, eq_rp = map(equity_from_returns, [ret_eq, ret_rp])
plt.figure(figsize=(10,6))
plt.plot(eq_eq, label="Equal Weight")
@@ -1501,6 +1525,7 @@ for name, r, path in [
("Equal Weight", ret_eq, "heatmap_equal_weight.png"),
("Risk Parity", ret_rp, "heatmap_risk_parity.png"),
]:
m = portfolio_metrics(r)
print(f"{name:22s} → CAGR {m['CAGR']*100:5.2f}% | Vol {m['Vol']*100:5.2f}% | Sharpe {m['Sharpe'] if m['Sharpe']==m['Sharpe'] else float('nan'):4.2f} | MaxDD {m['MaxDD']*100:5.2f}%")
plot_heatmap_monthly(r, f"Heatmap mensile {name}", save_path=path)
@@ -1759,7 +1784,7 @@ import numpy as np
def rebuild_daily_from_trades_dict(trades_dict):
"""
trades_dict: {'Equal_Weight': df, 'Risk_Parity': df}
trades_dict: {'Equal_Weight': df, 'Risk_Parity': df, 'Aggressiva_Crypto': df}
Ogni df deve avere: OpenDate, CloseDate, Size, Duration_bars, PnL_%
Regola: distribuiamo il PnL del trade su ciascun giorno di durata con
un rendimento giornaliero costante r tale che (1+r)^D - 1 = PnL.
@@ -2049,41 +2074,57 @@ def _select_isins_for_topN(df_sum: pd.DataFrame, top_n: int):
crypto_isin_N = None
return base_isins_N, crypto_isin_N
def _build_portfolio_returns_for_isins(base_isins_N, crypto_isin_N, wide_pnl):
"""Costruisce ret_eq, ret_rp, ret_agg per l'insieme base_isins_N (+crypto)."""
# Colonne realmente disponibili
def _build_portfolio_returns_for_isins(base_isins_N, wide_pnl):
"""
Costruisce i rendimenti di portafoglio Equal Weight e Risk Parity
per l'insieme di ISIN in base_isins_N.
Ritorna:
ret_eq_N : pd.Series
ret_rp_N : pd.Series
"""
# Colonne effettivamente disponibili
colsN = [c for c in base_isins_N if c in wide_pnl.columns]
if len(colsN) == 0:
# fallback: prendi i primi disponibili
colsN = list(wide_pnl.columns[:min(6, wide_pnl.shape[1])])
# Nessun ISIN valido → portafogli in cash (linea piatta)
idx = wide_pnl.index
ret_eq_N = pd.Series(0.0, index=idx, name="Ret_EqW_N")
ret_rp_N = pd.Series(0.0, index=idx, name="Ret_RP_N")
return ret_eq_N, ret_rp_N
if len(colsN) > 0:
ret_eq_N = wide_pnl[colsN].mean(axis=1)
weights_rp_N = inverse_vol_weights(wide_pnl[colsN], window=60)
ret_rp_N = (wide_pnl[colsN] * weights_rp_N).sum(axis=1)
else:
idx_empty = wide_pnl.index
ret_eq_N = pd.Series(0.0, index=idx_empty)
ret_rp_N = pd.Series(0.0, index=idx_empty)
# -------- Equal Weight --------
ret_eq_N = wide_pnl[colsN].mean(axis=1)
if crypto_isin_N and (crypto_isin_N in wide_pnl.columns) and len(colsN) > 0:
ret_agg_N = ret_eq_N * 0.85 + wide_pnl[crypto_isin_N] * 0.15
else:
ret_agg_N = ret_eq_N.copy()
# -------- Risk Parity con cap --------
weights_rp_N = inverse_vol_weights(
wide_pnl[colsN],
window=60,
max_weight=RP_MAX_WEIGHT # es. RP_MAX_WEIGHT = 2 / TOP_N_MAX = 0.1333
)
ret_rp_N = (wide_pnl[colsN] * weights_rp_N).sum(axis=1)
return ret_eq_N, ret_rp_N, ret_agg_N
return ret_eq_N, ret_rp_N
# --- calcolo metriche per TopN 6..20 ---
# --- calcolo metriche per TopN 8..15 ---
rows_byN = []
for top_n in range(8, 16):
# Selezione ISIN per questo TopN
base_isins_N, crypto_isin_N = _select_isins_for_topN(df_sum, top_n)
ret_eq_N, ret_rp_N, ret_agg_N = _build_portfolio_returns_for_isins(base_isins_N, crypto_isin_N, wide_pnl)
# Costruisce i rendimenti di portafoglio EqW + RP per questo N
# Nota: la nuova _build_portfolio_returns_for_isins accetta (base_isins_N, wide_pnl)
ret_eq_N, ret_rp_N = _build_portfolio_returns_for_isins(base_isins_N, wide_pnl)
# (OPZIONALE) se vuoi anche salvare equity/heatmap per ciascun N:
# _save_equity_plot_byN(ret_eq_N, ret_rp_N, top_n)
# _save_heatmaps_byN(ret_eq_N, ret_rp_N, top_n)
# Calcola le metriche (come nell'ottimizzatore)
for strategy_name, rser in [
("Equal_Weight", ret_eq_N),
("Risk_Parity", ret_rp_N),
#("Aggressiva_Crypto", ret_agg_N),
("Equal_Weight", ret_eq_N),
("Risk_Parity", ret_rp_N),
]:
m = _calc_all_metrics_from_returns(rser)
m["TopN"] = top_n
@@ -2093,19 +2134,17 @@ for top_n in range(8, 16):
# DataFrame finale con la colonna TopN
final_byN_df = pd.DataFrame(rows_byN)[[
"TopN", "Strategy",
"Rendimento_Ann","Volatilita_Ann","CAGR","R2_Equity",
"MaxDD","DD_Duration_Max","TTR_from_MDD",
"AAW","AUW","Heal_Index","H_min_100m_5Y"
"Rendimento_Ann", "Volatilita_Ann", "CAGR", "R2_Equity",
"MaxDD", "DD_Duration_Max", "TTR_from_MDD",
"AAW", "AUW", "Heal_Index", "H_min_100m_5Y"
]].sort_values(["TopN","Strategy"]).reset_index(drop=True)
# Salvataggio: aggiunge/riscrive i fogli in final_metrics.xlsx
# - mantiene (se vuoi) anche il foglio "Portfolio_Metrics" del caso corrente TOP_N
try:
# se hai già creato final_metrics.xlsx sopra, riapri in mode='a'
with pd.ExcelWriter("final_metrics.xlsx", engine="openpyxl", mode="a", if_sheet_exists="replace") as xw:
final_byN_df.to_excel(xw, "Portfolio_Metrics_By_N", index=False)
except Exception:
# primo salvataggio o openpyxl non disponibile → crea ex novo
with pd.ExcelWriter("final_metrics.xlsx") as xw:
final_byN_df.to_excel(xw, "Portfolio_Metrics_By_N", index=False)
@@ -2135,7 +2174,6 @@ def _save_equity_plot_byN(ret_eq, ret_rp, top_n: int):
eq_eq = equity_from_returns(ret_eq)
eq_rp = equity_from_returns(ret_rp)
# Se gli indici sono vuoti, crea un dummy per salvare comunque il file (debug friendly)
if eq_eq.empty and eq_rp.empty:
eq_eq = pd.Series([100.0], index=[pd.Timestamp("2000-01-01")])
@@ -2149,9 +2187,7 @@ def _save_equity_plot_byN(ret_eq, ret_rp, top_n: int):
savefig_safe(os.path.join(OUT_DIR, f"equity_topN_{top_n}.png"), dpi=150)
plt.close(fig)
def _save_heatmaps_byN(ret_eq, ret_rp, top_n: int):
# Le heatmap usano già plot_heatmap_monthly che fa savefig se 'save_path' è passato
ret_eq = _safe_series(ret_eq)
ret_rp = _safe_series(ret_rp)
@@ -2166,27 +2202,10 @@ def _save_heatmaps_byN(ret_eq, ret_rp, top_n: int):
save_path=os.path.join(OUT_DIR, f"heatmap_rp_topN_{top_n}.png")
)
plot_heatmap_monthly(
ret_eq,
f"Heatmap mensile Equal Weight (TopN={top_n})",
save_path=os.path.join(OUT_DIR, f"heatmap_equal_topN_{top_n}.png")
)
plot_heatmap_monthly(
ret_rp,
f"Heatmap mensile Risk Parity (TopN={top_n})",
save_path=os.path.join(OUT_DIR, f"heatmap_rp_topN_{top_n}.png")
)
plot_heatmap_monthly(
ret_agg,
f"Heatmap mensile Aggressiva + Crypto (TopN={top_n})",
save_path=os.path.join(OUT_DIR, f"heatmap_aggcrypto_topN_{top_n}.png")
)
# Loop 8..15 replicando i plot per ciascuna combinazione
for top_n in range(8, 16):
base_isins_N, crypto_isin_N = _select_isins_for_topN(df_sum, top_n)
ret_eq_N, ret_rp_N, ret_agg_N = _build_portfolio_returns_for_isins(base_isins_N, crypto_isin_N, wide_pnl)
ret_eq_N, ret_rp_N = _build_portfolio_returns_for_isins(base_isins_N, wide_pnl)
_save_equity_plot_byN(ret_eq_N, ret_rp_N, top_n)
_save_heatmaps_byN(ret_eq_N, ret_rp_N, top_n)
@@ -2286,11 +2305,16 @@ def _build_weights_for_isins(base_isins_N, crypto_isin_N, wide_pnl):
w_eq_N = pd.DataFrame(1/len(colsN), index=idx, columns=colsN)
else:
w_eq_N = pd.DataFrame(index=idx, columns=[])
# Risk Parity
# Risk Parity con cap
if len(colsN) > 0:
w_rp_N = inverse_vol_weights(wide_pnl[colsN], window=60)
w_rp_N = inverse_vol_weights(
wide_pnl[colsN],
window=60,
max_weight=RP_MAX_WEIGHT
)
else:
w_rp_N = pd.DataFrame(index=idx, columns=[])
# Aggressiva + Crypto
if (len(colsN) > 0) and (crypto_isin_N is not None) and (crypto_isin_N in wide_pnl.columns):
cols_agg = colsN + [crypto_isin_N]