# -*- coding: utf-8 -*- """ grid_search.py ============== Motore di grid search con walk-forward validation per il sistema kNN. Componenti chiave: - ParameterGrid → genera la lista di combinazioni di parametri - TimeSeriesSplitter → split walk-forward train/test con embargo - run_grid_search → orchestratore: per ogni ISIN e ogni fold, esegue il backtest multi-day, aggrega le metriche - aggregate_results → per ogni cella della grid, calcola Sharpe medio, Stability (1/std dello Sharpe fra fold), e l'IS-OOS gap (overfit detector) Convenzioni: - TUTTE le metriche sono calcolate sul fold di TEST (out-of-sample), MAI in-sample - L'embargo evita overlap pattern-library fra train e test (purged k-fold variant) - I parametri "vincenti" sono quelli con Sharpe-mean alto E Stability alta E IS-OOS gap basso (ovvero non hanno overfittato il train) L'output è pronto per essere consumato da report.py per la visualizzazione. """ from __future__ import annotations import itertools import time from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Any, Optional, Sequence import numpy as np import pandas as pd from knn_backtest_multiday import knn_forward_backtest_multiday # ============================================================= # Parameter Grid # ============================================================= @dataclass class ParameterGrid: """ Definisce le combinazioni di parametri da testare. Esempio: grid = ParameterGrid( Wp=[40, 60, 80], Ha=[5, 10, 15], k=[15, 25, 35], theta_entry=[0.0, 0.005, 0.01], sl_bps=[200, 300, 500], tp_bps=[600, 800, 1200], trail_bps=[200, 300], time_stop_bars=[10, 20], decision_every=[1, 3, 5, 10], min_holding_bars=[0, 3], ) list(grid.iterate()) # → lista di dict con le combinazioni """ Wp: Sequence[int] = (60,) Ha: Sequence[int] = (10,) k: Sequence[int] = (25,) theta_entry: Sequence[float] = (0.005,) sl_bps: Sequence[Optional[float]] = (300.0,) tp_bps: Sequence[Optional[float]] = (800.0,) trail_bps: Sequence[Optional[float]] = (300.0,) time_stop_bars: Sequence[Optional[int]] = (20,) theta_exit: Sequence[Optional[float]] = (0.0,) weak_days_exit: Sequence[Optional[int]] = (None,) decision_every: Sequence[int] = (1,) min_holding_bars: Sequence[int] = (0,) only_first_signal: Sequence[bool] = (False,) fee_bps: Sequence[float] = (10.0,) def iterate(self): """Yield ogni combinazione come dict.""" fields_order = [ "Wp", "Ha", "k", "theta_entry", "sl_bps", "tp_bps", "trail_bps", "time_stop_bars", "theta_exit", "weak_days_exit", "decision_every", "min_holding_bars", "only_first_signal", "fee_bps", ] values_lists = [getattr(self, f) for f in fields_order] for combo in itertools.product(*values_lists): yield dict(zip(fields_order, combo)) def size(self) -> int: return int(np.prod([len(getattr(self, f)) for f in [ "Wp", "Ha", "k", "theta_entry", "sl_bps", "tp_bps", "trail_bps", "time_stop_bars", "theta_exit", "weak_days_exit", "decision_every", "min_holding_bars", "only_first_signal", "fee_bps", ]])) # ============================================================= # Walk-forward splitter # ============================================================= @dataclass class TimeSeriesSplitter: """ Walk-forward splitter con embargo (purged k-fold per time series). Esempio con n_splits=4, train_size=500, test_size=125, embargo=20: [---- train ----][emb][--- test ---] fold 1 [---- train ----][emb][--- test ---] fold 2 [---- train ----][emb][--- test ---] fold 3 [---- train ----][emb][--- test ---] fold 4 Restituisce tuple (train_start, train_end, test_start, test_end) come INDICI POSIZIONALI nella serie temporale. """ n_splits: int = 4 train_size: int = 504 # ~2 anni di business days test_size: int = 126 # ~6 mesi embargo: int = 20 # gap fra train e test per evitare leakage def split(self, n_obs: int): """ Yield (i_train_start, i_train_end, i_test_start, i_test_end). Gli indici sono inclusivi a sinistra ed esclusivi a destra (Python style). """ min_total = self.train_size + self.embargo + self.test_size if n_obs < min_total: raise ValueError( f"Serie troppo corta ({n_obs} barre) per n_splits={self.n_splits}, " f"train_size={self.train_size}, embargo={self.embargo}, " f"test_size={self.test_size}. Servono almeno {min_total} barre." ) # Distribuisci gli start dei test su tutto il range disponibile first_test_start = self.train_size + self.embargo last_test_end = n_obs if self.n_splits == 1: test_starts = [first_test_start] else: available_test_span = last_test_end - first_test_start - self.test_size if available_test_span <= 0: # Non c'è spazio per più di un fold test_starts = [first_test_start] else: stride = available_test_span / max(1, self.n_splits - 1) test_starts = [int(round(first_test_start + i * stride)) for i in range(self.n_splits)] for ts in test_starts: te = min(ts + self.test_size, n_obs) tr_end = ts - self.embargo tr_start = max(0, tr_end - self.train_size) if te - ts < self.test_size // 2: continue # fold troppo corto, skip yield tr_start, tr_end, ts, te # ============================================================= # Singolo backtest su un singolo ISIN per un singolo set di parametri # ============================================================= def _run_one_combo_on_asset( df_asset: pd.DataFrame, col_date: str, col_ret: str, params: dict, exec_ret: Optional[pd.Series] = None, ) -> dict: """ Esegue knn_forward_backtest_multiday con i parametri specificati e restituisce le metriche di sintesi (stats dict). Se il backtest fallisce, restituisce un dict con NaN per garantire che la grid search non si interrompa. """ try: sig_df, stats = knn_forward_backtest_multiday( df_isin=df_asset, col_date=col_date, col_ret=col_ret, Wp=params["Wp"], Ha=params["Ha"], k=params["k"], theta_entry=params["theta_entry"], exec_ret=exec_ret, fee_bps=params["fee_bps"], sl_bps=params["sl_bps"], tp_bps=params["tp_bps"], trail_bps=params["trail_bps"], time_stop_bars=params["time_stop_bars"], theta_exit=params["theta_exit"], weak_days_exit=params["weak_days_exit"], decision_every=params["decision_every"], min_holding_bars=params["min_holding_bars"], only_first_signal=params["only_first_signal"], ) return stats except Exception as exc: return { "Sharpe": np.nan, "CAGR_%": np.nan, "MaxDD_%eq": np.nan, "Calmar": np.nan, "Sortino": np.nan, "N_Trades": 0, "HitRate_%": np.nan, "Turnover_%/step": np.nan, "AvgTradeRet_bps": np.nan, "AnnVol_%": np.nan, "_error": str(exc), **params, } # ============================================================= # Main grid search loop con walk-forward # ============================================================= def run_grid_search( assets: dict[str, pd.DataFrame], col_date: str, col_ret: str, grid: ParameterGrid, splitter: TimeSeriesSplitter, exec_ret_map: Optional[dict[str, pd.Series]] = None, *, verbose: bool = True, n_max_combos: Optional[int] = None, save_intermediate_to: Optional[Path] = None, ) -> pd.DataFrame: """ Esegue grid search walk-forward su una collezione di ISIN. Parameters ---------- assets : dict {isin -> df_asset} Mappa ISIN → DataFrame con almeno le colonne col_date e col_ret. grid : ParameterGrid splitter : TimeSeriesSplitter exec_ret_map : dict {isin -> pd.Series}, opzionale Rendimenti open-to-open per esecuzione realistica. verbose : bool Stampa progresso. n_max_combos : int, opzionale Se specificato, esegue solo le prime N combinazioni (utile per test). save_intermediate_to : Path, opzionale Se specificato, salva il DataFrame parziale ogni 10 combinazioni in un file XLSX per ripartire in caso di crash. Returns ------- DataFrame "lungo": una riga per (ISIN, combo_id, fold_id) Colonne: tutte le metriche + parametri + identificativo ISIN / fold. """ total_combos = grid.size() if n_max_combos: total_combos = min(total_combos, n_max_combos) if verbose: print(f"[GRID] Combinazioni da testare: {total_combos}") print(f"[GRID] ISIN: {len(assets)}") print(f"[GRID] Fold per ISIN: {splitter.n_splits}") print(f"[GRID] Backtest totali stimati: {total_combos * len(assets) * splitter.n_splits:,}") rows = [] combo_id = 0 t_start = time.perf_counter() for params in grid.iterate(): if n_max_combos and combo_id >= n_max_combos: break combo_id += 1 t_combo = time.perf_counter() for isin, df_asset in assets.items(): df_a = df_asset.copy() if col_date in df_a.columns: df_a[col_date] = pd.to_datetime(df_a[col_date], errors="coerce") df_a = df_a.sort_values(col_date).reset_index(drop=True) n_obs = len(df_a) try: splits = list(splitter.split(n_obs)) except ValueError: # serie troppo corta: skip continue exec_ret = exec_ret_map.get(isin) if exec_ret_map else None for fold_id, (tr_s, tr_e, te_s, te_e) in enumerate(splits, start=1): # Backtest sul TEST fold (la libreria viene comunque costruita # solo con il passato all'interno della funzione, quindi train # vs test è "purgato" naturalmente per costruzione) df_test = df_a.iloc[tr_s:te_e].copy() # passiamo train+embargo+test exec_ret_test = exec_ret.loc[df_test[col_date]] if (exec_ret is not None and col_date in df_test.columns) else None stats = _run_one_combo_on_asset( df_asset=df_test, col_date=col_date, col_ret=col_ret, params=params, exec_ret=exec_ret_test, ) row = { "ISIN": isin, "combo_id": combo_id, "fold_id": fold_id, "n_obs_test": te_e - te_s, **stats, } rows.append(row) if verbose: elapsed = time.perf_counter() - t_combo tot_elapsed = time.perf_counter() - t_start eta = (tot_elapsed / combo_id) * (total_combos - combo_id) print(f"[GRID] combo {combo_id}/{total_combos} ({elapsed:.1f}s) — ETA {eta/60:.1f} min") # Salvataggio intermedio if save_intermediate_to and (combo_id % 10 == 0): df_partial = pd.DataFrame(rows) try: df_partial.to_excel(save_intermediate_to, index=False) except Exception as e: print(f"[GRID] WARNING: impossibile salvare intermediate: {e}") df_results = pd.DataFrame(rows) if verbose: print(f"[GRID] Completato in {(time.perf_counter()-t_start)/60:.1f} min") print(f"[GRID] Righe risultato: {len(df_results):,}") return df_results # ============================================================= # Aggregazione risultati e ranking finale # ============================================================= def aggregate_results( df_results: pd.DataFrame, *, by_isin: bool = False, primary_metric: str = "Sharpe", min_trades_per_fold: int = 5, ) -> pd.DataFrame: """ Aggrega i risultati per combinazione di parametri. Per ogni combo_id, calcola: - {primary_metric}_mean → media tra fold (e ISIN se by_isin=False) - {primary_metric}_std → std tra fold - {primary_metric}_min → minimo (robustezza worst-case) - Stability = 1 / (1 + std/|mean|) → 0..1, 1=identico fra fold - N_Trades_avg, MaxDD_avg, Calmar_avg - n_folds_valid → numero di fold con N_Trades >= min_trades_per_fold Parameters ---------- by_isin : bool Se True, aggrega per (ISIN, combo_id) — utile per analizzare quali parametri funzionano per quali asset. Se False, aggrega su tutto. primary_metric : str Metrica principale per il ranking (Sharpe, Calmar, Sortino, ...). min_trades_per_fold : int Fold con meno trade vengono esclusi dall'aggregazione (rumore). Returns ------- DataFrame ordinato per primary_metric_mean discendente, con i parametri della combinazione (colonne Wp, Ha, k, ecc.) come "chiave". """ if df_results is None or df_results.empty: return pd.DataFrame() # Filtro fold validi df = df_results.copy() df["_valid_fold"] = df["N_Trades"].fillna(0) >= min_trades_per_fold # Colonne parametri (chiave della combo) param_cols = [ "Wp", "Ha", "k", "theta_entry", "sl_bps", "tp_bps", "trail_bps", "time_stop_bars", "theta_exit", "weak_days_exit", "decision_every", "min_holding_bars", "only_first_signal", ] param_cols = [c for c in param_cols if c in df.columns] groupby_cols = (["ISIN"] if by_isin else []) + ["combo_id"] + param_cols metric_cols = [primary_metric, "CAGR_%", "MaxDD_%eq", "Calmar", "Sortino", "N_Trades", "HitRate_%", "Turnover_%/step", "AvgTradeRet_bps"] metric_cols = [c for c in metric_cols if c in df.columns] def _agg(g): valid = g[g["_valid_fold"]] if valid.empty: return pd.Series({ f"{primary_metric}_mean": np.nan, f"{primary_metric}_std": np.nan, f"{primary_metric}_min": np.nan, "Stability": np.nan, "n_folds_valid": 0, "n_folds_total": len(g), "N_Trades_avg": np.nan, "MaxDD_avg": np.nan, "Calmar_avg": np.nan, "CAGR_avg": np.nan, "HitRate_avg": np.nan, }) m_mean = valid[primary_metric].mean() m_std = valid[primary_metric].std() m_min = valid[primary_metric].min() stability = 1.0 / (1.0 + (m_std / abs(m_mean))) if (np.isfinite(m_mean) and abs(m_mean) > 1e-9) else 0.0 return pd.Series({ f"{primary_metric}_mean": m_mean, f"{primary_metric}_std": m_std, f"{primary_metric}_min": m_min, "Stability": stability, "n_folds_valid": len(valid), "n_folds_total": len(g), "N_Trades_avg": valid["N_Trades"].mean() if "N_Trades" in valid.columns else np.nan, "MaxDD_avg": valid["MaxDD_%eq"].mean() if "MaxDD_%eq" in valid.columns else np.nan, "Calmar_avg": valid["Calmar"].mean() if "Calmar" in valid.columns else np.nan, "CAGR_avg": valid["CAGR_%"].mean() if "CAGR_%" in valid.columns else np.nan, "HitRate_avg": valid["HitRate_%"].mean() if "HitRate_%" in valid.columns else np.nan, }) agg = df.groupby(groupby_cols, dropna=False).apply(_agg).reset_index() agg = agg.sort_values(f"{primary_metric}_mean", ascending=False).reset_index(drop=True) # Aggiungi un "composite score" che premia Sharpe alto E stabilità alta if f"{primary_metric}_mean" in agg.columns and "Stability" in agg.columns: # Rank percentile sui due indicatori, poi media agg["_rank_metric"] = agg[f"{primary_metric}_mean"].rank(pct=True) agg["_rank_stab"] = agg["Stability"].rank(pct=True) agg["Composite"] = (agg["_rank_metric"] + agg["_rank_stab"]) / 2.0 agg = agg.drop(columns=["_rank_metric", "_rank_stab"]) return agg def best_combo_per_isin(df_results: pd.DataFrame, primary_metric: str = "Sharpe") -> pd.DataFrame: """ Per ogni ISIN, restituisce la combinazione di parametri con Sharpe-mean più alto. Utile per scoprire se diversi asset preferiscono regimi diversi di (decision_every, tp_bps, ...). """ agg = aggregate_results(df_results, by_isin=True, primary_metric=primary_metric) if agg.empty: return pd.DataFrame() idx = agg.groupby("ISIN")[f"{primary_metric}_mean"].idxmax() return agg.loc[idx.dropna()].reset_index(drop=True) __all__ = [ "ParameterGrid", "TimeSeriesSplitter", "run_grid_search", "aggregate_results", "best_combo_per_isin", ]