# -*- coding: utf-8 -*- """ knn_backtest_multiday.py ======================== Estensione "multi-day" del walk-forward kNN esistente. Filosofia: - NON modifica le funzioni di produzione (knn_forward_backtest_one_asset, build_pattern_library, predict_from_library, z_norm). - Riusa shared_utils.* per coerenza al 100% con il sistema live. - Aggiunge un parametro `decision_every` (N giorni di calendario tra una decisione e la successiva) per ridurre il turnover senza alterare la logica di entry/exit. - Restituisce sia i signals daily sia uno stats dict identico nel formato a quello prodotto dalla funzione daily, così le funzioni di valutazione (ranking, portfolio building, ecc.) possono essere riutilizzate as-is. Compatibilità: - decision_every=1 → comportamento IDENTICO al motore daily attuale (è il default, garantisce backward compatibility) - decision_every=N → ricalcola EstOutcome ogni N barre; in mezzo "mantiene in vita" lo stato del segnale e applica solo le exit di rischio (SL/TP/TRAIL/TIME) L'idea è coerente con la natura del kNN su finestre WP=60 / Ha=10: non c'è motivo strutturale per rifare la stessa stima ogni 24 ore. Author: refactor proposto per il sistema esistente """ from __future__ import annotations import sys from pathlib import Path from typing import Optional # Aggiunge la cartella padre al path Python per trovare shared_utils.py _PARENT_DIR = Path(__file__).resolve().parent.parent if str(_PARENT_DIR) not in sys.path: sys.path.insert(0, str(_PARENT_DIR)) import numpy as np import pandas as pd from shared_utils import ( build_pattern_library, predict_from_library, z_norm, ) def knn_forward_backtest_multiday( df_isin: pd.DataFrame, col_date: str, col_ret: str, Wp: int, Ha: int, k: int, theta_entry: float, *, exec_ret: Optional[pd.Series] = None, fee_bps: float = 10.0, # --- exit params (stessa semantica del motore daily) --- sl_bps: Optional[float] = 300.0, tp_bps: Optional[float] = 800.0, trail_bps: Optional[float] = 300.0, time_stop_bars: Optional[int] = 20, theta_exit: Optional[float] = 0.0, weak_days_exit: Optional[int] = None, # --- nuovi parametri --- decision_every: int = 1, min_holding_bars: int = 0, # --- nuovi parametri di entry/exit avanzati --- only_first_signal: bool = False, # se True, dopo l'apertura non rivaluta entry ) -> tuple[pd.DataFrame, dict]: """ Variante "multi-day" del walk-forward kNN. Parametri NUOVI rispetto a knn_forward_backtest_one_asset: -------------------------------------------------------- decision_every : int (>=1) Ricalcola EstOutcome e decide entry/flip solo ogni `decision_every` barre. Le exit di rischio (SL/TP/TRAIL/TIME) restano valutate ogni giorno per coerenza con le pratiche di risk management. decision_every=1 → comportamento IDENTICO al motore daily. min_holding_bars : int Numero minimo di barre per cui una posizione non può chiudere per RANK/FLIP (le exit di rischio rimangono attive). Utile per evitare di uccidere subito un trade per noise del primo giorno. only_first_signal : bool Se True, una volta in posizione non rivaluta EstOutcome per decidere eventuali flip; resta in posizione finché una exit di rischio o un time stop la chiude. Tutti gli altri parametri sono identici a knn_forward_backtest_one_asset. Returns ------- sig_df : DataFrame con colonne ["Date","Signal","EstOutcome","AvgDist","Ret+1","PnL"] stats : dict con le metriche di sintesi (stesso schema della funzione daily) """ if decision_every < 1: raise ValueError("decision_every deve essere >= 1") r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0 idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r)) idx = pd.to_datetime(idx).dt.normalize() if exec_ret is not None: r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float) r_exec.index = pd.to_datetime(r_exec.index).normalize() r_exec = r_exec.reindex(idx) if len(r_exec) != len(r): r_exec = pd.Series(r_exec.values, index=idx).reindex(idx) else: r_exec = r fee = fee_bps / 10000.0 def _lib_predict(past_returns: pd.Series, win_last: np.ndarray): lib_wins, lib_out = build_pattern_library(past_returns, Wp, Ha) if lib_wins is None: return np.nan, np.nan curr_zn = z_norm(win_last) if curr_zn is None: return np.nan, np.nan est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=k) return float(est_out), float(avg_dist) in_pos = False entry_t = None trade_pnl = 0.0 trade_peak = 0.0 weak_streak = 0 # Memorizziamo l'ultima stima per non doverla rifare ogni barra last_est_out: float = np.nan last_avg_dist: float = np.nan rows = [] for t in range(Wp, len(r) - 1): past = r.iloc[:t] next_ret = r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan # Decidi se è un giorno di "ranking refresh" is_decision_day = ((t - Wp) % decision_every == 0) # Calcola EstOutcome solo nei giorni di decisione (o se non abbiamo ancora una stima) if is_decision_day or pd.isna(last_est_out): if past.dropna().shape[0] < (Wp + Ha): est_out, avg_dist = np.nan, np.nan else: win_last = r.iloc[t - Wp:t].values est_out, avg_dist = _lib_predict(past, win_last) last_est_out = est_out last_avg_dist = avg_dist else: est_out = last_est_out avg_dist = last_avg_dist sig_out = 1 if in_pos else 0 bars_in_trade = (t - entry_t + 1) if (in_pos and entry_t is not None) else 0 # === ENTRATA === # Solo nei giorni di decisione e se non già in posizione if (not in_pos) and is_decision_day and np.isfinite(est_out) and (est_out > theta_entry): sig_out = 1 in_pos = True entry_t = t trade_pnl = 0.0 trade_peak = 0.0 weak_streak = 0 # === USCITE === elif in_pos: pnl_if_stay = (1.0 + trade_pnl) * (1.0 + (next_ret if np.isfinite(next_ret) else 0.0)) - 1.0 peak_if_stay = max(trade_peak, pnl_if_stay) exit_reasons = [] # SL/TP/TRAIL: sempre attive (anche nei giorni non-decisione) if (sl_bps is not None) and (pnl_if_stay <= -sl_bps / 10000.0) and (bars_in_trade >= min_holding_bars): exit_reasons.append("SL") if (tp_bps is not None) and (pnl_if_stay >= tp_bps / 10000.0): exit_reasons.append("TP") if (trail_bps is not None) and (peak_if_stay - pnl_if_stay >= trail_bps / 10000.0): exit_reasons.append("TRAIL") if (time_stop_bars is not None) and (bars_in_trade >= time_stop_bars): exit_reasons.append("TIME") # FLIP/WEAK: solo nei giorni di decisione e dopo min_holding_bars if ( is_decision_day and (not only_first_signal) and (theta_exit is not None) and (bars_in_trade >= min_holding_bars) and np.isfinite(est_out) ): if est_out <= theta_exit: weak_streak = weak_streak + 1 if weak_days_exit else weak_streak if weak_days_exit is None: exit_reasons.append("FLIP") elif weak_streak >= weak_days_exit: exit_reasons.append("FLIP_STREAK") else: weak_streak = 0 if exit_reasons: sig_out = 0 in_pos = False entry_t = None trade_pnl = 0.0 trade_peak = 0.0 weak_streak = 0 else: trade_pnl = pnl_if_stay trade_peak = peak_if_stay rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan)) sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"]) sig_df["Signal_prev"] = sig_df["Signal"].shift(1).fillna(0) trade_chg = (sig_df["Signal"] - sig_df["Signal_prev"]).abs() cost = trade_chg * fee sig_df["PnL"] = sig_df["Signal"] * sig_df["Ret+1"] - cost sig_df.drop(columns=["Signal_prev"], inplace=True) # === metriche di sintesi (stesso schema della funzione daily) === pnl_series = sig_df["PnL"].fillna(0.0) n = max(1, pnl_series.notna().sum()) eq = (1.0 + pnl_series).cumprod() peak = eq.cummax() dd = (eq / peak - 1.0) mdd = float(dd.min()) if dd.size else np.nan cagr = float((eq.iloc[-1]) ** (252.0 / max(1, len(pnl_series))) - 1.0) if eq.iloc[-1] > 0 else np.nan sigma = float(pnl_series.std(ddof=1)) if len(pnl_series) > 1 else np.nan vol_a = sigma * np.sqrt(252) if np.isfinite(sigma) else np.nan sharpe = float(pnl_series.mean() / sigma * np.sqrt(252)) if (np.isfinite(sigma) and sigma > 0) else np.nan downside = pnl_series[pnl_series < 0] sortino = ( float(pnl_series.mean() / downside.std(ddof=1) * np.sqrt(252)) if (len(downside) > 1 and downside.std(ddof=1) > 0) else np.nan ) calmar = float(cagr / abs(mdd)) if (np.isfinite(mdd) and mdd < 0 and np.isfinite(cagr)) else np.nan # numero di trade reali sig = sig_df["Signal"].astype(int).values n_trades = int(((sig[1:] - sig[:-1]) > 0).sum()) # transizioni 0→1 stats = { "CAGR_%": round(cagr * 100, 3) if np.isfinite(cagr) else np.nan, "AnnVol_%": round(vol_a * 100, 3) if np.isfinite(vol_a) else np.nan, "Sharpe": round(sharpe, 3) if np.isfinite(sharpe) else np.nan, "Sortino": round(sortino, 3) if np.isfinite(sortino) else np.nan, "MaxDD_%eq": round(mdd * 100, 3) if np.isfinite(mdd) else np.nan, "Calmar": round(calmar, 3) if np.isfinite(calmar) else np.nan, "HitRate_%": round(100 * (pnl_series > 0).sum() / max(1, (pnl_series != 0).sum()), 2), "AvgTradeRet_bps": round(pnl_series.mean() * 10000, 2), "Turnover_%/step": round(100 * trade_chg.mean(), 2), "N_Steps": int(sig_df.shape[0]), "N_Trades": n_trades, # parametri usati (per tracciare la cella della grid search) "Wp": int(Wp), "Ha": int(Ha), "k": int(k), "theta_entry": float(theta_entry), "theta_exit": (None if theta_exit is None else float(theta_exit)), "sl_bps": (None if sl_bps is None else float(sl_bps)), "tp_bps": (None if tp_bps is None else float(tp_bps)), "trail_bps": (None if trail_bps is None else float(trail_bps)), "time_stop_bars": (None if time_stop_bars is None else int(time_stop_bars)), "weak_days_exit": (None if weak_days_exit is None else int(weak_days_exit)), "decision_every": int(decision_every), "min_holding_bars": int(min_holding_bars), "only_first_signal": bool(only_first_signal), } return sig_df, stats __all__ = ["knn_forward_backtest_multiday"]