# -*- coding: utf-8 -*- """ Created on Mon Oct 27 13:59:10 2025 Script end-to-end: - Carica universo (Excel) e dati (DB) una sola volta - Calcola Hurst + Pattern signals (solo long per i trade) - Esegue walk-forward k-NN (solo long) e salva forward_bt_signals/summary - Fase 5: selezione dinamica portafogli + Equity + Heatmap + Trade report (solo long) - Fase 6: metriche finali (come "ottimizzatore") + salvataggio grafici su file Note: - Non rilegge file appena salvati; usa i DataFrame in memoria - Salva: hurst_by_isin.xlsx, pattern_signals.xlsx, forward_bt_*.xlsx, trades_report.xlsx, final_metrics.xlsx - Salva PNG: equity_line_portafogli.png, heatmap_*.png """ import pandas as pd import numpy as np import sqlalchemy as sa from sqlalchemy import text import matplotlib.pyplot as plt from pathlib import Path import json import ssl import re from urllib.request import urlopen from urllib.error import URLError, HTTPError from shared_utils import ( build_pattern_library, characterize_window, detect_column, load_config, predict_from_library, read_connection_txt, require_section, require_value, wavelet_denoise, z_norm, ) #from math import isfinite import time # ============================= # Plot saving helper (non-recursive) # ============================= try: import os as _os_sf import matplotlib.pyplot as _plt_sf except Exception: _plt_sf = None SAVE_PNG = globals().get("SAVE_PNG", True) def savefig_safe(path, **kwargs): """ Save a matplotlib figure to disk safely, honoring SAVE_PNG. Usage: savefig_safe("plot/myfig.png", dpi=150, bbox_inches="tight") """ if not SAVE_PNG or _plt_sf is None: return # Ensure directory exists try: d = _os_sf.path.dirname(path) if d and not _os_sf.path.exists(d): _os_sf.makedirs(d, exist_ok=True) except Exception as _e: print(f"[savefig_safe] Directory creation warning: {_e}") try: _plt_sf.savefig(path, **kwargs) except Exception as e: print(f"[savefig_safe] Warning while saving '{path}': {e}") # Calcolo Score (riusabile anche rolling) def _apply_score(df_sum: pd.DataFrame) -> pd.DataFrame: """Applica la calibrazione dei pesi su df_sum e aggiunge la colonna Score.""" def _available_cols(df, cols): return [c for c in cols if (c in df.columns and df[c].notna().sum() > 0)] primary_cols = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)] alt_cols = [("QualityScore", True), ("Confidence", True), ("OutcomeScore", True)] mm = [(c,gh) for (c,gh) in primary_cols if c in df_sum.columns and df_sum[c].notna().sum()>0] if len(mm) < 2: mm = [(c,gh) for (c,gh) in alt_cols if c in df_sum.columns and df_sum[c].notna().sum()>0] # Se ancora insufficienti, prova ad allargare al set unito if len(mm) < 2: union_candidates = list({x[0] for x in primary_cols+alt_cols}) mm = [(c, True) for c in _available_cols(df_sum, union_candidates)] if len(mm) == 0: print("[WARN] Nessuna metrica numerica disponibile: uso Score=0 e ordino per ISIN.") df_sum["Score"] = 0.0 df_sum["Score_mode"] = "degenerate_equal" return df_sum # Se sono definiti pesi fissi in config, usali; altrimenti calibra automaticamente use_fixed = False if SCORE_WEIGHTS: weights_raw = {k: float(v) for k, v in SCORE_WEIGHTS.items() if k in df_sum.columns} weights_raw = {k: v for k, v in weights_raw.items() if df_sum[k].notna().sum() > 0} if weights_raw: use_fixed = True w = pd.Series(weights_raw) w = w / w.sum() X_ranked = df_sum[w.index].rank(pct=True) df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1) df_sum["Score_mode"] = "fixed_weights" if SCORE_VERBOSE: print("Pesi fissi (config):", w.to_dict()) else: print("[WARN] score_weights in config non compatibili con le metriche disponibili. Uso calibrazione automatica.") if not use_fixed: res = calibrate_score_weights( df_sum, metrics_map=mm, target_col=None ) X_ranked = res["X_ranked"] w = res["weights"] df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1) df_sum["Score_mode"] = res["mode"] if SCORE_VERBOSE: print("Pesi stimati automaticamente (metriche usate):") print("Disponibilita' metriche (righe non-NaN):", {c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]}) print(w) return df_sum # ============================= # PRICE FETCH (OPEN/CLOSE) - storico # ============================= def _build_symbol_euronext(row: pd.Series) -> tuple[str, str]: isin = str(row.get("ISIN", "")).strip() venue = str(row.get("Mercato", "")).strip() tok = str(row.get("TickerOpen", "") or "").strip() base = OPEN_PRICE_BASE_URL if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper(): return base, tok if isin and venue: return base, f"{isin}-{venue}" if isin: return base, f"{isin}-ETFP" # fallback generico per endpoint history return base, isin def fetch_price_history(isins, universe: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame: """ Scarica la serie storica open/close per una lista di ISIN usando l'endpoint storico. - API chiamata 1 ISIN alla volta: https://fin.scorer.app/finance/etf-inv/history/{ticker}?fromDate=YYYYMMDD&toDate=YYYYMMDD - Caching locale su CSV per ridurre le richieste; se l'API fallisce, tenta di usare la cache. - Fallback mercati: ETFP → XPAR → XAMS. Se si estende una serie con un altro mercato, la giunta avviene solo se il prezzo all'ultimo punto del segmento precedente e al primo del successivo differisce < 2% (per evitare salti di valuta/quotazione). Ritorna DataFrame con colonne: Date (datetime), ISIN, Open, Close. """ start_dt = pd.to_datetime(start_date).date() end_dt = pd.to_datetime(end_date).date() def _symbol_cache_path(symbol: str) -> Path: safe = re.sub(r"[^A-Za-z0-9_-]+", "_", str(symbol)) return OPEN_CACHE_DIR / f"{safe}.csv" def _load_cache(path: Path) -> pd.DataFrame | None: try: if path.exists(): dfc = pd.read_csv(path, parse_dates=["Date"]) dfc["ISIN"] = dfc["ISIN"].astype(str) return dfc except Exception as e: print(f"[WARN] Cache prezzi corrotta {path}: {e}") return None def _normalize_payload_to_df(payload, isin): # Il nuovo endpoint ritorna [{"ticker": "...", "data": [ {...}, ... ]}] data_block = payload if isinstance(payload, list) and payload: if isinstance(payload[0], dict) and "data" in payload[0]: data_block = payload[0].get("data", []) else: data_block = payload if isinstance(payload, dict) and "data" in payload: data_block = payload.get("data", []) rows = [] for d in data_block or []: dt_raw = d.get("date") or d.get("Date") or d.get("data") or d.get("timestamp") if dt_raw is None: continue try: if isinstance(dt_raw, (int, float)): dt_parsed = pd.to_datetime(int(dt_raw), unit="ms").tz_localize(None) else: dt_parsed = pd.to_datetime(dt_raw).tz_localize(None) except Exception: continue rows.append({ "Date": dt_parsed, "ISIN": str(isin), "Open": _to_float_safe(d.get("open")), "Close": _to_float_safe(d.get("close") or d.get("last")) }) return pd.DataFrame(rows) if rows else pd.DataFrame(columns=["Date","ISIN","Open","Close"]) def _fetch_symbol(symbol: str, isin: str): url = f"{OPEN_PRICE_BASE_URL}/{symbol}?fromDate={start_dt.strftime('%Y%m%d')}&toDate={end_dt.strftime('%Y%m%d')}" cache_path = _symbol_cache_path(symbol) cache_df = _load_cache(cache_path) df_api = pd.DataFrame() ok = False for attempt in range(1, OPEN_MAX_RETRY + 1): try: with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp: payload = json.loads(resp.read().decode("utf-8")) df_api = _normalize_payload_to_df(payload, isin) if df_api.empty: print(f"[WARN] Nessun dato per {symbol}") ok = True break except (HTTPError, URLError, ssl.SSLError, json.JSONDecodeError) as e: if attempt < OPEN_MAX_RETRY: print(f"[WARN] Download {symbol} tentativo {attempt}/{OPEN_MAX_RETRY} fallito: {e}. Retry in {OPEN_SLEEP_SEC}s") time.sleep(OPEN_SLEEP_SEC) else: print(f"[ERROR] Download {symbol} fallito: {e}") df_use = pd.DataFrame() if ok and not df_api.empty: df_api = df_api.sort_values("Date") if cache_df is not None and not cache_df.empty: df_use = ( pd.concat([cache_df, df_api], ignore_index=True) .drop_duplicates(subset=["Date"]) .sort_values("Date") ) else: df_use = df_api try: OPEN_CACHE_DIR.mkdir(parents=True, exist_ok=True) df_use.to_csv(cache_path, index=False) except Exception as e: print(f"[WARN] Salvataggio cache prezzi fallito ({cache_path}): {e}") elif cache_df is not None and not cache_df.empty: df_use = cache_df print(f"[INFO] Uso cache prezzi per {symbol} (API indisponibile).") return df_use def _merge_with_check(df_base: pd.DataFrame, df_add: pd.DataFrame, label_prev: str, label_next: str): """ Estende df_base aggiungendo il tratto df_add antecedente al primo punto di df_base. Controlla il salto di prezzo all'incrocio: se > 2%, non fonde e avvisa. """ if df_base is None or df_base.empty: return df_add, False if df_add is None or df_add.empty: return df_base, False cutoff = df_base["Date"].min() prev_part = df_add[df_add["Date"] < cutoff] if prev_part.empty: return df_base, False merged = pd.concat([prev_part, df_base], ignore_index=True) merged = merged.sort_values("Date").drop_duplicates(subset=["Date"], keep="last") # controllo salto: ultimo prezzo del segmento precedente vs primo del successivo prev_last = prev_part.sort_values("Date").iloc[-1] next_first = df_base[df_base["Date"] >= cutoff].sort_values("Date").iloc[0] def _price(row): return _to_float_safe(row.get("Close")) if pd.notna(row.get("Close")) else _to_float_safe(row.get("Open")) p_prev = _price(prev_last) p_next = _price(next_first) if p_prev is None or p_next is None or not np.isfinite(p_prev) or not np.isfinite(p_next) or p_next == 0: return merged, True gap = abs(p_prev - p_next) / abs(p_next) if gap > 0.02: print(f"[WARN] Salto prezzo >2% tra {label_prev} e {label_next} su {prev_last['Date'].date()} -> {next_first['Date'].date()} (gap {gap:.2%}). Fallback non applicato.") return df_base, False return merged, True records = [] for i, isin in enumerate(isins, 1): try: row = universe.loc[universe["ISIN"] == str(isin)].iloc[0] except Exception: print(f"[WARN] ISIN {isin} non trovato nell'universo.") continue base, symbol = _build_symbol_euronext(row) df_primary = _fetch_symbol(symbol, isin) # Fallback mercati aggiuntivi (XPAR, poi XAMS) per estendere indietro la serie fallback_symbols = [] if "-" in symbol: root = symbol.rsplit("-", 1)[0] fallback_symbols.append(f"{root}-XPAR") fallback_symbols.append(f"{root}-XAMS") else: fallback_symbols.append(f"{symbol}-XPAR") fallback_symbols.append(f"{symbol}-XAMS") df_use = df_primary applied_any = False for fb_sym in fallback_symbols: # servono solo se la serie non parte da start_dt need_fb = df_use.empty or (df_use["Date"].min().date() > start_dt) if not need_fb: continue df_fb = _fetch_symbol(fb_sym, isin) if df_fb.empty: print(f"[WARN] Fallback {fb_sym} assente per {isin}") continue if df_use.empty: df_use = df_fb applied_any = True print(f"[INFO] Uso fallback {fb_sym} per tutto il periodo.") else: merged, merged_ok = _merge_with_check(df_use, df_fb, fb_sym, symbol) if merged_ok: df_use = merged applied_any = True cutoff = df_use["Date"].min() print(f"[INFO] Serie estesa con {fb_sym} fino a {cutoff.date()} per {isin}") else: print(f"[WARN] Fallback {fb_sym} scartato per gap >2% su {isin}") if df_use.empty: print(f"[WARN] Serie open/close non disponibile per {isin}") continue # Filtro range richiesto df_use["Date"] = pd.to_datetime(df_use["Date"]) mask = (df_use["Date"].dt.date >= start_dt) & (df_use["Date"].dt.date <= end_dt) df_use = df_use.loc[mask] if df_use.empty: print(f"[WARN] Nessun dato nel range richiesto per {symbol}") continue records.append(df_use) if not records: return pd.DataFrame(columns=["Date","ISIN","Open","Close"]) df_px = pd.concat(records, ignore_index=True) df_px = df_px.sort_values(["ISIN","Date"]).reset_index(drop=True) return df_px def save_price_cache_summary(cache_dir: Path, outfile: Path, pattern: str = "*ETFP.csv"): """ Salva un riepilogo delle serie prezzi in cache (senza fallback) con min/max date e numero righe. pattern di default: solo i simboli ETFP. """ try: if not cache_dir.exists(): print(f"[WARN] Cache prezzi non trovata: {cache_dir}") return rows = [] for f in sorted(cache_dir.glob(pattern)): try: df = pd.read_csv(f, parse_dates=["Date"]) except Exception as e: rows.append({"Symbol": f.stem, "Errore": str(e)}) continue if df.empty: rows.append({"Symbol": f.stem, "Rows": 0}) continue rows.append({ "Symbol": f.stem, "min_date": df["Date"].min().date(), "max_date": df["Date"].max().date(), "rows": len(df) }) if not rows: print(f"[WARN] Nessun file prezzi in cache ({cache_dir}).") return out_df = pd.DataFrame(rows).sort_values("Symbol") outfile.parent.mkdir(parents=True, exist_ok=True) out_df.to_excel(outfile, index=False) print(f"[INFO] Salvato riepilogo prezzi (no fallback) in {outfile} ({len(out_df)} righe)") except Exception as e: print(f"[WARN] Impossibile salvare riepilogo prezzi: {e}") def _to_float_safe(x): try: return float(x) except Exception: return np.nan # LEGACY: blocco originale mantenuto ma non eseguito (usiamo _apply_score sopra) # ========================================= # PARAMETRI GLOBALI # ========================================= CONFIG = load_config() DB_CONFIG = require_section(CONFIG, "db") PATTERN_CONFIG = require_section(CONFIG, "pattern") TAGGING_CONFIG = require_section(CONFIG, "tagging") RANKING_CONFIG = require_section(CONFIG, "ranking") PATHS_CONFIG = require_section(CONFIG, "paths") HURST_CONFIG = CONFIG.get("hurst", {}) RUN_CONFIG = CONFIG.get("run", {}) SIGNALS_CONFIG = CONFIG.get("signals", {}) PRICES_CONFIG = CONFIG.get("prices", {}) OUTPUT_DIR = Path(PATHS_CONFIG.get("output_dir", "output")) PLOT_DIR = Path(PATHS_CONFIG.get("plot_dir", "plot")) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) PLOT_DIR.mkdir(parents=True, exist_ok=True) UNIVERSO_XLSX = PATHS_CONFIG.get("input_universe", "Input/Universo per Trading System.xlsx") # Export OUTPUT_HURST_XLSX = OUTPUT_DIR / "hurst_by_isin.xlsx" OUTPUT_PATTERN_XLSX = OUTPUT_DIR / "pattern_signals.xlsx" ERROR_LOG_CSV = OUTPUT_DIR / "errori_isin.csv" FORWARD_BT_SIGNALS_XLSX = OUTPUT_DIR / "forward_bt_signals.xlsx" FORWARD_BT_SUMMARY_XLSX = OUTPUT_DIR / "forward_bt_summary.xlsx" TRADES_REPORT_XLSX = OUTPUT_DIR / "trades_report.xlsx" PERF_ATTRIB_XLSX = OUTPUT_DIR / "performance_attribution.xlsx" DAILY_FROM_TRADES_CSV = OUTPUT_DIR / "daily_from_trades.csv" DAILY_FROM_TRADES_XLSX = OUTPUT_DIR / "daily_from_trades.xlsx" WEIGHTS_DAILY_XLSX = OUTPUT_DIR / "weights_daily.xlsx" FINAL_METRICS_XLSX = OUTPUT_DIR / "final_metrics.xlsx" # Stored Procedure & parametri STORED_PROC = str(require_value(DB_CONFIG, "stored_proc", "db")) N_BARS = int(require_value(DB_CONFIG, "n_bars", "db")) PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db")) RANKING_WINDOW_BARS = int(RANKING_CONFIG.get("rolling_window_bars", N_BARS)) RP_LOOKBACK = int(SIGNALS_CONFIG.get("risk_parity_lookback", 60)) OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get("base_url", "https://fin.scorer.app/finance/etf-inv/history")) OPEN_MAX_RETRY = int(PRICES_CONFIG.get("max_retry", 3)) OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1)) OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10)) OPEN_CACHE_DIR = Path(PRICES_CONFIG.get("cache_dir", OUTPUT_DIR / "price_cache")) RECOMPUTE_PORTF_FROM_OPEN = bool(PRICES_CONFIG.get("recompute_portfolio_open", False)) # Pattern-matching (iper-parametri) WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) # lunghezza finestra pattern (barre) HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) # orizzonte outcome (barre) KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) # numero di vicini THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # soglia su outcome per generare segnale EMBARGO = require_value(PATTERN_CONFIG, "embargo", "pattern") WAVELET_CFG = CONFIG.get("wavelet_filter", {}) WAVELET_ENABLED = bool(WAVELET_CFG.get("enabled", False)) WAVELET_NAME = str(WAVELET_CFG.get("wavelet", "db3")) WAVELET_LEVEL = int(WAVELET_CFG.get("level", 3)) WAVELET_MODE = str(WAVELET_CFG.get("mode", "symmetric")) WAVELET_THRESHOLD_MODE = str(WAVELET_CFG.get("threshold_mode", "soft")) MAX_AVG_DIST = PATTERN_CONFIG.get("max_avg_dist", None) if EMBARGO is None: EMBARGO = WP + HA else: EMBARGO = int(EMBARGO) # Tagging rule-based (soglie) Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging")) Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging")) STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging")) TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) # numero massimo di asset ammessi RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # 2 x 1/15 ≈ 0.1333 = 13,33% if RP_MAX_WEIGHT is None: RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1) else: RP_MAX_WEIGHT = float(RP_MAX_WEIGHT) SCORE_VERBOSE = bool(RANKING_CONFIG.get("score_verbose", False)) SCORE_WEIGHTS = RANKING_CONFIG.get("score_weights") HURST_MIN_LENGTH = int(HURST_CONFIG.get("min_length", 200)) HURST_WIN_GRID = HURST_CONFIG.get("win_grid") HURST_MIN_SEGMENTS = int(HURST_CONFIG.get("min_segments", 1)) DAYS_PER_YEAR = int(RUN_CONFIG.get("days_per_year", 252)) TOP_N = int(RUN_CONFIG.get("top_n_default", TOP_N_MAX)) # ========================================= # UTILS GENERALI # ========================================= def clamp01(x): if not np.isfinite(x): return np.nan return float(min(1.0, max(0.0, x))) def format_eta(seconds): """Format a duration (seconds) as Xm Ys or Xh Ym Ys for readability.""" if not np.isfinite(seconds): return "n/a" seconds = max(0, int(round(seconds))) minutes, secs = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours: return f"{hours}h {minutes:02d}m {secs:02d}s" return f"{minutes}m {secs:02d}s" # Timer helper per fasi post-backtest _post_timer = {"t0": None, "tprev": None, "total": None, "done": 0} def start_post_timer(total_steps: int): _post_timer["t0"] = time.perf_counter() _post_timer["tprev"] = _post_timer["t0"] _post_timer["total"] = total_steps _post_timer["done"] = 0 def checkpoint_post_timer(label: str): if _post_timer["t0"] is None or _post_timer["total"] is None: return _post_timer["done"] += 1 now = time.perf_counter() step_dt = now - _post_timer["tprev"] total_dt = now - _post_timer["t0"] avg = total_dt / max(_post_timer["done"], 1) eta = avg * max(_post_timer["total"] - _post_timer["done"], 0) print(f"[TIMER] post { _post_timer['done']}/{_post_timer['total']} {label} — step {step_dt:.2f}s, total {total_dt:.2f}s, ETA {format_eta(eta)}") _post_timer["tprev"] = now # ================= HURST (sui RENDIMENTI) ================= def hurst_rs_returns(r, win_grid=None, min_seg=None): r = pd.Series(r).dropna().astype("float64").values n = len(r) seg_min = HURST_MIN_SEGMENTS if min_seg is None else int(min_seg) if n < HURST_MIN_LENGTH: return np.nan if win_grid is None: base = HURST_WIN_GRID or [16,24,32,48,64,96,128,192,256,384] base = np.array(base, dtype=int) win_grid = [w for w in base if w <= n//2] if len(win_grid) < 4: max_w = max(16, n//4) g = sorted(set([int(max(8, round((n/(2**k))))) for k in range(3,8)])) win_grid = [w for w in g if 8 <= w <= max_w] RS_vals, sizes = [], [] for w in win_grid: if w < 8 or w > n: continue m = n//w if m < seg_min: continue rs_list = [] for i in range(m): seg = r[i*w:(i+1)*w] seg = seg - np.mean(seg) sd = seg.std(ddof=1) if sd == 0 or not np.isfinite(sd): continue y = np.cumsum(seg) R = np.max(y) - np.min(y) rs = R/sd if np.isfinite(rs) and rs > 0: rs_list.append(rs) if rs_list: RS_vals.append(np.mean(rs_list)); sizes.append(w) if len(RS_vals) < 3: return np.nan sizes = np.array(sizes, float); RS_vals = np.array(RS_vals, float) mask = np.isfinite(RS_vals) & (RS_vals > 0) sizes, RS_vals = sizes[mask], RS_vals[mask] if sizes.size < 3: return np.nan slope, _ = np.polyfit(np.log(sizes), np.log(RS_vals), 1) return clamp01(slope) def hurst_dfa_returns(r, win_grid=None): r = pd.Series(r).dropna().astype("float64").values n = len(r) if n < HURST_MIN_LENGTH: return np.nan r_dm = r - np.mean(r) y = np.cumsum(r_dm) if win_grid is None: base = HURST_WIN_GRID or [16,24,32,48,64,96,128,192,256] base = np.array(base, dtype=int) win_grid = [w for w in base if w <= n//2] if len(win_grid) < 4: max_w = max(16, n//4) g = sorted(set([int(max(8, round((n/(2**k))))) for k in range(3,8)])) win_grid = [w for w in g if 8 <= w <= max_w] F_vals, sizes = [], [] for s in win_grid: if s < 8: continue m = n//s if m < 2: continue rms_list = [] for i in range(m): seg = y[i*s:(i+1)*s] t = np.arange(s, dtype=float) A = np.vstack([t, np.ones(s)]).T coeff, *_ = np.linalg.lstsq(A, seg, rcond=None) trend = A @ coeff detr = seg - trend rms = np.sqrt(np.mean(detr**2)) if np.isfinite(rms) and rms > 0: rms_list.append(rms) if rms_list: F_vals.append(np.mean(rms_list)); sizes.append(s) if len(F_vals) < 3: return np.nan sizes = np.array(sizes, float); F_vals = np.array(F_vals, float) mask = np.isfinite(F_vals) & (F_vals > 0) sizes, F_vals = sizes[mask], F_vals[mask] if sizes.size < 3: return np.nan slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1) return clamp01(slope) # --------------------------------- # R^2 su equity line (log-equity vs tempo) # --------------------------------- def r2_equity_line(returns: pd.Series) -> float: r = pd.to_numeric(returns, errors="coerce").fillna(0.0) eq = (1.0 + r).cumprod().replace(0, np.nan) y = np.log(eq.dropna()) if len(y) < 10: return np.nan x = np.arange(len(y), dtype=float) x = (x - x.mean()) / (x.std() + 1e-12) X = np.vstack([np.ones_like(x), x]).T beta, *_ = np.linalg.lstsq(X, y.values, rcond=None) y_hat = X @ beta ss_res = ((y.values - y_hat) ** 2).sum() ss_tot = ((y.values - y.values.mean()) ** 2).sum() return float(1 - ss_res / ss_tot) if ss_tot > 0 else np.nan # --------------------------------- # Drawdown metrics path-based # --------------------------------- def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): r = pd.to_numeric(returns, errors="coerce").fillna(0.0) eq = (1.0 + r).cumprod() if eq.empty: return np.nan, np.nan, np.nan roll_max = eq.cummax() dd = eq / roll_max - 1.0 maxdd = float(dd.min()) episodes = [] peak_i, peak_val = None, -np.inf trough_i, trough_val = None, np.inf in_uw = False v = eq.values for i in range(len(v)): if v[i] >= peak_val: if in_uw: episodes.append((peak_i, trough_i, i)) in_uw = False trough_i, trough_val = None, np.inf peak_i, peak_val = i, v[i] else: if not in_uw: in_uw = True trough_i, trough_val = i, v[i] elif v[i] < trough_val: trough_i, trough_val = i, v[i] if in_uw: episodes.append((peak_i, trough_i, None)) dd_dur_max = np.nan if episodes: durs = [] for p, t, rcv in episodes: if p is None: continue end_i = rcv if rcv is not None else len(eq) - 1 durs.append(end_i - p) if durs: dd_dur_max = int(max(durs)) ttr = np.nan if episodes: mdd_val = 0.0 mdd_ep = None for p, t, rcv in episodes: if p is None or t is None: continue dd_here = eq.iloc[t] / eq.iloc[p] - 1.0 if dd_here < mdd_val: mdd_val = dd_here mdd_ep = (p, t, rcv) if mdd_ep is not None: p, t, rcv = mdd_ep if rcv is not None: ttr = int(rcv - t) else: ttr = sentinel_ttr return maxdd, dd_dur_max, ttr # --------------------------------- # Utility per AAW, AUW e Heal Index (come nell'ottimizzatore) # --------------------------------- def heal_index_metrics(returns: pd.Series): """ Calcola: - AAW: area sopra acqua (run-up vs minimo cumulato) - AUW: area sotto acqua (drawdown vs massimo cumulato) - Heal Index: (AAW - AUW) / AUW """ s = returns.fillna(0.0).astype(float) if s.size == 0: return np.nan, np.nan, np.nan equity = (1.0 + s).cumprod() if equity.size == 0: return np.nan, np.nan, np.nan run_max = equity.cummax() dd = equity / run_max - 1.0 AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan run_min = equity.cummin() ru = equity / run_min - 1.0 AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan return AAW, AUW, heal # --------------------------------- # Utility per H_min (100% finestre positive) — come nell'ottimizzatore # --------------------------------- def h_min_100(returns: pd.Series, month_len: int = 21): """ Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)). """ s = returns.dropna().astype(float) n = s.size if n == 0: return np.nan, np.nan log1p = np.log1p(s.values) csum = np.cumsum(log1p) def rolling_sum_k(k: int): if k > n: return np.array([]) head = csum[k - 1:] tail = np.concatenate(([0.0], csum[:-k])) return head - tail for k in range(1, n + 1): rs = rolling_sum_k(k) if rs.size == 0: break roll_ret = np.exp(rs) - 1.0 if np.all(roll_ret >= 0): h_days = k h_months = int(np.ceil(h_days / month_len)) return h_days, h_months return np.nan, np.nan # ========================================= # 1) UNIVERSO: ISIN + METADATI # ========================================= universo = pd.read_excel(UNIVERSO_XLSX) col_isin_uni = detect_column(universo, ["ISIN", "isin", "codice isin"]) if col_isin_uni is None: raise ValueError("Nel file universo non trovo una colonna ISIN.") col_name_uni = detect_column(universo, ["Nome", "Name", "Descrizione", "Description", "Security Name", "Instrument Name"]) col_cat_uni = detect_column(universo, ["Categoria", "Category", "Classe", "Linea", "Tipo"]) col_ac_uni = detect_column(universo, ["Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class"]) isins = ( universo[col_isin_uni].astype(str).str.strip() .replace("", pd.NA).dropna().drop_duplicates().tolist() ) print(f"[INFO] ISIN totali in universo: {len(isins)}") meta_df = pd.DataFrame({"ISIN": universo[col_isin_uni].astype(str).str.strip()}) meta_df["Nome"] = universo[col_name_uni] if col_name_uni else None meta_df["Categoria"] = universo[col_cat_uni] if col_cat_uni else None meta_df["Asset Class"] = universo[col_ac_uni] if col_ac_uni else None meta_df = meta_df.drop_duplicates(subset=["ISIN"]).reset_index(drop=True) # ========================================= # 2) CONNESSIONE DB # ========================================= conn_str = read_connection_txt("connection.txt") engine = sa.create_engine(conn_str, fast_executemany=True) print("[INFO] Connessione pronta (SQLAlchemy + pyodbc).") # ========================================= # 3) LOOP ISIN → SP → HURST + PATTERN (SOLO LONG per i trade) # ========================================= errors = [] hurst_rows = [] pattern_rows = [] last_dates = [] sql_sp = text(f"EXEC {STORED_PROC} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") def detect_cols(df0): col_date = detect_column(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"]) col_ret = detect_column(df0, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"]) col_px = detect_column(df0, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"]) return col_date, col_ret, col_px ok_count = 0 first_ok_reported = False for i, isin in enumerate(isins, 1): try: df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR}) if df_isin.empty: errors.append({"ISIN": isin, "Errore": "SP vuota"}) continue col_date, col_ret, col_px = detect_cols(df_isin) if col_date: df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce") df_isin = df_isin.sort_values(col_date) # --- Rendimenti --- if col_ret and col_ret in df_isin.columns: r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float).dropna() elif col_px and col_px in df_isin.columns: px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan) r = np.log(px/px.shift(1)).dropna() else: errors.append({"ISIN": isin, "Errore": "Né rendimenti né prezzi utilizzabili"}) continue if len(r) < max(200, WP + HA + 10): errors.append({"ISIN": isin, "Errore": f"Serie troppo corta ({len(r)} punti)"}) continue # --- HURST (sui rendimenti) --- h_rs = hurst_rs_returns(r) h_dfa = hurst_dfa_returns(r) H = np.nanmedian([h_rs, h_dfa]) H = clamp01(H) if np.isfinite(H) else np.nan if pd.isna(H): regime = None elif H < 0.45: regime = "mean_reversion" elif H > 0.55: regime = "breakout" else: regime = "neutral" # --- Wavelet filter per i pattern (prima del riconoscimento) --- r_pattern = r if WAVELET_ENABLED: r_wav = wavelet_denoise( r, wavelet=WAVELET_NAME, level=WAVELET_LEVEL, mode=WAVELET_MODE, threshold_mode=WAVELET_THRESHOLD_MODE, ) if r_wav is not None and r_wav.notna().sum() >= WP + HA: r_pattern = r_wav else: print(f"[WARN] Wavelet filter non applicabile su {isin}: uso la serie raw.") # --- Libreria pattern --- lib_wins, lib_out = build_pattern_library(r_pattern, WP, HA, embargo=EMBARGO) # Finestra corrente date_last = df_isin[col_date].iloc[-1] if col_date else None if date_last is not None: last_dates.append(pd.to_datetime(date_last)) if lib_wins is None or len(r_pattern) < WP + HA: ptype, pconf = characterize_window(r_pattern, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) signal = 0 est_out, avg_dist = np.nan, np.nan else: curr = r_pattern.values[-WP:] curr_zn = z_norm(curr) if curr_zn is None: ptype, pconf = characterize_window(r_pattern, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) signal = 0; est_out = np.nan; avg_dist = np.nan else: est_out, avg_dist, idx_sel = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K) # SOLO LONG: apri solo se est_out > THETA signal = 1 if est_out > THETA else 0 ptype, pconf = characterize_window(r_pattern, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) # Salva risultati hurst_rows.append({ "ISIN": isin, "Hurst": None if pd.isna(H) else round(float(H), 4), "Regime": regime }) pattern_rows.append({ "ISIN": isin, "DateLast": date_last, "PatternType": ptype, "Signal": {1:"long",-1:"short",0:"flat"}.get(int(signal), "flat"), "Confidence": None if pconf is None else round(float(min(1.0, max(0.0, pconf))), 3), "EstOutcome": None if pd.isna(est_out) else float(est_out), "AvgDist": None if pd.isna(avg_dist) else float(avg_dist), "Wp": WP, "Ha": HA, "k": KNN_K }) ok_count += 1 if not first_ok_reported: print("[INFO] Colonne riconosciute sul primo ISIN valido:", "Data:", col_date, "| Rendimenti:", col_ret, "| Prezzo:", col_px, "| H:", round(H,4) if pd.notna(H) else None, "| Regime:", regime) first_ok_reported = True if i % 10 == 0: print(f"… {i}/{len(isins)} ISIN processati (ok finora: {ok_count})") except Exception as e: errors.append({"ISIN": isin, "Errore": str(e)}) # ========================================= # 4A) EXPORT: HURST + PATTERN (QualityScore) # ========================================= hurst_df = pd.DataFrame(hurst_rows) if hurst_rows else pd.DataFrame( {"ISIN": [], "Hurst": [], "Regime": []} ) meta_df["ISIN"] = meta_df["ISIN"].astype(str).str.strip() hurst_df["ISIN"] = hurst_df["ISIN"].astype(str).str.strip() # Mappa ISIN -> Hurst (per usare H come theta_entry nel backtest) hurst_map = { str(row["ISIN"]).strip(): (float(row["Hurst"]) if pd.notna(row["Hurst"]) else np.nan) for _, row in hurst_df.iterrows() } summary_hurst = meta_df.merge(hurst_df, on="ISIN", how="left") cols_hurst = ["ISIN", "Nome", "Categoria", "Asset Class", "Hurst", "Regime"] summary_hurst = summary_hurst[[c for c in cols_hurst if c in summary_hurst.columns]] summary_hurst = summary_hurst.sort_values(["Hurst", "ISIN"], na_position="last").reset_index(drop=True) summary_hurst.to_excel(OUTPUT_HURST_XLSX, index=False) pat_df = pd.DataFrame(pattern_rows) if pattern_rows else pd.DataFrame( {"ISIN": [], "DateLast": [], "PatternType": [], "Signal": [], "Confidence": [], "EstOutcome": [], "AvgDist": [], "Wp": [], "Ha": [], "k": []} ) pat_df["ISIN"] = pat_df["ISIN"].astype(str).str.strip() summary_pattern = ( meta_df .merge(hurst_df, on="ISIN", how="left") .merge(pat_df, on="ISIN", how="left") ) wanted_cols = ["ISIN","Nome","Categoria","Asset Class","Hurst","Regime", "DateLast","PatternType","Signal","Confidence","EstOutcome","AvgDist","Wp","Ha","k"] summary_pattern = summary_pattern[[c for c in wanted_cols if c in summary_pattern.columns]] def _add_quality_scores(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() conf = pd.to_numeric(out.get("Confidence", np.nan), errors="coerce") est = pd.to_numeric(out.get("EstOutcome", np.nan), errors="coerce") dist = pd.to_numeric(out.get("AvgDist", np.nan), errors="coerce") max_abs_est = np.nanmax(np.abs(est)) if np.isfinite(np.nanmax(np.abs(est))) and (np.nanmax(np.abs(est)) > 0) else np.nan outcome_score = np.where(np.isnan(max_abs_est) | (max_abs_est == 0), np.nan, np.abs(est) / max_abs_est) similarity_score = 1.0 / (1.0 + dist.astype(float)) confidence_score = conf.astype(float) quality = confidence_score * similarity_score * outcome_score out["OutcomeScore"] = np.round(outcome_score, 4) out["SimilarityScore"] = np.round(similarity_score, 4) out["QualityScore"] = np.round(quality, 4) return out summary_pattern = _add_quality_scores(summary_pattern) sort_cols = [c for c in ["QualityScore", "Confidence", "OutcomeScore"] if c in summary_pattern.columns] if sort_cols: summary_pattern = summary_pattern.sort_values(sort_cols, ascending=[False]*len(sort_cols), na_position="last").reset_index(drop=True) if "DateLast" in summary_pattern.columns: summary_pattern = summary_pattern.sort_values(["QualityScore","DateLast"], ascending=[False, True], na_position="last").reset_index(drop=True) summary_pattern.to_excel(OUTPUT_PATTERN_XLSX, index=False) print(f"[INFO] Salvato: {OUTPUT_HURST_XLSX} (righe: {len(summary_hurst)})") print(f"[INFO] Salvato: {OUTPUT_PATTERN_XLSX} (righe: {len(summary_pattern)})") if errors: pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False) print(f"[INFO] Log errori: {ERROR_LOG_CSV} (tot: {len(errors)})") # ========================================= # 4B) FORWARD-BACKTEST (walk-forward) — SOLO LONG # ========================================= def drawdown_stats_simple(ret_series: pd.Series): eq = (ret_series.fillna(0)).cumsum() rolling_max = eq.cummax() dd = eq - rolling_max maxdd = float(dd.min()) if len(dd) else 0.0 cagr = np.exp(ret_series.mean()*DAYS_PER_YEAR) - 1 annvol = ret_series.std() * np.sqrt(DAYS_PER_YEAR) sharpe = (ret_series.mean() / (ret_series.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR) calmar = (cagr / abs(maxdd)) if maxdd < 0 else np.nan return { "CAGR_%": round(cagr*100, 2), "AnnVol_%": round(annvol*100, 2), "Sharpe": round(float(sharpe), 2), "MaxDD_%eq": round(float(maxdd*100), 2), "Calmar": round(float(calmar), 2) if np.isfinite(calmar) else np.nan } def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret: str, Wp: int, Ha: int, k: int, theta_entry: float, exec_ret: pd.Series | None = None, fee_bps: float = 10, # --- EXIT PARAMS (tutte opzionali) --- sl_bps: float | None = 300.0, # Stop loss assoluto (bps sul PnL cumulato del trade) tp_bps: float | None = 800.0, # Take profit assoluto (bps) trail_bps: float | None = 300.0, # Trailing stop (drawdown dal picco, bps) time_stop_bars: int | None = 20, # Massimo holding theta_exit: float | None = 0.0, # esci se est_out <= theta_exit (se None, ignora) weak_days_exit: int | None = None # esci se per N giorni est_out <= theta_exit ): """ Walk-forward SOLO LONG con regole di EXIT (SL/TP/TS/time/flip). Ritorna (signals_df, summary_metrics_dict). Nota: usa solo dati daily → le soglie sono valutate a fine giornata, l'uscita avviene sulla barra successiva (modello prudente). """ r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0 # rendimenti in decimali (close/close) idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r)) idx = pd.to_datetime(idx).dt.normalize() if exec_ret is not None: r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float) r_exec.index = pd.to_datetime(r_exec.index).normalize() # reindex robusto: usa l'ordine di idx, preserva NaN se manca la data r_exec = r_exec.reindex(idx) if len(r_exec) != len(r): r_exec = pd.Series(r_exec.values, index=idx).reindex(idx) else: r_exec = r fee = fee_bps / 10000.0 # helper per costruire libreria solo passato def _lib_predict(past_returns: pd.Series, win_last: np.ndarray): past_use = past_returns if WAVELET_ENABLED: r_wav = wavelet_denoise( past_returns, wavelet=WAVELET_NAME, level=WAVELET_LEVEL + 1, mode=WAVELET_MODE, threshold_mode=WAVELET_THRESHOLD_MODE, ) if r_wav is not None and r_wav.notna().sum() >= (Wp + Ha): past_use = r_wav lib_wins, lib_out = build_pattern_library(past_use, Wp, Ha) if lib_wins is None: return np.nan, np.nan curr_win = win_last if WAVELET_ENABLED and 'past_use' in locals() and past_use is not None: # riallinea win_last con eventuale smoothing curr_idx = past_returns.index[t-Wp:t] # usa idx del loop esterno smoothed_slice = past_use.reindex(curr_idx).values if np.isfinite(smoothed_slice).all(): curr_win = smoothed_slice curr_zn = z_norm(curr_win) if curr_zn is None: return np.nan, np.nan est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=k) return float(est_out), float(avg_dist) # Stato della posizione in_pos = False entry_t = None trade_pnl = 0.0 trade_peak = 0.0 weak_streak = 0 rows = [] # Nota: scorriamo dalle prime barre dove possiamo calcolare una finestra completa for t in range(Wp, len(r) - 1): past = r.iloc[:t] # se passato insufficiente per libreria, forza out if past.dropna().shape[0] < (Wp + Ha): sig_out, est_out, avg_dist = 0, np.nan, np.nan # PnL a t+1 sempre riportato in colonna Ret+1 rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan)) continue win_last = r.iloc[t-Wp:t].values est_out, avg_dist = _lib_predict(past, win_last) # filtro di qualità sui pattern troppo distanti if MAX_AVG_DIST is not None: try: dist_thr = float(MAX_AVG_DIST) except Exception: dist_thr = None if dist_thr is not None and np.isfinite(avg_dist) and avg_dist > dist_thr: est_out = np.nan # Default: portiamo avanti lo stato corrente (sig_out = 1 se in_pos) sig_out = 1 if in_pos else 0 # --- LOGICA DI INGRESSO --- if (not in_pos) and (est_out > theta_entry): # apri domani → oggi segnaliamo 1 (per avere PnL su t+1) sig_out = 1 in_pos = True entry_t = t trade_pnl = 0.0 trade_peak = 0.0 weak_streak = 0 # --- LOGICA DI USCITA (se in posizione) --- elif in_pos: # 1) aggiorna PnL del trade con il rendimento della barra che verrà *incassato* domani: # Per coerenza EOD, PnL di oggi (da riportare) è su r[t+1] quando Signal(t)=1. # Per controlli di stop a fine giornata, stimiamo la "pnl se restassi" accumulando r[t+1] ex-ante. next_ret = r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan # rendimento che si applicherà se resto in posizione pnl_if_stay = (1.0 + trade_pnl) * (1.0 + next_ret) - 1.0 # 2) aggiorna trailing peak ipotetico peak_if_stay = max(trade_peak, pnl_if_stay) # 3) valuta condizioni di uscita sullo stato "if stay" exit_reasons = [] # SL if (sl_bps is not None) and (pnl_if_stay <= -sl_bps/10000.0): exit_reasons.append("SL") # TP if (tp_bps is not None) and (pnl_if_stay >= tp_bps/10000.0): exit_reasons.append("TP") # Trailing if (trail_bps is not None) and (peak_if_stay - pnl_if_stay >= trail_bps/10000.0): exit_reasons.append("TRAIL") # Time stop if (time_stop_bars is not None) and (t - entry_t + 1 >= time_stop_bars): exit_reasons.append("TIME") # Flip / debolezza persistente if theta_exit is not None: if est_out <= theta_exit: # Debole oggi → aggiorna streak weak_streak = weak_streak + 1 if weak_days_exit else weak_streak # exit immediata se non usi weak_days_exit if weak_days_exit is None: exit_reasons.append("FLIP") else: if weak_streak >= weak_days_exit: exit_reasons.append("FLIP_STREAK") else: weak_streak = 0 # *** Se una qualunque condizione scatta, usciamo domani → oggi mettiamo 0 if exit_reasons: sig_out = 0 in_pos = False entry_t = None trade_pnl = 0.0 trade_peak = 0.0 weak_streak = 0 else: # restiamo → aggiorniamo lo stato “vero” per il prossimo loop trade_pnl = pnl_if_stay trade_peak = peak_if_stay # Registra la riga odierna; il PnL riportato è sempre il r[t+1] rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan)) sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"]) # Costi su variazione posizione sig_df["Signal_prev"] = sig_df["Signal"].shift(1).fillna(0) trade_chg = (sig_df["Signal"] - sig_df["Signal_prev"]).abs() cost = trade_chg * fee sig_df["PnL"] = sig_df["Signal"] * sig_df["Ret+1"] - cost sig_df.drop(columns=["Signal_prev"], inplace=True) # Metriche sintetiche stats = drawdown_stats_simple(sig_df["PnL"]) stats.update({ "HitRate_%": round(100 * ((sig_df["PnL"] > 0).sum() / max(1, sig_df["PnL"].notna().sum())), 2), "AvgTradeRet_bps": round(sig_df["PnL"].mean() * 10000, 2), "Turnover_%/step": round(100 * trade_chg.mean(), 2), "N_Steps": int(sig_df.shape[0]), }) # Aggiungi anche parametri di uscita usati (utile per grid search/trace) stats.update({ "theta_entry": theta_entry, "theta_exit": (None if theta_exit is None else float(theta_exit)), "sl_bps": (None if sl_bps is None else float(sl_bps)), "tp_bps": (None if tp_bps is None else float(tp_bps)), "trail_bps": (None if trail_bps is None else float(trail_bps)), "time_stop_bars": (None if time_stop_bars is None else int(time_stop_bars)), "weak_days_exit": (None if weak_days_exit is None else int(weak_days_exit)) }) return sig_df, stats # ========= ESECUZIONE BACKTEST PER TUTTI GLI ISIN ========= bt_signals = [] bt_summary = [] total_t = 0.0 start_all = time.perf_counter() for i, isin in enumerate(isins, 1): t0 = time.perf_counter() # ---- INIZIO TIMER SINGOLO CICLO ---- try: df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR}) if df_isin.empty: errors.append({"ISIN": isin, "Errore": "SP vuota (BT)"}) continue col_date, col_ret, col_px = detect_cols(df_isin) if col_date: df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce") df_isin = df_isin.sort_values(col_date) if col_ret and col_ret in df_isin.columns: df_isin[col_ret] = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) # % raw elif col_px and col_px in df_isin.columns: px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan) df_isin[col_ret] = np.log(px / px.shift(1)) * 100.0 # % else: errors.append({"ISIN": isin, "Errore": "Né rendimenti né prezzi (BT)"}) continue if df_isin[col_ret].dropna().shape[0] < max(200, WP + HA + 10): errors.append({"ISIN": isin, "Errore": f"Serie troppo corta (BT) ({df_isin[col_ret].dropna().shape[0]} punti)"}) continue # --- Fetch open/close per calcolare rendimenti di esecuzione (open->open) --- try: date_min = df_isin[col_date].min().date() if col_date else None date_max = df_isin[col_date].max().date() if col_date else None if date_min and date_max: px_hist_one = fetch_price_history( isins=[isin], universe=meta_df if 'meta_df' in globals() else pd.DataFrame(), start_date=date_min.isoformat(), end_date=date_max.isoformat() ) px_hist_one = px_hist_one.sort_values("Date") open_series = px_hist_one[["Date","Open"]].dropna() open_series["Date"] = pd.to_datetime(open_series["Date"]).dt.normalize() open_series = open_series.drop_duplicates(subset=["Date"]).set_index("Date")["Open"] open_ret = open_series.pct_change() # riallinea sulla stessa sequenza di date del df_isin idx_dates = pd.to_datetime(df_isin[col_date]).dt.normalize() exec_ret = open_ret.reindex(idx_dates) exec_ret.index = idx_dates else: exec_ret = None except Exception as e: print(f"[WARN] Fetch open/close fallito per {isin}: {e}") exec_ret = None # ============================ # THETA = HURST IN PERCENTUALE # H = 0.50 -> theta_entry = 0.005 (0.5%) # ============================ isin_str = str(isin).strip() H_val = hurst_map.get(isin_str, np.nan) if H_val is None or pd.isna(H_val): theta_entry = THETA # fallback se H mancante else: theta_entry = float(H_val) / 100.0 sl_cfg = float(SIGNALS_CONFIG.get("sl_bps", 300.0)) tp_cfg = float(SIGNALS_CONFIG.get("tp_bps", 800.0)) trail_cfg = float(SIGNALS_CONFIG.get("trail_bps", 300.0)) time_cfg = int(SIGNALS_CONFIG.get("time_stop_bars", 20)) theta_ex = SIGNALS_CONFIG.get("theta_exit", 0.0) weak_ex = SIGNALS_CONFIG.get("weak_days_exit", None) sig_df, stats = knn_forward_backtest_one_asset( df_isin=df_isin, col_date=(col_date if col_date else df_isin.index.name or "idx"), col_ret=col_ret, Wp=WP, Ha=HA, k=KNN_K, theta_entry=theta_entry, exec_ret=exec_ret, fee_bps=10, sl_bps=sl_cfg, tp_bps=tp_cfg, trail_bps=trail_cfg, time_stop_bars=time_cfg, theta_exit=theta_ex, weak_days_exit=weak_ex, ) name = meta_df.loc[meta_df["ISIN"]==isin, "Nome"].iloc[0] if (meta_df["ISIN"]==isin).any() else None cat = meta_df.loc[meta_df["ISIN"]==isin, "Categoria"].iloc[0] if (meta_df["ISIN"]==isin).any() else None ac = meta_df.loc[meta_df["ISIN"]==isin, "Asset Class"].iloc[0] if (meta_df["ISIN"]==isin).any() else None tmp = sig_df.copy() tmp.insert(0, "ISIN", isin) tmp.insert(1, "Nome", name) tmp.insert(2, "Categoria", cat) tmp.insert(3, "Asset Class", ac) tmp["Wp"] = WP; tmp["Ha"] = HA; tmp["k"] = KNN_K; tmp["Theta"] = theta_entry bt_signals.append(tmp) stats_row = {"ISIN": isin, "Nome": name, "Categoria": cat, "Asset Class": ac} stats_row.update(stats) bt_summary.append(stats_row) except Exception as e: errors.append({"ISIN": isin, "Errore": f"Backtest: {str(e)}"}) # ---- FINE TIMER SINGOLO CICLO ---- dt = time.perf_counter() - t0 total_t += dt avg_t = total_t / i eta = avg_t * (len(isins) - i) print( f"… backtest {i}/{len(isins)} completati — {dt:.2f} sec " f"(avg {avg_t:.2f}s, ETA {format_eta(eta)} rimanenti)" ) # ---- TIMER FINALE ---- end_all = time.perf_counter() total_elapsed = end_all - start_all avg_elapsed = total_elapsed / max(len(isins), 1) print(f"[INFO] Tempo totale: {format_eta(total_elapsed)} ({total_elapsed:.2f} sec)") print(f"[INFO] Tempo medio per asset: {format_eta(avg_elapsed)} ({avg_elapsed:.2f} sec)") bt_signals_df = pd.concat(bt_signals, ignore_index=True) if bt_signals else pd.DataFrame( columns=["ISIN","Nome","Categoria","Asset Class","Date","Signal","EstOutcome","AvgDist","Ret+1","PnL","Wp","Ha","k","Theta"] ) bt_summary_df = pd.DataFrame(bt_summary) if bt_summary else pd.DataFrame( columns=["ISIN","Nome","Categoria","Asset Class","CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar","HitRate_%","AvgTradeRet_bps","Turnover_%/step","N_Steps"] ) bt_signals_df.to_excel(FORWARD_BT_SIGNALS_XLSX, index=False) bt_summary_df.to_excel(FORWARD_BT_SUMMARY_XLSX, index=False) print(f"[INFO] Salvato: {FORWARD_BT_SIGNALS_XLSX} ({len(bt_signals_df):,} righe)") print(f"[INFO] Salvato: {FORWARD_BT_SUMMARY_XLSX} ({len(bt_summary_df):,} righe)") if errors: pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False) print(f"[INFO] Log errori aggiornato: {ERROR_LOG_CSV} (tot: {len(errors)})") # Salva riepilogo prezzi (solo simboli primari, senza fallback) ogni run try: save_price_cache_summary(OPEN_CACHE_DIR, OPEN_CACHE_DIR / "prezzi_summary_no_fallback.xlsx") except Exception as e: print(f"[WARN] Riepilogo prezzi non creato: {e}") # Timer per fasi post-backtest (sezione 5 in poi) start_post_timer(total_steps=4) # ====================================================================== # 5) STRATEGIE PORTAFOGLIO DINAMICHE + EQUITY + HEATMAP + TRADE REPORT # ====================================================================== def _ensure_bt_summary(meta_df: pd.DataFrame, bt_summary_df: pd.DataFrame) -> pd.DataFrame: if bt_summary_df is not None and not bt_summary_df.empty: out = bt_summary_df.copy() else: out = pd.DataFrame({"ISIN": meta_df["ISIN"].copy()}) for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]: out[c] = np.nan out = out.merge(meta_df[["ISIN","Nome","Categoria","Asset Class"]], on="ISIN", how="left") out["ISIN"] = out["ISIN"].astype(str).str.strip() return out def _ensure_bt_signals(meta_df: pd.DataFrame, bt_signals_df: pd.DataFrame, last_dates_hint: list) -> pd.DataFrame: if bt_signals_df is not None and not bt_signals_df.empty: fbsig = bt_signals_df.copy() fbsig["Date"] = pd.to_datetime(fbsig["Date"]) fbsig["ISIN"] = fbsig["ISIN"].astype(str).str.strip() fbsig["Signal"] = pd.to_numeric(fbsig["Signal"], errors="coerce").fillna(0).astype(int) fbsig["PnL"] = pd.to_numeric(fbsig["PnL"], errors="coerce").fillna(0.0) return fbsig end_date = (max(last_dates_hint) if last_dates_hint else pd.Timestamp.today()).normalize() dates = pd.bdate_range(end=end_date, periods=120, freq="C") isins_small = meta_df["ISIN"].astype(str).str.strip().tolist()[:12] rows = [] for isin in isins_small: for dt in dates: rows.append({"Date": dt, "ISIN": isin, "Signal": 0, "PnL": 0.0}) return pd.DataFrame(rows) forward_bt_summary = _ensure_bt_summary(meta_df, bt_summary_df) forward_bt_signals = _ensure_bt_signals(meta_df, bt_signals_df, last_dates) # ----------------------------- # 5.1 Funzioni di supporto (grafici e pesi) # ----------------------------- def equity_from_returns(r: pd.Series) -> pd.Series: r = pd.to_numeric(r, errors="coerce").fillna(0.0) return (1 + r).cumprod() * 100 def monthly_returns(r: pd.Series) -> pd.Series: r = pd.to_numeric(r, errors="coerce").fillna(0.0) if not isinstance(r.index, (pd.DatetimeIndex, pd.PeriodIndex, pd.TimedeltaIndex)): try: r.index = pd.to_datetime(r.index) except Exception: return pd.Series(dtype=float) return (1 + r).resample("M").prod() - 1 def plot_heatmap_monthly(r: pd.Series, title: str, save_path: str = None): r = pd.to_numeric(r, errors="coerce").fillna(0.0) m = monthly_returns(r) df = m.to_frame("ret") df["Year"], df["Month"] = df.index.year, df.index.month pv = df.pivot(index="Year", columns="Month", values="ret") fig, ax = plt.subplots(figsize=(10,6)) im = ax.imshow(pv.fillna(0)*100, cmap="RdYlGn", vmin=-3, vmax=5, aspect="auto") for i in range(pv.shape[0]): for j in range(pv.shape[1]): val = pv.iloc[i,j] if not np.isnan(val): ax.text(j, i, f"{val*100:.1f}", ha="center", va="center", fontsize=8) ax.set_title(title); ax.set_xlabel("Mese"); ax.set_ylabel("Anno") ax.set_xticks(range(12)); ax.set_xticklabels(range(1,13)) fig.colorbar(im, ax=ax, label="%") plt.tight_layout() if save_path: savefig_safe(save_path, dpi=150) plt.close(fig) # Non mostrare il plot durante l'esecuzione def inverse_vol_weights(df, window=60, max_weight=None): vol = df.rolling(window).std() inv = 1 / vol.replace(0, np.nan) w = inv.div(inv.sum(axis=1), axis=0) w = w.ffill().fillna(1 / max(1, df.shape[1])) if max_weight is not None: w = w.clip(upper=max_weight) return w def portfolio_metrics(r: pd.Series): r = pd.to_numeric(r, errors="coerce").fillna(0.0) if len(r) == 0: return dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan) ann = DAYS_PER_YEAR eq = equity_from_returns(r) cagr = (eq.iloc[-1]/eq.iloc[0])**(ann/max(1, len(r))) - 1 vol = r.std() * np.sqrt(ann) sharpe = (r.mean()/r.std()) * np.sqrt(ann) if r.std() > 0 else np.nan mdd = (eq/eq.cummax() - 1).min() return dict(CAGR=cagr, Vol=vol, Sharpe=sharpe, MaxDD=mdd) # ----------------------------- # 5.2 Selezione dinamica ISIN (ranking) # ----------------------------- df_sum = forward_bt_summary.copy() for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]: if c in df_sum.columns: df_sum[c] = pd.to_numeric(df_sum[c], errors="coerce") def _safe_rank(s: pd.Series): s = pd.to_numeric(s, errors="coerce") if s.notna().sum() == 0: return pd.Series(np.zeros(len(s)), index=s.index) s_filled = s.fillna(s.median()) return s_filled.rank(method="average") import numpy as np import pandas as pd from scipy import linalg # ---------------------------- # Utils # ---------------------------- def _safe_rank_ser(s: pd.Series) -> pd.Series: """Rank robusto (0..1), gestisce NaN.""" s = s.copy() return s.rank(method="average", na_option="keep") / s.notna().sum() def _winsorize(s: pd.Series, p=0.005): lo, hi = s.quantile(p), s.quantile(1-p) return s.clip(lo, hi) def _pos_normalize(w: np.ndarray, eps=1e-12): w = np.where(w < 0, 0, w) s = w.sum() return (w / (s + eps)) if s > eps else np.ones_like(w)/len(w) def _corr_shrink(C: np.ndarray, alpha=0.10): """Shrink verso I per stabilità numerica.""" k = C.shape[0] return (1-alpha)*C + alpha*np.eye(k) def _time_blocks_index(idx: pd.Index, k_folds=5): """Split temporale semplice in k blocchi contigui.""" n = len(idx) fold_sizes = np.full(k_folds, n // k_folds, dtype=int) fold_sizes[: n % k_folds] += 1 splits, start = [], 0 for fs in fold_sizes: end = start + fs splits.append(idx[start:end]) start = end return splits # ---------------------------- # Calibrazione pesi # ---------------------------- def calibrate_score_weights( df_sum: pd.DataFrame, metrics_map=None, target_col: str | None = None, k_folds: int = 5, shrink_equal: float = 0.25, # shrink verso equal weight per stabilità corr_shrink: float = 0.10 # shrink della matrice di correlazione ): """ metrics_map: lista di tuple (colname, good_is_high) good_is_high=True se valori alti sono migliori. Esempio: [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)] target_col: nome colonna target OOS (es. 'FWD_CAGR_%', 'FWD_Sharpe', ecc.) Se None => usa ERC non supervisionato. Ritorna: dict con 'weights' (pd.Series), 'X_ranked' (DataFrame) e 'mode' """ if metrics_map is None: metrics_map = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)] # Costruisci matrice delle metriche X (rank 0..1) X_cols, X_list = [], [] for col, good_high in metrics_map: s = df_sum.get(col, pd.Series(index=df_sum.index, dtype=float)).astype(float) s = _winsorize(s) if not good_high: s = -s # inverti segno se "meno è meglio" X_cols.append(col) X_list.append(_safe_rank_ser(s)) X = pd.concat(X_list, axis=1) X.columns = X_cols X = X.loc[:, X.columns[X.notna().sum(0) > 0]] # droppa colonne tutte NaN k = X.shape[1] if k == 0: raise ValueError("Nessuna metrica valida per la calibrazione.") # Se non hai target: ERC non supervisionato if target_col is None or target_col not in df_sum.columns: # cov su features rankate C = np.cov(np.nan_to_num(X.values, nan=np.nanmean(X.values)), rowvar=False) # stabilizza C = _corr_shrink(C, alpha=corr_shrink) # ERC: pesi proposti ~ inverse vol in spazio decorrelato # approssimazione pratica: w ∝ diag(C)^(-1/2), poi normalizza riducendo l'effetto di correlazioni con Σ^-1/2 vol = np.sqrt(np.clip(np.diag(C), 1e-12, None)) w0 = 1.0 / vol w = _pos_normalize(w0) return { "mode": "unsupervised_erc", "weights": pd.Series(w, index=X.columns, name="weight"), "X_ranked": X } # Supervisionato: IC-Σ^-1 y = df_sum[target_col].astype(float).copy() y = _winsorize(y) y_rank = _safe_rank_ser(y) # target in rank per allineare a Spearman # Drop righe senza target o tutte NaN nelle metriche mask = y_rank.notna() & X.notna().any(1) Xf, yf = X[mask].copy(), y_rank[mask].copy() if len(Xf) < max(30, k*5): # fallback se troppo pochi dati C = np.corrcoef(np.nan_to_num(X.values, nan=np.nanmean(X.values)), rowvar=False) C = _corr_shrink(C, alpha=corr_shrink) ic = np.array([ pd.Series(X.iloc[:, j]).corr(y_rank, method="spearman") for j in range(k) ]) ic = np.nan_to_num(ic, nan=0.0) w = linalg.solve(C + 1e-6*np.eye(k), ic, assume_a='sym') w = (1-shrink_equal)*_pos_normalize(w) + shrink_equal*np.ones(k)/k return { "mode": "supervised_icSigmaInv_fallback", "weights": pd.Series(w, index=X.columns, name="weight"), "X_ranked": X } # CV a blocchi temporali (walk-forward grossolano) # Ordine temporale = index di df_sum (assumo ordinato) folds = _time_blocks_index(Xf.index, k_folds=k_folds) W_list = [] for t in range(1, len(folds)): train_idx = pd.Index([]).append(pd.Index([])) for i in range(t): # usa tutti i blocchi precedenti come train train_idx = train_idx.append(folds[i]) valid_idx = folds[t] Xt, yt = Xf.loc[train_idx], yf.loc[train_idx] Xv, yv = Xf.loc[valid_idx], yf.loc[valid_idx] # IC per colonna usando solo train ic = np.array([Xt.iloc[:, j].corr(yt, method="spearman") for j in range(Xt.shape[1])]) ic = np.nan_to_num(ic, nan=0.0) # Corr(X) su train C = np.corrcoef(np.nan_to_num(Xt.values, nan=np.nanmean(Xt.values)), rowvar=False) C = _corr_shrink(C, alpha=corr_shrink) try: w_raw = linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic, assume_a='sym') except Exception: w_raw = ic.copy() w_fold = _pos_normalize(w_raw) # shrink verso equal w_fold = (1-shrink_equal)*w_fold + shrink_equal*np.ones_like(w_fold)/len(w_fold) W_list.append(w_fold) # media pesi sui fold if not W_list: # in casi limite (pochi dati), ripiega ic = np.array([Xf.iloc[:, j].corr(yf, method="spearman") for j in range(Xf.shape[1])]) ic = np.nan_to_num(ic, nan=0.0) C = np.corrcoef(np.nan_to_num(Xf.values, nan=np.nanmean(Xf.values)), rowvar=False) C = _corr_shrink(C, alpha=corr_shrink) w = linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic, assume_a='sym') else: w = np.mean(np.vstack(W_list), axis=0) w = _pos_normalize(w) return { "mode": "supervised_icSigmaInv_cv", "weights": pd.Series(w, index=X.columns, name="weight"), "X_ranked": X } # --- PRE-FLIGHT METRICS GUARD --------------------------------------------- def _coerce_num(s: pd.Series) -> pd.Series: return pd.to_numeric(s, errors="coerce").replace([np.inf, -np.inf], np.nan) # Assicurati che df_sum esista ed abbia le colonne base df_sum = forward_bt_summary.copy() for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%", "QualityScore","Confidence","OutcomeScore"]: if c in df_sum.columns: df_sum[c] = _coerce_num(df_sum[c]) # Se Sharpe/CAGR/MaxDD mancano o sono tutti NaN, prova a ricostruirli dai segnali need_rebuild = ( ("Sharpe" not in df_sum.columns or df_sum["Sharpe"].notna().sum()==0) or ("CAGR_%" not in df_sum.columns) or ("MaxDD_%eq" not in df_sum.columns) ) if need_rebuild: try: # Recompute metriche minime per ISIN da bt_signals_df (se presente) tmp = bt_signals_df.copy() if not tmp.empty: tmp["PnL"] = pd.to_numeric(tmp["PnL"], errors="coerce").fillna(0.0) agg_rows = [] for isin, g in tmp.groupby("ISIN"): stats = drawdown_stats_simple(g["PnL"]) stats["ISIN"] = str(isin) agg_rows.append(stats) rebuilt = pd.DataFrame(agg_rows) # Coerce numerico e merge su df_sum for c in ["CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"]: if c in rebuilt.columns: rebuilt[c] = _coerce_num(rebuilt[c]) df_sum = ( df_sum.drop(columns=[c for c in ["CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"] if c in df_sum.columns]) .merge(rebuilt[["ISIN","CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"]], on="ISIN", how="left") ) except Exception as e: print(f"[WARN] Ricostruzione metriche fallita: {e}") df_sum = _apply_score(df_sum) TOP_N = 15 base_isins = ( df_sum .sort_values("Score", ascending=False) .head(TOP_N)["ISIN"].astype(str).str.strip().tolist() ) print(f"[INFO] Ranking full-sample (solo debug, i portafogli usano ranking rolling): {base_isins}") # ----------------------------- # 5.3 Costruzione portafogli # (Equal Weight + Risk Parity con cap) # ----------------------------- bt = forward_bt_signals.copy() bt["Date"] = pd.to_datetime(bt["Date"]) bt["ISIN"] = bt["ISIN"].astype(str).str.strip() bt = bt.sort_values(["Date", "ISIN"]) wide_pnl = ( bt.pivot_table(index="Date", columns="ISIN", values="PnL", aggfunc="sum") .fillna(0.0) ) wide_sig = ( bt.pivot_table(index="Date", columns="ISIN", values="Signal", aggfunc="last") .fillna(0) .astype(int) ) wide_est = ( bt.pivot_table(index="Date", columns="ISIN", values="EstOutcome", aggfunc="last") .sort_index() ) # (Opzionale) ricostruzione PnL portafoglio con open->open: disattivata di default perché il PnL # viene già calcolato a livello di singolo asset usando gli open. if globals().get("RECOMPUTE_PORTF_FROM_OPEN", False): try: date_min = (bt["Date"].min() - pd.Timedelta(days=5)).date() date_max = (bt["Date"].max() + pd.Timedelta(days=5)).date() px_hist = fetch_price_history( isins=bt["ISIN"].unique(), universe=meta_df if 'meta_df' in globals() else pd.DataFrame(), start_date=date_min.isoformat(), end_date=date_max.isoformat() ) open_pivot = ( px_hist.pivot(index="Date", columns="ISIN", values="Open") .sort_index() ) open_ret = open_pivot.pct_change() # segnale su giorno t, esecuzione a open t+1 wide_pnl = wide_sig * open_ret.shift(-1) common_idx = wide_sig.index.intersection(wide_pnl.index) common_idx = pd.to_datetime(common_idx) wide_sig = wide_sig.reindex(common_idx).fillna(0).astype(int) wide_pnl = wide_pnl.reindex(common_idx).fillna(0.0) wide_sig.index = pd.to_datetime(wide_sig.index) wide_pnl.index = pd.to_datetime(wide_pnl.index) print(f"[INFO] PnL ricostruito su open->open per {len(open_pivot.columns)} ISIN.") except Exception as e: print(f"[WARN] Ricostruzione PnL open->open fallita, uso PnL originale: {e}") # I portafogli verranno costruiti piu' sotto con ranking rolling (vedi _build_dynamic_portfolio_returns). def plot_portfolio_composition(weights: pd.DataFrame, title: str, save_path: str | None = None, max_legend: int = 12): """ Stacked area dei pesi per ISIN nel tempo (un colore per asset). - Accetta un DataFrame 'weights' indicizzato per Date, colonne = ISIN, valori = peso (0..1). - Normalizza le righe per sicurezza. - Raggruppa la coda lunga in 'Altri' se gli asset superano 'max_legend'. - Salva su 'save_path' se fornito; se è solo un filename, salva nella cartella corrente. - Non lancia eccezioni inutili: stampa messaggi SKIP quando i dati non sono plottabili. Esempio: plot_portfolio_composition(w_eq, "Equal Weight", "composition_equal_weight.png") plot_portfolio_composition(w_rp, "Risk Parity", "composition_risk_parity.png") """ import os import numpy as np import pandas as pd import matplotlib.pyplot as plt # ---- Guard basi ---- if weights is None or getattr(weights, "empty", True): print(f"[SKIP] Nessun peso disponibile per: {title}") return # Copia e sanificazione W = weights.copy() # Forza indice univoco e ordinato (se Date duplicati, tieni l’ultimo) if W.index.has_duplicates: W = W[~W.index.duplicated(keep="last")] W = W.sort_index() # Coerce numerico e sostituisci NaN con 0 W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0) # Droppa colonne totalmente nulle keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0] if not keep_cols: print(f"[SKIP] Tutte le colonne hanno peso zero per: {title}") return W = W[keep_cols] # Normalizzazione riga (se somma=0, lascia 0) row_sum = W.sum(axis=1) with np.errstate(invalid="ignore", divide="ignore"): W = W.div(row_sum.replace(0.0, np.nan), axis=0).fillna(0.0).clip(lower=0.0) # Se dopo la pulizia resta vuoto o una sola riga, non ha senso il plot area if len(W.index) < 2 or W.shape[1] == 0: print(f"[SKIP] Serie troppo corta o senza colonne valide per: {title}") return # Ordina gli asset per peso medio decrescente (più leggibile) avg_w = W.mean(axis=0).sort_values(ascending=False) ordered = avg_w.index.tolist() # Raggruppa coda lunga in "Altri" se troppi asset if len(ordered) > max_legend: head, tail = ordered[:max_legend], ordered[max_legend:] W_show = W[head].copy() W_show["Altri"] = W[tail].sum(axis=1) ordered = head + ["Altri"] else: W_show = W[ordered].copy() # Palette (tab20 riciclata se necessario) cmap = plt.colormaps.get_cmap("tab20") colors = [cmap(i % cmap.N) for i in range(len(ordered))] # Plot fig, ax = plt.subplots(figsize=(11, 6)) ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors) ax.set_title(f"Composizione portafoglio nel tempo - {title}") ax.set_ylim(0, 1) ax.grid(True, alpha=0.3) ax.set_ylabel("Peso") # Etichette Y in percentuale yticks = ax.get_yticks() ax.set_yticklabels([f"{y*100:.0f}%" for y in yticks]) # Legenda compatta ncol = 2 if len(ordered) > 10 else 1 ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN") fig.tight_layout() # Salvataggio robusto (gestisce filename senza cartella) if save_path: folder = os.path.dirname(save_path) or "." try: os.makedirs(folder, exist_ok=True) except Exception as e: print(f"[WARN] Impossibile creare la cartella '{folder}': {e}. Provo a salvare nella dir corrente.") save_path = os.path.basename(save_path) fig.savefig(save_path, dpi=150, bbox_inches="tight") try: full = os.path.abspath(save_path) except Exception: full = save_path print(f"[INFO] Salvato: {full}") # Plot salvato senza visualizzazione interattiva def make_active_weights(w_base: pd.DataFrame, sig: pd.DataFrame, renorm_to_1: bool = False, # True: rialloca tra gli attivi; False: lascia quota in Cash add_cash: bool = True, cash_label: str = "Cash") -> pd.DataFrame: """ w_base : pesi teorici (righe=date, colonne=ISIN) sig : matrice Signal (0/1) allineata (righe=date, colonne=ISIN) """ import numpy as np import pandas as pd if w_base is None or w_base.empty: return pd.DataFrame(index=sig.index, columns=[]) W = w_base.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0) S = sig.reindex_like(W).fillna(0).astype(int) # allinea e riempi # pesa solo gli attivi W_active = W * (S > 0) row_sum = W_active.sum(axis=1) if renorm_to_1: # rialloca sui soli attivi: nessuna quota in cash W_active = W_active.div(row_sum.replace(0, np.nan), axis=0).fillna(0.0) if add_cash: # se renorm True, cash è zero W_active[cash_label] = 0.0 else: # non renorm: lasciamo la parte non investita in cash if add_cash: cash = (1.0 - row_sum).clip(lower=0.0, upper=1.0) W_active[cash_label] = cash # rimuovi colonne sempre zero keep = [c for c in W_active.columns if W_active[c].abs().sum() > 0] return W_active[keep] # ----------------------------- # Portafogli dinamici con ranking rolling # ----------------------------- _dynamic_portfolio_cache: dict[int, dict] = {} def _build_dynamic_portfolio_returns( wide_pnl: pd.DataFrame, wide_sig: pd.DataFrame, wide_est: pd.DataFrame, top_n: int, window_bars: int = RANKING_WINDOW_BARS, rp_lookback: int = RP_LOOKBACK ) -> dict: if wide_pnl is None or wide_pnl.empty: idx = pd.Index([]) empty_w = pd.DataFrame(index=idx, columns=[]) return { "ret_eq": pd.Series(dtype=float), "ret_rp": pd.Series(dtype=float), "w_eq": empty_w, "w_rp": empty_w, "w_eq_act": empty_w, "w_rp_act": empty_w, "selection": {} } dates = wide_pnl.index.sort_values() all_cols = wide_pnl.columns.tolist() w_eq = pd.DataFrame(0.0, index=dates, columns=all_cols) w_rp = pd.DataFrame(0.0, index=dates, columns=all_cols) selection = {} for dt in dates: # Considera solo gli ISIN con segnale attivo oggi sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float) on_cols = [c for c in all_cols if sig_row.get(c, 0) == 1] if not on_cols: selection[dt] = [] continue window_est = wide_est.loc[:dt].tail(window_bars) if not wide_est.empty else pd.DataFrame() scores = [] for c in on_cols: s = pd.to_numeric(window_est[c], errors="coerce") if c in window_est.columns else pd.Series(dtype=float) est_score = s.mean(skipna=True) if pd.isna(est_score): continue scores.append((c, est_score)) if not scores: selection[dt] = [] continue scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True) base_isins_dt = [c for c, _ in scores_sorted[:top_n]] selection[dt] = base_isins_dt if not base_isins_dt: continue w_eq.loc[dt, base_isins_dt] = 1 / len(base_isins_dt) window_pnl = wide_pnl.loc[:dt].tail(window_bars) rp_hist = window_pnl[base_isins_dt] rp_w = inverse_vol_weights(rp_hist, window=rp_lookback, max_weight=RP_MAX_WEIGHT) if not rp_w.empty: last = rp_w.iloc[-1].fillna(0.0) last_sum = float(last.sum()) if last_sum > 0: last = last / last_sum w_rp.loc[dt, last.index] = last.values w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") ret_eq = (wide_pnl * w_eq_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1) ret_rp = (wide_pnl * w_rp_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1) return { "ret_eq": ret_eq, "ret_rp": ret_rp, "w_eq": w_eq, "w_rp": w_rp, "w_eq_act": w_eq_act, "w_rp_act": w_rp_act, "selection": selection } def _get_dynamic_portfolio(top_n: int) -> dict: if top_n not in _dynamic_portfolio_cache: _dynamic_portfolio_cache[top_n] = _build_dynamic_portfolio_returns( wide_pnl=wide_pnl, wide_sig=wide_sig, wide_est=wide_est, top_n=top_n, window_bars=RANKING_WINDOW_BARS, rp_lookback=RP_LOOKBACK ) return _dynamic_portfolio_cache[top_n] # Portafoglio principale (Top_N di default) calcolato in modo rolling _main_port = _get_dynamic_portfolio(TOP_N) ret_eq = _main_port["ret_eq"] ret_rp = _main_port["ret_rp"] w_eq = _main_port["w_eq"] w_rp = _main_port["w_rp"] w_eq_act = _main_port["w_eq_act"] w_rp_act = _main_port["w_rp_act"] selection_by_date = _main_port["selection"] weights_rp = w_rp.copy() print(f"[INFO] Portafoglio rolling calcolato (TopN={TOP_N}, finestra={RANKING_WINDOW_BARS} barre, rp_lookback={RP_LOOKBACK}).") checkpoint_post_timer("Portafoglio rolling") # ----------------------------- # 5.4 Equity line + Heatmap (salva PNG) # ----------------------------- # eq_eq, eq_rp = map(equity_from_returns, [ret_eq, ret_rp]) # plt.figure(figsize=(10, 6)) # plt.plot(eq_eq, label="Equal Weight") # ============================= # 5.4bis Composizione nel tempo (ATTIVI vs CASH) # ============================= import os import numpy as np import matplotlib.pyplot as plt def plot_portfolio_composition_fixed(weights: pd.DataFrame, title: str, save_path: str | None = None, max_legend: int = 20): """ Stacked area dei pesi nel tempo. 'weights' deve essere già quello ATTIVO (già mascherato con i Signal) e può includere una colonna 'Cash'. - NON ri-normalizza le righe: mostra la quota di Cash reale. - Se vuoi forzare sempre 100% investito, passa prima da renorm_to_1=True in make_active_weights. """ if weights is None or getattr(weights, "empty", True): print(f"[SKIP] Nessun peso per: {title}") return W = weights.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0) if W.index.has_duplicates: W = W[~W.index.duplicated(keep="last")] W = W.sort_index() # drop colonne sempre zero keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0] if not keep_cols: print(f"[SKIP] Tutti i pesi sono zero per: {title}") return W = W[keep_cols] if len(W.index) < 2: print(f"[SKIP] Serie troppo corta per: {title}") return # Ordina per peso medio (Cash in coda così resta sopra nello stack) avg_w = W.mean(0).sort_values(ascending=False) ordered = avg_w.index.tolist() if "Cash" in ordered: ordered = [c for c in ordered if c != "Cash"] + ["Cash"] # Raggruppa coda lunga in "Altri" if len(ordered) > max_legend: head = ordered[:max_legend] # Garantisce che 'Cash' resti in legenda anche oltre il cap if "Cash" not in head and "Cash" in ordered: head = head[:-1] + ["Cash"] tail = [c for c in ordered if c not in head] W_show = W[head].copy() if tail: W_show["Altri"] = W[tail].sum(1) ordered = head + ["Altri"] else: ordered = head else: W_show = W[ordered].copy() cmap = plt.colormaps.get_cmap("tab20") colors = [cmap(i % cmap.N) for i in range(len(ordered))] fig, ax = plt.subplots(figsize=(11, 6)) ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors) ax.set_title(f"Composizione portafoglio nel tempo - {title}") # limite Y = somma massima osservata (<= 1 se pesi + Cash corretti) ymax = float(np.nanmax(W_show.sum(1).values)) if not np.isfinite(ymax) or ymax <= 0: ymax = 1.0 ax.set_ylim(0, max(1.0, ymax)) ax.grid(True, alpha=0.3) ax.set_ylabel("Peso") ax.set_yticks(ax.get_yticks()) ax.set_yticklabels([f"{y*100:.0f}%" for y in ax.get_yticks()]) ncol = 2 if len(ordered) > 10 else 1 ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN") fig.tight_layout() if save_path: folder = os.path.dirname(save_path) or "." try: os.makedirs(folder, exist_ok=True) except Exception as e: print(f"[WARN] mkdir '{folder}': {e} -> salvo in cwd") save_path = os.path.basename(save_path) fig.savefig(save_path, dpi=150, bbox_inches="tight") print(f"[INFO] Salvato: {os.path.abspath(save_path)}") # Plot salvato senza visualizzazione interattiva # --- 1) Pesi teorici dei portafogli (già costruiti sopra) --- # w_eq : equal weight su 'cols' # w_rp : risk parity (weights_rp) def _sanitize_weights(W: pd.DataFrame, index_like: pd.Index) -> pd.DataFrame: if W is None or W.empty: return pd.DataFrame(index=index_like, columns=[]) W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0) # normalizziamo a somma 1 (pesi TEORICI) rs = W.sum(1).replace(0, np.nan) return W.div(rs, axis=0).fillna(0.0).clip(lower=0.0) # ricostruisco coerentemente nel caso non fossero gia definiti if 'w_eq' not in globals(): w_eq = pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) if 'w_rp' not in globals(): w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) if 'w_eq' not in globals(): w_eq = pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) if 'w_rp' not in globals(): w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns) w_eq = _sanitize_weights(w_eq, wide_pnl.index) w_rp = _sanitize_weights(w_rp, wide_pnl.index) # --- 2) Pesi ATTIVI (mascherati con i Signal) --- # renorm_to_1=False → lascia la quota NON investita in 'Cash' w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") # Export pesi giornalieri (Equal/Risk Parity) con cash normalizzato a 100% def _export_weights_daily(w_eq_act_df: pd.DataFrame, w_rp_act_df: pd.DataFrame, path=WEIGHTS_DAILY_XLSX): try: def _prep(df: pd.DataFrame) -> pd.DataFrame: if df is None or df.empty: return pd.DataFrame() out = df.copy() if "Cash" not in out.columns: out["Cash"] = 0.0 out = out.apply(pd.to_numeric, errors="coerce").fillna(0.0) if out.index.has_duplicates: out = out[~out.index.duplicated(keep="last")] out = out.sort_index() row_sum = out.sum(axis=1).replace(0, np.nan) out = out.div(row_sum, axis=0).fillna(0.0) cols = [c for c in out.columns if c != "Cash"] + ["Cash"] return out[cols] w_eq_x = _prep(w_eq_act_df) w_rp_x = _prep(w_rp_act_df) if w_eq_x.empty and w_rp_x.empty: print("[INFO] Nessun peso da esportare (weights_daily.xlsx non creato).") return with pd.ExcelWriter(path) as xw: if not w_eq_x.empty: w_eq_x.to_excel(xw, "Equal_Weight", index=True) if not w_rp_x.empty: w_rp_x.to_excel(xw, "Risk_Parity", index=True) print(f"[INFO] Salvato: {path}") except Exception as e: print(f"[WARN] Export weights_daily.xlsx fallito: {e}") _export_weights_daily(w_eq_act, w_rp_act, path=WEIGHTS_DAILY_XLSX) # --- 3) Plot + salvataggio --- plot_portfolio_composition_fixed(w_eq_act, "Equal Weight (attivi + Cash)", str(PLOT_DIR / "composition_equal_weight_active.png")) plot_portfolio_composition_fixed(w_rp_act, "Risk Parity (attivi + Cash)", str(PLOT_DIR / "composition_risk_parity_active.png")) checkpoint_post_timer("Pesi/plot portafogli") # ----------------------------- # 5.5 Report trades — SOLO LONG # ----------------------------- def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFrame, name: str) -> tuple[pd.DataFrame, pd.DataFrame]: """ Report trade SOLO LONG coerente EOD: - segnale laggato di 1 barra (apertura dal giorno successivo al primo 1) - PnL allineato al giorno di esposizione: r = pnl.shift(-1) - chiusura al primo 0 (CloseDate = dt), e a fine serie CloseDate = ultimo + BDay(1) - Duration_bars = numero di barre accumulate nel PnL del trade Ritorna (df_trades, df_daily) dove df_daily contiene PnL giorno per giorno per ogni trade valido. """ from pandas.tseries.offsets import BDay # Allinea indici common_idx = sig.index.intersection(pnl.index) if not weights.empty: common_idx = common_idx.intersection(weights.index) sig = sig.loc[common_idx].copy() pnl = pnl.loc[common_idx].copy() weights = weights.loc[common_idx].copy() if not weights.empty else pd.DataFrame(index=common_idx) # Sanitizza sig = sig.fillna(0).astype(int).clip(lower=0) # solo long pnl = pnl.apply(pd.to_numeric, errors="coerce") # mantieni NaN per buchi open rows = [] daily_rows = [] for isin in sig.columns: # 1) Segnale EOD (lag 1) s = sig[isin].fillna(0).astype(int).shift(1).fillna(0).astype(int) # 2) PnL allineato al giorno di esposizione (EOD): usa pnl.shift(-1) r = pnl[isin].shift(1) # 3) Pesi (se disponibili) w = (weights[isin].fillna(0.0) if (isin in weights.columns) else pd.Series(0.0, index=s.index)) in_pos, start, acc, acc_dates = False, None, [], [] for dt in s.index: sig_t = int(s.at[dt]) # CHIUSURA: primo 0 dopo un periodo in posizione → chiudi oggi (dt) if in_pos and (sig_t == 0): if any(pd.isna(acc)): print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{dt.date()}") else: pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0 w_start = float(w.get(start, 0.0)) rows.append(dict( Strategy=name, ISIN=isin, OpenDate=start, CloseDate=dt, Direction="long", Size=w_start, Duration_bars=len(acc), **{"PnL_%": pnl_val * 100.0} )) # salva contributo giornaliero reale (senza spalmare) for dd, rv in zip(acc_dates, acc): daily_rows.append({ "Strategy": name, "ISIN": isin, "Date": dd, "Size": w_start, "PnL_day": float(rv) }) in_pos, start, acc = False, None, [] acc_dates = [] # APERTURA: primo 1 (laggato) quando non in posizione if (not in_pos) and (sig_t == 1): in_pos, start, acc, acc_dates = True, dt, [], [] # ACCUMULO: PnL del giorno di esposizione if in_pos: acc.append(r.at[dt]) acc_dates.append(dt) # CHIUSURA A FINE SERIE → prossimo business day if in_pos: close_dt = s.index[-1] + BDay(1) if any(pd.isna(acc)): print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{close_dt.date()}") else: pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0 w_start = float(w.get(start, 0.0)) rows.append(dict( Strategy=name, ISIN=isin, OpenDate=start, CloseDate=close_dt, Direction="long", Size=w_start, Duration_bars=len(acc), **{"PnL_%": pnl_val * 100.0} )) for dd, rv in zip(acc_dates, acc): daily_rows.append({ "Strategy": name, "ISIN": isin, "Date": dd, "Size": w_start, "PnL_day": float(rv) }) # Ordina colonne cols = ["Strategy","ISIN","OpenDate","CloseDate","Direction","Size","Duration_bars","PnL_%"] out = pd.DataFrame(rows) out = out[[c for c in cols if c in out.columns] + [c for c in out.columns if c not in cols]] daily_df = pd.DataFrame(daily_rows) if not daily_df.empty: daily_df["Date"] = pd.to_datetime(daily_df["Date"]) return out, daily_df # Colonne asset effettivamente usate nel portafoglio principale asset_cols = [c for c in w_eq.columns if float(pd.to_numeric(w_eq[c], errors="coerce").abs().sum()) > 0.0] if not asset_cols: asset_cols = list(wide_pnl.columns) rep_eq, daily_eq = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]], wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]], w_eq_act, "Equal Weight") rep_rp, daily_rp = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]], wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]], w_rp_act, "Risk Parity") with pd.ExcelWriter(TRADES_REPORT_XLSX) as xw: rep_eq.to_excel(xw, "Equal_Weight", index=False) rep_rp.to_excel(xw, "Risk_Parity", index=False) checkpoint_post_timer("Report trades") # Performance attribution per ISIN def _build_performance_attribution(trades_df: pd.DataFrame, meta_df: pd.DataFrame | None) -> pd.DataFrame: if trades_df is None or trades_df.empty: return pd.DataFrame(columns=["ISIN","Nome","Tot_Trades","Positivi","Negativi","Positivi_%","Negativi_%","PnL_Cum_%"]) df = trades_df.copy() df["PnL_%"] = pd.to_numeric(df["PnL_%"], errors="coerce") rows = [] for isin, g in df.groupby("ISIN"): tot = len(g) pos = int((g["PnL_%"] > 0).sum()) neg = int((g["PnL_%"] < 0).sum()) rows.append({ "ISIN": str(isin), "Tot_Trades": tot, "Positivi": pos, "Negativi": neg, "Positivi_%": (pos / tot * 100.0) if tot > 0 else np.nan, "Negativi_%": (neg / tot * 100.0) if tot > 0 else np.nan, "PnL_Cum_%": float(g["PnL_%"].sum()) }) out = pd.DataFrame(rows) if meta_df is not None and "ISIN" in meta_df.columns: meta_cols = [c for c in ["ISIN","Nome","Descrizione","Categoria","Asset Class"] if c in meta_df.columns] if meta_cols: out = out.merge(meta_df[meta_cols].drop_duplicates("ISIN"), on="ISIN", how="left") # ordina colonne con Nome subito dopo ISIN cols = [c for c in ["ISIN","Nome"] if c in out.columns] + [c for c in out.columns if c not in ["ISIN","Nome"]] return out[cols] perf_attr_df = _build_performance_attribution(pd.concat([rep_eq, rep_rp], ignore_index=True), df_sum if 'df_sum' in globals() else None) perf_attr_df.to_excel(PERF_ATTRIB_XLSX, index=False) print(f"[INFO] Performance attribution salvata in {PERF_ATTRIB_XLSX}") print(f"[INFO] Report trades salvato in {TRADES_REPORT_XLSX}") # ============================================================ # 5.6 Rebuild DAILY PnL from trades_report (calendarized) # → per rendere coerente il compounding dei trade con equity/heatmap # ============================================================ import pandas as pd import numpy as np def rebuild_daily_from_trades_dict(trades_dict): """ trades_dict: {'Equal_Weight': df, 'Risk_Parity': df} Ogni df deve avere: OpenDate, CloseDate, Size, Duration_bars, PnL_% Regola: distribuiamo il PnL del trade su ciascun giorno di durata con un rendimento giornaliero costante r tale che (1+r)^D - 1 = PnL. I giorni attivi sono [OpenDate, CloseDate) sul calendario business. Il contributo del trade ogni giorno è Size * r. Ritorna: DataFrame con indice Date (business days) e colonne per strategia. """ # Costruisci indice calendario minimo→massimo min_dt, max_dt = None, None for df in trades_dict.values(): if df is None or df.empty: continue df = df.copy() df["OpenDate"] = pd.to_datetime(df["OpenDate"]) df["CloseDate"] = pd.to_datetime(df["CloseDate"]) lo, hi = df["OpenDate"].min(), df["CloseDate"].max() if pd.notna(lo): min_dt = lo if (min_dt is None or lo < min_dt) else min_dt if pd.notna(hi): max_dt = hi if (max_dt is None or hi > max_dt) else max_dt if min_dt is None or max_dt is None: return pd.DataFrame() cal = pd.bdate_range(start=min_dt, end=max_dt, freq="C") # business calendar daily = pd.DataFrame(0.0, index=cal, columns=list(trades_dict.keys())) for strat, df in trades_dict.items(): if df is None or df.empty: continue d = df.copy() d["OpenDate"] = pd.to_datetime(d["OpenDate"]) d["CloseDate"] = pd.to_datetime(d["CloseDate"]) d["Size"] = pd.to_numeric(d.get("Size", 0.0), errors="coerce").fillna(0.0) d["Duration_bars"] = pd.to_numeric(d.get("Duration_bars", 0), errors="coerce").fillna(0).astype(int) d["PnL_%"] = pd.to_numeric(d.get("PnL_%", 0.0), errors="coerce").fillna(0.0) # Costruisci serie giornaliera sommando i contributi dei singoli trade s = pd.Series(0.0, index=cal) for _, row in d.iterrows(): D = int(row["Duration_bars"]) if pd.notna(row["Duration_bars"]) else 0 if D <= 0: # trade di durata nulla → salta continue pnl = float(row["PnL_%"]) / 100.0 try: r_daily = (1.0 + pnl) ** (1.0 / D) - 1.0 except Exception: r_daily = pnl / max(1, D) size = float(row["Size"]) if pd.notna(row["Size"]) else 0.0 # Giorni attivi: [OpenDate, CloseDate) sul calendario business rng = pd.bdate_range(start=row["OpenDate"], end=row["CloseDate"] - pd.tseries.offsets.BDay(1), freq="C") if len(rng) != D: # se per qualche motivo differisce (festivi ecc.) ricalibra usando la lunghezza effettiva if len(rng) <= 0: continue try: r_daily = (1.0 + pnl) ** (1.0 / len(rng)) - 1.0 except Exception: r_daily = pnl / len(rng) s[rng] = s[rng] + size * r_daily daily[strat] = s return daily # Costruisci il dict dai DataFrame di trade appena creati trades_dict = { "Equal_Weight": rep_eq if 'rep_eq' in globals() else pd.DataFrame(), "Risk_Parity": rep_rp if 'rep_rp' in globals() else pd.DataFrame(), } # Ricostruzione daily dai PnL giornalieri reali dei trade (senza spalmare) daily_detail = pd.concat([daily_eq, daily_rp], ignore_index=True) if not daily_detail.empty: daily_detail["Date"] = pd.to_datetime(daily_detail["Date"]) daily_detail["PnL_day"] = pd.to_numeric(daily_detail["PnL_day"], errors="coerce") daily_detail["Size"] = pd.to_numeric(daily_detail["Size"], errors="coerce").fillna(0.0) daily_detail = daily_detail.dropna(subset=["Date", "PnL_day"]) daily_detail["Pnl_contrib"] = daily_detail["PnL_day"] * daily_detail["Size"] daily_from_trades = ( daily_detail.pivot_table(index="Date", columns="Strategy", values="Pnl_contrib", aggfunc="sum") .sort_index() .fillna(0.0) ) else: daily_from_trades = pd.DataFrame() # Salva su disco (CSV + XLSX) per ispezione if not daily_from_trades.empty: daily_from_trades.to_csv(DAILY_FROM_TRADES_CSV, index_label="Date") # Plot equity & heatmap basati sui DAILY da trade (coerenti col compounding) import matplotlib.pyplot as plt col_map = [ ("Equal Weight", ["Equal Weight", "Equal_Weight"]), ("Risk Parity", ["Risk Parity", "Risk_Parity"]), ] def _find_col(df, aliases): for c in aliases: if c in df.columns: return c return None fig, ax = plt.subplots(figsize=(10, 6)) for lab, aliases in col_map: col = _find_col(daily_from_trades, aliases) if col: eq = (1.0 + daily_from_trades[col].fillna(0.0)).cumprod() * 100 eq.plot(ax=ax, label=lab) ax.legend() ax.grid(True) ax.set_title("Equity line ricostruita dai trades (calendarizzata)") fig.tight_layout() fig.savefig(PLOT_DIR / "equity_from_trades.png", dpi=150) plt.close(fig) print(f"[INFO] Salvato: {PLOT_DIR / 'equity_from_trades.png'}") # Heatmap per ciascuna strategia for lab, aliases, fname in [ ("Equal Weight", ["Equal Weight", "Equal_Weight"], "heatmap_equal_from_trades.png"), ("Risk Parity", ["Risk Parity", "Risk_Parity"], "heatmap_rp_from_trades.png"), ]: col = _find_col(daily_from_trades, aliases) if col: try: plot_heatmap_monthly( daily_from_trades[col], f"Heatmap mensile - {lab} (da trades)", save_path=PLOT_DIR / fname, ) except Exception as e: print(f"[WARN] Heatmap {lab} da trades: {e}") else: print("[INFO] daily_from_trades risulta vuoto: nessun plot/CSV generato.") checkpoint_post_timer("Ricostruzione daily/plot") # ============================================================ # METRICS UTILS (guard) — richieste da _calc_all_metrics_... # ============================================================ import numpy as np import pandas as pd # r2 della equity line vs trend line if "r2_equity_line" not in globals(): def r2_equity_line(returns: pd.Series) -> float: r = pd.to_numeric(returns, errors="coerce").fillna(0.0) if r.size == 0: return np.nan eq = (1.0 + r).cumprod() t = np.arange(eq.size, dtype=float) X = np.vstack([t, np.ones_like(t)]).T beta, alpha = np.linalg.lstsq(X, eq.values, rcond=None)[0] yhat = alpha + beta * t ss_res = np.sum((eq.values - yhat) ** 2) ss_tot = np.sum((eq.values - eq.values.mean()) ** 2) return float(1 - ss_res/ss_tot) if ss_tot > 0 else np.nan # metriche drawdown: MaxDD, durata massima, time-to-recovery if "drawdown_metrics" not in globals(): def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): r = pd.to_numeric(returns, errors="coerce").fillna(0.0) if r.size == 0: return np.nan, np.nan, np.nan eq = (1.0 + r).cumprod() peak = eq.cummax() dd = eq / peak - 1.0 maxdd = float(dd.min()) if dd.size else np.nan # durata massima di drawdown (numero di barre consecutive sotto il picco) dd_mask = dd < 0 max_dur, cur = 0, 0 for flag in dd_mask: cur = cur + 1 if flag else 0 if cur > max_dur: max_dur = cur # time-to-recovery dal minimo assoluto ttr = np.nan if dd.size: idx_min = dd.idxmin() sub_eq = eq.loc[idx_min:] rec_idx = sub_eq[sub_eq >= peak.loc[idx_min]].index.min() if pd.notna(rec_idx): # se indice datetime, durata in giorni; se indice numerico, in barre if isinstance(rec_idx, pd.Timestamp): ttr = (rec_idx - idx_min).days else: ttr = int(sub_eq.index.get_loc(rec_idx)) else: ttr = sentinel_ttr return maxdd, int(max_dur), ttr # AAW/AUW/Heal Index (versione validata che mi hai passato) if "heal_index_metrics" not in globals(): def heal_index_metrics(returns: pd.Series): s = returns.fillna(0.0).astype(float) if s.size == 0: return np.nan, np.nan, np.nan equity = (1.0 + s).cumprod() if equity.size == 0: return np.nan, np.nan, np.nan run_max = equity.cummax() dd = equity / run_max - 1.0 AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan run_min = equity.cummin() ru = equity / run_min - 1.0 AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan return AAW, AUW, heal # H_min (100% finestre positive) in giorni/mesi (versione validata) if "h_min_100" not in globals(): def h_min_100(returns: pd.Series, month_len: int = 21): s = returns.dropna().astype(float) n = s.size if n == 0: return np.nan, np.nan log1p = np.log1p(s.values) csum = np.cumsum(log1p) def rolling_sum_k(k: int): if k > n: return np.array([]) head = csum[k - 1:] tail = np.concatenate(([0.0], csum[:-k])) return head - tail for k in range(1, n + 1): rs = rolling_sum_k(k) if rs.size == 0: break roll_ret = np.exp(rs) - 1.0 if np.all(roll_ret >= 0): h_days = k h_months = int(np.ceil(h_days / month_len)) return h_days, h_months return np.nan, np.nan # costante se non già definita try: DAYS_PER_YEAR except NameError: DAYS_PER_YEAR = 252 # ========= FUNZIONE RICHIESTA DAL LOOP ========= def _calc_all_metrics_from_returns(r: pd.Series) -> dict: r = pd.to_numeric(r, errors="coerce").fillna(0.0) n = len(r) if n == 0: return {k: np.nan for k in [ "Rendimento_Ann","Volatilita_Ann","CAGR","R2_Equity", "MaxDD","DD_Duration_Max","TTR_from_MDD","AAW","AUW","Heal_Index","H_min_100m_5Y" ]} rendimento_ann = float(r.mean() * DAYS_PER_YEAR) volatilita_ann = float(r.std(ddof=1) * np.sqrt(DAYS_PER_YEAR)) if n > 1 else np.nan eq = (1.0 + r).cumprod() years_elapsed = n / DAYS_PER_YEAR if n > 0 else np.nan if years_elapsed and years_elapsed > 0 and eq.iloc[0] > 0: cagr = float(eq.iloc[-1] ** (1.0 / years_elapsed) - 1.0) else: cagr = np.nan r2 = r2_equity_line(r) maxdd, dddur, ttr = drawdown_metrics(r, sentinel_ttr=1250) aaw, auw, heal = heal_index_metrics(r) # 5 anni o meno se la serie è più corta lookback = min(n, DAYS_PER_YEAR * 5) r5 = r.iloc[-lookback:] _, hmin_months = h_min_100(r5, month_len=21) return { "Rendimento_Ann": rendimento_ann, "Volatilita_Ann": volatilita_ann, "CAGR": cagr, "R2_Equity": r2, "MaxDD": maxdd, "DD_Duration_Max": dddur, "TTR_from_MDD": ttr, "AAW": aaw, "AUW": auw, "Heal_Index": heal, "H_min_100m_5Y": hmin_months } # ====================================================================== # 6) LOOP Top-N: metriche per N = 6..20 → final_metrics.xlsx # ====================================================================== # Safety: DAYS_PER_YEAR se non definito try: DAYS_PER_YEAR except NameError: DAYS_PER_YEAR = 252 def _select_isins_for_topN(df_sum: pd.DataFrame, top_n: int): """Seleziona i migliori 'top_n' ISIN in base allo Score.""" df_sum_loc = df_sum.copy() base_isins_N = ( df_sum_loc .sort_values("Score", ascending=False) .head(top_n)["ISIN"].astype(str).str.strip().tolist() ) return base_isins_N def _build_portfolio_returns_for_isins(base_isins_N, wide_pnl): """ Costruisce i rendimenti di portafoglio Equal Weight e Risk Parity per l'insieme di ISIN in base_isins_N. Ritorna: ret_eq_N : pd.Series ret_rp_N : pd.Series """ # Colonne effettivamente disponibili colsN = [c for c in base_isins_N if c in wide_pnl.columns] if len(colsN) == 0: # Nessun ISIN valido → portafogli in cash (linea piatta) idx = wide_pnl.index ret_eq_N = pd.Series(0.0, index=idx, name="Ret_EqW_N") ret_rp_N = pd.Series(0.0, index=idx, name="Ret_RP_N") return ret_eq_N, ret_rp_N # -------- Equal Weight -------- ret_eq_N = wide_pnl[colsN].mean(axis=1) # -------- Risk Parity con cap -------- weights_rp_N = inverse_vol_weights( wide_pnl[colsN], window=60, max_weight=RP_MAX_WEIGHT # es. RP_MAX_WEIGHT = 2 / TOP_N_MAX = 0.1333 ) ret_rp_N = (wide_pnl[colsN] * weights_rp_N).sum(axis=1) return ret_eq_N, ret_rp_N # ============================== # Metriche portafoglio (TOP_N corrente) → Excel # ============================== metrics_rows = [] for strategy_name, rser in [ ("Equal_Weight", ret_eq), ("Risk_Parity", ret_rp), ]: m = _calc_all_metrics_from_returns(rser) m["TopN"] = TOP_N m["Strategy"] = strategy_name metrics_rows.append(m) df_metrics = pd.DataFrame(metrics_rows)[[ "TopN", "Strategy", "Rendimento_Ann", "Volatilita_Ann", "CAGR", "R2_Equity", "MaxDD", "DD_Duration_Max", "TTR_from_MDD", "AAW", "AUW", "Heal_Index", "H_min_100m_5Y", ]] try: with pd.ExcelWriter(FINAL_METRICS_XLSX, engine="openpyxl", mode="a", if_sheet_exists="replace") as xw: df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False) except Exception: with pd.ExcelWriter(FINAL_METRICS_XLSX) as xw: df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False) print(f"[INFO] Salvato: {FINAL_METRICS_XLSX} (Portfolio_Metrics)")