diff --git a/Trading Pattern Recon w Hurst.py b/Trading Pattern Recon w Hurst.py index 4eccf55..7d6236c 100644 --- a/Trading Pattern Recon w Hurst.py +++ b/Trading Pattern Recon w Hurst.py @@ -1391,13 +1391,6 @@ for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]: if c in df_sum.columns: df_sum[c] = pd.to_numeric(df_sum[c], errors="coerce") -# def is_crypto(row): -# txt = f"{row.get('Nome','')} {row.get('Categoria','')} {row.get('Asset Class','')}".lower() -# return any(k in txt for k in ["crypto","cripto","bitcoin","btc","ether","eth"]) - -# if "is_crypto" not in df_sum.columns: -# df_sum["is_crypto"] = df_sum.apply(is_crypto, axis=1) - def _safe_rank(s: pd.Series): s = pd.to_numeric(s, errors="coerce") if s.notna().sum() == 0: @@ -1616,15 +1609,6 @@ if need_rebuild: except Exception as e: print(f"[WARN] Ricostruzione metriche fallita: {e}") -# Flag crypto se manca -if "is_crypto" not in df_sum.columns: - def _is_crypto_row(row): - txt = f"{row.get('Nome','')} {row.get('Categoria','')} {row.get('Asset Class','')}".lower() - return any(k in txt for k in ["crypto","cripto","bitcoin","btc","ether","eth"]) - df_sum["is_crypto"] = df_sum.apply(_is_crypto_row, axis=1) - - - df_sum = _apply_score(df_sum) TOP_N = 15 @@ -1635,8 +1619,6 @@ base_isins = ( ) # Nessuna strategia cripto separata: le criptovalute sono trattate come gli altri asset -crypto_isin = None - print(f"[INFO] Ranking full-sample (solo debug, i portafogli usano ranking rolling): {base_isins}") # ----------------------------- @@ -1708,7 +1690,6 @@ def plot_portfolio_composition(weights: pd.DataFrame, Esempio: plot_portfolio_composition(w_eq, "Equal Weight", "composition_equal_weight.png") plot_portfolio_composition(w_rp, "Risk Parity", "composition_risk_parity.png") - plot_portfolio_composition(w_agg, "Aggressiva + Crypto", "composition_agg_crypto.png") """ import os import numpy as np @@ -2061,10 +2042,9 @@ def plot_portfolio_composition_fixed(weights: pd.DataFrame, # Plot salvato senza visualizzazione interattiva -# --- 1) Pesi teorici dei tre portafogli (già costruiti sopra) --- +# --- 1) Pesi teorici dei portafogli (già costruiti sopra) --- # w_eq : equal weight su 'cols' # w_rp : risk parity (weights_rp) -# w_agg: 85% equal + 15% crypto se disponibile def _sanitize_weights(W: pd.DataFrame, index_like: pd.Index) -> pd.DataFrame: if W is None or W.empty: @@ -2079,18 +2059,14 @@ if 'w_eq' not in globals(): w_eq = pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) if 'w_rp' not in globals(): w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) -if 'w_agg' not in globals(): - w_agg = w_eq.copy() w_eq = _sanitize_weights(w_eq, wide_pnl.index) w_rp = _sanitize_weights(w_rp, wide_pnl.index) -w_agg = _sanitize_weights(w_agg, wide_pnl.index) # --- 2) Pesi ATTIVI (mascherati con i Signal) --- # renorm_to_1=False → lascia la quota NON investita in 'Cash' w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") -w_agg_act = make_active_weights(w_agg, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") # Export pesi giornalieri (Equal/Risk Parity) con cash normalizzato a 100% def _export_weights_daily(w_eq_act_df: pd.DataFrame, w_rp_act_df: pd.DataFrame, path=WEIGHTS_DAILY_XLSX): @@ -2310,7 +2286,7 @@ import numpy as np def rebuild_daily_from_trades_dict(trades_dict): """ - trades_dict: {'Equal_Weight': df, 'Risk_Parity': df, 'Aggressiva_Crypto': df} + trades_dict: {'Equal_Weight': df, 'Risk_Parity': df} Ogni df deve avere: OpenDate, CloseDate, Size, Duration_bars, PnL_% Regola: distribuiamo il PnL del trade su ciascun giorno di durata con un rendimento giornaliero costante r tale che (1+r)^D - 1 = PnL. @@ -2614,16 +2590,14 @@ except NameError: DAYS_PER_YEAR = 252 def _select_isins_for_topN(df_sum: pd.DataFrame, top_n: int): - """Seleziona i migliori 'top_n' ISIN (crypto incluse) in base allo Score.""" + """Seleziona i migliori 'top_n' ISIN in base allo Score.""" df_sum_loc = df_sum.copy() base_isins_N = ( df_sum_loc .sort_values("Score", ascending=False) .head(top_n)["ISIN"].astype(str).str.strip().tolist() ) - # Nessuna crypto separata: tutto trattato allo stesso modo - crypto_isin_N = None - return base_isins_N, crypto_isin_N + return base_isins_N def _build_portfolio_returns_for_isins(base_isins_N, wide_pnl): """ @@ -2658,244 +2632,30 @@ def _build_portfolio_returns_for_isins(base_isins_N, wide_pnl): return ret_eq_N, ret_rp_N -# # --- calcolo metriche per TopN 8..15 --- (DISATTIVATO) -# rows_byN = [] -# for top_n in range(8, 16): -# portN = _get_dynamic_portfolio(top_n) -# ret_eq_N = portN["ret_eq"] -# ret_rp_N = portN["ret_rp"] -# -# # (OPZIONALE) se vuoi anche salvare equity/heatmap per ciascun N: -# # _save_equity_plot_byN(ret_eq_N, ret_rp_N, top_n) -# # _save_heatmaps_byN(ret_eq_N, ret_rp_N, top_n) -# -# # Calcola le metriche (come nell'ottimizzatore) -# for strategy_name, rser in [ -# ("Equal_Weight", ret_eq_N), -# ("Risk_Parity", ret_rp_N), -# ]: -# m = _calc_all_metrics_from_returns(rser) -# m["TopN"] = top_n -# m["Strategy"] = strategy_name -# rows_byN.append(m) -# -# # DataFrame finale con la colonna TopN -# final_byN_df = pd.DataFrame(rows_byN)[[ -# "TopN", "Strategy", -# "Rendimento_Ann", "Volatilita_Ann", "CAGR", "R2_Equity", -# "MaxDD", "DD_Duration_Max", "TTR_from_MDD", -# "AAW", "AUW", "Heal_Index", "H_min_100m_5Y" -# ]].sort_values(["TopN","Strategy"]).reset_index(drop=True) -# -# # Salvataggio: aggiunge/riscrive i fogli in final_metrics.xlsx -# # - mantiene (se vuoi) anche il foglio "Portfolio_Metrics" del caso corrente TOP_N -# try: -# with pd.ExcelWriter(FINAL_METRICS_XLSX, engine="openpyxl", mode="a", if_sheet_exists="replace") as xw: -# final_byN_df.to_excel(xw, "Portfolio_Metrics_By_N", index=False) -# except Exception: -# with pd.ExcelWriter(FINAL_METRICS_XLSX) as xw: -# final_byN_df.to_excel(xw, "Portfolio_Metrics_By_N", index=False) -# -# print(f"✅ Salvato: {FINAL_METRICS_XLSX} (Portfolio_Metrics_By_N) per TopN = 8..15") +# ============================== +# Metriche portafoglio (TOP_N corrente) → Excel +# ============================== +metrics_rows = [] +for strategy_name, rser in [ + ("Equal_Weight", ret_eq), + ("Risk_Parity", ret_rp), +]: + m = _calc_all_metrics_from_returns(rser) + m["TopN"] = TOP_N + m["Strategy"] = strategy_name + metrics_rows.append(m) -# # ====================================================================== -# # 6bis) Plot per ciascun TopN (8..15): Equity + Heatmap per strategia (DISATTIVATO) -# # ====================================================================== -# # import os -# # import numpy as np -# # import matplotlib.pyplot as plt -# # -# # OUT_DIR = PLOT_DIR -# # OUT_DIR.mkdir(parents=True, exist_ok=True) -# # -# # def _safe_series(r: pd.Series) -> pd.Series: -# # """Forza tipo numerico e se tutto NaN, rimpiazza con 0.0 (linea piatta ma plot salvato).""" -# # r = pd.to_numeric(r, errors="coerce") -# # if r.notna().sum() == 0: -# # r = pd.Series(0.0, index=r.index) -# # return r.fillna(0.0) -# # -# # def _save_equity_plot_byN(ret_eq, ret_rp, top_n: int): -# # ret_eq = _safe_series(ret_eq) -# # ret_rp = _safe_series(ret_rp) -# # -# # eq_eq = equity_from_returns(ret_eq) -# # eq_rp = equity_from_returns(ret_rp) -# # -# # if eq_eq.empty and eq_rp.empty: -# # eq_eq = pd.Series([100.0], index=[pd.Timestamp("2000-01-01")]) -# # -# # fig, ax = plt.subplots(figsize=(10, 6)) -# # eq_eq.plot(ax=ax, label="Equal Weight") -# # eq_rp.plot(ax=ax, label="Risk Parity") -# # ax.legend() -# # ax.grid(True) -# # ax.set_title(f"Equity line - TopN={top_n}") -# # fig.tight_layout() -# # savefig_safe(str(OUT_DIR / f"equity_topN_{top_n}.png"), dpi=150) -# # plt.close(fig) -# # -# # def _save_heatmaps_byN(ret_eq, ret_rp, top_n: int): -# # ret_eq = _safe_series(ret_eq) -# # ret_rp = _safe_series(ret_rp) -# # -# # plot_heatmap_monthly( -# # ret_eq, -# # f"Heatmap mensile - Equal Weight (TopN={top_n})", -# # save_path=OUT_DIR / f"heatmap_equal_topN_{top_n}.png" -# # ) -# # plot_heatmap_monthly( -# # ret_rp, -# # f"Heatmap mensile - Risk Parity (TopN={top_n})", -# # save_path=OUT_DIR / f"heatmap_rp_topN_{top_n}.png" -# # ) -# # -# # # Loop 8..15 replicando i plot per ciascuna combinazione -# # for top_n in range(8, 16): -# # portN = _get_dynamic_portfolio(top_n) -# # ret_eq_N = portN["ret_eq"] -# # ret_rp_N = portN["ret_rp"] -# # -# # _save_equity_plot_byN(ret_eq_N, ret_rp_N, top_n) -# # _save_heatmaps_byN(ret_eq_N, ret_rp_N, top_n) -# # -# # print(f"✅ Plot salvati in: {OUT_DIR}/") +df_metrics = pd.DataFrame(metrics_rows)[[ + "TopN", "Strategy", + "Rendimento_Ann", "Volatilita_Ann", "CAGR", "R2_Equity", + "MaxDD", "DD_Duration_Max", "TTR_from_MDD", + "AAW", "AUW", "Heal_Index", "H_min_100m_5Y", +]] -# ====================================================================== -# 6ter) Plot composizione (ATTIVI + Cash) per ciascun TopN (8..15) -# ====================================================================== -import os -import numpy as np -import matplotlib.pyplot as plt - -OUT_DIR = PLOT_DIR -OUT_DIR.mkdir(parents=True, exist_ok=True) - -# -- safety: helper per pesi attivi e plotting, se mancassero già nel file -- - -if 'make_active_weights' not in globals(): - def make_active_weights(w_base: pd.DataFrame, - sig: pd.DataFrame, - renorm_to_1: bool = False, - add_cash: bool = True, - cash_label: str = "Cash") -> pd.DataFrame: - import numpy as np, pandas as pd - if w_base is None or w_base.empty: - return pd.DataFrame(index=sig.index, columns=[]) - W = w_base.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0) - S = sig.reindex_like(W).fillna(0).astype(int) - W_active = W * (S > 0) - row_sum = W_active.sum(axis=1) - if renorm_to_1: - W_active = W_active.div(row_sum.replace(0, np.nan), axis=0).fillna(0.0) - if add_cash: - W_active[cash_label] = 0.0 - else: - if add_cash: - cash = (1.0 - row_sum).clip(lower=0.0, upper=1.0) - W_active[cash_label] = cash - keep = [c for c in W_active.columns if float(np.abs(W_active[c]).sum()) > 0.0] - return W_active[keep] - -if 'plot_portfolio_composition_fixed' not in globals(): - def plot_portfolio_composition_fixed(weights: pd.DataFrame, - title: str, - save_path: str | None = None, - max_legend: int = 20): - if weights is None or getattr(weights, "empty", True): - print(f"[SKIP] Nessun peso per: {title}") - return - W = weights.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0) - if W.index.has_duplicates: - W = W[~W.index.duplicated(keep="last")] - W = W.sort_index() - keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0] - if not keep_cols or len(W.index) < 2: - print(f"[SKIP] Dati insufficienti per: {title}") - return - W = W[keep_cols] - avg_w = W.mean(0).sort_values(ascending=False) - ordered = avg_w.index.tolist() - if "Cash" in ordered: - ordered = [c for c in ordered if c!="Cash"] + ["Cash"] - if len(ordered) > max_legend: - head = ordered[:max_legend] - if "Cash" not in head and "Cash" in ordered: - head = head[:-1] + ["Cash"] - tail = [c for c in ordered if c not in head] - W_show = W[head].copy() - if tail: - W_show["Altri"] = W[tail].sum(1) - ordered = head + ["Altri"] - else: - ordered = head - else: - W_show = W[ordered].copy() - cmap = plt.colormaps.get_cmap("tab20") - colors = [cmap(i % cmap.N) for i in range(len(ordered))] - fig, ax = plt.subplots(figsize=(11, 6)) - ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors) - ax.set_title(f"Composizione portafoglio nel tempo – {title}") - ymax = float(np.nanmax(W_show.sum(1).values)) - ax.set_ylim(0, max(1.0, ymax if np.isfinite(ymax) else 1.0)) - ax.grid(True, alpha=0.3) - ax.set_ylabel("Peso") - ax.set_yticklabels([f"{y*100:.0f}%" for y in ax.get_yticks()]) - ncol = 2 if len(ordered) > 10 else 1 - ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN") - fig.tight_layout() - if save_path: - folder = os.path.dirname(save_path) or "." - os.makedirs(folder, exist_ok=True) - fig.savefig(save_path, dpi=150, bbox_inches="tight") - print(f"💾 Salvato: {os.path.abspath(save_path)}") - # Nessuna visualizzazione interattiva - -def _build_weights_for_isins(base_isins_N, crypto_isin_N, wide_pnl): - """Costruisce i pesi TEORICI per Equal / Risk Parity / Aggressiva su un dato insieme di ISIN.""" - colsN = [c for c in base_isins_N if c in wide_pnl.columns] - idx = wide_pnl.index - # Equal - if len(colsN) > 0: - w_eq_N = pd.DataFrame(1/len(colsN), index=idx, columns=colsN) - else: - w_eq_N = pd.DataFrame(index=idx, columns=[]) - # Risk Parity con cap - if len(colsN) > 0: - w_rp_N = inverse_vol_weights( - wide_pnl[colsN], - window=60, - max_weight=RP_MAX_WEIGHT - ) - else: - w_rp_N = pd.DataFrame(index=idx, columns=[]) - - # Aggressiva + Crypto - if (len(colsN) > 0) and (crypto_isin_N is not None) and (crypto_isin_N in wide_pnl.columns): - cols_agg = colsN + [crypto_isin_N] - w_agg_N = pd.DataFrame(0.0, index=idx, columns=cols_agg) - w_agg_N[colsN] = 0.85/len(colsN) - w_agg_N[crypto_isin_N] = 0.15 - else: - w_agg_N = w_eq_N.copy() - # normalizza i TEORICI (solo per sicurezza numerica) - def _norm(W): - if W is None or W.empty: - return pd.DataFrame(index=idx, columns=[]) - rs = W.sum(axis=1).replace(0, np.nan) - return W.div(rs, axis=0).fillna(0.0).clip(lower=0.0) - return _norm(w_eq_N), _norm(w_rp_N), _norm(w_agg_N) - -# # === Loop 8..15: crea pesi, attiva coi Signal, plotta e SALVA in OUT_DIR === (DISATTIVATO) -# for top_n in range(8, 16): -# portN = _get_dynamic_portfolio(top_n) -# w_eq_act_N = portN["w_eq_act"] -# w_rp_act_N = portN["w_rp_act"] -# -# # path di salvataggio -# sp_eq = OUT_DIR / f"composition_equal_topN_{top_n}.png" -# sp_rp = OUT_DIR / f"composition_rp_topN_{top_n}.png" -# -# # plot + salvataggio (SOLO Equal e Risk Parity) -# plot_portfolio_composition_fixed(w_eq_act_N, f"Equal Weight (attivi + Cash) – TopN={top_n}", sp_eq) -# plot_portfolio_composition_fixed(w_rp_act_N, f"Risk Parity (attivi + Cash) – TopN={top_n}", sp_rp) +try: + with pd.ExcelWriter(FINAL_METRICS_XLSX, engine="openpyxl", mode="a", if_sheet_exists="replace") as xw: + df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False) +except Exception: + with pd.ExcelWriter(FINAL_METRICS_XLSX) as xw: + df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False) +print(f"[INFO] Salvato: {FINAL_METRICS_XLSX} (Portfolio_Metrics)") diff --git a/Trading Pattern Recon w Wavelets.py b/Trading Pattern Recon w Wavelets.py new file mode 100644 index 0000000..5f5092e --- /dev/null +++ b/Trading Pattern Recon w Wavelets.py @@ -0,0 +1,2713 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Oct 27 13:59:10 2025 + +Script end-to-end: +- Carica universo (Excel) e dati (DB) una sola volta +- Calcola Hurst + Pattern signals (solo long per i trade) +- Esegue walk-forward k-NN (solo long) e salva forward_bt_signals/summary +- Fase 5: selezione dinamica portafogli + Equity + Heatmap + Trade report (solo long) +- Fase 6: metriche finali (come "ottimizzatore") + salvataggio grafici su file + +Note: +- Non rilegge file appena salvati; usa i DataFrame in memoria +- Salva: hurst_by_isin.xlsx, pattern_signals.xlsx, forward_bt_*.xlsx, trades_report.xlsx, final_metrics.xlsx +- Salva PNG: equity_line_portafogli.png, heatmap_*.png +""" + +import pandas as pd +import numpy as np +import sqlalchemy as sa +from sqlalchemy import text +import matplotlib.pyplot as plt +from pathlib import Path +import json +import ssl +import re +from urllib.request import urlopen +from urllib.error import URLError, HTTPError + +from shared_utils import ( + build_pattern_library, + characterize_window, + detect_column, + load_config, + predict_from_library, + read_connection_txt, + require_section, + require_value, + wavelet_denoise, + z_norm, +) +#from math import isfinite +import time + +# ============================= +# Plot saving helper (non-recursive) +# ============================= +try: + import os as _os_sf + import matplotlib.pyplot as _plt_sf +except Exception: + _plt_sf = None + +SAVE_PNG = globals().get("SAVE_PNG", True) + +def savefig_safe(path, **kwargs): + """ + Save a matplotlib figure to disk safely, honoring SAVE_PNG. + Usage: savefig_safe("plot/myfig.png", dpi=150, bbox_inches="tight") + """ + if not SAVE_PNG or _plt_sf is None: + return + # Ensure directory exists + try: + d = _os_sf.path.dirname(path) + if d and not _os_sf.path.exists(d): + _os_sf.makedirs(d, exist_ok=True) + except Exception as _e: + print(f"[savefig_safe] Directory creation warning: {_e}") + try: + _plt_sf.savefig(path, **kwargs) + except Exception as e: + print(f"[savefig_safe] Warning while saving '{path}': {e}") + + + +# Calcolo Score (riusabile anche rolling) +def _apply_score(df_sum: pd.DataFrame) -> pd.DataFrame: + """Applica la calibrazione dei pesi su df_sum e aggiunge la colonna Score.""" + def _available_cols(df, cols): + return [c for c in cols if (c in df.columns and df[c].notna().sum() > 0)] + + primary_cols = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)] + alt_cols = [("QualityScore", True), ("Confidence", True), ("OutcomeScore", True)] + + mm = [(c,gh) for (c,gh) in primary_cols if c in df_sum.columns and df_sum[c].notna().sum()>0] + if len(mm) < 2: + mm = [(c,gh) for (c,gh) in alt_cols if c in df_sum.columns and df_sum[c].notna().sum()>0] + + # Se ancora insufficienti, prova ad allargare al set unito + if len(mm) < 2: + union_candidates = list({x[0] for x in primary_cols+alt_cols}) + mm = [(c, True) for c in _available_cols(df_sum, union_candidates)] + + if len(mm) == 0: + print("[WARN] Nessuna metrica numerica disponibile: uso Score=0 e ordino per ISIN.") + df_sum["Score"] = 0.0 + df_sum["Score_mode"] = "degenerate_equal" + return df_sum + + # Se sono definiti pesi fissi in config, usali; altrimenti calibra automaticamente + use_fixed = False + if SCORE_WEIGHTS: + weights_raw = {k: float(v) for k, v in SCORE_WEIGHTS.items() if k in df_sum.columns} + weights_raw = {k: v for k, v in weights_raw.items() if df_sum[k].notna().sum() > 0} + if weights_raw: + use_fixed = True + w = pd.Series(weights_raw) + w = w / w.sum() + X_ranked = df_sum[w.index].rank(pct=True) + df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1) + df_sum["Score_mode"] = "fixed_weights" + if SCORE_VERBOSE: + print("Pesi fissi (config):", w.to_dict()) + else: + print("[WARN] score_weights in config non compatibili con le metriche disponibili. Uso calibrazione automatica.") + + if not use_fixed: + res = calibrate_score_weights( + df_sum, + metrics_map=mm, + target_col=None + ) + X_ranked = res["X_ranked"] + w = res["weights"] + df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1) + df_sum["Score_mode"] = res["mode"] + if SCORE_VERBOSE: + print("Pesi stimati automaticamente (metriche usate):") + print("Disponibilita' metriche (righe non-NaN):", + {c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]}) + print(w) + return df_sum + +# ============================= +# PRICE FETCH (OPEN/CLOSE) - storico +# ============================= +def _build_symbol_euronext(row: pd.Series) -> tuple[str, str]: + isin = str(row.get("ISIN", "")).strip() + venue = str(row.get("Mercato", "")).strip() + tok = str(row.get("TickerOpen", "") or "").strip() + base = OPEN_PRICE_BASE_URL + if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper(): + return base, tok + if isin and venue: + return base, f"{isin}-{venue}" + if isin: + return base, f"{isin}-ETFP" # fallback generico per endpoint history + return base, isin + +def fetch_price_history(isins, universe: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame: + """ + Scarica la serie storica open/close per una lista di ISIN usando l'endpoint storico. + - API chiamata 1 ISIN alla volta: https://fin.scorer.app/finance/etf-inv/history/{ticker}?fromDate=YYYYMMDD&toDate=YYYYMMDD + - Caching locale su CSV per ridurre le richieste; se l'API fallisce, tenta di usare la cache. + - Fallback mercati: ETFP → XPAR → XAMS. Se si estende una serie con un altro mercato, + la giunta avviene solo se il prezzo all'ultimo punto del segmento precedente e al primo del successivo + differisce < 2% (per evitare salti di valuta/quotazione). + Ritorna DataFrame con colonne: Date (datetime), ISIN, Open, Close. + """ + start_dt = pd.to_datetime(start_date).date() + end_dt = pd.to_datetime(end_date).date() + + def _symbol_cache_path(symbol: str) -> Path: + safe = re.sub(r"[^A-Za-z0-9_-]+", "_", str(symbol)) + return OPEN_CACHE_DIR / f"{safe}.csv" + + def _load_cache(path: Path) -> pd.DataFrame | None: + try: + if path.exists(): + dfc = pd.read_csv(path, parse_dates=["Date"]) + dfc["ISIN"] = dfc["ISIN"].astype(str) + return dfc + except Exception as e: + print(f"[WARN] Cache prezzi corrotta {path}: {e}") + return None + + def _normalize_payload_to_df(payload, isin): + # Il nuovo endpoint ritorna [{"ticker": "...", "data": [ {...}, ... ]}] + data_block = payload + if isinstance(payload, list) and payload: + if isinstance(payload[0], dict) and "data" in payload[0]: + data_block = payload[0].get("data", []) + else: + data_block = payload + if isinstance(payload, dict) and "data" in payload: + data_block = payload.get("data", []) + rows = [] + for d in data_block or []: + dt_raw = d.get("date") or d.get("Date") or d.get("data") or d.get("timestamp") + if dt_raw is None: + continue + try: + if isinstance(dt_raw, (int, float)): + dt_parsed = pd.to_datetime(int(dt_raw), unit="ms").tz_localize(None) + else: + dt_parsed = pd.to_datetime(dt_raw).tz_localize(None) + except Exception: + continue + rows.append({ + "Date": dt_parsed, + "ISIN": str(isin), + "Open": _to_float_safe(d.get("open")), + "Close": _to_float_safe(d.get("close") or d.get("last")) + }) + return pd.DataFrame(rows) if rows else pd.DataFrame(columns=["Date","ISIN","Open","Close"]) + + def _fetch_symbol(symbol: str, isin: str): + url = f"{OPEN_PRICE_BASE_URL}/{symbol}?fromDate={start_dt.strftime('%Y%m%d')}&toDate={end_dt.strftime('%Y%m%d')}" + cache_path = _symbol_cache_path(symbol) + cache_df = _load_cache(cache_path) + + df_api = pd.DataFrame() + ok = False + for attempt in range(1, OPEN_MAX_RETRY + 1): + try: + with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp: + payload = json.loads(resp.read().decode("utf-8")) + df_api = _normalize_payload_to_df(payload, isin) + if df_api.empty: + print(f"[WARN] Nessun dato per {symbol}") + ok = True + break + except (HTTPError, URLError, ssl.SSLError, json.JSONDecodeError) as e: + if attempt < OPEN_MAX_RETRY: + print(f"[WARN] Download {symbol} tentativo {attempt}/{OPEN_MAX_RETRY} fallito: {e}. Retry in {OPEN_SLEEP_SEC}s") + time.sleep(OPEN_SLEEP_SEC) + else: + print(f"[ERROR] Download {symbol} fallito: {e}") + + df_use = pd.DataFrame() + if ok and not df_api.empty: + df_api = df_api.sort_values("Date") + if cache_df is not None and not cache_df.empty: + df_use = ( + pd.concat([cache_df, df_api], ignore_index=True) + .drop_duplicates(subset=["Date"]) + .sort_values("Date") + ) + else: + df_use = df_api + try: + OPEN_CACHE_DIR.mkdir(parents=True, exist_ok=True) + df_use.to_csv(cache_path, index=False) + except Exception as e: + print(f"[WARN] Salvataggio cache prezzi fallito ({cache_path}): {e}") + elif cache_df is not None and not cache_df.empty: + df_use = cache_df + print(f"[INFO] Uso cache prezzi per {symbol} (API indisponibile).") + return df_use + + def _merge_with_check(df_base: pd.DataFrame, df_add: pd.DataFrame, label_prev: str, label_next: str): + """ + Estende df_base aggiungendo il tratto df_add antecedente al primo punto di df_base. + Controlla il salto di prezzo all'incrocio: se > 2%, non fonde e avvisa. + """ + if df_base is None or df_base.empty: + return df_add, False + if df_add is None or df_add.empty: + return df_base, False + cutoff = df_base["Date"].min() + prev_part = df_add[df_add["Date"] < cutoff] + if prev_part.empty: + return df_base, False + merged = pd.concat([prev_part, df_base], ignore_index=True) + merged = merged.sort_values("Date").drop_duplicates(subset=["Date"], keep="last") + # controllo salto: ultimo prezzo del segmento precedente vs primo del successivo + prev_last = prev_part.sort_values("Date").iloc[-1] + next_first = df_base[df_base["Date"] >= cutoff].sort_values("Date").iloc[0] + def _price(row): + return _to_float_safe(row.get("Close")) if pd.notna(row.get("Close")) else _to_float_safe(row.get("Open")) + p_prev = _price(prev_last) + p_next = _price(next_first) + if p_prev is None or p_next is None or not np.isfinite(p_prev) or not np.isfinite(p_next) or p_next == 0: + return merged, True + gap = abs(p_prev - p_next) / abs(p_next) + if gap > 0.02: + print(f"[WARN] Salto prezzo >2% tra {label_prev} e {label_next} su {prev_last['Date'].date()} -> {next_first['Date'].date()} (gap {gap:.2%}). Fallback non applicato.") + return df_base, False + return merged, True + + records = [] + for i, isin in enumerate(isins, 1): + try: + row = universe.loc[universe["ISIN"] == str(isin)].iloc[0] + except Exception: + print(f"[WARN] ISIN {isin} non trovato nell'universo.") + continue + base, symbol = _build_symbol_euronext(row) + df_primary = _fetch_symbol(symbol, isin) + + # Fallback mercati aggiuntivi (XPAR, poi XAMS) per estendere indietro la serie + fallback_symbols = [] + if "-" in symbol: + root = symbol.rsplit("-", 1)[0] + fallback_symbols.append(f"{root}-XPAR") + fallback_symbols.append(f"{root}-XAMS") + else: + fallback_symbols.append(f"{symbol}-XPAR") + fallback_symbols.append(f"{symbol}-XAMS") + + df_use = df_primary + applied_any = False + for fb_sym in fallback_symbols: + # servono solo se la serie non parte da start_dt + need_fb = df_use.empty or (df_use["Date"].min().date() > start_dt) + if not need_fb: + continue + df_fb = _fetch_symbol(fb_sym, isin) + if df_fb.empty: + print(f"[WARN] Fallback {fb_sym} assente per {isin}") + continue + if df_use.empty: + df_use = df_fb + applied_any = True + print(f"[INFO] Uso fallback {fb_sym} per tutto il periodo.") + else: + merged, merged_ok = _merge_with_check(df_use, df_fb, fb_sym, symbol) + if merged_ok: + df_use = merged + applied_any = True + cutoff = df_use["Date"].min() + print(f"[INFO] Serie estesa con {fb_sym} fino a {cutoff.date()} per {isin}") + else: + print(f"[WARN] Fallback {fb_sym} scartato per gap >2% su {isin}") + + if df_use.empty: + print(f"[WARN] Serie open/close non disponibile per {isin}") + continue + + # Filtro range richiesto + df_use["Date"] = pd.to_datetime(df_use["Date"]) + mask = (df_use["Date"].dt.date >= start_dt) & (df_use["Date"].dt.date <= end_dt) + df_use = df_use.loc[mask] + if df_use.empty: + print(f"[WARN] Nessun dato nel range richiesto per {symbol}") + continue + records.append(df_use) + + if not records: + return pd.DataFrame(columns=["Date","ISIN","Open","Close"]) + df_px = pd.concat(records, ignore_index=True) + df_px = df_px.sort_values(["ISIN","Date"]).reset_index(drop=True) + return df_px + +def save_price_cache_summary(cache_dir: Path, outfile: Path, pattern: str = "*ETFP.csv"): + """ + Salva un riepilogo delle serie prezzi in cache (senza fallback) con min/max date e numero righe. + pattern di default: solo i simboli ETFP. + """ + try: + if not cache_dir.exists(): + print(f"[WARN] Cache prezzi non trovata: {cache_dir}") + return + rows = [] + for f in sorted(cache_dir.glob(pattern)): + try: + df = pd.read_csv(f, parse_dates=["Date"]) + except Exception as e: + rows.append({"Symbol": f.stem, "Errore": str(e)}) + continue + if df.empty: + rows.append({"Symbol": f.stem, "Rows": 0}) + continue + rows.append({ + "Symbol": f.stem, + "min_date": df["Date"].min().date(), + "max_date": df["Date"].max().date(), + "rows": len(df) + }) + if not rows: + print(f"[WARN] Nessun file prezzi in cache ({cache_dir}).") + return + out_df = pd.DataFrame(rows).sort_values("Symbol") + outfile.parent.mkdir(parents=True, exist_ok=True) + out_df.to_excel(outfile, index=False) + print(f"[INFO] Salvato riepilogo prezzi (no fallback) in {outfile} ({len(out_df)} righe)") + except Exception as e: + print(f"[WARN] Impossibile salvare riepilogo prezzi: {e}") + +def _to_float_safe(x): + try: + return float(x) + except Exception: + return np.nan + +# LEGACY: blocco originale mantenuto ma non eseguito (usiamo _apply_score sopra) +# ========================================= +# PARAMETRI GLOBALI +# ========================================= +CONFIG = load_config() +DB_CONFIG = require_section(CONFIG, "db") +PATTERN_CONFIG = require_section(CONFIG, "pattern") +TAGGING_CONFIG = require_section(CONFIG, "tagging") +RANKING_CONFIG = require_section(CONFIG, "ranking") +PATHS_CONFIG = require_section(CONFIG, "paths") +HURST_CONFIG = CONFIG.get("hurst", {}) +RUN_CONFIG = CONFIG.get("run", {}) +SIGNALS_CONFIG = CONFIG.get("signals", {}) +PRICES_CONFIG = CONFIG.get("prices", {}) + +OUTPUT_DIR = Path(PATHS_CONFIG.get("output_dir", "output")) +PLOT_DIR = Path(PATHS_CONFIG.get("plot_dir", "plot")) +OUTPUT_DIR.mkdir(parents=True, exist_ok=True) +PLOT_DIR.mkdir(parents=True, exist_ok=True) + +UNIVERSO_XLSX = PATHS_CONFIG.get("input_universe", "Input/Universo per Trading System.xlsx") + +# Export +OUTPUT_HURST_XLSX = OUTPUT_DIR / "hurst_by_isin.xlsx" +OUTPUT_PATTERN_XLSX = OUTPUT_DIR / "pattern_signals.xlsx" +ERROR_LOG_CSV = OUTPUT_DIR / "errori_isin.csv" +FORWARD_BT_SIGNALS_XLSX = OUTPUT_DIR / "forward_bt_signals.xlsx" +FORWARD_BT_SUMMARY_XLSX = OUTPUT_DIR / "forward_bt_summary.xlsx" +TRADES_REPORT_XLSX = OUTPUT_DIR / "trades_report.xlsx" +PERF_ATTRIB_XLSX = OUTPUT_DIR / "performance_attribution.xlsx" +DAILY_FROM_TRADES_CSV = OUTPUT_DIR / "daily_from_trades.csv" +DAILY_FROM_TRADES_XLSX = OUTPUT_DIR / "daily_from_trades.xlsx" +WEIGHTS_DAILY_XLSX = OUTPUT_DIR / "weights_daily.xlsx" +FINAL_METRICS_XLSX = OUTPUT_DIR / "final_metrics.xlsx" + +# Stored Procedure & parametri +STORED_PROC = str(require_value(DB_CONFIG, "stored_proc", "db")) +N_BARS = int(require_value(DB_CONFIG, "n_bars", "db")) +PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db")) +RANKING_WINDOW_BARS = int(RANKING_CONFIG.get("rolling_window_bars", N_BARS)) +RP_LOOKBACK = int(SIGNALS_CONFIG.get("risk_parity_lookback", 60)) +OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get("base_url", "https://fin.scorer.app/finance/etf-inv/history")) +OPEN_MAX_RETRY = int(PRICES_CONFIG.get("max_retry", 3)) +OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1)) +OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10)) +OPEN_CACHE_DIR = Path(PRICES_CONFIG.get("cache_dir", OUTPUT_DIR / "price_cache")) +RECOMPUTE_PORTF_FROM_OPEN = bool(PRICES_CONFIG.get("recompute_portfolio_open", False)) + +# Pattern-matching (iper-parametri) +WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) # lunghezza finestra pattern (barre) +HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) # orizzonte outcome (barre) +KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) # numero di vicini +THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # soglia su outcome per generare segnale +EMBARGO = require_value(PATTERN_CONFIG, "embargo", "pattern") +WAVELET_CFG = CONFIG.get("wavelet_filter", {}) +WAVELET_ENABLED = bool(WAVELET_CFG.get("enabled", False)) +WAVELET_NAME = str(WAVELET_CFG.get("wavelet", "db3")) +WAVELET_LEVEL = int(WAVELET_CFG.get("level", 3)) +WAVELET_MODE = str(WAVELET_CFG.get("mode", "symmetric")) +WAVELET_THRESHOLD_MODE = str(WAVELET_CFG.get("threshold_mode", "soft")) +MAX_AVG_DIST = PATTERN_CONFIG.get("max_avg_dist", None) +if EMBARGO is None: + EMBARGO = WP + HA +else: + EMBARGO = int(EMBARGO) + +# Tagging rule-based (soglie) +Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging")) +Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging")) +STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging")) + +TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) # numero massimo di asset ammessi +RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # 2 x 1/15 ≈ 0.1333 = 13,33% +if RP_MAX_WEIGHT is None: + RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1) +else: + RP_MAX_WEIGHT = float(RP_MAX_WEIGHT) +SCORE_VERBOSE = bool(RANKING_CONFIG.get("score_verbose", False)) +SCORE_WEIGHTS = RANKING_CONFIG.get("score_weights") +HURST_MIN_LENGTH = int(HURST_CONFIG.get("min_length", 200)) +HURST_WIN_GRID = HURST_CONFIG.get("win_grid") +HURST_MIN_SEGMENTS = int(HURST_CONFIG.get("min_segments", 1)) + +DAYS_PER_YEAR = int(RUN_CONFIG.get("days_per_year", 252)) +TOP_N = int(RUN_CONFIG.get("top_n_default", TOP_N_MAX)) + +# ========================================= +# UTILS GENERALI +# ========================================= +def clamp01(x): + if not np.isfinite(x): + return np.nan + return float(min(1.0, max(0.0, x))) + + +def format_eta(seconds): + """Format a duration (seconds) as Xm Ys or Xh Ym Ys for readability.""" + if not np.isfinite(seconds): + return "n/a" + seconds = max(0, int(round(seconds))) + minutes, secs = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours}h {minutes:02d}m {secs:02d}s" + return f"{minutes}m {secs:02d}s" + +# Timer helper per fasi post-backtest +_post_timer = {"t0": None, "tprev": None, "total": None, "done": 0} +def start_post_timer(total_steps: int): + _post_timer["t0"] = time.perf_counter() + _post_timer["tprev"] = _post_timer["t0"] + _post_timer["total"] = total_steps + _post_timer["done"] = 0 + +def checkpoint_post_timer(label: str): + if _post_timer["t0"] is None or _post_timer["total"] is None: + return + _post_timer["done"] += 1 + now = time.perf_counter() + step_dt = now - _post_timer["tprev"] + total_dt = now - _post_timer["t0"] + avg = total_dt / max(_post_timer["done"], 1) + eta = avg * max(_post_timer["total"] - _post_timer["done"], 0) + print(f"[TIMER] post { _post_timer['done']}/{_post_timer['total']} {label} — step {step_dt:.2f}s, total {total_dt:.2f}s, ETA {format_eta(eta)}") + _post_timer["tprev"] = now + +# ================= HURST (sui RENDIMENTI) ================= +def hurst_rs_returns(r, win_grid=None, min_seg=None): + r = pd.Series(r).dropna().astype("float64").values + n = len(r) + seg_min = HURST_MIN_SEGMENTS if min_seg is None else int(min_seg) + if n < HURST_MIN_LENGTH: + return np.nan + if win_grid is None: + base = HURST_WIN_GRID or [16,24,32,48,64,96,128,192,256,384] + base = np.array(base, dtype=int) + win_grid = [w for w in base if w <= n//2] + if len(win_grid) < 4: + max_w = max(16, n//4) + g = sorted(set([int(max(8, round((n/(2**k))))) for k in range(3,8)])) + win_grid = [w for w in g if 8 <= w <= max_w] + + RS_vals, sizes = [], [] + for w in win_grid: + if w < 8 or w > n: continue + m = n//w + if m < seg_min: continue + rs_list = [] + for i in range(m): + seg = r[i*w:(i+1)*w] + seg = seg - np.mean(seg) + sd = seg.std(ddof=1) + if sd == 0 or not np.isfinite(sd): continue + y = np.cumsum(seg) + R = np.max(y) - np.min(y) + rs = R/sd + if np.isfinite(rs) and rs > 0: rs_list.append(rs) + if rs_list: + RS_vals.append(np.mean(rs_list)); sizes.append(w) + + if len(RS_vals) < 3: return np.nan + sizes = np.array(sizes, float); RS_vals = np.array(RS_vals, float) + mask = np.isfinite(RS_vals) & (RS_vals > 0) + sizes, RS_vals = sizes[mask], RS_vals[mask] + if sizes.size < 3: return np.nan + slope, _ = np.polyfit(np.log(sizes), np.log(RS_vals), 1) + return clamp01(slope) + +def hurst_dfa_returns(r, win_grid=None): + r = pd.Series(r).dropna().astype("float64").values + n = len(r) + if n < HURST_MIN_LENGTH: + return np.nan + r_dm = r - np.mean(r) + y = np.cumsum(r_dm) + if win_grid is None: + base = HURST_WIN_GRID or [16,24,32,48,64,96,128,192,256] + base = np.array(base, dtype=int) + win_grid = [w for w in base if w <= n//2] + if len(win_grid) < 4: + max_w = max(16, n//4) + g = sorted(set([int(max(8, round((n/(2**k))))) for k in range(3,8)])) + win_grid = [w for w in g if 8 <= w <= max_w] + + F_vals, sizes = [], [] + for s in win_grid: + if s < 8: continue + m = n//s + if m < 2: continue + rms_list = [] + for i in range(m): + seg = y[i*s:(i+1)*s] + t = np.arange(s, dtype=float) + A = np.vstack([t, np.ones(s)]).T + coeff, *_ = np.linalg.lstsq(A, seg, rcond=None) + trend = A @ coeff + detr = seg - trend + rms = np.sqrt(np.mean(detr**2)) + if np.isfinite(rms) and rms > 0: rms_list.append(rms) + if rms_list: + F_vals.append(np.mean(rms_list)); sizes.append(s) + + if len(F_vals) < 3: return np.nan + sizes = np.array(sizes, float); F_vals = np.array(F_vals, float) + mask = np.isfinite(F_vals) & (F_vals > 0) + sizes, F_vals = sizes[mask], F_vals[mask] + if sizes.size < 3: return np.nan + slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1) + return clamp01(slope) + +# --------------------------------- +# R^2 su equity line (log-equity vs tempo) +# --------------------------------- +def r2_equity_line(returns: pd.Series) -> float: + r = pd.to_numeric(returns, errors="coerce").fillna(0.0) + eq = (1.0 + r).cumprod().replace(0, np.nan) + y = np.log(eq.dropna()) + if len(y) < 10: + return np.nan + x = np.arange(len(y), dtype=float) + x = (x - x.mean()) / (x.std() + 1e-12) + X = np.vstack([np.ones_like(x), x]).T + beta, *_ = np.linalg.lstsq(X, y.values, rcond=None) + y_hat = X @ beta + ss_res = ((y.values - y_hat) ** 2).sum() + ss_tot = ((y.values - y.values.mean()) ** 2).sum() + return float(1 - ss_res / ss_tot) if ss_tot > 0 else np.nan + +# --------------------------------- +# Drawdown metrics path-based +# --------------------------------- +def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): + r = pd.to_numeric(returns, errors="coerce").fillna(0.0) + eq = (1.0 + r).cumprod() + if eq.empty: + return np.nan, np.nan, np.nan + + roll_max = eq.cummax() + dd = eq / roll_max - 1.0 + maxdd = float(dd.min()) + + episodes = [] + peak_i, peak_val = None, -np.inf + trough_i, trough_val = None, np.inf + in_uw = False + v = eq.values + for i in range(len(v)): + if v[i] >= peak_val: + if in_uw: + episodes.append((peak_i, trough_i, i)) + in_uw = False + trough_i, trough_val = None, np.inf + peak_i, peak_val = i, v[i] + else: + if not in_uw: + in_uw = True + trough_i, trough_val = i, v[i] + elif v[i] < trough_val: + trough_i, trough_val = i, v[i] + if in_uw: + episodes.append((peak_i, trough_i, None)) + + dd_dur_max = np.nan + if episodes: + durs = [] + for p, t, rcv in episodes: + if p is None: + continue + end_i = rcv if rcv is not None else len(eq) - 1 + durs.append(end_i - p) + if durs: + dd_dur_max = int(max(durs)) + + ttr = np.nan + if episodes: + mdd_val = 0.0 + mdd_ep = None + for p, t, rcv in episodes: + if p is None or t is None: + continue + dd_here = eq.iloc[t] / eq.iloc[p] - 1.0 + if dd_here < mdd_val: + mdd_val = dd_here + mdd_ep = (p, t, rcv) + if mdd_ep is not None: + p, t, rcv = mdd_ep + if rcv is not None: + ttr = int(rcv - t) + else: + ttr = sentinel_ttr + return maxdd, dd_dur_max, ttr + +# --------------------------------- +# Utility per AAW, AUW e Heal Index (come nell'ottimizzatore) +# --------------------------------- +def heal_index_metrics(returns: pd.Series): + """ + Calcola: + - AAW: area sopra acqua (run-up vs minimo cumulato) + - AUW: area sotto acqua (drawdown vs massimo cumulato) + - Heal Index: (AAW - AUW) / AUW + """ + s = returns.fillna(0.0).astype(float) + if s.size == 0: + return np.nan, np.nan, np.nan + + equity = (1.0 + s).cumprod() + if equity.size == 0: + return np.nan, np.nan, np.nan + + run_max = equity.cummax() + dd = equity / run_max - 1.0 + AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan + + run_min = equity.cummin() + ru = equity / run_min - 1.0 + AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan + + heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan + return AAW, AUW, heal + +# --------------------------------- +# Utility per H_min (100% finestre positive) — come nell'ottimizzatore +# --------------------------------- +def h_min_100(returns: pd.Series, month_len: int = 21): + """ + Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days + hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)). + """ + s = returns.dropna().astype(float) + n = s.size + if n == 0: + return np.nan, np.nan + + log1p = np.log1p(s.values) + csum = np.cumsum(log1p) + + def rolling_sum_k(k: int): + if k > n: + return np.array([]) + head = csum[k - 1:] + tail = np.concatenate(([0.0], csum[:-k])) + return head - tail + + for k in range(1, n + 1): + rs = rolling_sum_k(k) + if rs.size == 0: + break + roll_ret = np.exp(rs) - 1.0 + if np.all(roll_ret >= 0): + h_days = k + h_months = int(np.ceil(h_days / month_len)) + return h_days, h_months + + return np.nan, np.nan + +# ========================================= +# 1) UNIVERSO: ISIN + METADATI +# ========================================= +universo = pd.read_excel(UNIVERSO_XLSX) + +col_isin_uni = detect_column(universo, ["ISIN", "isin", "codice isin"]) +if col_isin_uni is None: + raise ValueError("Nel file universo non trovo una colonna ISIN.") + +col_name_uni = detect_column(universo, ["Nome", "Name", "Descrizione", "Description", "Security Name", "Instrument Name"]) +col_cat_uni = detect_column(universo, ["Categoria", "Category", "Classe", "Linea", "Tipo"]) +col_ac_uni = detect_column(universo, ["Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class"]) + +isins = ( + universo[col_isin_uni].astype(str).str.strip() + .replace("", pd.NA).dropna().drop_duplicates().tolist() +) +print(f"[INFO] ISIN totali in universo: {len(isins)}") + +meta_df = pd.DataFrame({"ISIN": universo[col_isin_uni].astype(str).str.strip()}) +meta_df["Nome"] = universo[col_name_uni] if col_name_uni else None +meta_df["Categoria"] = universo[col_cat_uni] if col_cat_uni else None +meta_df["Asset Class"] = universo[col_ac_uni] if col_ac_uni else None +meta_df = meta_df.drop_duplicates(subset=["ISIN"]).reset_index(drop=True) + +# ========================================= +# 2) CONNESSIONE DB +# ========================================= +conn_str = read_connection_txt("connection.txt") +engine = sa.create_engine(conn_str, fast_executemany=True) +print("[INFO] Connessione pronta (SQLAlchemy + pyodbc).") + +# ========================================= +# 3) LOOP ISIN → SP → HURST + PATTERN (SOLO LONG per i trade) +# ========================================= +errors = [] +hurst_rows = [] +pattern_rows = [] +last_dates = [] + +sql_sp = text(f"EXEC {STORED_PROC} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") + +def detect_cols(df0): + col_date = detect_column(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"]) + col_ret = detect_column(df0, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"]) + col_px = detect_column(df0, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"]) + return col_date, col_ret, col_px + +ok_count = 0 +first_ok_reported = False + +for i, isin in enumerate(isins, 1): + try: + df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR}) + if df_isin.empty: + errors.append({"ISIN": isin, "Errore": "SP vuota"}) + continue + + col_date, col_ret, col_px = detect_cols(df_isin) + if col_date: + df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce") + df_isin = df_isin.sort_values(col_date) + + # --- Rendimenti --- + if col_ret and col_ret in df_isin.columns: + r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float).dropna() + elif col_px and col_px in df_isin.columns: + px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan) + r = np.log(px/px.shift(1)).dropna() + else: + errors.append({"ISIN": isin, "Errore": "Né rendimenti né prezzi utilizzabili"}) + continue + + if len(r) < max(200, WP + HA + 10): + errors.append({"ISIN": isin, "Errore": f"Serie troppo corta ({len(r)} punti)"}) + continue + + # --- HURST (sui rendimenti) --- + h_rs = hurst_rs_returns(r) + h_dfa = hurst_dfa_returns(r) + H = np.nanmedian([h_rs, h_dfa]) + H = clamp01(H) if np.isfinite(H) else np.nan + + if pd.isna(H): + regime = None + elif H < 0.45: + regime = "mean_reversion" + elif H > 0.55: + regime = "breakout" + else: + regime = "neutral" + + # --- Wavelet filter per i pattern (prima del riconoscimento) --- + r_pattern = r + if WAVELET_ENABLED: + r_wav = wavelet_denoise( + r, + wavelet=WAVELET_NAME, + level=WAVELET_LEVEL, + mode=WAVELET_MODE, + threshold_mode=WAVELET_THRESHOLD_MODE, + ) + if r_wav is not None and r_wav.notna().sum() >= WP + HA: + r_pattern = r_wav + else: + print(f"[WARN] Wavelet filter non applicabile su {isin}: uso la serie raw.") + + # --- Libreria pattern --- + lib_wins, lib_out = build_pattern_library(r_pattern, WP, HA, embargo=EMBARGO) + + # Finestra corrente + date_last = df_isin[col_date].iloc[-1] if col_date else None + if date_last is not None: + last_dates.append(pd.to_datetime(date_last)) + + if lib_wins is None or len(r_pattern) < WP + HA: + ptype, pconf = characterize_window(r_pattern, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + signal = 0 + est_out, avg_dist = np.nan, np.nan + else: + curr = r_pattern.values[-WP:] + curr_zn = z_norm(curr) + if curr_zn is None: + ptype, pconf = characterize_window(r_pattern, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + signal = 0; est_out = np.nan; avg_dist = np.nan + else: + est_out, avg_dist, idx_sel = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K) + # SOLO LONG: apri solo se est_out > THETA + signal = 1 if est_out > THETA else 0 + ptype, pconf = characterize_window(r_pattern, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + + # Salva risultati + hurst_rows.append({ + "ISIN": isin, + "Hurst": None if pd.isna(H) else round(float(H), 4), + "Regime": regime + }) + pattern_rows.append({ + "ISIN": isin, + "DateLast": date_last, + "PatternType": ptype, + "Signal": {1:"long",-1:"short",0:"flat"}.get(int(signal), "flat"), + "Confidence": None if pconf is None else round(float(min(1.0, max(0.0, pconf))), 3), + "EstOutcome": None if pd.isna(est_out) else float(est_out), + "AvgDist": None if pd.isna(avg_dist) else float(avg_dist), + "Wp": WP, "Ha": HA, "k": KNN_K + }) + ok_count += 1 + + if not first_ok_reported: + print("[INFO] Colonne riconosciute sul primo ISIN valido:", + "Data:", col_date, "| Rendimenti:", col_ret, "| Prezzo:", col_px, + "| H:", round(H,4) if pd.notna(H) else None, "| Regime:", regime) + first_ok_reported = True + + if i % 10 == 0: + print(f"… {i}/{len(isins)} ISIN processati (ok finora: {ok_count})") + + except Exception as e: + errors.append({"ISIN": isin, "Errore": str(e)}) + +# ========================================= +# 4A) EXPORT: HURST + PATTERN (QualityScore) +# ========================================= +hurst_df = pd.DataFrame(hurst_rows) if hurst_rows else pd.DataFrame( + {"ISIN": [], "Hurst": [], "Regime": []} +) +meta_df["ISIN"] = meta_df["ISIN"].astype(str).str.strip() +hurst_df["ISIN"] = hurst_df["ISIN"].astype(str).str.strip() + +# Mappa ISIN -> Hurst (per usare H come theta_entry nel backtest) +hurst_map = { + str(row["ISIN"]).strip(): (float(row["Hurst"]) if pd.notna(row["Hurst"]) else np.nan) + for _, row in hurst_df.iterrows() +} + +summary_hurst = meta_df.merge(hurst_df, on="ISIN", how="left") +cols_hurst = ["ISIN", "Nome", "Categoria", "Asset Class", "Hurst", "Regime"] +summary_hurst = summary_hurst[[c for c in cols_hurst if c in summary_hurst.columns]] +summary_hurst = summary_hurst.sort_values(["Hurst", "ISIN"], na_position="last").reset_index(drop=True) +summary_hurst.to_excel(OUTPUT_HURST_XLSX, index=False) + +pat_df = pd.DataFrame(pattern_rows) if pattern_rows else pd.DataFrame( + {"ISIN": [], "DateLast": [], "PatternType": [], "Signal": [], + "Confidence": [], "EstOutcome": [], "AvgDist": [], "Wp": [], "Ha": [], "k": []} +) +pat_df["ISIN"] = pat_df["ISIN"].astype(str).str.strip() + +summary_pattern = ( + meta_df + .merge(hurst_df, on="ISIN", how="left") + .merge(pat_df, on="ISIN", how="left") +) +wanted_cols = ["ISIN","Nome","Categoria","Asset Class","Hurst","Regime", + "DateLast","PatternType","Signal","Confidence","EstOutcome","AvgDist","Wp","Ha","k"] +summary_pattern = summary_pattern[[c for c in wanted_cols if c in summary_pattern.columns]] + +def _add_quality_scores(df: pd.DataFrame) -> pd.DataFrame: + out = df.copy() + conf = pd.to_numeric(out.get("Confidence", np.nan), errors="coerce") + est = pd.to_numeric(out.get("EstOutcome", np.nan), errors="coerce") + dist = pd.to_numeric(out.get("AvgDist", np.nan), errors="coerce") + + max_abs_est = np.nanmax(np.abs(est)) if np.isfinite(np.nanmax(np.abs(est))) and (np.nanmax(np.abs(est)) > 0) else np.nan + outcome_score = np.where(np.isnan(max_abs_est) | (max_abs_est == 0), np.nan, np.abs(est) / max_abs_est) + similarity_score = 1.0 / (1.0 + dist.astype(float)) + confidence_score = conf.astype(float) + quality = confidence_score * similarity_score * outcome_score + + out["OutcomeScore"] = np.round(outcome_score, 4) + out["SimilarityScore"] = np.round(similarity_score, 4) + out["QualityScore"] = np.round(quality, 4) + return out + +summary_pattern = _add_quality_scores(summary_pattern) + +sort_cols = [c for c in ["QualityScore", "Confidence", "OutcomeScore"] if c in summary_pattern.columns] +if sort_cols: + summary_pattern = summary_pattern.sort_values(sort_cols, ascending=[False]*len(sort_cols), + na_position="last").reset_index(drop=True) +if "DateLast" in summary_pattern.columns: + summary_pattern = summary_pattern.sort_values(["QualityScore","DateLast"], ascending=[False, True], + na_position="last").reset_index(drop=True) + +summary_pattern.to_excel(OUTPUT_PATTERN_XLSX, index=False) + +print(f"[INFO] Salvato: {OUTPUT_HURST_XLSX} (righe: {len(summary_hurst)})") +print(f"[INFO] Salvato: {OUTPUT_PATTERN_XLSX} (righe: {len(summary_pattern)})") + +if errors: + pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False) + print(f"[INFO] Log errori: {ERROR_LOG_CSV} (tot: {len(errors)})") + +# ========================================= +# 4B) FORWARD-BACKTEST (walk-forward) — SOLO LONG +# ========================================= +def drawdown_stats_simple(ret_series: pd.Series): + eq = (ret_series.fillna(0)).cumsum() + rolling_max = eq.cummax() + dd = eq - rolling_max + maxdd = float(dd.min()) if len(dd) else 0.0 + cagr = np.exp(ret_series.mean()*DAYS_PER_YEAR) - 1 + annvol = ret_series.std() * np.sqrt(DAYS_PER_YEAR) + sharpe = (ret_series.mean() / (ret_series.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR) + calmar = (cagr / abs(maxdd)) if maxdd < 0 else np.nan + return { + "CAGR_%": round(cagr*100, 2), + "AnnVol_%": round(annvol*100, 2), + "Sharpe": round(float(sharpe), 2), + "MaxDD_%eq": round(float(maxdd*100), 2), + "Calmar": round(float(calmar), 2) if np.isfinite(calmar) else np.nan + } + +def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret: str, + Wp: int, Ha: int, k: int, + theta_entry: float, + exec_ret: pd.Series | None = None, + fee_bps: float = 10, + # --- EXIT PARAMS (tutte opzionali) --- + sl_bps: float | None = 300.0, # Stop loss assoluto (bps sul PnL cumulato del trade) + tp_bps: float | None = 800.0, # Take profit assoluto (bps) + trail_bps: float | None = 300.0, # Trailing stop (drawdown dal picco, bps) + time_stop_bars: int | None = 20, # Massimo holding + theta_exit: float | None = 0.0, # esci se est_out <= theta_exit (se None, ignora) + weak_days_exit: int | None = None # esci se per N giorni est_out <= theta_exit + ): + """ + Walk-forward SOLO LONG con regole di EXIT (SL/TP/TS/time/flip). + Ritorna (signals_df, summary_metrics_dict). + Nota: usa solo dati daily → le soglie sono valutate a fine giornata, + l'uscita avviene sulla barra successiva (modello prudente). + """ + r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0 # rendimenti in decimali (close/close) + idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r)) + idx = pd.to_datetime(idx).dt.normalize() + if exec_ret is not None: + r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float) + r_exec.index = pd.to_datetime(r_exec.index).normalize() + # reindex robusto: usa l'ordine di idx, preserva NaN se manca la data + r_exec = r_exec.reindex(idx) + if len(r_exec) != len(r): + r_exec = pd.Series(r_exec.values, index=idx).reindex(idx) + else: + r_exec = r + fee = fee_bps / 10000.0 + + # helper per costruire libreria solo passato + def _lib_predict(past_returns: pd.Series, win_last: np.ndarray): + past_use = past_returns + if WAVELET_ENABLED: + r_wav = wavelet_denoise( + past_returns, + wavelet=WAVELET_NAME, + level=WAVELET_LEVEL + 1, + mode=WAVELET_MODE, + threshold_mode=WAVELET_THRESHOLD_MODE, + ) + if r_wav is not None and r_wav.notna().sum() >= (Wp + Ha): + past_use = r_wav + lib_wins, lib_out = build_pattern_library(past_use, Wp, Ha) + if lib_wins is None: + return np.nan, np.nan + curr_win = win_last + if WAVELET_ENABLED and 'past_use' in locals() and past_use is not None: + # riallinea win_last con eventuale smoothing + curr_idx = past_returns.index[t-Wp:t] # usa idx del loop esterno + smoothed_slice = past_use.reindex(curr_idx).values + if np.isfinite(smoothed_slice).all(): + curr_win = smoothed_slice + curr_zn = z_norm(curr_win) + if curr_zn is None: + return np.nan, np.nan + est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=k) + return float(est_out), float(avg_dist) + + # Stato della posizione + in_pos = False + entry_t = None + trade_pnl = 0.0 + trade_peak = 0.0 + weak_streak = 0 + + rows = [] + + # Nota: scorriamo dalle prime barre dove possiamo calcolare una finestra completa + for t in range(Wp, len(r) - 1): + past = r.iloc[:t] + # se passato insufficiente per libreria, forza out + if past.dropna().shape[0] < (Wp + Ha): + sig_out, est_out, avg_dist = 0, np.nan, np.nan + # PnL a t+1 sempre riportato in colonna Ret+1 + rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan)) + continue + + win_last = r.iloc[t-Wp:t].values + est_out, avg_dist = _lib_predict(past, win_last) + # filtro di qualità sui pattern troppo distanti + if MAX_AVG_DIST is not None: + try: + dist_thr = float(MAX_AVG_DIST) + except Exception: + dist_thr = None + if dist_thr is not None and np.isfinite(avg_dist) and avg_dist > dist_thr: + est_out = np.nan + + # Default: portiamo avanti lo stato corrente (sig_out = 1 se in_pos) + sig_out = 1 if in_pos else 0 + + # --- LOGICA DI INGRESSO --- + if (not in_pos) and (est_out > theta_entry): + # apri domani → oggi segnaliamo 1 (per avere PnL su t+1) + sig_out = 1 + in_pos = True + entry_t = t + trade_pnl = 0.0 + trade_peak = 0.0 + weak_streak = 0 + + # --- LOGICA DI USCITA (se in posizione) --- + elif in_pos: + # 1) aggiorna PnL del trade con il rendimento della barra che verrà *incassato* domani: + # Per coerenza EOD, PnL di oggi (da riportare) è su r[t+1] quando Signal(t)=1. + # Per controlli di stop a fine giornata, stimiamo la "pnl se restassi" accumulando r[t+1] ex-ante. + next_ret = r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan # rendimento che si applicherà se resto in posizione + pnl_if_stay = (1.0 + trade_pnl) * (1.0 + next_ret) - 1.0 + + # 2) aggiorna trailing peak ipotetico + peak_if_stay = max(trade_peak, pnl_if_stay) + + # 3) valuta condizioni di uscita sullo stato "if stay" + exit_reasons = [] + + # SL + if (sl_bps is not None) and (pnl_if_stay <= -sl_bps/10000.0): + exit_reasons.append("SL") + + # TP + if (tp_bps is not None) and (pnl_if_stay >= tp_bps/10000.0): + exit_reasons.append("TP") + + # Trailing + if (trail_bps is not None) and (peak_if_stay - pnl_if_stay >= trail_bps/10000.0): + exit_reasons.append("TRAIL") + + # Time stop + if (time_stop_bars is not None) and (t - entry_t + 1 >= time_stop_bars): + exit_reasons.append("TIME") + + # Flip / debolezza persistente + if theta_exit is not None: + if est_out <= theta_exit: + # Debole oggi → aggiorna streak + weak_streak = weak_streak + 1 if weak_days_exit else weak_streak + # exit immediata se non usi weak_days_exit + if weak_days_exit is None: + exit_reasons.append("FLIP") + else: + if weak_streak >= weak_days_exit: + exit_reasons.append("FLIP_STREAK") + else: + weak_streak = 0 + + # *** Se una qualunque condizione scatta, usciamo domani → oggi mettiamo 0 + if exit_reasons: + sig_out = 0 + in_pos = False + entry_t = None + trade_pnl = 0.0 + trade_peak = 0.0 + weak_streak = 0 + else: + # restiamo → aggiorniamo lo stato “vero” per il prossimo loop + trade_pnl = pnl_if_stay + trade_peak = peak_if_stay + + # Registra la riga odierna; il PnL riportato è sempre il r[t+1] + rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan)) + + sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"]) + + # Costi su variazione posizione + sig_df["Signal_prev"] = sig_df["Signal"].shift(1).fillna(0) + trade_chg = (sig_df["Signal"] - sig_df["Signal_prev"]).abs() + cost = trade_chg * fee + + sig_df["PnL"] = sig_df["Signal"] * sig_df["Ret+1"] - cost + sig_df.drop(columns=["Signal_prev"], inplace=True) + + # Metriche sintetiche + stats = drawdown_stats_simple(sig_df["PnL"]) + stats.update({ + "HitRate_%": round(100 * ((sig_df["PnL"] > 0).sum() / max(1, sig_df["PnL"].notna().sum())), 2), + "AvgTradeRet_bps": round(sig_df["PnL"].mean() * 10000, 2), + "Turnover_%/step": round(100 * trade_chg.mean(), 2), + "N_Steps": int(sig_df.shape[0]), + }) + # Aggiungi anche parametri di uscita usati (utile per grid search/trace) + stats.update({ + "theta_entry": theta_entry, + "theta_exit": (None if theta_exit is None else float(theta_exit)), + "sl_bps": (None if sl_bps is None else float(sl_bps)), + "tp_bps": (None if tp_bps is None else float(tp_bps)), + "trail_bps": (None if trail_bps is None else float(trail_bps)), + "time_stop_bars": (None if time_stop_bars is None else int(time_stop_bars)), + "weak_days_exit": (None if weak_days_exit is None else int(weak_days_exit)) + }) + return sig_df, stats + + +# ========= ESECUZIONE BACKTEST PER TUTTI GLI ISIN ========= +bt_signals = [] +bt_summary = [] + +total_t = 0.0 +start_all = time.perf_counter() + +for i, isin in enumerate(isins, 1): + t0 = time.perf_counter() # ---- INIZIO TIMER SINGOLO CICLO ---- + + try: + df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR}) + if df_isin.empty: + errors.append({"ISIN": isin, "Errore": "SP vuota (BT)"}) + continue + + col_date, col_ret, col_px = detect_cols(df_isin) + if col_date: + df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce") + df_isin = df_isin.sort_values(col_date) + + if col_ret and col_ret in df_isin.columns: + df_isin[col_ret] = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) # % raw + elif col_px and col_px in df_isin.columns: + px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan) + df_isin[col_ret] = np.log(px / px.shift(1)) * 100.0 # % + else: + errors.append({"ISIN": isin, "Errore": "Né rendimenti né prezzi (BT)"}) + continue + + if df_isin[col_ret].dropna().shape[0] < max(200, WP + HA + 10): + errors.append({"ISIN": isin, "Errore": f"Serie troppo corta (BT) ({df_isin[col_ret].dropna().shape[0]} punti)"}) + continue + + # --- Fetch open/close per calcolare rendimenti di esecuzione (open->open) --- + try: + date_min = df_isin[col_date].min().date() if col_date else None + date_max = df_isin[col_date].max().date() if col_date else None + if date_min and date_max: + px_hist_one = fetch_price_history( + isins=[isin], + universe=meta_df if 'meta_df' in globals() else pd.DataFrame(), + start_date=date_min.isoformat(), + end_date=date_max.isoformat() + ) + px_hist_one = px_hist_one.sort_values("Date") + open_series = px_hist_one[["Date","Open"]].dropna() + open_series["Date"] = pd.to_datetime(open_series["Date"]).dt.normalize() + open_series = open_series.drop_duplicates(subset=["Date"]).set_index("Date")["Open"] + open_ret = open_series.pct_change() + # riallinea sulla stessa sequenza di date del df_isin + idx_dates = pd.to_datetime(df_isin[col_date]).dt.normalize() + exec_ret = open_ret.reindex(idx_dates) + exec_ret.index = idx_dates + else: + exec_ret = None + except Exception as e: + print(f"[WARN] Fetch open/close fallito per {isin}: {e}") + exec_ret = None + + # ============================ + # THETA = HURST IN PERCENTUALE + # H = 0.50 -> theta_entry = 0.005 (0.5%) + # ============================ + isin_str = str(isin).strip() + H_val = hurst_map.get(isin_str, np.nan) + if H_val is None or pd.isna(H_val): + theta_entry = THETA # fallback se H mancante + else: + theta_entry = float(H_val) / 100.0 + + sl_cfg = float(SIGNALS_CONFIG.get("sl_bps", 300.0)) + tp_cfg = float(SIGNALS_CONFIG.get("tp_bps", 800.0)) + trail_cfg = float(SIGNALS_CONFIG.get("trail_bps", 300.0)) + time_cfg = int(SIGNALS_CONFIG.get("time_stop_bars", 20)) + theta_ex = SIGNALS_CONFIG.get("theta_exit", 0.0) + weak_ex = SIGNALS_CONFIG.get("weak_days_exit", None) + + sig_df, stats = knn_forward_backtest_one_asset( + df_isin=df_isin, + col_date=(col_date if col_date else df_isin.index.name or "idx"), + col_ret=col_ret, + Wp=WP, + Ha=HA, + k=KNN_K, + theta_entry=theta_entry, + exec_ret=exec_ret, + fee_bps=10, + sl_bps=sl_cfg, + tp_bps=tp_cfg, + trail_bps=trail_cfg, + time_stop_bars=time_cfg, + theta_exit=theta_ex, + weak_days_exit=weak_ex, + ) + + name = meta_df.loc[meta_df["ISIN"]==isin, "Nome"].iloc[0] if (meta_df["ISIN"]==isin).any() else None + cat = meta_df.loc[meta_df["ISIN"]==isin, "Categoria"].iloc[0] if (meta_df["ISIN"]==isin).any() else None + ac = meta_df.loc[meta_df["ISIN"]==isin, "Asset Class"].iloc[0] if (meta_df["ISIN"]==isin).any() else None + + tmp = sig_df.copy() + tmp.insert(0, "ISIN", isin) + tmp.insert(1, "Nome", name) + tmp.insert(2, "Categoria", cat) + tmp.insert(3, "Asset Class", ac) + tmp["Wp"] = WP; tmp["Ha"] = HA; tmp["k"] = KNN_K; tmp["Theta"] = theta_entry + bt_signals.append(tmp) + + stats_row = {"ISIN": isin, "Nome": name, "Categoria": cat, "Asset Class": ac} + stats_row.update(stats) + bt_summary.append(stats_row) + + except Exception as e: + errors.append({"ISIN": isin, "Errore": f"Backtest: {str(e)}"}) + + # ---- FINE TIMER SINGOLO CICLO ---- + dt = time.perf_counter() - t0 + total_t += dt + + avg_t = total_t / i + eta = avg_t * (len(isins) - i) + + print( + f"… backtest {i}/{len(isins)} completati — {dt:.2f} sec " + f"(avg {avg_t:.2f}s, ETA {format_eta(eta)} rimanenti)" + ) + +# ---- TIMER FINALE ---- +end_all = time.perf_counter() +total_elapsed = end_all - start_all +avg_elapsed = total_elapsed / max(len(isins), 1) +print(f"[INFO] Tempo totale: {format_eta(total_elapsed)} ({total_elapsed:.2f} sec)") +print(f"[INFO] Tempo medio per asset: {format_eta(avg_elapsed)} ({avg_elapsed:.2f} sec)") + + +bt_signals_df = pd.concat(bt_signals, ignore_index=True) if bt_signals else pd.DataFrame( + columns=["ISIN","Nome","Categoria","Asset Class","Date","Signal","EstOutcome","AvgDist","Ret+1","PnL","Wp","Ha","k","Theta"] +) +bt_summary_df = pd.DataFrame(bt_summary) if bt_summary else pd.DataFrame( + columns=["ISIN","Nome","Categoria","Asset Class","CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar","HitRate_%","AvgTradeRet_bps","Turnover_%/step","N_Steps"] +) + +bt_signals_df.to_excel(FORWARD_BT_SIGNALS_XLSX, index=False) +bt_summary_df.to_excel(FORWARD_BT_SUMMARY_XLSX, index=False) +print(f"[INFO] Salvato: {FORWARD_BT_SIGNALS_XLSX} ({len(bt_signals_df):,} righe)") +print(f"[INFO] Salvato: {FORWARD_BT_SUMMARY_XLSX} ({len(bt_summary_df):,} righe)") + +if errors: + pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False) + print(f"[INFO] Log errori aggiornato: {ERROR_LOG_CSV} (tot: {len(errors)})") + +# Salva riepilogo prezzi (solo simboli primari, senza fallback) ogni run +try: + save_price_cache_summary(OPEN_CACHE_DIR, OPEN_CACHE_DIR / "prezzi_summary_no_fallback.xlsx") +except Exception as e: + print(f"[WARN] Riepilogo prezzi non creato: {e}") + +# Timer per fasi post-backtest (sezione 5 in poi) +start_post_timer(total_steps=4) + +# ====================================================================== +# 5) STRATEGIE PORTAFOGLIO DINAMICHE + EQUITY + HEATMAP + TRADE REPORT +# ====================================================================== +def _ensure_bt_summary(meta_df: pd.DataFrame, bt_summary_df: pd.DataFrame) -> pd.DataFrame: + if bt_summary_df is not None and not bt_summary_df.empty: + out = bt_summary_df.copy() + else: + out = pd.DataFrame({"ISIN": meta_df["ISIN"].copy()}) + for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]: + out[c] = np.nan + out = out.merge(meta_df[["ISIN","Nome","Categoria","Asset Class"]], on="ISIN", how="left") + out["ISIN"] = out["ISIN"].astype(str).str.strip() + return out + +def _ensure_bt_signals(meta_df: pd.DataFrame, bt_signals_df: pd.DataFrame, last_dates_hint: list) -> pd.DataFrame: + if bt_signals_df is not None and not bt_signals_df.empty: + fbsig = bt_signals_df.copy() + fbsig["Date"] = pd.to_datetime(fbsig["Date"]) + fbsig["ISIN"] = fbsig["ISIN"].astype(str).str.strip() + fbsig["Signal"] = pd.to_numeric(fbsig["Signal"], errors="coerce").fillna(0).astype(int) + fbsig["PnL"] = pd.to_numeric(fbsig["PnL"], errors="coerce").fillna(0.0) + return fbsig + + end_date = (max(last_dates_hint) if last_dates_hint else pd.Timestamp.today()).normalize() + dates = pd.bdate_range(end=end_date, periods=120, freq="C") + isins_small = meta_df["ISIN"].astype(str).str.strip().tolist()[:12] + rows = [] + for isin in isins_small: + for dt in dates: + rows.append({"Date": dt, "ISIN": isin, "Signal": 0, "PnL": 0.0}) + return pd.DataFrame(rows) + +forward_bt_summary = _ensure_bt_summary(meta_df, bt_summary_df) +forward_bt_signals = _ensure_bt_signals(meta_df, bt_signals_df, last_dates) + +# ----------------------------- +# 5.1 Funzioni di supporto (grafici e pesi) +# ----------------------------- +def equity_from_returns(r: pd.Series) -> pd.Series: + r = pd.to_numeric(r, errors="coerce").fillna(0.0) + return (1 + r).cumprod() * 100 + +def monthly_returns(r: pd.Series) -> pd.Series: + r = pd.to_numeric(r, errors="coerce").fillna(0.0) + if not isinstance(r.index, (pd.DatetimeIndex, pd.PeriodIndex, pd.TimedeltaIndex)): + try: + r.index = pd.to_datetime(r.index) + except Exception: + return pd.Series(dtype=float) + return (1 + r).resample("M").prod() - 1 + +def plot_heatmap_monthly(r: pd.Series, title: str, save_path: str = None): + r = pd.to_numeric(r, errors="coerce").fillna(0.0) + m = monthly_returns(r) + df = m.to_frame("ret") + df["Year"], df["Month"] = df.index.year, df.index.month + pv = df.pivot(index="Year", columns="Month", values="ret") + fig, ax = plt.subplots(figsize=(10,6)) + im = ax.imshow(pv.fillna(0)*100, cmap="RdYlGn", vmin=-3, vmax=5, aspect="auto") + for i in range(pv.shape[0]): + for j in range(pv.shape[1]): + val = pv.iloc[i,j] + if not np.isnan(val): + ax.text(j, i, f"{val*100:.1f}", ha="center", va="center", fontsize=8) + ax.set_title(title); ax.set_xlabel("Mese"); ax.set_ylabel("Anno") + ax.set_xticks(range(12)); ax.set_xticklabels(range(1,13)) + fig.colorbar(im, ax=ax, label="%") + plt.tight_layout() + if save_path: + savefig_safe(save_path, dpi=150) + plt.close(fig) + # Non mostrare il plot durante l'esecuzione + +def inverse_vol_weights(df, window=60, max_weight=None): + vol = df.rolling(window).std() + inv = 1 / vol.replace(0, np.nan) + w = inv.div(inv.sum(axis=1), axis=0) + w = w.ffill().fillna(1 / max(1, df.shape[1])) + + if max_weight is not None: + w = w.clip(upper=max_weight) + + return w + +def portfolio_metrics(r: pd.Series): + r = pd.to_numeric(r, errors="coerce").fillna(0.0) + if len(r) == 0: + return dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan) + ann = DAYS_PER_YEAR + eq = equity_from_returns(r) + cagr = (eq.iloc[-1]/eq.iloc[0])**(ann/max(1, len(r))) - 1 + vol = r.std() * np.sqrt(ann) + sharpe = (r.mean()/r.std()) * np.sqrt(ann) if r.std() > 0 else np.nan + mdd = (eq/eq.cummax() - 1).min() + return dict(CAGR=cagr, Vol=vol, Sharpe=sharpe, MaxDD=mdd) + +# ----------------------------- +# 5.2 Selezione dinamica ISIN (ranking) +# ----------------------------- +df_sum = forward_bt_summary.copy() +for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]: + if c in df_sum.columns: + df_sum[c] = pd.to_numeric(df_sum[c], errors="coerce") + +def _safe_rank(s: pd.Series): + s = pd.to_numeric(s, errors="coerce") + if s.notna().sum() == 0: + return pd.Series(np.zeros(len(s)), index=s.index) + s_filled = s.fillna(s.median()) + return s_filled.rank(method="average") + +import numpy as np +import pandas as pd +from scipy import linalg + +# ---------------------------- +# Utils +# ---------------------------- +def _safe_rank_ser(s: pd.Series) -> pd.Series: + """Rank robusto (0..1), gestisce NaN.""" + s = s.copy() + return s.rank(method="average", na_option="keep") / s.notna().sum() + +def _winsorize(s: pd.Series, p=0.005): + lo, hi = s.quantile(p), s.quantile(1-p) + return s.clip(lo, hi) + +def _pos_normalize(w: np.ndarray, eps=1e-12): + w = np.where(w < 0, 0, w) + s = w.sum() + return (w / (s + eps)) if s > eps else np.ones_like(w)/len(w) + +def _corr_shrink(C: np.ndarray, alpha=0.10): + """Shrink verso I per stabilità numerica.""" + k = C.shape[0] + return (1-alpha)*C + alpha*np.eye(k) + +def _time_blocks_index(idx: pd.Index, k_folds=5): + """Split temporale semplice in k blocchi contigui.""" + n = len(idx) + fold_sizes = np.full(k_folds, n // k_folds, dtype=int) + fold_sizes[: n % k_folds] += 1 + splits, start = [], 0 + for fs in fold_sizes: + end = start + fs + splits.append(idx[start:end]) + start = end + return splits + +# ---------------------------- +# Calibrazione pesi +# ---------------------------- +def calibrate_score_weights( + df_sum: pd.DataFrame, + metrics_map=None, + target_col: str | None = None, + k_folds: int = 5, + shrink_equal: float = 0.25, # shrink verso equal weight per stabilità + corr_shrink: float = 0.10 # shrink della matrice di correlazione +): + """ + metrics_map: lista di tuple (colname, good_is_high) + good_is_high=True se valori alti sono migliori. + Esempio: [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)] + target_col: nome colonna target OOS (es. 'FWD_CAGR_%', 'FWD_Sharpe', ecc.) + Se None => usa ERC non supervisionato. + Ritorna: dict con 'weights' (pd.Series), 'X_ranked' (DataFrame) e 'mode' + """ + if metrics_map is None: + metrics_map = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)] + + # Costruisci matrice delle metriche X (rank 0..1) + X_cols, X_list = [], [] + for col, good_high in metrics_map: + s = df_sum.get(col, pd.Series(index=df_sum.index, dtype=float)).astype(float) + s = _winsorize(s) + if not good_high: + s = -s # inverti segno se "meno è meglio" + X_cols.append(col) + X_list.append(_safe_rank_ser(s)) + + X = pd.concat(X_list, axis=1) + X.columns = X_cols + X = X.loc[:, X.columns[X.notna().sum(0) > 0]] # droppa colonne tutte NaN + k = X.shape[1] + if k == 0: + raise ValueError("Nessuna metrica valida per la calibrazione.") + + # Se non hai target: ERC non supervisionato + if target_col is None or target_col not in df_sum.columns: + # cov su features rankate + C = np.cov(np.nan_to_num(X.values, nan=np.nanmean(X.values)), rowvar=False) + # stabilizza + C = _corr_shrink(C, alpha=corr_shrink) + # ERC: pesi proposti ~ inverse vol in spazio decorrelato + # approssimazione pratica: w ∝ diag(C)^(-1/2), poi normalizza riducendo l'effetto di correlazioni con Σ^-1/2 + vol = np.sqrt(np.clip(np.diag(C), 1e-12, None)) + w0 = 1.0 / vol + w = _pos_normalize(w0) + return { + "mode": "unsupervised_erc", + "weights": pd.Series(w, index=X.columns, name="weight"), + "X_ranked": X + } + + # Supervisionato: IC-Σ^-1 + y = df_sum[target_col].astype(float).copy() + y = _winsorize(y) + y_rank = _safe_rank_ser(y) # target in rank per allineare a Spearman + + # Drop righe senza target o tutte NaN nelle metriche + mask = y_rank.notna() & X.notna().any(1) + Xf, yf = X[mask].copy(), y_rank[mask].copy() + if len(Xf) < max(30, k*5): + # fallback se troppo pochi dati + C = np.corrcoef(np.nan_to_num(X.values, nan=np.nanmean(X.values)), rowvar=False) + C = _corr_shrink(C, alpha=corr_shrink) + ic = np.array([ + pd.Series(X.iloc[:, j]).corr(y_rank, method="spearman") + for j in range(k) + ]) + ic = np.nan_to_num(ic, nan=0.0) + w = linalg.solve(C + 1e-6*np.eye(k), ic, assume_a='sym') + w = (1-shrink_equal)*_pos_normalize(w) + shrink_equal*np.ones(k)/k + return { + "mode": "supervised_icSigmaInv_fallback", + "weights": pd.Series(w, index=X.columns, name="weight"), + "X_ranked": X + } + + # CV a blocchi temporali (walk-forward grossolano) + # Ordine temporale = index di df_sum (assumo ordinato) + folds = _time_blocks_index(Xf.index, k_folds=k_folds) + W_list = [] + for t in range(1, len(folds)): + train_idx = pd.Index([]).append(pd.Index([])) + for i in range(t): # usa tutti i blocchi precedenti come train + train_idx = train_idx.append(folds[i]) + valid_idx = folds[t] + + Xt, yt = Xf.loc[train_idx], yf.loc[train_idx] + Xv, yv = Xf.loc[valid_idx], yf.loc[valid_idx] + + # IC per colonna usando solo train + ic = np.array([Xt.iloc[:, j].corr(yt, method="spearman") for j in range(Xt.shape[1])]) + ic = np.nan_to_num(ic, nan=0.0) + + # Corr(X) su train + C = np.corrcoef(np.nan_to_num(Xt.values, nan=np.nanmean(Xt.values)), rowvar=False) + C = _corr_shrink(C, alpha=corr_shrink) + + try: + w_raw = linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic, assume_a='sym') + except Exception: + w_raw = ic.copy() + + w_fold = _pos_normalize(w_raw) + # shrink verso equal + w_fold = (1-shrink_equal)*w_fold + shrink_equal*np.ones_like(w_fold)/len(w_fold) + W_list.append(w_fold) + + # media pesi sui fold + if not W_list: + # in casi limite (pochi dati), ripiega + ic = np.array([Xf.iloc[:, j].corr(yf, method="spearman") for j in range(Xf.shape[1])]) + ic = np.nan_to_num(ic, nan=0.0) + C = np.corrcoef(np.nan_to_num(Xf.values, nan=np.nanmean(Xf.values)), rowvar=False) + C = _corr_shrink(C, alpha=corr_shrink) + w = linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic, assume_a='sym') + else: + w = np.mean(np.vstack(W_list), axis=0) + + w = _pos_normalize(w) + return { + "mode": "supervised_icSigmaInv_cv", + "weights": pd.Series(w, index=X.columns, name="weight"), + "X_ranked": X + } + +# --- PRE-FLIGHT METRICS GUARD --------------------------------------------- +def _coerce_num(s: pd.Series) -> pd.Series: + return pd.to_numeric(s, errors="coerce").replace([np.inf, -np.inf], np.nan) + +# Assicurati che df_sum esista ed abbia le colonne base +df_sum = forward_bt_summary.copy() +for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%", + "QualityScore","Confidence","OutcomeScore"]: + if c in df_sum.columns: + df_sum[c] = _coerce_num(df_sum[c]) + +# Se Sharpe/CAGR/MaxDD mancano o sono tutti NaN, prova a ricostruirli dai segnali +need_rebuild = ( + ("Sharpe" not in df_sum.columns or df_sum["Sharpe"].notna().sum()==0) or + ("CAGR_%" not in df_sum.columns) or + ("MaxDD_%eq" not in df_sum.columns) +) + +if need_rebuild: + try: + # Recompute metriche minime per ISIN da bt_signals_df (se presente) + tmp = bt_signals_df.copy() + if not tmp.empty: + tmp["PnL"] = pd.to_numeric(tmp["PnL"], errors="coerce").fillna(0.0) + agg_rows = [] + for isin, g in tmp.groupby("ISIN"): + stats = drawdown_stats_simple(g["PnL"]) + stats["ISIN"] = str(isin) + agg_rows.append(stats) + rebuilt = pd.DataFrame(agg_rows) + + # Coerce numerico e merge su df_sum + for c in ["CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"]: + if c in rebuilt.columns: + rebuilt[c] = _coerce_num(rebuilt[c]) + + df_sum = ( + df_sum.drop(columns=[c for c in ["CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"] if c in df_sum.columns]) + .merge(rebuilt[["ISIN","CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"]], on="ISIN", how="left") + ) + except Exception as e: + print(f"[WARN] Ricostruzione metriche fallita: {e}") + +df_sum = _apply_score(df_sum) + +TOP_N = 15 +base_isins = ( + df_sum + .sort_values("Score", ascending=False) + .head(TOP_N)["ISIN"].astype(str).str.strip().tolist() +) + +print(f"[INFO] Ranking full-sample (solo debug, i portafogli usano ranking rolling): {base_isins}") + +# ----------------------------- +# 5.3 Costruzione portafogli +# (Equal Weight + Risk Parity con cap) +# ----------------------------- +bt = forward_bt_signals.copy() +bt["Date"] = pd.to_datetime(bt["Date"]) +bt["ISIN"] = bt["ISIN"].astype(str).str.strip() +bt = bt.sort_values(["Date", "ISIN"]) + +wide_pnl = ( + bt.pivot_table(index="Date", columns="ISIN", values="PnL", aggfunc="sum") + .fillna(0.0) +) +wide_sig = ( + bt.pivot_table(index="Date", columns="ISIN", values="Signal", aggfunc="last") + .fillna(0) + .astype(int) +) +wide_est = ( + bt.pivot_table(index="Date", columns="ISIN", values="EstOutcome", aggfunc="last") + .sort_index() +) + +# (Opzionale) ricostruzione PnL portafoglio con open->open: disattivata di default perché il PnL +# viene già calcolato a livello di singolo asset usando gli open. +if globals().get("RECOMPUTE_PORTF_FROM_OPEN", False): + try: + date_min = (bt["Date"].min() - pd.Timedelta(days=5)).date() + date_max = (bt["Date"].max() + pd.Timedelta(days=5)).date() + px_hist = fetch_price_history( + isins=bt["ISIN"].unique(), + universe=meta_df if 'meta_df' in globals() else pd.DataFrame(), + start_date=date_min.isoformat(), + end_date=date_max.isoformat() + ) + open_pivot = ( + px_hist.pivot(index="Date", columns="ISIN", values="Open") + .sort_index() + ) + open_ret = open_pivot.pct_change() + # segnale su giorno t, esecuzione a open t+1 + wide_pnl = wide_sig * open_ret.shift(-1) + common_idx = wide_sig.index.intersection(wide_pnl.index) + common_idx = pd.to_datetime(common_idx) + wide_sig = wide_sig.reindex(common_idx).fillna(0).astype(int) + wide_pnl = wide_pnl.reindex(common_idx).fillna(0.0) + wide_sig.index = pd.to_datetime(wide_sig.index) + wide_pnl.index = pd.to_datetime(wide_pnl.index) + print(f"[INFO] PnL ricostruito su open->open per {len(open_pivot.columns)} ISIN.") + except Exception as e: + print(f"[WARN] Ricostruzione PnL open->open fallita, uso PnL originale: {e}") + +# I portafogli verranno costruiti piu' sotto con ranking rolling (vedi _build_dynamic_portfolio_returns). + +def plot_portfolio_composition(weights: pd.DataFrame, + title: str, + save_path: str | None = None, + max_legend: int = 12): + """ + Stacked area dei pesi per ISIN nel tempo (un colore per asset). + - Accetta un DataFrame 'weights' indicizzato per Date, colonne = ISIN, valori = peso (0..1). + - Normalizza le righe per sicurezza. + - Raggruppa la coda lunga in 'Altri' se gli asset superano 'max_legend'. + - Salva su 'save_path' se fornito; se è solo un filename, salva nella cartella corrente. + - Non lancia eccezioni inutili: stampa messaggi SKIP quando i dati non sono plottabili. + + Esempio: + plot_portfolio_composition(w_eq, "Equal Weight", "composition_equal_weight.png") + plot_portfolio_composition(w_rp, "Risk Parity", "composition_risk_parity.png") + """ + import os + import numpy as np + import pandas as pd + import matplotlib.pyplot as plt + + # ---- Guard basi ---- + if weights is None or getattr(weights, "empty", True): + print(f"[SKIP] Nessun peso disponibile per: {title}") + return + + # Copia e sanificazione + W = weights.copy() + + # Forza indice univoco e ordinato (se Date duplicati, tieni l’ultimo) + if W.index.has_duplicates: + W = W[~W.index.duplicated(keep="last")] + W = W.sort_index() + + # Coerce numerico e sostituisci NaN con 0 + W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0) + + # Droppa colonne totalmente nulle + keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0] + if not keep_cols: + print(f"[SKIP] Tutte le colonne hanno peso zero per: {title}") + return + W = W[keep_cols] + + # Normalizzazione riga (se somma=0, lascia 0) + row_sum = W.sum(axis=1) + with np.errstate(invalid="ignore", divide="ignore"): + W = W.div(row_sum.replace(0.0, np.nan), axis=0).fillna(0.0).clip(lower=0.0) + + # Se dopo la pulizia resta vuoto o una sola riga, non ha senso il plot area + if len(W.index) < 2 or W.shape[1] == 0: + print(f"[SKIP] Serie troppo corta o senza colonne valide per: {title}") + return + + # Ordina gli asset per peso medio decrescente (più leggibile) + avg_w = W.mean(axis=0).sort_values(ascending=False) + ordered = avg_w.index.tolist() + + # Raggruppa coda lunga in "Altri" se troppi asset + if len(ordered) > max_legend: + head, tail = ordered[:max_legend], ordered[max_legend:] + W_show = W[head].copy() + W_show["Altri"] = W[tail].sum(axis=1) + ordered = head + ["Altri"] + else: + W_show = W[ordered].copy() + + # Palette (tab20 riciclata se necessario) + cmap = plt.colormaps.get_cmap("tab20") + colors = [cmap(i % cmap.N) for i in range(len(ordered))] + + # Plot + fig, ax = plt.subplots(figsize=(11, 6)) + ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors) + ax.set_title(f"Composizione portafoglio nel tempo - {title}") + ax.set_ylim(0, 1) + ax.grid(True, alpha=0.3) + ax.set_ylabel("Peso") + + # Etichette Y in percentuale + yticks = ax.get_yticks() + ax.set_yticklabels([f"{y*100:.0f}%" for y in yticks]) + + # Legenda compatta + ncol = 2 if len(ordered) > 10 else 1 + ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN") + + fig.tight_layout() + + # Salvataggio robusto (gestisce filename senza cartella) + if save_path: + folder = os.path.dirname(save_path) or "." + try: + os.makedirs(folder, exist_ok=True) + except Exception as e: + print(f"[WARN] Impossibile creare la cartella '{folder}': {e}. Provo a salvare nella dir corrente.") + save_path = os.path.basename(save_path) + fig.savefig(save_path, dpi=150, bbox_inches="tight") + try: + full = os.path.abspath(save_path) + except Exception: + full = save_path + print(f"[INFO] Salvato: {full}") + + # Plot salvato senza visualizzazione interattiva + +def make_active_weights(w_base: pd.DataFrame, + sig: pd.DataFrame, + renorm_to_1: bool = False, # True: rialloca tra gli attivi; False: lascia quota in Cash + add_cash: bool = True, + cash_label: str = "Cash") -> pd.DataFrame: + """ + w_base : pesi teorici (righe=date, colonne=ISIN) + sig : matrice Signal (0/1) allineata (righe=date, colonne=ISIN) + """ + import numpy as np + import pandas as pd + + if w_base is None or w_base.empty: + return pd.DataFrame(index=sig.index, columns=[]) + + W = w_base.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0) + S = sig.reindex_like(W).fillna(0).astype(int) # allinea e riempi + # pesa solo gli attivi + W_active = W * (S > 0) + + row_sum = W_active.sum(axis=1) + if renorm_to_1: + # rialloca sui soli attivi: nessuna quota in cash + W_active = W_active.div(row_sum.replace(0, np.nan), axis=0).fillna(0.0) + if add_cash: + # se renorm True, cash è zero + W_active[cash_label] = 0.0 + else: + # non renorm: lasciamo la parte non investita in cash + if add_cash: + cash = (1.0 - row_sum).clip(lower=0.0, upper=1.0) + W_active[cash_label] = cash + + # rimuovi colonne sempre zero + keep = [c for c in W_active.columns if W_active[c].abs().sum() > 0] + return W_active[keep] + +# ----------------------------- +# Portafogli dinamici con ranking rolling +# ----------------------------- +_dynamic_portfolio_cache: dict[int, dict] = {} + +def _build_dynamic_portfolio_returns( + wide_pnl: pd.DataFrame, + wide_sig: pd.DataFrame, + wide_est: pd.DataFrame, + top_n: int, + window_bars: int = RANKING_WINDOW_BARS, + rp_lookback: int = RP_LOOKBACK +) -> dict: + if wide_pnl is None or wide_pnl.empty: + idx = pd.Index([]) + empty_w = pd.DataFrame(index=idx, columns=[]) + return { + "ret_eq": pd.Series(dtype=float), + "ret_rp": pd.Series(dtype=float), + "w_eq": empty_w, + "w_rp": empty_w, + "w_eq_act": empty_w, + "w_rp_act": empty_w, + "selection": {} + } + + dates = wide_pnl.index.sort_values() + all_cols = wide_pnl.columns.tolist() + + w_eq = pd.DataFrame(0.0, index=dates, columns=all_cols) + w_rp = pd.DataFrame(0.0, index=dates, columns=all_cols) + selection = {} + + for dt in dates: + # Considera solo gli ISIN con segnale attivo oggi + sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float) + on_cols = [c for c in all_cols if sig_row.get(c, 0) == 1] + if not on_cols: + selection[dt] = [] + continue + + window_est = wide_est.loc[:dt].tail(window_bars) if not wide_est.empty else pd.DataFrame() + scores = [] + for c in on_cols: + s = pd.to_numeric(window_est[c], errors="coerce") if c in window_est.columns else pd.Series(dtype=float) + est_score = s.mean(skipna=True) + if pd.isna(est_score): + continue + scores.append((c, est_score)) + + if not scores: + selection[dt] = [] + continue + + scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True) + base_isins_dt = [c for c, _ in scores_sorted[:top_n]] + selection[dt] = base_isins_dt + if not base_isins_dt: + continue + + w_eq.loc[dt, base_isins_dt] = 1 / len(base_isins_dt) + + window_pnl = wide_pnl.loc[:dt].tail(window_bars) + rp_hist = window_pnl[base_isins_dt] + rp_w = inverse_vol_weights(rp_hist, window=rp_lookback, max_weight=RP_MAX_WEIGHT) + if not rp_w.empty: + last = rp_w.iloc[-1].fillna(0.0) + last_sum = float(last.sum()) + if last_sum > 0: + last = last / last_sum + w_rp.loc[dt, last.index] = last.values + + w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") + w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") + + ret_eq = (wide_pnl * w_eq_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1) + ret_rp = (wide_pnl * w_rp_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1) + + return { + "ret_eq": ret_eq, + "ret_rp": ret_rp, + "w_eq": w_eq, + "w_rp": w_rp, + "w_eq_act": w_eq_act, + "w_rp_act": w_rp_act, + "selection": selection + } + +def _get_dynamic_portfolio(top_n: int) -> dict: + if top_n not in _dynamic_portfolio_cache: + _dynamic_portfolio_cache[top_n] = _build_dynamic_portfolio_returns( + wide_pnl=wide_pnl, + wide_sig=wide_sig, + wide_est=wide_est, + top_n=top_n, + window_bars=RANKING_WINDOW_BARS, + rp_lookback=RP_LOOKBACK + ) + return _dynamic_portfolio_cache[top_n] + +# Portafoglio principale (Top_N di default) calcolato in modo rolling +_main_port = _get_dynamic_portfolio(TOP_N) +ret_eq = _main_port["ret_eq"] +ret_rp = _main_port["ret_rp"] +w_eq = _main_port["w_eq"] +w_rp = _main_port["w_rp"] +w_eq_act = _main_port["w_eq_act"] +w_rp_act = _main_port["w_rp_act"] +selection_by_date = _main_port["selection"] +weights_rp = w_rp.copy() +print(f"[INFO] Portafoglio rolling calcolato (TopN={TOP_N}, finestra={RANKING_WINDOW_BARS} barre, rp_lookback={RP_LOOKBACK}).") +checkpoint_post_timer("Portafoglio rolling") + +# ----------------------------- +# 5.4 Equity line + Heatmap (salva PNG) +# ----------------------------- +# eq_eq, eq_rp = map(equity_from_returns, [ret_eq, ret_rp]) +# plt.figure(figsize=(10, 6)) +# plt.plot(eq_eq, label="Equal Weight") +# ============================= +# 5.4bis Composizione nel tempo (ATTIVI vs CASH) +# ============================= +import os +import numpy as np +import matplotlib.pyplot as plt + +def plot_portfolio_composition_fixed(weights: pd.DataFrame, + title: str, + save_path: str | None = None, + max_legend: int = 20): + """ + Stacked area dei pesi nel tempo. + 'weights' deve essere già quello ATTIVO (già mascherato con i Signal) + e può includere una colonna 'Cash'. + - NON ri-normalizza le righe: mostra la quota di Cash reale. + - Se vuoi forzare sempre 100% investito, passa prima da renorm_to_1=True in make_active_weights. + """ + if weights is None or getattr(weights, "empty", True): + print(f"[SKIP] Nessun peso per: {title}") + return + + W = weights.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0) + if W.index.has_duplicates: + W = W[~W.index.duplicated(keep="last")] + W = W.sort_index() + + # drop colonne sempre zero + keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0] + if not keep_cols: + print(f"[SKIP] Tutti i pesi sono zero per: {title}") + return + W = W[keep_cols] + + if len(W.index) < 2: + print(f"[SKIP] Serie troppo corta per: {title}") + return + + # Ordina per peso medio (Cash in coda così resta sopra nello stack) + avg_w = W.mean(0).sort_values(ascending=False) + ordered = avg_w.index.tolist() + if "Cash" in ordered: + ordered = [c for c in ordered if c != "Cash"] + ["Cash"] + + # Raggruppa coda lunga in "Altri" + if len(ordered) > max_legend: + head = ordered[:max_legend] + # Garantisce che 'Cash' resti in legenda anche oltre il cap + if "Cash" not in head and "Cash" in ordered: + head = head[:-1] + ["Cash"] + tail = [c for c in ordered if c not in head] + W_show = W[head].copy() + if tail: + W_show["Altri"] = W[tail].sum(1) + ordered = head + ["Altri"] + else: + ordered = head + else: + W_show = W[ordered].copy() + + cmap = plt.colormaps.get_cmap("tab20") + colors = [cmap(i % cmap.N) for i in range(len(ordered))] + + fig, ax = plt.subplots(figsize=(11, 6)) + ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors) + ax.set_title(f"Composizione portafoglio nel tempo - {title}") + # limite Y = somma massima osservata (<= 1 se pesi + Cash corretti) + ymax = float(np.nanmax(W_show.sum(1).values)) + if not np.isfinite(ymax) or ymax <= 0: + ymax = 1.0 + ax.set_ylim(0, max(1.0, ymax)) + ax.grid(True, alpha=0.3) + ax.set_ylabel("Peso") + ax.set_yticks(ax.get_yticks()) + ax.set_yticklabels([f"{y*100:.0f}%" for y in ax.get_yticks()]) + + ncol = 2 if len(ordered) > 10 else 1 + ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN") + fig.tight_layout() + + if save_path: + folder = os.path.dirname(save_path) or "." + try: + os.makedirs(folder, exist_ok=True) + except Exception as e: + print(f"[WARN] mkdir '{folder}': {e} -> salvo in cwd") + save_path = os.path.basename(save_path) + fig.savefig(save_path, dpi=150, bbox_inches="tight") + print(f"[INFO] Salvato: {os.path.abspath(save_path)}") + + # Plot salvato senza visualizzazione interattiva + +# --- 1) Pesi teorici dei portafogli (già costruiti sopra) --- +# w_eq : equal weight su 'cols' +# w_rp : risk parity (weights_rp) + +def _sanitize_weights(W: pd.DataFrame, index_like: pd.Index) -> pd.DataFrame: + if W is None or W.empty: + return pd.DataFrame(index=index_like, columns=[]) + W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0) + # normalizziamo a somma 1 (pesi TEORICI) + rs = W.sum(1).replace(0, np.nan) + return W.div(rs, axis=0).fillna(0.0).clip(lower=0.0) + +# ricostruisco coerentemente nel caso non fossero gia definiti +if 'w_eq' not in globals(): + w_eq = pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) +if 'w_rp' not in globals(): + w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) +if 'w_eq' not in globals(): + w_eq = pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) +if 'w_rp' not in globals(): + w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) + +w_eq = _sanitize_weights(w_eq, wide_pnl.index) +w_rp = _sanitize_weights(w_rp, wide_pnl.index) + +# --- 2) Pesi ATTIVI (mascherati con i Signal) --- +# renorm_to_1=False → lascia la quota NON investita in 'Cash' +w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") +w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") + +# Export pesi giornalieri (Equal/Risk Parity) con cash normalizzato a 100% +def _export_weights_daily(w_eq_act_df: pd.DataFrame, w_rp_act_df: pd.DataFrame, path=WEIGHTS_DAILY_XLSX): + try: + def _prep(df: pd.DataFrame) -> pd.DataFrame: + if df is None or df.empty: + return pd.DataFrame() + out = df.copy() + if "Cash" not in out.columns: + out["Cash"] = 0.0 + out = out.apply(pd.to_numeric, errors="coerce").fillna(0.0) + if out.index.has_duplicates: + out = out[~out.index.duplicated(keep="last")] + out = out.sort_index() + row_sum = out.sum(axis=1).replace(0, np.nan) + out = out.div(row_sum, axis=0).fillna(0.0) + cols = [c for c in out.columns if c != "Cash"] + ["Cash"] + return out[cols] + + w_eq_x = _prep(w_eq_act_df) + w_rp_x = _prep(w_rp_act_df) + if w_eq_x.empty and w_rp_x.empty: + print("[INFO] Nessun peso da esportare (weights_daily.xlsx non creato).") + return + with pd.ExcelWriter(path) as xw: + if not w_eq_x.empty: + w_eq_x.to_excel(xw, "Equal_Weight", index=True) + if not w_rp_x.empty: + w_rp_x.to_excel(xw, "Risk_Parity", index=True) + print(f"[INFO] Salvato: {path}") + except Exception as e: + print(f"[WARN] Export weights_daily.xlsx fallito: {e}") + +_export_weights_daily(w_eq_act, w_rp_act, path=WEIGHTS_DAILY_XLSX) + +# --- 3) Plot + salvataggio --- +plot_portfolio_composition_fixed(w_eq_act, "Equal Weight (attivi + Cash)", str(PLOT_DIR / "composition_equal_weight_active.png")) +plot_portfolio_composition_fixed(w_rp_act, "Risk Parity (attivi + Cash)", str(PLOT_DIR / "composition_risk_parity_active.png")) +checkpoint_post_timer("Pesi/plot portafogli") + + +# ----------------------------- +# 5.5 Report trades — SOLO LONG +# ----------------------------- +def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFrame, name: str) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Report trade SOLO LONG coerente EOD: + - segnale laggato di 1 barra (apertura dal giorno successivo al primo 1) + - PnL allineato al giorno di esposizione: r = pnl.shift(-1) + - chiusura al primo 0 (CloseDate = dt), e a fine serie CloseDate = ultimo + BDay(1) + - Duration_bars = numero di barre accumulate nel PnL del trade + Ritorna (df_trades, df_daily) dove df_daily contiene PnL giorno per giorno per ogni trade valido. + """ + from pandas.tseries.offsets import BDay + + # Allinea indici + common_idx = sig.index.intersection(pnl.index) + if not weights.empty: + common_idx = common_idx.intersection(weights.index) + sig = sig.loc[common_idx].copy() + pnl = pnl.loc[common_idx].copy() + weights = weights.loc[common_idx].copy() if not weights.empty else pd.DataFrame(index=common_idx) + + # Sanitizza + sig = sig.fillna(0).astype(int).clip(lower=0) # solo long + pnl = pnl.apply(pd.to_numeric, errors="coerce") # mantieni NaN per buchi open + + rows = [] + daily_rows = [] + + for isin in sig.columns: + # 1) Segnale EOD (lag 1) + s = sig[isin].fillna(0).astype(int).shift(1).fillna(0).astype(int) + + # 2) PnL allineato al giorno di esposizione (EOD): usa pnl.shift(-1) + r = pnl[isin].shift(1) + + # 3) Pesi (se disponibili) + w = (weights[isin].fillna(0.0) if (isin in weights.columns) else pd.Series(0.0, index=s.index)) + + in_pos, start, acc, acc_dates = False, None, [], [] + + for dt in s.index: + sig_t = int(s.at[dt]) + + # CHIUSURA: primo 0 dopo un periodo in posizione → chiudi oggi (dt) + if in_pos and (sig_t == 0): + if any(pd.isna(acc)): + print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{dt.date()}") + else: + pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0 + w_start = float(w.get(start, 0.0)) + rows.append(dict( + Strategy=name, + ISIN=isin, + OpenDate=start, + CloseDate=dt, + Direction="long", + Size=w_start, + Duration_bars=len(acc), + **{"PnL_%": pnl_val * 100.0} + )) + # salva contributo giornaliero reale (senza spalmare) + for dd, rv in zip(acc_dates, acc): + daily_rows.append({ + "Strategy": name, + "ISIN": isin, + "Date": dd, + "Size": w_start, + "PnL_day": float(rv) + }) + in_pos, start, acc = False, None, [] + acc_dates = [] + + # APERTURA: primo 1 (laggato) quando non in posizione + if (not in_pos) and (sig_t == 1): + in_pos, start, acc, acc_dates = True, dt, [], [] + + # ACCUMULO: PnL del giorno di esposizione + if in_pos: + acc.append(r.at[dt]) + acc_dates.append(dt) + + # CHIUSURA A FINE SERIE → prossimo business day + if in_pos: + close_dt = s.index[-1] + BDay(1) + if any(pd.isna(acc)): + print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{close_dt.date()}") + else: + pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0 + w_start = float(w.get(start, 0.0)) + rows.append(dict( + Strategy=name, + ISIN=isin, + OpenDate=start, + CloseDate=close_dt, + Direction="long", + Size=w_start, + Duration_bars=len(acc), + **{"PnL_%": pnl_val * 100.0} + )) + for dd, rv in zip(acc_dates, acc): + daily_rows.append({ + "Strategy": name, + "ISIN": isin, + "Date": dd, + "Size": w_start, + "PnL_day": float(rv) + }) + + # Ordina colonne + cols = ["Strategy","ISIN","OpenDate","CloseDate","Direction","Size","Duration_bars","PnL_%"] + out = pd.DataFrame(rows) + out = out[[c for c in cols if c in out.columns] + [c for c in out.columns if c not in cols]] + daily_df = pd.DataFrame(daily_rows) + if not daily_df.empty: + daily_df["Date"] = pd.to_datetime(daily_df["Date"]) + return out, daily_df + +# Colonne asset effettivamente usate nel portafoglio principale +asset_cols = [c for c in w_eq.columns if float(pd.to_numeric(w_eq[c], errors="coerce").abs().sum()) > 0.0] +if not asset_cols: + asset_cols = list(wide_pnl.columns) + +rep_eq, daily_eq = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]], + wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]], + w_eq_act, "Equal Weight") +rep_rp, daily_rp = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]], + wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]], + w_rp_act, "Risk Parity") + +with pd.ExcelWriter(TRADES_REPORT_XLSX) as xw: + rep_eq.to_excel(xw, "Equal_Weight", index=False) + rep_rp.to_excel(xw, "Risk_Parity", index=False) +checkpoint_post_timer("Report trades") + +# Performance attribution per ISIN +def _build_performance_attribution(trades_df: pd.DataFrame, meta_df: pd.DataFrame | None) -> pd.DataFrame: + if trades_df is None or trades_df.empty: + return pd.DataFrame(columns=["ISIN","Nome","Tot_Trades","Positivi","Negativi","Positivi_%","Negativi_%","PnL_Cum_%"]) + df = trades_df.copy() + df["PnL_%"] = pd.to_numeric(df["PnL_%"], errors="coerce") + rows = [] + for isin, g in df.groupby("ISIN"): + tot = len(g) + pos = int((g["PnL_%"] > 0).sum()) + neg = int((g["PnL_%"] < 0).sum()) + rows.append({ + "ISIN": str(isin), + "Tot_Trades": tot, + "Positivi": pos, + "Negativi": neg, + "Positivi_%": (pos / tot * 100.0) if tot > 0 else np.nan, + "Negativi_%": (neg / tot * 100.0) if tot > 0 else np.nan, + "PnL_Cum_%": float(g["PnL_%"].sum()) + }) + out = pd.DataFrame(rows) + if meta_df is not None and "ISIN" in meta_df.columns: + meta_cols = [c for c in ["ISIN","Nome","Descrizione","Categoria","Asset Class"] if c in meta_df.columns] + if meta_cols: + out = out.merge(meta_df[meta_cols].drop_duplicates("ISIN"), on="ISIN", how="left") + # ordina colonne con Nome subito dopo ISIN + cols = [c for c in ["ISIN","Nome"] if c in out.columns] + [c for c in out.columns if c not in ["ISIN","Nome"]] + return out[cols] + +perf_attr_df = _build_performance_attribution(pd.concat([rep_eq, rep_rp], ignore_index=True), df_sum if 'df_sum' in globals() else None) +perf_attr_df.to_excel(PERF_ATTRIB_XLSX, index=False) +print(f"[INFO] Performance attribution salvata in {PERF_ATTRIB_XLSX}") + +print(f"[INFO] Report trades salvato in {TRADES_REPORT_XLSX}") +# ============================================================ +# 5.6 Rebuild DAILY PnL from trades_report (calendarized) +# → per rendere coerente il compounding dei trade con equity/heatmap +# ============================================================ +import pandas as pd +import numpy as np + +def rebuild_daily_from_trades_dict(trades_dict): + """ + trades_dict: {'Equal_Weight': df, 'Risk_Parity': df} + Ogni df deve avere: OpenDate, CloseDate, Size, Duration_bars, PnL_% + Regola: distribuiamo il PnL del trade su ciascun giorno di durata con + un rendimento giornaliero costante r tale che (1+r)^D - 1 = PnL. + I giorni attivi sono [OpenDate, CloseDate) sul calendario business. + Il contributo del trade ogni giorno è Size * r. + Ritorna: DataFrame con indice Date (business days) e colonne per strategia. + """ + # Costruisci indice calendario minimo→massimo + min_dt, max_dt = None, None + for df in trades_dict.values(): + if df is None or df.empty: + continue + df = df.copy() + df["OpenDate"] = pd.to_datetime(df["OpenDate"]) + df["CloseDate"] = pd.to_datetime(df["CloseDate"]) + lo, hi = df["OpenDate"].min(), df["CloseDate"].max() + if pd.notna(lo): min_dt = lo if (min_dt is None or lo < min_dt) else min_dt + if pd.notna(hi): max_dt = hi if (max_dt is None or hi > max_dt) else max_dt + if min_dt is None or max_dt is None: + return pd.DataFrame() + + cal = pd.bdate_range(start=min_dt, end=max_dt, freq="C") # business calendar + daily = pd.DataFrame(0.0, index=cal, columns=list(trades_dict.keys())) + + for strat, df in trades_dict.items(): + if df is None or df.empty: + continue + d = df.copy() + d["OpenDate"] = pd.to_datetime(d["OpenDate"]) + d["CloseDate"] = pd.to_datetime(d["CloseDate"]) + d["Size"] = pd.to_numeric(d.get("Size", 0.0), errors="coerce").fillna(0.0) + d["Duration_bars"] = pd.to_numeric(d.get("Duration_bars", 0), errors="coerce").fillna(0).astype(int) + d["PnL_%"] = pd.to_numeric(d.get("PnL_%", 0.0), errors="coerce").fillna(0.0) + + # Costruisci serie giornaliera sommando i contributi dei singoli trade + s = pd.Series(0.0, index=cal) + for _, row in d.iterrows(): + D = int(row["Duration_bars"]) if pd.notna(row["Duration_bars"]) else 0 + if D <= 0: # trade di durata nulla → salta + continue + pnl = float(row["PnL_%"]) / 100.0 + try: + r_daily = (1.0 + pnl) ** (1.0 / D) - 1.0 + except Exception: + r_daily = pnl / max(1, D) + size = float(row["Size"]) if pd.notna(row["Size"]) else 0.0 + + # Giorni attivi: [OpenDate, CloseDate) sul calendario business + rng = pd.bdate_range(start=row["OpenDate"], end=row["CloseDate"] - pd.tseries.offsets.BDay(1), freq="C") + if len(rng) != D: + # se per qualche motivo differisce (festivi ecc.) ricalibra usando la lunghezza effettiva + if len(rng) <= 0: + continue + try: + r_daily = (1.0 + pnl) ** (1.0 / len(rng)) - 1.0 + except Exception: + r_daily = pnl / len(rng) + s[rng] = s[rng] + size * r_daily + daily[strat] = s + + return daily + +# Costruisci il dict dai DataFrame di trade appena creati +trades_dict = { + "Equal_Weight": rep_eq if 'rep_eq' in globals() else pd.DataFrame(), + "Risk_Parity": rep_rp if 'rep_rp' in globals() else pd.DataFrame(), +} + +# Ricostruzione daily dai PnL giornalieri reali dei trade (senza spalmare) +daily_detail = pd.concat([daily_eq, daily_rp], ignore_index=True) +if not daily_detail.empty: + daily_detail["Date"] = pd.to_datetime(daily_detail["Date"]) + daily_detail["PnL_day"] = pd.to_numeric(daily_detail["PnL_day"], errors="coerce") + daily_detail["Size"] = pd.to_numeric(daily_detail["Size"], errors="coerce").fillna(0.0) + daily_detail = daily_detail.dropna(subset=["Date", "PnL_day"]) + daily_detail["Pnl_contrib"] = daily_detail["PnL_day"] * daily_detail["Size"] + daily_from_trades = ( + daily_detail.pivot_table(index="Date", columns="Strategy", values="Pnl_contrib", aggfunc="sum") + .sort_index() + .fillna(0.0) + ) +else: + daily_from_trades = pd.DataFrame() + +# Salva su disco (CSV + XLSX) per ispezione +if not daily_from_trades.empty: + daily_from_trades.to_csv(DAILY_FROM_TRADES_CSV, index_label="Date") + + # Plot equity & heatmap basati sui DAILY da trade (coerenti col compounding) + import matplotlib.pyplot as plt + + col_map = [ + ("Equal Weight", ["Equal Weight", "Equal_Weight"]), + ("Risk Parity", ["Risk Parity", "Risk_Parity"]), + ] + + def _find_col(df, aliases): + for c in aliases: + if c in df.columns: + return c + return None + + fig, ax = plt.subplots(figsize=(10, 6)) + for lab, aliases in col_map: + col = _find_col(daily_from_trades, aliases) + if col: + eq = (1.0 + daily_from_trades[col].fillna(0.0)).cumprod() * 100 + eq.plot(ax=ax, label=lab) + ax.legend() + ax.grid(True) + ax.set_title("Equity line ricostruita dai trades (calendarizzata)") + fig.tight_layout() + fig.savefig(PLOT_DIR / "equity_from_trades.png", dpi=150) + plt.close(fig) + print(f"[INFO] Salvato: {PLOT_DIR / 'equity_from_trades.png'}") + + # Heatmap per ciascuna strategia + for lab, aliases, fname in [ + ("Equal Weight", ["Equal Weight", "Equal_Weight"], "heatmap_equal_from_trades.png"), + ("Risk Parity", ["Risk Parity", "Risk_Parity"], "heatmap_rp_from_trades.png"), + ]: + col = _find_col(daily_from_trades, aliases) + if col: + try: + plot_heatmap_monthly( + daily_from_trades[col], + f"Heatmap mensile - {lab} (da trades)", + save_path=PLOT_DIR / fname, + ) + except Exception as e: + print(f"[WARN] Heatmap {lab} da trades: {e}") +else: + print("[INFO] daily_from_trades risulta vuoto: nessun plot/CSV generato.") + +checkpoint_post_timer("Ricostruzione daily/plot") + + +# ============================================================ +# METRICS UTILS (guard) — richieste da _calc_all_metrics_... +# ============================================================ +import numpy as np +import pandas as pd + +# r2 della equity line vs trend line +if "r2_equity_line" not in globals(): + def r2_equity_line(returns: pd.Series) -> float: + r = pd.to_numeric(returns, errors="coerce").fillna(0.0) + if r.size == 0: + return np.nan + eq = (1.0 + r).cumprod() + t = np.arange(eq.size, dtype=float) + X = np.vstack([t, np.ones_like(t)]).T + beta, alpha = np.linalg.lstsq(X, eq.values, rcond=None)[0] + yhat = alpha + beta * t + ss_res = np.sum((eq.values - yhat) ** 2) + ss_tot = np.sum((eq.values - eq.values.mean()) ** 2) + return float(1 - ss_res/ss_tot) if ss_tot > 0 else np.nan + +# metriche drawdown: MaxDD, durata massima, time-to-recovery +if "drawdown_metrics" not in globals(): + def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): + r = pd.to_numeric(returns, errors="coerce").fillna(0.0) + if r.size == 0: + return np.nan, np.nan, np.nan + eq = (1.0 + r).cumprod() + peak = eq.cummax() + dd = eq / peak - 1.0 + maxdd = float(dd.min()) if dd.size else np.nan + + # durata massima di drawdown (numero di barre consecutive sotto il picco) + dd_mask = dd < 0 + max_dur, cur = 0, 0 + for flag in dd_mask: + cur = cur + 1 if flag else 0 + if cur > max_dur: + max_dur = cur + + # time-to-recovery dal minimo assoluto + ttr = np.nan + if dd.size: + idx_min = dd.idxmin() + sub_eq = eq.loc[idx_min:] + rec_idx = sub_eq[sub_eq >= peak.loc[idx_min]].index.min() + if pd.notna(rec_idx): + # se indice datetime, durata in giorni; se indice numerico, in barre + if isinstance(rec_idx, pd.Timestamp): + ttr = (rec_idx - idx_min).days + else: + ttr = int(sub_eq.index.get_loc(rec_idx)) + else: + ttr = sentinel_ttr + return maxdd, int(max_dur), ttr + +# AAW/AUW/Heal Index (versione validata che mi hai passato) +if "heal_index_metrics" not in globals(): + def heal_index_metrics(returns: pd.Series): + s = returns.fillna(0.0).astype(float) + if s.size == 0: + return np.nan, np.nan, np.nan + equity = (1.0 + s).cumprod() + if equity.size == 0: + return np.nan, np.nan, np.nan + + run_max = equity.cummax() + dd = equity / run_max - 1.0 + AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan + + run_min = equity.cummin() + ru = equity / run_min - 1.0 + AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan + + heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan + return AAW, AUW, heal + +# H_min (100% finestre positive) in giorni/mesi (versione validata) +if "h_min_100" not in globals(): + def h_min_100(returns: pd.Series, month_len: int = 21): + s = returns.dropna().astype(float) + n = s.size + if n == 0: + return np.nan, np.nan + log1p = np.log1p(s.values) + csum = np.cumsum(log1p) + + def rolling_sum_k(k: int): + if k > n: + return np.array([]) + head = csum[k - 1:] + tail = np.concatenate(([0.0], csum[:-k])) + return head - tail + + for k in range(1, n + 1): + rs = rolling_sum_k(k) + if rs.size == 0: + break + roll_ret = np.exp(rs) - 1.0 + if np.all(roll_ret >= 0): + h_days = k + h_months = int(np.ceil(h_days / month_len)) + return h_days, h_months + return np.nan, np.nan + +# costante se non già definita +try: + DAYS_PER_YEAR +except NameError: + DAYS_PER_YEAR = 252 + +# ========= FUNZIONE RICHIESTA DAL LOOP ========= +def _calc_all_metrics_from_returns(r: pd.Series) -> dict: + r = pd.to_numeric(r, errors="coerce").fillna(0.0) + n = len(r) + if n == 0: + return {k: np.nan for k in [ + "Rendimento_Ann","Volatilita_Ann","CAGR","R2_Equity", + "MaxDD","DD_Duration_Max","TTR_from_MDD","AAW","AUW","Heal_Index","H_min_100m_5Y" + ]} + + rendimento_ann = float(r.mean() * DAYS_PER_YEAR) + volatilita_ann = float(r.std(ddof=1) * np.sqrt(DAYS_PER_YEAR)) if n > 1 else np.nan + + eq = (1.0 + r).cumprod() + years_elapsed = n / DAYS_PER_YEAR if n > 0 else np.nan + if years_elapsed and years_elapsed > 0 and eq.iloc[0] > 0: + cagr = float(eq.iloc[-1] ** (1.0 / years_elapsed) - 1.0) + else: + cagr = np.nan + + r2 = r2_equity_line(r) + maxdd, dddur, ttr = drawdown_metrics(r, sentinel_ttr=1250) + aaw, auw, heal = heal_index_metrics(r) + + # 5 anni o meno se la serie è più corta + lookback = min(n, DAYS_PER_YEAR * 5) + r5 = r.iloc[-lookback:] + _, hmin_months = h_min_100(r5, month_len=21) + + return { + "Rendimento_Ann": rendimento_ann, + "Volatilita_Ann": volatilita_ann, + "CAGR": cagr, + "R2_Equity": r2, + "MaxDD": maxdd, + "DD_Duration_Max": dddur, + "TTR_from_MDD": ttr, + "AAW": aaw, + "AUW": auw, + "Heal_Index": heal, + "H_min_100m_5Y": hmin_months + } + + +# ====================================================================== +# 6) LOOP Top-N: metriche per N = 6..20 → final_metrics.xlsx +# ====================================================================== + +# Safety: DAYS_PER_YEAR se non definito +try: + DAYS_PER_YEAR +except NameError: + DAYS_PER_YEAR = 252 + +def _select_isins_for_topN(df_sum: pd.DataFrame, top_n: int): + """Seleziona i migliori 'top_n' ISIN in base allo Score.""" + df_sum_loc = df_sum.copy() + base_isins_N = ( + df_sum_loc + .sort_values("Score", ascending=False) + .head(top_n)["ISIN"].astype(str).str.strip().tolist() + ) + return base_isins_N + +def _build_portfolio_returns_for_isins(base_isins_N, wide_pnl): + """ + Costruisce i rendimenti di portafoglio Equal Weight e Risk Parity + per l'insieme di ISIN in base_isins_N. + + Ritorna: + ret_eq_N : pd.Series + ret_rp_N : pd.Series + """ + + # Colonne effettivamente disponibili + colsN = [c for c in base_isins_N if c in wide_pnl.columns] + + if len(colsN) == 0: + # Nessun ISIN valido → portafogli in cash (linea piatta) + idx = wide_pnl.index + ret_eq_N = pd.Series(0.0, index=idx, name="Ret_EqW_N") + ret_rp_N = pd.Series(0.0, index=idx, name="Ret_RP_N") + return ret_eq_N, ret_rp_N + + # -------- Equal Weight -------- + ret_eq_N = wide_pnl[colsN].mean(axis=1) + + # -------- Risk Parity con cap -------- + weights_rp_N = inverse_vol_weights( + wide_pnl[colsN], + window=60, + max_weight=RP_MAX_WEIGHT # es. RP_MAX_WEIGHT = 2 / TOP_N_MAX = 0.1333 + ) + ret_rp_N = (wide_pnl[colsN] * weights_rp_N).sum(axis=1) + + return ret_eq_N, ret_rp_N + +# ============================== +# Metriche portafoglio (TOP_N corrente) → Excel +# ============================== +metrics_rows = [] +for strategy_name, rser in [ + ("Equal_Weight", ret_eq), + ("Risk_Parity", ret_rp), +]: + m = _calc_all_metrics_from_returns(rser) + m["TopN"] = TOP_N + m["Strategy"] = strategy_name + metrics_rows.append(m) + +df_metrics = pd.DataFrame(metrics_rows)[[ + "TopN", "Strategy", + "Rendimento_Ann", "Volatilita_Ann", "CAGR", "R2_Equity", + "MaxDD", "DD_Duration_Max", "TTR_from_MDD", + "AAW", "AUW", "Heal_Index", "H_min_100m_5Y", +]] + +try: + with pd.ExcelWriter(FINAL_METRICS_XLSX, engine="openpyxl", mode="a", if_sheet_exists="replace") as xw: + df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False) +except Exception: + with pd.ExcelWriter(FINAL_METRICS_XLSX) as xw: + df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False) +print(f"[INFO] Salvato: {FINAL_METRICS_XLSX} (Portfolio_Metrics)") diff --git a/config/pattern_knn_config.json b/config/pattern_knn_config.json index 3fa8684..5d4d5fd 100644 --- a/config/pattern_knn_config.json +++ b/config/pattern_knn_config.json @@ -11,6 +11,13 @@ "theta": 0.005, "embargo": null }, + "wavelet_filter": { + "enabled": true, + "wavelet": "db4", + "level": 4, + "mode": "symmetric", + "threshold_mode": "soft" + }, "tagging": { "z_rev": 2.0, "z_vol": 2.0, diff --git a/shared_utils.py b/shared_utils.py index 1d6982a..0f643be 100644 --- a/shared_utils.py +++ b/shared_utils.py @@ -9,6 +9,10 @@ from typing import Dict, List, Optional, Sequence, Tuple import numpy as np import pandas as pd import pyodbc +try: + import pywt +except ImportError: # pragma: no cover - optional dependency + pywt = None DEFAULT_CONFIG_PATH = Path("config/pattern_knn_config.json") @@ -87,6 +91,58 @@ def z_norm(arr: np.ndarray) -> Optional[np.ndarray]: return (arr - mu) / (sd + 1e-12) +def wavelet_denoise( + series: pd.Series, + wavelet: str = "db3", + level: int = 3, + mode: str = "symmetric", + threshold_mode: str = "soft", +) -> Optional[pd.Series]: + """Denoise/reshape the series with a wavelet decomposition. + + Keeps the original index length; if PyWavelets is missing the function + returns None so callers can gracefully fall back to the raw signal. + """ + if pywt is None: + print("[WARN] pywt non installato: salto il filtraggio wavelet.") + return None + s = pd.to_numeric(series, errors="coerce") + if s.dropna().empty: + return None + + w = pywt.Wavelet(wavelet) + max_level = pywt.dwt_max_level(len(s.dropna()), w.dec_len) + lvl = max(1, min(level, max_level)) if max_level > 0 else 1 + + valid = s.dropna() + coeffs = pywt.wavedec(valid.values, w, mode=mode, level=lvl) + # Universal threshold (Donoho-Johnstone) + sigma = np.median(np.abs(coeffs[-1])) / 0.6745 if len(coeffs[-1]) > 0 else 0.0 + thresh = sigma * np.sqrt(2 * np.log(len(valid))) if sigma > 0 else 0.0 + if thresh <= 0: + coeffs_f = coeffs + else: + def _safe_thresh(c: np.ndarray) -> np.ndarray: + if c is None or c.size == 0: + return c + if threshold_mode == "hard": + return pywt.threshold(c, value=thresh, mode="hard") + # soft threshold without divide-by-zero warnings + mag = np.abs(c) + mask = mag > thresh + out = np.zeros_like(c) + out[mask] = np.sign(c[mask]) * (mag[mask] - thresh) + return out + + coeffs_f = [coeffs[0]] + [_safe_thresh(c) for c in coeffs[1:]] + + rec = pywt.waverec(coeffs_f, w, mode=mode) + rec = rec[: len(valid)] + filt = pd.Series(rec, index=valid.index) + # Re-allineamento all'indice originale + return filt.reindex(s.index).interpolate(limit_direction="both") + + def build_pattern_library( ret_series: pd.Series, wp: int,