283 lines
11 KiB
Python
283 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
knn_backtest_multiday.py
|
|
========================
|
|
Estensione "multi-day" del walk-forward kNN esistente.
|
|
|
|
Filosofia:
|
|
- NON modifica le funzioni di produzione (knn_forward_backtest_one_asset,
|
|
build_pattern_library, predict_from_library, z_norm).
|
|
- Riusa shared_utils.* per coerenza al 100% con il sistema live.
|
|
- Aggiunge un parametro `decision_every` (N giorni di calendario tra una
|
|
decisione e la successiva) per ridurre il turnover senza alterare la
|
|
logica di entry/exit.
|
|
- Restituisce sia i signals daily sia uno stats dict identico nel formato
|
|
a quello prodotto dalla funzione daily, così le funzioni di valutazione
|
|
(ranking, portfolio building, ecc.) possono essere riutilizzate as-is.
|
|
|
|
Compatibilità:
|
|
- decision_every=1 → comportamento IDENTICO al motore daily attuale
|
|
(è il default, garantisce backward compatibility)
|
|
- decision_every=N → ricalcola EstOutcome ogni N barre; in mezzo "mantiene
|
|
in vita" lo stato del segnale e applica solo le exit di rischio (SL/TP/TRAIL/TIME)
|
|
|
|
L'idea è coerente con la natura del kNN su finestre WP=60 / Ha=10:
|
|
non c'è motivo strutturale per rifare la stessa stima ogni 24 ore.
|
|
|
|
Author: refactor proposto per il sistema esistente
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Aggiunge la cartella padre al path Python per trovare shared_utils.py
|
|
_PARENT_DIR = Path(__file__).resolve().parent.parent
|
|
if str(_PARENT_DIR) not in sys.path:
|
|
sys.path.insert(0, str(_PARENT_DIR))
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from shared_utils import (
|
|
build_pattern_library,
|
|
predict_from_library,
|
|
z_norm,
|
|
)
|
|
|
|
|
|
def knn_forward_backtest_multiday(
|
|
df_isin: pd.DataFrame,
|
|
col_date: str,
|
|
col_ret: str,
|
|
Wp: int,
|
|
Ha: int,
|
|
k: int,
|
|
theta_entry: float,
|
|
*,
|
|
exec_ret: Optional[pd.Series] = None,
|
|
fee_bps: float = 10.0,
|
|
# --- exit params (stessa semantica del motore daily) ---
|
|
sl_bps: Optional[float] = 300.0,
|
|
tp_bps: Optional[float] = 800.0,
|
|
trail_bps: Optional[float] = 300.0,
|
|
time_stop_bars: Optional[int] = 20,
|
|
theta_exit: Optional[float] = 0.0,
|
|
weak_days_exit: Optional[int] = None,
|
|
# --- nuovi parametri ---
|
|
decision_every: int = 1,
|
|
min_holding_bars: int = 0,
|
|
# --- nuovi parametri di entry/exit avanzati ---
|
|
only_first_signal: bool = False, # se True, dopo l'apertura non rivaluta entry
|
|
) -> tuple[pd.DataFrame, dict]:
|
|
"""
|
|
Variante "multi-day" del walk-forward kNN.
|
|
|
|
Parametri NUOVI rispetto a knn_forward_backtest_one_asset:
|
|
--------------------------------------------------------
|
|
decision_every : int (>=1)
|
|
Ricalcola EstOutcome e decide entry/flip solo ogni `decision_every`
|
|
barre. Le exit di rischio (SL/TP/TRAIL/TIME) restano valutate ogni
|
|
giorno per coerenza con le pratiche di risk management.
|
|
decision_every=1 → comportamento IDENTICO al motore daily.
|
|
|
|
min_holding_bars : int
|
|
Numero minimo di barre per cui una posizione non può chiudere per
|
|
RANK/FLIP (le exit di rischio rimangono attive). Utile per evitare
|
|
di uccidere subito un trade per noise del primo giorno.
|
|
|
|
only_first_signal : bool
|
|
Se True, una volta in posizione non rivaluta EstOutcome per decidere
|
|
eventuali flip; resta in posizione finché una exit di rischio o un
|
|
time stop la chiude.
|
|
|
|
Tutti gli altri parametri sono identici a knn_forward_backtest_one_asset.
|
|
|
|
Returns
|
|
-------
|
|
sig_df : DataFrame con colonne ["Date","Signal","EstOutcome","AvgDist","Ret+1","PnL"]
|
|
stats : dict con le metriche di sintesi (stesso schema della funzione daily)
|
|
"""
|
|
if decision_every < 1:
|
|
raise ValueError("decision_every deve essere >= 1")
|
|
|
|
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0
|
|
idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r))
|
|
idx = pd.to_datetime(idx).dt.normalize()
|
|
|
|
if exec_ret is not None:
|
|
r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float)
|
|
r_exec.index = pd.to_datetime(r_exec.index).normalize()
|
|
r_exec = r_exec.reindex(idx)
|
|
if len(r_exec) != len(r):
|
|
r_exec = pd.Series(r_exec.values, index=idx).reindex(idx)
|
|
else:
|
|
r_exec = r
|
|
|
|
fee = fee_bps / 10000.0
|
|
|
|
def _lib_predict(past_returns: pd.Series, win_last: np.ndarray):
|
|
lib_wins, lib_out = build_pattern_library(past_returns, Wp, Ha)
|
|
if lib_wins is None:
|
|
return np.nan, np.nan
|
|
curr_zn = z_norm(win_last)
|
|
if curr_zn is None:
|
|
return np.nan, np.nan
|
|
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=k)
|
|
return float(est_out), float(avg_dist)
|
|
|
|
in_pos = False
|
|
entry_t = None
|
|
trade_pnl = 0.0
|
|
trade_peak = 0.0
|
|
weak_streak = 0
|
|
# Memorizziamo l'ultima stima per non doverla rifare ogni barra
|
|
last_est_out: float = np.nan
|
|
last_avg_dist: float = np.nan
|
|
|
|
rows = []
|
|
|
|
for t in range(Wp, len(r) - 1):
|
|
past = r.iloc[:t]
|
|
next_ret = r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan
|
|
|
|
# Decidi se è un giorno di "ranking refresh"
|
|
is_decision_day = ((t - Wp) % decision_every == 0)
|
|
|
|
# Calcola EstOutcome solo nei giorni di decisione (o se non abbiamo ancora una stima)
|
|
if is_decision_day or pd.isna(last_est_out):
|
|
if past.dropna().shape[0] < (Wp + Ha):
|
|
est_out, avg_dist = np.nan, np.nan
|
|
else:
|
|
win_last = r.iloc[t - Wp:t].values
|
|
est_out, avg_dist = _lib_predict(past, win_last)
|
|
last_est_out = est_out
|
|
last_avg_dist = avg_dist
|
|
else:
|
|
est_out = last_est_out
|
|
avg_dist = last_avg_dist
|
|
|
|
sig_out = 1 if in_pos else 0
|
|
bars_in_trade = (t - entry_t + 1) if (in_pos and entry_t is not None) else 0
|
|
|
|
# === ENTRATA ===
|
|
# Solo nei giorni di decisione e se non già in posizione
|
|
if (not in_pos) and is_decision_day and np.isfinite(est_out) and (est_out > theta_entry):
|
|
sig_out = 1
|
|
in_pos = True
|
|
entry_t = t
|
|
trade_pnl = 0.0
|
|
trade_peak = 0.0
|
|
weak_streak = 0
|
|
|
|
# === USCITE ===
|
|
elif in_pos:
|
|
pnl_if_stay = (1.0 + trade_pnl) * (1.0 + (next_ret if np.isfinite(next_ret) else 0.0)) - 1.0
|
|
peak_if_stay = max(trade_peak, pnl_if_stay)
|
|
exit_reasons = []
|
|
|
|
# SL/TP/TRAIL: sempre attive (anche nei giorni non-decisione)
|
|
if (sl_bps is not None) and (pnl_if_stay <= -sl_bps / 10000.0) and (bars_in_trade >= min_holding_bars):
|
|
exit_reasons.append("SL")
|
|
if (tp_bps is not None) and (pnl_if_stay >= tp_bps / 10000.0):
|
|
exit_reasons.append("TP")
|
|
if (trail_bps is not None) and (peak_if_stay - pnl_if_stay >= trail_bps / 10000.0):
|
|
exit_reasons.append("TRAIL")
|
|
if (time_stop_bars is not None) and (bars_in_trade >= time_stop_bars):
|
|
exit_reasons.append("TIME")
|
|
|
|
# FLIP/WEAK: solo nei giorni di decisione e dopo min_holding_bars
|
|
if (
|
|
is_decision_day
|
|
and (not only_first_signal)
|
|
and (theta_exit is not None)
|
|
and (bars_in_trade >= min_holding_bars)
|
|
and np.isfinite(est_out)
|
|
):
|
|
if est_out <= theta_exit:
|
|
weak_streak = weak_streak + 1 if weak_days_exit else weak_streak
|
|
if weak_days_exit is None:
|
|
exit_reasons.append("FLIP")
|
|
elif weak_streak >= weak_days_exit:
|
|
exit_reasons.append("FLIP_STREAK")
|
|
else:
|
|
weak_streak = 0
|
|
|
|
if exit_reasons:
|
|
sig_out = 0
|
|
in_pos = False
|
|
entry_t = None
|
|
trade_pnl = 0.0
|
|
trade_peak = 0.0
|
|
weak_streak = 0
|
|
else:
|
|
trade_pnl = pnl_if_stay
|
|
trade_peak = peak_if_stay
|
|
|
|
rows.append((idx.iloc[t], sig_out, est_out, avg_dist,
|
|
r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan))
|
|
|
|
sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"])
|
|
sig_df["Signal_prev"] = sig_df["Signal"].shift(1).fillna(0)
|
|
trade_chg = (sig_df["Signal"] - sig_df["Signal_prev"]).abs()
|
|
cost = trade_chg * fee
|
|
sig_df["PnL"] = sig_df["Signal"] * sig_df["Ret+1"] - cost
|
|
sig_df.drop(columns=["Signal_prev"], inplace=True)
|
|
|
|
# === metriche di sintesi (stesso schema della funzione daily) ===
|
|
pnl_series = sig_df["PnL"].fillna(0.0)
|
|
n = max(1, pnl_series.notna().sum())
|
|
eq = (1.0 + pnl_series).cumprod()
|
|
peak = eq.cummax()
|
|
dd = (eq / peak - 1.0)
|
|
mdd = float(dd.min()) if dd.size else np.nan
|
|
cagr = float((eq.iloc[-1]) ** (252.0 / max(1, len(pnl_series))) - 1.0) if eq.iloc[-1] > 0 else np.nan
|
|
sigma = float(pnl_series.std(ddof=1)) if len(pnl_series) > 1 else np.nan
|
|
vol_a = sigma * np.sqrt(252) if np.isfinite(sigma) else np.nan
|
|
sharpe = float(pnl_series.mean() / sigma * np.sqrt(252)) if (np.isfinite(sigma) and sigma > 0) else np.nan
|
|
downside = pnl_series[pnl_series < 0]
|
|
sortino = (
|
|
float(pnl_series.mean() / downside.std(ddof=1) * np.sqrt(252))
|
|
if (len(downside) > 1 and downside.std(ddof=1) > 0)
|
|
else np.nan
|
|
)
|
|
calmar = float(cagr / abs(mdd)) if (np.isfinite(mdd) and mdd < 0 and np.isfinite(cagr)) else np.nan
|
|
|
|
# numero di trade reali
|
|
sig = sig_df["Signal"].astype(int).values
|
|
n_trades = int(((sig[1:] - sig[:-1]) > 0).sum()) # transizioni 0→1
|
|
|
|
stats = {
|
|
"CAGR_%": round(cagr * 100, 3) if np.isfinite(cagr) else np.nan,
|
|
"AnnVol_%": round(vol_a * 100, 3) if np.isfinite(vol_a) else np.nan,
|
|
"Sharpe": round(sharpe, 3) if np.isfinite(sharpe) else np.nan,
|
|
"Sortino": round(sortino, 3) if np.isfinite(sortino) else np.nan,
|
|
"MaxDD_%eq": round(mdd * 100, 3) if np.isfinite(mdd) else np.nan,
|
|
"Calmar": round(calmar, 3) if np.isfinite(calmar) else np.nan,
|
|
"HitRate_%": round(100 * (pnl_series > 0).sum() / max(1, (pnl_series != 0).sum()), 2),
|
|
"AvgTradeRet_bps": round(pnl_series.mean() * 10000, 2),
|
|
"Turnover_%/step": round(100 * trade_chg.mean(), 2),
|
|
"N_Steps": int(sig_df.shape[0]),
|
|
"N_Trades": n_trades,
|
|
# parametri usati (per tracciare la cella della grid search)
|
|
"Wp": int(Wp),
|
|
"Ha": int(Ha),
|
|
"k": int(k),
|
|
"theta_entry": float(theta_entry),
|
|
"theta_exit": (None if theta_exit is None else float(theta_exit)),
|
|
"sl_bps": (None if sl_bps is None else float(sl_bps)),
|
|
"tp_bps": (None if tp_bps is None else float(tp_bps)),
|
|
"trail_bps": (None if trail_bps is None else float(trail_bps)),
|
|
"time_stop_bars": (None if time_stop_bars is None else int(time_stop_bars)),
|
|
"weak_days_exit": (None if weak_days_exit is None else int(weak_days_exit)),
|
|
"decision_every": int(decision_every),
|
|
"min_holding_bars": int(min_holding_bars),
|
|
"only_first_signal": bool(only_first_signal),
|
|
}
|
|
|
|
return sig_df, stats
|
|
|
|
|
|
__all__ = ["knn_forward_backtest_multiday"]
|