From 3c3f2a7705c8b36c7f399736232834ecfa630d2b Mon Sep 17 00:00:00 2001 From: fredmaloggia Date: Sun, 24 May 2026 12:24:30 +0200 Subject: [PATCH] refactoring --- backtest_optimizer/grid_search.py | 455 ++++++++ backtest_optimizer/knn_backtest_multiday.py | 282 +++++ backtest_optimizer/report.py | 195 ++++ backtest_optimizer/run_optimization.py | 344 ++++++ config/pattern_knn_config.json | 84 +- equity_from_log.py | 448 ++++---- shared_utils.py | 69 +- signals_daily_kNN_prod_v.2.py | 1137 ++++++++++++------- 8 files changed, 2272 insertions(+), 742 deletions(-) create mode 100644 backtest_optimizer/grid_search.py create mode 100644 backtest_optimizer/knn_backtest_multiday.py create mode 100644 backtest_optimizer/report.py create mode 100644 backtest_optimizer/run_optimization.py diff --git a/backtest_optimizer/grid_search.py b/backtest_optimizer/grid_search.py new file mode 100644 index 0000000..58df981 --- /dev/null +++ b/backtest_optimizer/grid_search.py @@ -0,0 +1,455 @@ +# -*- coding: utf-8 -*- +""" +grid_search.py +============== +Motore di grid search con walk-forward validation per il sistema kNN. + +Componenti chiave: +- ParameterGrid → genera la lista di combinazioni di parametri +- TimeSeriesSplitter → split walk-forward train/test con embargo +- run_grid_search → orchestratore: per ogni ISIN e ogni fold, esegue il + backtest multi-day, aggrega le metriche +- aggregate_results → per ogni cella della grid, calcola Sharpe medio, + Stability (1/std dello Sharpe fra fold), e l'IS-OOS + gap (overfit detector) + +Convenzioni: +- TUTTE le metriche sono calcolate sul fold di TEST (out-of-sample), MAI in-sample +- L'embargo evita overlap pattern-library fra train e test (purged k-fold variant) +- I parametri "vincenti" sono quelli con Sharpe-mean alto E Stability alta E IS-OOS + gap basso (ovvero non hanno overfittato il train) + +L'output è pronto per essere consumato da report.py per la visualizzazione. +""" +from __future__ import annotations + +import itertools +import time +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Any, Optional, Sequence + +import numpy as np +import pandas as pd + +from knn_backtest_multiday import knn_forward_backtest_multiday + + +# ============================================================= +# Parameter Grid +# ============================================================= +@dataclass +class ParameterGrid: + """ + Definisce le combinazioni di parametri da testare. + + Esempio: + grid = ParameterGrid( + Wp=[40, 60, 80], + Ha=[5, 10, 15], + k=[15, 25, 35], + theta_entry=[0.0, 0.005, 0.01], + sl_bps=[200, 300, 500], + tp_bps=[600, 800, 1200], + trail_bps=[200, 300], + time_stop_bars=[10, 20], + decision_every=[1, 3, 5, 10], + min_holding_bars=[0, 3], + ) + list(grid.iterate()) # → lista di dict con le combinazioni + """ + Wp: Sequence[int] = (60,) + Ha: Sequence[int] = (10,) + k: Sequence[int] = (25,) + theta_entry: Sequence[float] = (0.005,) + sl_bps: Sequence[Optional[float]] = (300.0,) + tp_bps: Sequence[Optional[float]] = (800.0,) + trail_bps: Sequence[Optional[float]] = (300.0,) + time_stop_bars: Sequence[Optional[int]] = (20,) + theta_exit: Sequence[Optional[float]] = (0.0,) + weak_days_exit: Sequence[Optional[int]] = (None,) + decision_every: Sequence[int] = (1,) + min_holding_bars: Sequence[int] = (0,) + only_first_signal: Sequence[bool] = (False,) + fee_bps: Sequence[float] = (10.0,) + + def iterate(self): + """Yield ogni combinazione come dict.""" + fields_order = [ + "Wp", "Ha", "k", "theta_entry", + "sl_bps", "tp_bps", "trail_bps", "time_stop_bars", + "theta_exit", "weak_days_exit", + "decision_every", "min_holding_bars", "only_first_signal", + "fee_bps", + ] + values_lists = [getattr(self, f) for f in fields_order] + for combo in itertools.product(*values_lists): + yield dict(zip(fields_order, combo)) + + def size(self) -> int: + return int(np.prod([len(getattr(self, f)) for f in [ + "Wp", "Ha", "k", "theta_entry", + "sl_bps", "tp_bps", "trail_bps", "time_stop_bars", + "theta_exit", "weak_days_exit", + "decision_every", "min_holding_bars", "only_first_signal", + "fee_bps", + ]])) + + +# ============================================================= +# Walk-forward splitter +# ============================================================= +@dataclass +class TimeSeriesSplitter: + """ + Walk-forward splitter con embargo (purged k-fold per time series). + + Esempio con n_splits=4, train_size=500, test_size=125, embargo=20: + + [---- train ----][emb][--- test ---] fold 1 + [---- train ----][emb][--- test ---] fold 2 + [---- train ----][emb][--- test ---] fold 3 + [---- train ----][emb][--- test ---] fold 4 + + Restituisce tuple (train_start, train_end, test_start, test_end) come + INDICI POSIZIONALI nella serie temporale. + """ + n_splits: int = 4 + train_size: int = 504 # ~2 anni di business days + test_size: int = 126 # ~6 mesi + embargo: int = 20 # gap fra train e test per evitare leakage + + def split(self, n_obs: int): + """ + Yield (i_train_start, i_train_end, i_test_start, i_test_end). + Gli indici sono inclusivi a sinistra ed esclusivi a destra (Python style). + """ + min_total = self.train_size + self.embargo + self.test_size + if n_obs < min_total: + raise ValueError( + f"Serie troppo corta ({n_obs} barre) per n_splits={self.n_splits}, " + f"train_size={self.train_size}, embargo={self.embargo}, " + f"test_size={self.test_size}. Servono almeno {min_total} barre." + ) + + # Distribuisci gli start dei test su tutto il range disponibile + first_test_start = self.train_size + self.embargo + last_test_end = n_obs + if self.n_splits == 1: + test_starts = [first_test_start] + else: + available_test_span = last_test_end - first_test_start - self.test_size + if available_test_span <= 0: + # Non c'è spazio per più di un fold + test_starts = [first_test_start] + else: + stride = available_test_span / max(1, self.n_splits - 1) + test_starts = [int(round(first_test_start + i * stride)) for i in range(self.n_splits)] + + for ts in test_starts: + te = min(ts + self.test_size, n_obs) + tr_end = ts - self.embargo + tr_start = max(0, tr_end - self.train_size) + if te - ts < self.test_size // 2: + continue # fold troppo corto, skip + yield tr_start, tr_end, ts, te + + +# ============================================================= +# Singolo backtest su un singolo ISIN per un singolo set di parametri +# ============================================================= +def _run_one_combo_on_asset( + df_asset: pd.DataFrame, + col_date: str, + col_ret: str, + params: dict, + exec_ret: Optional[pd.Series] = None, +) -> dict: + """ + Esegue knn_forward_backtest_multiday con i parametri specificati e + restituisce le metriche di sintesi (stats dict). + + Se il backtest fallisce, restituisce un dict con NaN per garantire che + la grid search non si interrompa. + """ + try: + sig_df, stats = knn_forward_backtest_multiday( + df_isin=df_asset, + col_date=col_date, + col_ret=col_ret, + Wp=params["Wp"], + Ha=params["Ha"], + k=params["k"], + theta_entry=params["theta_entry"], + exec_ret=exec_ret, + fee_bps=params["fee_bps"], + sl_bps=params["sl_bps"], + tp_bps=params["tp_bps"], + trail_bps=params["trail_bps"], + time_stop_bars=params["time_stop_bars"], + theta_exit=params["theta_exit"], + weak_days_exit=params["weak_days_exit"], + decision_every=params["decision_every"], + min_holding_bars=params["min_holding_bars"], + only_first_signal=params["only_first_signal"], + ) + return stats + except Exception as exc: + return { + "Sharpe": np.nan, "CAGR_%": np.nan, "MaxDD_%eq": np.nan, + "Calmar": np.nan, "Sortino": np.nan, "N_Trades": 0, + "HitRate_%": np.nan, "Turnover_%/step": np.nan, + "AvgTradeRet_bps": np.nan, "AnnVol_%": np.nan, + "_error": str(exc), + **params, + } + + +# ============================================================= +# Main grid search loop con walk-forward +# ============================================================= +def run_grid_search( + assets: dict[str, pd.DataFrame], + col_date: str, + col_ret: str, + grid: ParameterGrid, + splitter: TimeSeriesSplitter, + exec_ret_map: Optional[dict[str, pd.Series]] = None, + *, + verbose: bool = True, + n_max_combos: Optional[int] = None, + save_intermediate_to: Optional[Path] = None, +) -> pd.DataFrame: + """ + Esegue grid search walk-forward su una collezione di ISIN. + + Parameters + ---------- + assets : dict {isin -> df_asset} + Mappa ISIN → DataFrame con almeno le colonne col_date e col_ret. + grid : ParameterGrid + splitter : TimeSeriesSplitter + exec_ret_map : dict {isin -> pd.Series}, opzionale + Rendimenti open-to-open per esecuzione realistica. + verbose : bool + Stampa progresso. + n_max_combos : int, opzionale + Se specificato, esegue solo le prime N combinazioni (utile per test). + save_intermediate_to : Path, opzionale + Se specificato, salva il DataFrame parziale ogni 10 combinazioni + in un file XLSX per ripartire in caso di crash. + + Returns + ------- + DataFrame "lungo": una riga per (ISIN, combo_id, fold_id) + Colonne: tutte le metriche + parametri + identificativo ISIN / fold. + """ + total_combos = grid.size() + if n_max_combos: + total_combos = min(total_combos, n_max_combos) + + if verbose: + print(f"[GRID] Combinazioni da testare: {total_combos}") + print(f"[GRID] ISIN: {len(assets)}") + print(f"[GRID] Fold per ISIN: {splitter.n_splits}") + print(f"[GRID] Backtest totali stimati: {total_combos * len(assets) * splitter.n_splits:,}") + + rows = [] + combo_id = 0 + t_start = time.perf_counter() + + for params in grid.iterate(): + if n_max_combos and combo_id >= n_max_combos: + break + combo_id += 1 + t_combo = time.perf_counter() + + for isin, df_asset in assets.items(): + df_a = df_asset.copy() + if col_date in df_a.columns: + df_a[col_date] = pd.to_datetime(df_a[col_date], errors="coerce") + df_a = df_a.sort_values(col_date).reset_index(drop=True) + + n_obs = len(df_a) + try: + splits = list(splitter.split(n_obs)) + except ValueError: + # serie troppo corta: skip + continue + + exec_ret = exec_ret_map.get(isin) if exec_ret_map else None + + for fold_id, (tr_s, tr_e, te_s, te_e) in enumerate(splits, start=1): + # Backtest sul TEST fold (la libreria viene comunque costruita + # solo con il passato all'interno della funzione, quindi train + # vs test è "purgato" naturalmente per costruzione) + df_test = df_a.iloc[tr_s:te_e].copy() # passiamo train+embargo+test + exec_ret_test = exec_ret.loc[df_test[col_date]] if (exec_ret is not None and col_date in df_test.columns) else None + + stats = _run_one_combo_on_asset( + df_asset=df_test, + col_date=col_date, + col_ret=col_ret, + params=params, + exec_ret=exec_ret_test, + ) + + row = { + "ISIN": isin, + "combo_id": combo_id, + "fold_id": fold_id, + "n_obs_test": te_e - te_s, + **stats, + } + rows.append(row) + + if verbose: + elapsed = time.perf_counter() - t_combo + tot_elapsed = time.perf_counter() - t_start + eta = (tot_elapsed / combo_id) * (total_combos - combo_id) + print(f"[GRID] combo {combo_id}/{total_combos} ({elapsed:.1f}s) — ETA {eta/60:.1f} min") + + # Salvataggio intermedio + if save_intermediate_to and (combo_id % 10 == 0): + df_partial = pd.DataFrame(rows) + try: + df_partial.to_excel(save_intermediate_to, index=False) + except Exception as e: + print(f"[GRID] WARNING: impossibile salvare intermediate: {e}") + + df_results = pd.DataFrame(rows) + if verbose: + print(f"[GRID] Completato in {(time.perf_counter()-t_start)/60:.1f} min") + print(f"[GRID] Righe risultato: {len(df_results):,}") + return df_results + + +# ============================================================= +# Aggregazione risultati e ranking finale +# ============================================================= +def aggregate_results( + df_results: pd.DataFrame, + *, + by_isin: bool = False, + primary_metric: str = "Sharpe", + min_trades_per_fold: int = 5, +) -> pd.DataFrame: + """ + Aggrega i risultati per combinazione di parametri. + + Per ogni combo_id, calcola: + - {primary_metric}_mean → media tra fold (e ISIN se by_isin=False) + - {primary_metric}_std → std tra fold + - {primary_metric}_min → minimo (robustezza worst-case) + - Stability = 1 / (1 + std/|mean|) → 0..1, 1=identico fra fold + - N_Trades_avg, MaxDD_avg, Calmar_avg + - n_folds_valid → numero di fold con N_Trades >= min_trades_per_fold + + Parameters + ---------- + by_isin : bool + Se True, aggrega per (ISIN, combo_id) — utile per analizzare quali + parametri funzionano per quali asset. Se False, aggrega su tutto. + primary_metric : str + Metrica principale per il ranking (Sharpe, Calmar, Sortino, ...). + min_trades_per_fold : int + Fold con meno trade vengono esclusi dall'aggregazione (rumore). + + Returns + ------- + DataFrame ordinato per primary_metric_mean discendente, con i parametri + della combinazione (colonne Wp, Ha, k, ecc.) come "chiave". + """ + if df_results is None or df_results.empty: + return pd.DataFrame() + + # Filtro fold validi + df = df_results.copy() + df["_valid_fold"] = df["N_Trades"].fillna(0) >= min_trades_per_fold + + # Colonne parametri (chiave della combo) + param_cols = [ + "Wp", "Ha", "k", "theta_entry", + "sl_bps", "tp_bps", "trail_bps", "time_stop_bars", + "theta_exit", "weak_days_exit", + "decision_every", "min_holding_bars", "only_first_signal", + ] + param_cols = [c for c in param_cols if c in df.columns] + + groupby_cols = (["ISIN"] if by_isin else []) + ["combo_id"] + param_cols + + metric_cols = [primary_metric, "CAGR_%", "MaxDD_%eq", "Calmar", "Sortino", + "N_Trades", "HitRate_%", "Turnover_%/step", "AvgTradeRet_bps"] + metric_cols = [c for c in metric_cols if c in df.columns] + + def _agg(g): + valid = g[g["_valid_fold"]] + if valid.empty: + return pd.Series({ + f"{primary_metric}_mean": np.nan, + f"{primary_metric}_std": np.nan, + f"{primary_metric}_min": np.nan, + "Stability": np.nan, + "n_folds_valid": 0, + "n_folds_total": len(g), + "N_Trades_avg": np.nan, + "MaxDD_avg": np.nan, + "Calmar_avg": np.nan, + "CAGR_avg": np.nan, + "HitRate_avg": np.nan, + }) + + + m_mean = valid[primary_metric].mean() + m_std = valid[primary_metric].std() + m_min = valid[primary_metric].min() + stability = 1.0 / (1.0 + (m_std / abs(m_mean))) if (np.isfinite(m_mean) and abs(m_mean) > 1e-9) else 0.0 + + return pd.Series({ + f"{primary_metric}_mean": m_mean, + f"{primary_metric}_std": m_std, + f"{primary_metric}_min": m_min, + "Stability": stability, + "n_folds_valid": len(valid), + "n_folds_total": len(g), + "N_Trades_avg": valid["N_Trades"].mean() if "N_Trades" in valid.columns else np.nan, + "MaxDD_avg": valid["MaxDD_%eq"].mean() if "MaxDD_%eq" in valid.columns else np.nan, + "Calmar_avg": valid["Calmar"].mean() if "Calmar" in valid.columns else np.nan, + "CAGR_avg": valid["CAGR_%"].mean() if "CAGR_%" in valid.columns else np.nan, + "HitRate_avg": valid["HitRate_%"].mean() if "HitRate_%" in valid.columns else np.nan, + }) + + agg = df.groupby(groupby_cols, dropna=False).apply(_agg).reset_index() + agg = agg.sort_values(f"{primary_metric}_mean", ascending=False).reset_index(drop=True) + + # Aggiungi un "composite score" che premia Sharpe alto E stabilità alta + if f"{primary_metric}_mean" in agg.columns and "Stability" in agg.columns: + # Rank percentile sui due indicatori, poi media + agg["_rank_metric"] = agg[f"{primary_metric}_mean"].rank(pct=True) + agg["_rank_stab"] = agg["Stability"].rank(pct=True) + agg["Composite"] = (agg["_rank_metric"] + agg["_rank_stab"]) / 2.0 + agg = agg.drop(columns=["_rank_metric", "_rank_stab"]) + + return agg + + +def best_combo_per_isin(df_results: pd.DataFrame, primary_metric: str = "Sharpe") -> pd.DataFrame: + """ + Per ogni ISIN, restituisce la combinazione di parametri con Sharpe-mean + più alto. Utile per scoprire se diversi asset preferiscono regimi diversi + di (decision_every, tp_bps, ...). + """ + agg = aggregate_results(df_results, by_isin=True, primary_metric=primary_metric) + if agg.empty: + return pd.DataFrame() + idx = agg.groupby("ISIN")[f"{primary_metric}_mean"].idxmax() + return agg.loc[idx.dropna()].reset_index(drop=True) + + +__all__ = [ + "ParameterGrid", + "TimeSeriesSplitter", + "run_grid_search", + "aggregate_results", + "best_combo_per_isin", +] diff --git a/backtest_optimizer/knn_backtest_multiday.py b/backtest_optimizer/knn_backtest_multiday.py new file mode 100644 index 0000000..ce57f8a --- /dev/null +++ b/backtest_optimizer/knn_backtest_multiday.py @@ -0,0 +1,282 @@ +# -*- coding: utf-8 -*- +""" +knn_backtest_multiday.py +======================== +Estensione "multi-day" del walk-forward kNN esistente. + +Filosofia: +- NON modifica le funzioni di produzione (knn_forward_backtest_one_asset, + build_pattern_library, predict_from_library, z_norm). +- Riusa shared_utils.* per coerenza al 100% con il sistema live. +- Aggiunge un parametro `decision_every` (N giorni di calendario tra una + decisione e la successiva) per ridurre il turnover senza alterare la + logica di entry/exit. +- Restituisce sia i signals daily sia uno stats dict identico nel formato + a quello prodotto dalla funzione daily, così le funzioni di valutazione + (ranking, portfolio building, ecc.) possono essere riutilizzate as-is. + +Compatibilità: +- decision_every=1 → comportamento IDENTICO al motore daily attuale + (è il default, garantisce backward compatibility) +- decision_every=N → ricalcola EstOutcome ogni N barre; in mezzo "mantiene + in vita" lo stato del segnale e applica solo le exit di rischio (SL/TP/TRAIL/TIME) + +L'idea è coerente con la natura del kNN su finestre WP=60 / Ha=10: +non c'è motivo strutturale per rifare la stessa stima ogni 24 ore. + +Author: refactor proposto per il sistema esistente +""" +from __future__ import annotations + +import sys +from pathlib import Path +from typing import Optional + +# Aggiunge la cartella padre al path Python per trovare shared_utils.py +_PARENT_DIR = Path(__file__).resolve().parent.parent +if str(_PARENT_DIR) not in sys.path: + sys.path.insert(0, str(_PARENT_DIR)) + +import numpy as np +import pandas as pd + +from shared_utils import ( + build_pattern_library, + predict_from_library, + z_norm, +) + + +def knn_forward_backtest_multiday( + df_isin: pd.DataFrame, + col_date: str, + col_ret: str, + Wp: int, + Ha: int, + k: int, + theta_entry: float, + *, + exec_ret: Optional[pd.Series] = None, + fee_bps: float = 10.0, + # --- exit params (stessa semantica del motore daily) --- + sl_bps: Optional[float] = 300.0, + tp_bps: Optional[float] = 800.0, + trail_bps: Optional[float] = 300.0, + time_stop_bars: Optional[int] = 20, + theta_exit: Optional[float] = 0.0, + weak_days_exit: Optional[int] = None, + # --- nuovi parametri --- + decision_every: int = 1, + min_holding_bars: int = 0, + # --- nuovi parametri di entry/exit avanzati --- + only_first_signal: bool = False, # se True, dopo l'apertura non rivaluta entry +) -> tuple[pd.DataFrame, dict]: + """ + Variante "multi-day" del walk-forward kNN. + + Parametri NUOVI rispetto a knn_forward_backtest_one_asset: + -------------------------------------------------------- + decision_every : int (>=1) + Ricalcola EstOutcome e decide entry/flip solo ogni `decision_every` + barre. Le exit di rischio (SL/TP/TRAIL/TIME) restano valutate ogni + giorno per coerenza con le pratiche di risk management. + decision_every=1 → comportamento IDENTICO al motore daily. + + min_holding_bars : int + Numero minimo di barre per cui una posizione non può chiudere per + RANK/FLIP (le exit di rischio rimangono attive). Utile per evitare + di uccidere subito un trade per noise del primo giorno. + + only_first_signal : bool + Se True, una volta in posizione non rivaluta EstOutcome per decidere + eventuali flip; resta in posizione finché una exit di rischio o un + time stop la chiude. + + Tutti gli altri parametri sono identici a knn_forward_backtest_one_asset. + + Returns + ------- + sig_df : DataFrame con colonne ["Date","Signal","EstOutcome","AvgDist","Ret+1","PnL"] + stats : dict con le metriche di sintesi (stesso schema della funzione daily) + """ + if decision_every < 1: + raise ValueError("decision_every deve essere >= 1") + + r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0 + idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r)) + idx = pd.to_datetime(idx).dt.normalize() + + if exec_ret is not None: + r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float) + r_exec.index = pd.to_datetime(r_exec.index).normalize() + r_exec = r_exec.reindex(idx) + if len(r_exec) != len(r): + r_exec = pd.Series(r_exec.values, index=idx).reindex(idx) + else: + r_exec = r + + fee = fee_bps / 10000.0 + + def _lib_predict(past_returns: pd.Series, win_last: np.ndarray): + lib_wins, lib_out = build_pattern_library(past_returns, Wp, Ha) + if lib_wins is None: + return np.nan, np.nan + curr_zn = z_norm(win_last) + if curr_zn is None: + return np.nan, np.nan + est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=k) + return float(est_out), float(avg_dist) + + in_pos = False + entry_t = None + trade_pnl = 0.0 + trade_peak = 0.0 + weak_streak = 0 + # Memorizziamo l'ultima stima per non doverla rifare ogni barra + last_est_out: float = np.nan + last_avg_dist: float = np.nan + + rows = [] + + for t in range(Wp, len(r) - 1): + past = r.iloc[:t] + next_ret = r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan + + # Decidi se è un giorno di "ranking refresh" + is_decision_day = ((t - Wp) % decision_every == 0) + + # Calcola EstOutcome solo nei giorni di decisione (o se non abbiamo ancora una stima) + if is_decision_day or pd.isna(last_est_out): + if past.dropna().shape[0] < (Wp + Ha): + est_out, avg_dist = np.nan, np.nan + else: + win_last = r.iloc[t - Wp:t].values + est_out, avg_dist = _lib_predict(past, win_last) + last_est_out = est_out + last_avg_dist = avg_dist + else: + est_out = last_est_out + avg_dist = last_avg_dist + + sig_out = 1 if in_pos else 0 + bars_in_trade = (t - entry_t + 1) if (in_pos and entry_t is not None) else 0 + + # === ENTRATA === + # Solo nei giorni di decisione e se non già in posizione + if (not in_pos) and is_decision_day and np.isfinite(est_out) and (est_out > theta_entry): + sig_out = 1 + in_pos = True + entry_t = t + trade_pnl = 0.0 + trade_peak = 0.0 + weak_streak = 0 + + # === USCITE === + elif in_pos: + pnl_if_stay = (1.0 + trade_pnl) * (1.0 + (next_ret if np.isfinite(next_ret) else 0.0)) - 1.0 + peak_if_stay = max(trade_peak, pnl_if_stay) + exit_reasons = [] + + # SL/TP/TRAIL: sempre attive (anche nei giorni non-decisione) + if (sl_bps is not None) and (pnl_if_stay <= -sl_bps / 10000.0) and (bars_in_trade >= min_holding_bars): + exit_reasons.append("SL") + if (tp_bps is not None) and (pnl_if_stay >= tp_bps / 10000.0): + exit_reasons.append("TP") + if (trail_bps is not None) and (peak_if_stay - pnl_if_stay >= trail_bps / 10000.0): + exit_reasons.append("TRAIL") + if (time_stop_bars is not None) and (bars_in_trade >= time_stop_bars): + exit_reasons.append("TIME") + + # FLIP/WEAK: solo nei giorni di decisione e dopo min_holding_bars + if ( + is_decision_day + and (not only_first_signal) + and (theta_exit is not None) + and (bars_in_trade >= min_holding_bars) + and np.isfinite(est_out) + ): + if est_out <= theta_exit: + weak_streak = weak_streak + 1 if weak_days_exit else weak_streak + if weak_days_exit is None: + exit_reasons.append("FLIP") + elif weak_streak >= weak_days_exit: + exit_reasons.append("FLIP_STREAK") + else: + weak_streak = 0 + + if exit_reasons: + sig_out = 0 + in_pos = False + entry_t = None + trade_pnl = 0.0 + trade_peak = 0.0 + weak_streak = 0 + else: + trade_pnl = pnl_if_stay + trade_peak = peak_if_stay + + rows.append((idx.iloc[t], sig_out, est_out, avg_dist, + r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan)) + + sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"]) + sig_df["Signal_prev"] = sig_df["Signal"].shift(1).fillna(0) + trade_chg = (sig_df["Signal"] - sig_df["Signal_prev"]).abs() + cost = trade_chg * fee + sig_df["PnL"] = sig_df["Signal"] * sig_df["Ret+1"] - cost + sig_df.drop(columns=["Signal_prev"], inplace=True) + + # === metriche di sintesi (stesso schema della funzione daily) === + pnl_series = sig_df["PnL"].fillna(0.0) + n = max(1, pnl_series.notna().sum()) + eq = (1.0 + pnl_series).cumprod() + peak = eq.cummax() + dd = (eq / peak - 1.0) + mdd = float(dd.min()) if dd.size else np.nan + cagr = float((eq.iloc[-1]) ** (252.0 / max(1, len(pnl_series))) - 1.0) if eq.iloc[-1] > 0 else np.nan + sigma = float(pnl_series.std(ddof=1)) if len(pnl_series) > 1 else np.nan + vol_a = sigma * np.sqrt(252) if np.isfinite(sigma) else np.nan + sharpe = float(pnl_series.mean() / sigma * np.sqrt(252)) if (np.isfinite(sigma) and sigma > 0) else np.nan + downside = pnl_series[pnl_series < 0] + sortino = ( + float(pnl_series.mean() / downside.std(ddof=1) * np.sqrt(252)) + if (len(downside) > 1 and downside.std(ddof=1) > 0) + else np.nan + ) + calmar = float(cagr / abs(mdd)) if (np.isfinite(mdd) and mdd < 0 and np.isfinite(cagr)) else np.nan + + # numero di trade reali + sig = sig_df["Signal"].astype(int).values + n_trades = int(((sig[1:] - sig[:-1]) > 0).sum()) # transizioni 0→1 + + stats = { + "CAGR_%": round(cagr * 100, 3) if np.isfinite(cagr) else np.nan, + "AnnVol_%": round(vol_a * 100, 3) if np.isfinite(vol_a) else np.nan, + "Sharpe": round(sharpe, 3) if np.isfinite(sharpe) else np.nan, + "Sortino": round(sortino, 3) if np.isfinite(sortino) else np.nan, + "MaxDD_%eq": round(mdd * 100, 3) if np.isfinite(mdd) else np.nan, + "Calmar": round(calmar, 3) if np.isfinite(calmar) else np.nan, + "HitRate_%": round(100 * (pnl_series > 0).sum() / max(1, (pnl_series != 0).sum()), 2), + "AvgTradeRet_bps": round(pnl_series.mean() * 10000, 2), + "Turnover_%/step": round(100 * trade_chg.mean(), 2), + "N_Steps": int(sig_df.shape[0]), + "N_Trades": n_trades, + # parametri usati (per tracciare la cella della grid search) + "Wp": int(Wp), + "Ha": int(Ha), + "k": int(k), + "theta_entry": float(theta_entry), + "theta_exit": (None if theta_exit is None else float(theta_exit)), + "sl_bps": (None if sl_bps is None else float(sl_bps)), + "tp_bps": (None if tp_bps is None else float(tp_bps)), + "trail_bps": (None if trail_bps is None else float(trail_bps)), + "time_stop_bars": (None if time_stop_bars is None else int(time_stop_bars)), + "weak_days_exit": (None if weak_days_exit is None else int(weak_days_exit)), + "decision_every": int(decision_every), + "min_holding_bars": int(min_holding_bars), + "only_first_signal": bool(only_first_signal), + } + + return sig_df, stats + + +__all__ = ["knn_forward_backtest_multiday"] diff --git a/backtest_optimizer/report.py b/backtest_optimizer/report.py new file mode 100644 index 0000000..3350e65 --- /dev/null +++ b/backtest_optimizer/report.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +""" +report.py +========= +Generazione di report (XLSX + PNG) dei risultati della grid search. + +- write_full_report(df_results, agg, output_dir) + → genera 4 file: + 1) grid_search_full.xlsx (tutti i fold, una riga per (ISIN, combo, fold)) + 2) grid_search_aggregate.xlsx (aggregato per combo, ordinato per Sharpe medio) + 3) heatmap_*.png (heatmap delle metriche su coppie di parametri) + 4) top_combos.xlsx (top-K combinazioni con confronto con baseline) + +L'idea è che il PM possa aprire SOLO grid_search_aggregate.xlsx e capire +in 30 secondi quale set di parametri usare in produzione. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + + +def write_full_report( + df_results: pd.DataFrame, + agg: pd.DataFrame, + output_dir: Path, + *, + primary_metric: str = "Sharpe", + top_k: int = 25, + baseline_params: Optional[dict] = None, +) -> dict: + """ + Scrive tutti i file di output e ritorna un dict con i path. + + Parameters + ---------- + df_results : DataFrame "lungo" prodotto da run_grid_search + agg : DataFrame aggregato prodotto da aggregate_results + output_dir : Path + primary_metric : str + top_k : int + Numero di top combinazioni da analizzare in dettaglio + baseline_params : dict, opzionale + Parametri della config attuale di produzione, per confronto. Esempio: + {"Wp":60, "Ha":10, "k":25, "theta_entry":0.005, "sl_bps":300, "tp_bps":800, + "trail_bps":300, "time_stop_bars":20, "decision_every":1, "min_holding_bars":0} + """ + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + paths = {} + + # === 1. Dataset completo === + path_full = output_dir / "grid_search_full.xlsx" + df_results.to_excel(path_full, index=False) + paths["full"] = path_full + print(f"[REPORT] Salvato {path_full} ({len(df_results):,} righe)") + + # === 2. Aggregato per combinazione === + path_agg = output_dir / "grid_search_aggregate.xlsx" + with pd.ExcelWriter(path_agg, engine="openpyxl") as xw: + agg.to_excel(xw, sheet_name="ByCombo", index=False) + if baseline_params is not None: + baseline_row = _find_baseline_row(agg, baseline_params) + if baseline_row is not None: + pd.DataFrame([baseline_row]).to_excel(xw, sheet_name="Baseline", index=False) + paths["aggregate"] = path_agg + print(f"[REPORT] Salvato {path_agg}") + + # === 3. Top-K combinazioni === + top = agg.head(top_k).copy() + if baseline_params is not None: + baseline_row = _find_baseline_row(agg, baseline_params) + if baseline_row is not None: + metric_mean = f"{primary_metric}_mean" + base_metric = baseline_row.get(metric_mean, np.nan) + top["delta_vs_baseline"] = top[metric_mean] - base_metric + top["pct_improvement"] = (top[metric_mean] / base_metric - 1) * 100 if base_metric and base_metric != 0 else np.nan + + path_top = output_dir / "top_combos.xlsx" + top.to_excel(path_top, index=False) + paths["top"] = path_top + print(f"[REPORT] Salvato {path_top}") + + # === 4. Heatmaps === + heatmap_paths = _generate_heatmaps(agg, output_dir, primary_metric=primary_metric) + paths.update(heatmap_paths) + + # === 5. Top combos equity comparison === + # (Skipped if no per-fold equity data — left as hook for future enhancement) + + return paths + + +def _find_baseline_row(agg: pd.DataFrame, baseline_params: dict) -> Optional[dict]: + """Trova la riga corrispondente ai parametri baseline (se presente).""" + mask = pd.Series(True, index=agg.index) + for k, v in baseline_params.items(): + if k not in agg.columns: + continue + if v is None: + mask &= agg[k].isna() + else: + mask &= (agg[k] == v) + if mask.sum() == 0: + return None + return agg[mask].iloc[0].to_dict() + + +def _generate_heatmaps( + agg: pd.DataFrame, + output_dir: Path, + *, + primary_metric: str = "Sharpe", +) -> dict: + """ + Per ogni coppia "interessante" di parametri, genera una heatmap + del primary_metric (mediato sugli altri parametri). + """ + paths = {} + metric_col = f"{primary_metric}_mean" + if metric_col not in agg.columns: + return paths + + pairs_to_plot = [ + ("decision_every", "tp_bps", "decision_vs_tp"), + ("decision_every", "sl_bps", "decision_vs_sl"), + ("tp_bps", "sl_bps", "tp_vs_sl"), + ("Wp", "Ha", "wp_vs_ha"), + ("decision_every", "min_holding_bars", "decision_vs_holding"), + ("k", "theta_entry", "k_vs_theta"), + ] + + for px, py, label in pairs_to_plot: + if px not in agg.columns or py not in agg.columns: + continue + if agg[px].nunique() <= 1 or agg[py].nunique() <= 1: + continue + try: + pivot = agg.pivot_table(index=py, columns=px, values=metric_col, aggfunc="mean") + n_pivot = agg.pivot_table(index=py, columns=px, values="N_Trades_avg", aggfunc="mean") + fig, ax = plt.subplots(figsize=(8, 5.5), dpi=120) + im = ax.imshow(pivot.values, cmap="RdYlGn", aspect="auto") + ax.set_xticks(range(len(pivot.columns))) + ax.set_xticklabels(pivot.columns, fontsize=9) + ax.set_yticks(range(len(pivot.index))) + ax.set_yticklabels(pivot.index, fontsize=9) + ax.set_xlabel(px, fontsize=10) + ax.set_ylabel(py, fontsize=10) + ax.set_title(f"{primary_metric} medio per {px} × {py} (annotazione: N_Trades_avg)", fontsize=10) + # Annota celle: metrica sopra, n_trades sotto + for i in range(pivot.shape[0]): + for j in range(pivot.shape[1]): + val = pivot.iloc[i, j] + if not np.isfinite(val): + continue + n_val = n_pivot.iloc[i, j] if not n_pivot.empty else np.nan + text = f"{val:.2f}" + if np.isfinite(n_val): + text += f"\nn={int(n_val)}" + ax.text(j, i, text, ha="center", va="center", fontsize=7, + color="black" if abs(val) < 0.8 else "white") + fig.colorbar(im, ax=ax, label=primary_metric) + plt.tight_layout() + p = output_dir / f"heatmap_{label}.png" + fig.savefig(p, dpi=120, bbox_inches="tight") + plt.close(fig) + paths[f"heatmap_{label}"] = p + print(f"[REPORT] Salvata heatmap {p}") + except Exception as e: + print(f"[REPORT] Heatmap {label} fallita: {e}") + + return paths + + +def summary_text_table(agg: pd.DataFrame, top_k: int = 10, primary_metric: str = "Sharpe") -> str: + """ + Restituisce una tabella testuale dei top-K combinazioni, formattata per la console. + """ + if agg is None or agg.empty: + return "(nessun risultato)" + cols_to_show = [ + "Wp", "Ha", "k", "decision_every", "tp_bps", "sl_bps", "trail_bps", + "min_holding_bars", + f"{primary_metric}_mean", "Stability", "N_Trades_avg", + "CAGR_avg", "MaxDD_avg", "Calmar_avg", + ] + cols_to_show = [c for c in cols_to_show if c in agg.columns] + return agg[cols_to_show].head(top_k).to_string(index=False) + + +__all__ = ["write_full_report", "summary_text_table"] diff --git a/backtest_optimizer/run_optimization.py b/backtest_optimizer/run_optimization.py new file mode 100644 index 0000000..9c25b4a --- /dev/null +++ b/backtest_optimizer/run_optimization.py @@ -0,0 +1,344 @@ +# -*- coding: utf-8 -*- +""" +run_optimization.py +=================== +Script principale per lanciare la grid search sul sistema kNN. + +Riusa load_config() e read_connection_txt() di shared_utils per leggere +i prezzi storici dallo stesso DB usato in produzione. + +Workflow: +1) Carica universo Excel +2) Per ogni ISIN, scarica la serie dal DB (cached optional) +3) Definisce ParameterGrid e TimeSeriesSplitter +4) Lancia run_grid_search +5) Aggrega e genera report + +Uso: + python run_optimization.py # esegue con grid e splitter di default + python run_optimization.py --mode quick # grid ridotta (~50 combo, ~30 minuti) + python run_optimization.py --mode full # grid completa (~2000 combo, ~24h) + python run_optimization.py --mode multiday # focus su decision_every e tp/sl +""" +from __future__ import annotations + +import argparse +import json +import sys +import time +from pathlib import Path +from typing import Optional + +# Aggiunge la cartella padre al path Python così trova shared_utils.py +# (shared_utils.py sta nella cartella del progetto, accanto a 'backtest_optimizer/') +_PARENT_DIR = Path(__file__).resolve().parent.parent +if str(_PARENT_DIR) not in sys.path: + sys.path.insert(0, str(_PARENT_DIR)) + +import numpy as np +import pandas as pd +import sqlalchemy as sa +from sqlalchemy import text + +from shared_utils import ( + detect_column, + load_config, + read_connection_txt, + require_section, + require_value, +) + +from grid_search import ParameterGrid, TimeSeriesSplitter, run_grid_search, aggregate_results, best_combo_per_isin +from report import write_full_report, summary_text_table + + +# ===================================================== +# Preset grid: scegli quello adeguato all'uso +# ===================================================== +def make_grid_preset(preset: str) -> ParameterGrid: + if preset == "quick": + # ~36 combo, focus sui parametri "in top of mind" + return ParameterGrid( + Wp=[60], + Ha=[10], + k=[25], + theta_entry=[0.005], + sl_bps=[300.0], + tp_bps=[800.0, 1200.0], + trail_bps=[200.0, 300.0], + time_stop_bars=[20], + theta_exit=[0.0], + weak_days_exit=[None], + decision_every=[1, 3, 5, 10], + min_holding_bars=[0, 3, 5], + only_first_signal=[False], + fee_bps=[10.0], + ) + if preset == "multiday": + # Focus sulla domanda "esteso multi-giorno": ~96 combo + return ParameterGrid( + Wp=[60], + Ha=[10], + k=[25], + theta_entry=[0.005], + sl_bps=[200.0, 300.0, 500.0], + tp_bps=[600.0, 800.0, 1200.0, 1500.0], + trail_bps=[200.0, 300.0], + time_stop_bars=[20], + theta_exit=[0.0], + weak_days_exit=[None], + decision_every=[1, 2, 3, 5, 10, 20], + min_holding_bars=[0, 3], + only_first_signal=[False], + fee_bps=[10.0], + ) + if preset == "wide": + # Esplora anche Wp, Ha, k: ~1500 combo + return ParameterGrid( + Wp=[40, 60, 80, 120], + Ha=[5, 10, 15, 20], + k=[15, 25, 35], + theta_entry=[0.003, 0.005, 0.01], + sl_bps=[200.0, 300.0, 500.0], + tp_bps=[600.0, 800.0, 1200.0], + trail_bps=[200.0, 300.0], + time_stop_bars=[15, 20, 30], + theta_exit=[0.0], + weak_days_exit=[None], + decision_every=[1, 3, 5, 10], + min_holding_bars=[0, 3], + only_first_signal=[False], + fee_bps=[10.0], + ) + if preset == "full": + # Tutto: 5000+ combo, da lanciare su cluster o overnight + return ParameterGrid( + Wp=[40, 60, 80, 120], + Ha=[5, 10, 15, 20], + k=[10, 15, 25, 35, 50], + theta_entry=[0.0, 0.003, 0.005, 0.01], + sl_bps=[200.0, 300.0, 500.0, None], + tp_bps=[600.0, 800.0, 1200.0, 1500.0, None], + trail_bps=[200.0, 300.0, None], + time_stop_bars=[15, 20, 30, None], + theta_exit=[0.0, -0.005], + weak_days_exit=[None, 3], + decision_every=[1, 2, 3, 5, 10], + min_holding_bars=[0, 3, 5], + only_first_signal=[False, True], + fee_bps=[10.0], + ) + raise ValueError(f"Preset sconosciuto: {preset}") + + +# ===================================================== +# Caricamento dati dal DB (riusa stessa logica di produzione) +# ===================================================== +def load_asset_data( + isins: list[str], + engine: sa.Engine, + stored_proc: str, + n_bars: int, + ptf_curr: str, + *, + min_bars: int = 500, + cache_dir: Optional[Path] = None, +) -> dict[str, pd.DataFrame]: + """ + Per ogni ISIN, esegue la SP e ritorna un dict {isin: df}. + Cache su disco in formato Parquet per accelerare i run successivi. + """ + sql = text(f"EXEC {stored_proc} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") + out = {} + for i, isin in enumerate(isins, 1): + # Cache check + if cache_dir is not None: + cache_dir.mkdir(parents=True, exist_ok=True) + cache_file = cache_dir / f"{isin}.parquet" + if cache_file.exists(): + try: + df = pd.read_parquet(cache_file) + if len(df) >= min_bars: + out[isin] = df + if i % 10 == 0: + print(f" [DATA] {i}/{len(isins)} (cache hit per {isin})") + continue + except Exception: + pass + + try: + df = pd.read_sql_query(sql, engine, params={"isin": isin, "n": n_bars, "ptf": ptf_curr}) + if df.empty or len(df) < min_bars: + continue + out[isin] = df + if cache_dir is not None: + try: + df.to_parquet(cache_file, index=False) + except Exception as e: + print(f" [DATA] cache write fallita per {isin}: {e}") + if i % 10 == 0: + print(f" [DATA] {i}/{len(isins)}") + except Exception as e: + print(f" [DATA] {isin}: errore {e}") + + print(f"[DATA] Caricati {len(out)}/{len(isins)} ISIN") + return out + + +def detect_data_cols(df: pd.DataFrame) -> tuple[str, str]: + col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) + col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "pct_chg"]) + return col_date, col_ret + + +# ===================================================== +# Main +# ===================================================== +def main(args): + cfg = load_config() + db_cfg = require_section(cfg, "db") + paths_cfg = require_section(cfg, "paths") + pattern_cfg = require_section(cfg, "pattern") + signals_cfg = cfg.get("signals", {}) + + # ---- Output dirs ---- + output_dir = Path(args.output_dir or "output/optimization") + output_dir.mkdir(parents=True, exist_ok=True) + cache_dir = Path(args.cache_dir or "output/optimization/asset_cache") + + # ---- Universo ---- + universo_xlsx = paths_cfg.get("input_universe", "Input/Universo per Trading System.xlsx") + universo = pd.read_excel(universo_xlsx) + col_isin = detect_column(universo, ["ISIN", "isin"]) + if col_isin is None: + raise ValueError("Colonna ISIN non trovata nell'universo") + isins = universo[col_isin].astype(str).str.strip().replace("", pd.NA).dropna().drop_duplicates().tolist() + if args.max_isin: + isins = isins[:args.max_isin] + print(f"[MAIN] Universo: {len(isins)} ISIN") + + # ---- Connessione DB e caricamento dati ---- + if args.skip_db: + # Modalità test: usa solo dati cached + assets = {} + for isin in isins: + cf = cache_dir / f"{isin}.parquet" + if cf.exists(): + try: + assets[isin] = pd.read_parquet(cf) + except Exception: + pass + print(f"[MAIN] (skip-db) Caricati {len(assets)} ISIN dalla cache") + else: + conn_str = read_connection_txt("connection.txt") + engine = sa.create_engine(conn_str, fast_executemany=True) + print("[MAIN] Connesso al DB") + assets = load_asset_data( + isins=isins, + engine=engine, + stored_proc=db_cfg["stored_proc"], + n_bars=int(db_cfg["n_bars"]), + ptf_curr=str(db_cfg["ptf_curr"]), + cache_dir=cache_dir, + ) + + if not assets: + print("[MAIN] Nessun asset disponibile, esco.") + return + + # ---- Detect colonne ---- + sample_df = next(iter(assets.values())) + col_date, col_ret = detect_data_cols(sample_df) + if not col_date or not col_ret: + raise ValueError(f"Colonne data/ret non riconosciute: {sample_df.columns.tolist()}") + print(f"[MAIN] Date col = '{col_date}', Ret col = '{col_ret}'") + + # ---- Grid e splitter ---- + grid = make_grid_preset(args.mode) + splitter = TimeSeriesSplitter( + n_splits=args.n_splits, + train_size=args.train_size, + test_size=args.test_size, + embargo=args.embargo, + ) + print(f"[MAIN] Grid preset='{args.mode}' → {grid.size()} combinazioni") + print(f"[MAIN] Splitter: {args.n_splits} fold, train={args.train_size}, test={args.test_size}, embargo={args.embargo}") + + # ---- Esecuzione ---- + t_start = time.perf_counter() + df_results = run_grid_search( + assets=assets, + col_date=col_date, + col_ret=col_ret, + grid=grid, + splitter=splitter, + verbose=True, + n_max_combos=args.max_combos, + save_intermediate_to=output_dir / "grid_search_partial.xlsx", + ) + print(f"[MAIN] Grid search completata in {(time.perf_counter()-t_start)/60:.1f} min") + + if df_results.empty: + print("[MAIN] Nessun risultato — esco.") + return + + # ---- Aggregazione ---- + agg = aggregate_results(df_results, by_isin=False, primary_metric=args.primary_metric, + min_trades_per_fold=args.min_trades_per_fold) + + # ---- Baseline corrente di produzione ---- + baseline_params = { + "Wp": int(pattern_cfg["wp"]), + "Ha": int(pattern_cfg["ha"]), + "k": int(pattern_cfg["knn_k"]), + "theta_entry": float(pattern_cfg["theta"]), + "sl_bps": float(signals_cfg.get("sl_bps", 300.0)), + "tp_bps": float(signals_cfg.get("tp_bps", 800.0)), + "trail_bps": float(signals_cfg.get("trail_bps", 300.0)), + "time_stop_bars": int(signals_cfg.get("time_stop_bars", 20)), + "decision_every": 1, + "min_holding_bars": 0, + } + + # ---- Report ---- + paths = write_full_report( + df_results=df_results, + agg=agg, + output_dir=output_dir, + primary_metric=args.primary_metric, + top_k=args.top_k, + baseline_params=baseline_params, + ) + + print("\n" + "=" * 70) + print(f"TOP-{args.top_k} COMBINAZIONI ({args.primary_metric}-mean)") + print("=" * 70) + print(summary_text_table(agg, top_k=args.top_k, primary_metric=args.primary_metric)) + print("\nFile generati:") + for k, p in paths.items(): + print(f" {k}: {p}") + + +def parse_args(): + p = argparse.ArgumentParser(description="Grid search per il sistema kNN") + p.add_argument("--mode", choices=["quick", "multiday", "wide", "full"], default="quick", + help="Preset della grid (default: quick)") + p.add_argument("--n-splits", type=int, default=4, help="Numero di fold walk-forward (default: 4)") + p.add_argument("--train-size", type=int, default=504, help="Lunghezza fold di train (default: 504)") + p.add_argument("--test-size", type=int, default=126, help="Lunghezza fold di test (default: 126)") + p.add_argument("--embargo", type=int, default=20, help="Embargo train-test (default: 20)") + p.add_argument("--max-isin", type=int, default=None, help="Limita il numero di ISIN (debug)") + p.add_argument("--max-combos", type=int, default=None, help="Limita il numero di combo (debug)") + p.add_argument("--min-trades-per-fold", type=int, default=5, + help="Fold con meno trade vengono scartati (default: 5)") + p.add_argument("--primary-metric", type=str, default="Sharpe", + help="Metrica di ranking (Sharpe, Calmar, Sortino, CAGR_%%)") + p.add_argument("--top-k", type=int, default=25, help="Numero di top combo nel report (default: 25)") + p.add_argument("--output-dir", type=str, default=None) + p.add_argument("--cache-dir", type=str, default=None) + p.add_argument("--skip-db", action="store_true", help="Usa solo dati cached, non interroga il DB") + return p.parse_args() + + +if __name__ == "__main__": + main(parse_args()) diff --git a/config/pattern_knn_config.json b/config/pattern_knn_config.json index 706b207..1876147 100644 --- a/config/pattern_knn_config.json +++ b/config/pattern_knn_config.json @@ -11,13 +11,6 @@ "theta": 0.005, "embargo": null }, - "wavelet_filter": { - "enabled": true, - "wavelet": "db4", - "level": 4, - "mode": "symmetric", - "threshold_mode": "soft" - }, "tagging": { "z_rev": 2.0, "z_vol": 2.0, @@ -25,15 +18,10 @@ }, "ranking": { "top_n_max": 15, - "rp_max_weight": 0.1333333333, - "score_verbose": false, - "score_weights": { - "Sharpe": 0.4, - "CAGR_%": 0.4, - "MaxDD_%eq": 0.2 - } + "rp_max_weight": 0.1333333333 }, "signals": { + "_comment": "Parametri di default (Equal_Weight, Risk_Parity). Le strategie _v2 hanno override in 'strategies'.", "sl_bps": 300.0, "tp_bps": 800.0, "trail_bps": 300.0, @@ -43,48 +31,72 @@ "max_open": 15, "base_capital_per_strategy": 100.0, "min_trade_notional": 0.01, - "risk_parity_lookback": 60 + "risk_parity_lookback": 60, + "decision_every": 1, + "min_holding_bars": 0 + }, + "strategies": { + "_comment": "Strategie operative. Ognuna pu\u00f2 sovrascrivere parametri di 'signals'. EW_v2 e RP_v2 implementano Config B identificata dal grid search.", + "Equal_Weight": { + "sizing": "equal_weight", + "enabled": true, + "params": {} + }, + "Risk_Parity": { + "sizing": "risk_parity", + "enabled": true, + "params": {} + }, + "Equal_Weight_v2": { + "sizing": "equal_weight", + "enabled": true, + "params": { + "tp_bps": 1200.0, + "trail_bps": 200.0, + "decision_every": 1, + "min_holding_bars": 5 + } + }, + "Risk_Parity_v2": { + "sizing": "risk_parity", + "enabled": true, + "params": { + "tp_bps": 1200.0, + "trail_bps": 200.0, + "decision_every": 1, + "min_holding_bars": 5 + } + } }, "equity_log": { "strategy_whitelist": [ "Equal_Weight", - "Risk_Parity" + "Risk_Parity", + "Equal_Weight_v2", + "Risk_Parity_v2" ] }, "paths": { "base_dir": ".", "input_universe": "Input/Universo per Trading System.xlsx", "connection_txt": "connection.txt", - "output_dir": "out_etf", - "plot_dir": "plot_etf", + "output_dir": "output", + "plot_dir": "plot", "open_trades_dir": "open_trades", - "audit_log_csv": "out_etf/trades_audit_log.csv" + "audit_log_csv": "output/trades_audit_log.csv" }, "hurst": { + "_comment": "Hurst RIMOSSO dalla logica decisionale (vedi PROJECT.md). Sezione mantenuta per retrocompatibilit\u00e0 e usi qualitativi futuri tramite shared_utils.hurst_rs.", "lookback": null, "min_length": 200, - "win_grid": [ - 16, - 24, - 32, - 48, - 64, - 96, - 128, - 192, - 256, - 384, - 512 - ], + "win_grid": [16, 24, 32, 48, 64, 96, 128, 192, 256, 384, 512], "min_segments": 2 }, "prices": { - "base_url": "https://fin.scorer.app/finance/etf-inv/history", + "base_url": "https://fin.scorer.app/finance/euronext/price", "max_retry": 3, "sleep_sec": 0.1, - "timeout": 10, - "cache_dir": "out_etf/price_cache", - "recompute_portfolio_open": false + "timeout": 10 }, "run": { "business_days_only": true, diff --git a/equity_from_log.py b/equity_from_log.py index 72b1d68..6c11246 100644 --- a/equity_from_log.py +++ b/equity_from_log.py @@ -1,24 +1,38 @@ # -*- coding: utf-8 -*- """ -Equity/Reconciliation Builder from Audit Log +Equity / Reconciliation Builder from Audit Log +================================================ -- Legge trades_audit_log.csv (OPEN/CLOSE; EntryAmount base=100; EntryIndex opzionale) -- Scarica rendimenti giornalieri via stored procedure (connection.txt) -- Converte i rendimenti in decimali coerenti (percentuali => /100; log-return => expm1) -- Ricostruisce i rendimenti giornalieri per strategia come MEDIA PONDERATA sui trade attivi -- Salva: - - daily_returns_by_strategy.csv - - equity_by_strategy.csv - - debug_daily_by_strategy.csv - - equity_by_strategy.png, drawdown_by_strategy.png -- Mostra anche a video i grafici +Legge il trades_audit_log.csv prodotto dalla pipeline di produzione e ricostruisce: + - daily_returns_by_strategy.csv: rendimenti giornalieri per strategia + (media ponderata sui trade attivi) + - equity_by_strategy.csv: curve di equity composte (base 100) + - debug_daily_by_strategy.csv: scomposizione num/den/ret per debug + - plot/equity_by_strategy.png + - plot/drawdown_by_strategy.png + +Modifiche v2.1 (questa versione): +- Supporto MULTI-STRATEGIA: legge dinamicamente la whitelist da + pattern_knn_config.json -> equity_log.strategy_whitelist. Le strategie attese + sono ora 4 (Equal_Weight, Risk_Parity, Equal_Weight_v2, Risk_Parity_v2). +- Parser audit log tollerante a formato date misto (ISO + DD/MM/YYYY) per + retrocompatibilita' con il log esistente. +- Rimossa duplicazione codice / pulizia delle import ridondanti. +- Esecuzione idempotente: tutti i file output vengono rigenerati ad ogni run. + +Esecuzione: python equity_from_log.py """ from __future__ import annotations -from pathlib import Path -import pandas as pd -import numpy as np + import shutil +from pathlib import Path +from typing import List, Optional + +import numpy as np +import pandas as pd +import sqlalchemy as sa +from sqlalchemy import text as sql_text from shared_utils import ( detect_column, @@ -27,147 +41,126 @@ from shared_utils import ( require_section, ) + # ============================================================================= -# PATH & OUTPUT +# CONFIG & PATHS # ============================================================================= -BASE_DIR = Path(__file__).resolve().parent -CONFIG = None +BASE_DIR = Path(__file__).resolve().parent + try: CONFIG = load_config() PATHS_CONFIG = require_section(CONFIG, "paths") except Exception as exc: # pragma: no cover - best effort print(f"[WARN] Config non disponibile ({exc}); uso i percorsi di default.") + CONFIG = None PATHS_CONFIG = {} -OUTPUT_DIR = BASE_DIR / PATHS_CONFIG.get("output_dir", "output") -PLOT_DIR = BASE_DIR / PATHS_CONFIG.get("plot_dir", "plot") -AUDIT_LOG_CSV = BASE_DIR / PATHS_CONFIG.get("audit_log_csv", OUTPUT_DIR / "trades_audit_log.csv") +OUTPUT_DIR = BASE_DIR / PATHS_CONFIG.get("output_dir", "output") +PLOT_DIR = BASE_DIR / PATHS_CONFIG.get("plot_dir", "plot") +AUDIT_LOG_CSV = BASE_DIR / PATHS_CONFIG.get( + "audit_log_csv", OUTPUT_DIR / "trades_audit_log.csv" +) CONNECTION_TXT = BASE_DIR / PATHS_CONFIG.get("connection_txt", "connection.txt") -OUT_DAILY_CSV = OUTPUT_DIR / "daily_returns_by_strategy.csv" +OUT_DAILY_CSV = OUTPUT_DIR / "daily_returns_by_strategy.csv" OUT_EQUITY_CSV = OUTPUT_DIR / "equity_by_strategy.csv" -OUT_DEBUG_CSV = OUTPUT_DIR / "debug_daily_by_strategy.csv" -PLOT_EQUITY = PLOT_DIR / "equity_by_strategy.png" -PLOT_DD = PLOT_DIR / "drawdown_by_strategy.png" +OUT_DEBUG_CSV = OUTPUT_DIR / "debug_daily_by_strategy.csv" +PLOT_EQUITY = PLOT_DIR / "equity_by_strategy.png" +PLOT_DD = PLOT_DIR / "drawdown_by_strategy.png" -DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF") +DROPBOX_EXPORT_DIR = Path( + r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF" +) +# Stored procedure defaults (sovrascritti se presenti in config) +SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL" +SP_N_DEFAULT = 1260 +PTF_CURR_DEFAULT = "EUR" + +if CONFIG is not None: + try: + DB_CONFIG = require_section(CONFIG, "db") + SP_NAME_DEFAULT = str(DB_CONFIG.get("stored_proc", SP_NAME_DEFAULT)) + SP_N_DEFAULT = int(DB_CONFIG.get("n_bars", SP_N_DEFAULT)) + PTF_CURR_DEFAULT = str(DB_CONFIG.get("ptf_curr", PTF_CURR_DEFAULT)) + except KeyError: + pass + +# Whitelist strategie (con fallback alle 4 attese in v2.1) +DEFAULT_STRATEGIES = ["Equal_Weight", "Risk_Parity", "Equal_Weight_v2", "Risk_Parity_v2"] +EQUITY_CFG = CONFIG.get("equity_log", {}) if CONFIG else {} +raw_whitelist = EQUITY_CFG.get("strategy_whitelist") if isinstance(EQUITY_CFG, dict) else None +if raw_whitelist: + cleaned = [str(x).strip() for x in raw_whitelist if str(x).strip()] + VALID_STRATEGIES = cleaned if cleaned else DEFAULT_STRATEGIES +else: + VALID_STRATEGIES = DEFAULT_STRATEGIES + + +# ============================================================================= +# DROPBOX EXPORT +# ============================================================================= def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR) -> bool: - if not src or not dst_dir: - return False - if not src.exists(): - print(f"[WARN] file non trovato per copia Dropbox: {src}") + if not src or not dst_dir or not src.exists(): + if src and not src.exists(): + print(f"[WARN] file non trovato per copia Dropbox: {src}") return False try: dst_dir.mkdir(parents=True, exist_ok=True) - dst = dst_dir / src.name - shutil.copy2(src, dst) + shutil.copy2(src, dst_dir / src.name) print(f"[DROPBOX] Copiato {src.name} in {dst_dir}") return True except Exception as exc: print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}") return False -# Stored procedure -SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL" -SP_N_DEFAULT = 1260 -PTF_CURR_DEFAULT = "EUR" - -try: - DB_CONFIG = require_section(CONFIG, "db") if CONFIG else {} -except Exception as exc: # pragma: no cover - best effort - print(f"[WARN] Config DB non disponibile ({exc}); uso i default interni.") - DB_CONFIG = {} -else: - SP_NAME_DEFAULT = str(DB_CONFIG.get("stored_proc", SP_NAME_DEFAULT)) - SP_N_DEFAULT = int(DB_CONFIG.get("n_bars", SP_N_DEFAULT)) - PTF_CURR_DEFAULT = str(DB_CONFIG.get("ptf_curr", PTF_CURR_DEFAULT)) - - -DEFAULT_STRATEGIES = ["Equal_Weight", "Risk_Parity"] -VALID_STRATEGIES = DEFAULT_STRATEGIES - -EQUITY_CFG = CONFIG.get("equity_log", {}) if CONFIG else {} -raw_whitelist = EQUITY_CFG.get("strategy_whitelist") if isinstance(EQUITY_CFG, dict) else None -if raw_whitelist: - whitelist = [str(x).strip() for x in raw_whitelist if str(x).strip()] - if whitelist: - VALID_STRATEGIES = whitelist # ============================================================================= -# AUDIT LOG LOADER (FORMAT CHECKS) +# AUDIT LOG LOADER (PARSER ROBUSTO) # ============================================================================= REQUIRED_AUDIT_COLS = ["Strategy", "ISIN", "Action", "TradeDate"] CANONICAL_AUDIT_COLS = [ - "Strategy", - "ISIN", - "Action", - "TradeDate", - "EntryIndex", - "EntryAmount", - "SizeWeight", - "Price", - "PnL_%", - "ExitReason", - "LinkedOpenDate", - "Duration_bars", - "Notes", -] -NUMERIC_COLS = [ - "EntryIndex", - "EntryAmount", - "SizeWeight", - "Price", - "PnL_%", - "Duration_bars", + "Strategy", "ISIN", "Action", "TradeDate", + "EntryIndex", "EntryAmount", "SizeWeight", "Price", + "PnL_%", "ExitReason", "LinkedOpenDate", "Duration_bars", "Notes", ] +NUMERIC_COLS = ["EntryIndex", "EntryAmount", "SizeWeight", "Price", "PnL_%", "Duration_bars"] def _clean_numeric_series(s: pd.Series) -> pd.Series: + """Conversione robusta a numerico, tollera separatori italiani/europei.""" if pd.api.types.is_numeric_dtype(s): return s - txt = s.astype(str).str.strip() - txt = txt.str.replace("%", "", regex=False) + txt = s.astype(str).str.strip().str.replace("%", "", regex=False) txt = txt.replace({"": np.nan, "nan": np.nan, "None": np.nan}) - def _fix_one(val: str) -> str: + def _fix_one(val): if val is None or (isinstance(val, float) and np.isnan(val)): return val v = str(val).strip() if not v: return v - dot_n = v.count(".") - comma_n = v.count(",") - - # Heuristic: - # - multiple dots with no commas => dots are thousands separators + dot_n, comma_n = v.count("."), v.count(",") if dot_n > 1 and comma_n == 0: - return v.replace(".", "") - # - both comma and dot present => decide decimal by last separator + return v.replace(".", "") # dots are thousands sep if dot_n > 0 and comma_n > 0: - last_dot = v.rfind(".") - last_comma = v.rfind(",") - if last_comma > last_dot: - # comma as decimal, dots as thousands - return v.replace(".", "").replace(",", ".") - # dot as decimal, commas as thousands - return v.replace(",", "") - # - only comma present => comma as decimal + return (v.replace(".", "").replace(",", ".") + if v.rfind(",") > v.rfind(".") + else v.replace(",", "")) if comma_n > 0 and dot_n == 0: - return v.replace(",", ".") + return v.replace(",", ".") # comma is decimal return v - cleaned = txt.map(_fix_one) - return pd.to_numeric(cleaned, errors="coerce") + return pd.to_numeric(txt.map(_fix_one), errors="coerce") + def _parse_mixed_dates(series: pd.Series) -> pd.Series: - s = series.astype(str).str.strip() - s = s.replace({"": np.nan, "nan": np.nan, "None": np.nan}) - dt_iso = pd.to_datetime(s, format="%Y-%m-%d", errors="coerce") - dt_iso_ts = pd.to_datetime(s, format="%Y-%m-%d %H:%M:%S", errors="coerce") - dt_dmy = pd.to_datetime(s, format="%d/%m/%Y", errors="coerce") - dt_dmy_ts = pd.to_datetime(s, format="%d/%m/%Y %H:%M:%S", errors="coerce") - return dt_iso.fillna(dt_iso_ts).fillna(dt_dmy).fillna(dt_dmy_ts) + """Parser per date in formato misto ISO + europeo.""" + s = series.astype(str).str.strip().replace({"": np.nan, "nan": np.nan, "None": np.nan}) + return (pd.to_datetime(s, format="%Y-%m-%d", errors="coerce") + .fillna(pd.to_datetime(s, format="%Y-%m-%d %H:%M:%S", errors="coerce")) + .fillna(pd.to_datetime(s, format="%d/%m/%Y", errors="coerce")) + .fillna(pd.to_datetime(s, format="%d/%m/%Y %H:%M:%S", errors="coerce"))) def load_audit_log(path: Path) -> pd.DataFrame: @@ -189,8 +182,7 @@ def load_audit_log(path: Path) -> pd.DataFrame: if not header or "TradeDate" not in header: header = CANONICAL_AUDIT_COLS.copy() - rows = [] - mixed_rows = 0 + rows, mixed_rows = [], 0 for line in lines[1:]: if not line or not line.strip(): continue @@ -210,7 +202,7 @@ def load_audit_log(path: Path) -> pd.DataFrame: df = pd.DataFrame(rows, columns=header) if mixed_rows > 0: - print(f"[WARN] Audit log con {mixed_rows} righe in formato legacy/misto: normalizzate in lettura.") + print(f"[WARN] Audit log con {mixed_rows} righe in formato legacy: normalizzate in lettura.") missing = [c for c in REQUIRED_AUDIT_COLS if c not in df.columns] if missing: @@ -219,70 +211,63 @@ def load_audit_log(path: Path) -> pd.DataFrame: f"Colonne trovate: {list(df.columns)}" ) - # Normalize key columns + # Normalizzazione df["Action"] = df["Action"].astype(str).str.upper().str.strip() df["Strategy"] = df["Strategy"].astype(str).str.strip() df["ISIN"] = df["ISIN"].astype(str).str.strip() - - # Dates df["TradeDate"] = _parse_mixed_dates(df["TradeDate"]) if "LinkedOpenDate" in df.columns: df["LinkedOpenDate"] = _parse_mixed_dates(df["LinkedOpenDate"]) - # Drop rows with invalid dates + # Pulizia righe before = len(df) df = df.dropna(subset=["TradeDate"]) - dropped = before - len(df) - if dropped > 0: + if (dropped := before - len(df)) > 0: print(f"[WARN] Rimosse {dropped} righe con TradeDate non valido.") - # Keep only OPEN/CLOSE if present - if "Action" in df.columns: - before = len(df) - df = df[df["Action"].isin(["OPEN", "CLOSE"])] - dropped = before - len(df) - if dropped > 0: - print(f"[WARN] Rimosse {dropped} righe con Action non valida.") + before = len(df) + df = df[df["Action"].isin(["OPEN", "CLOSE"])] + if (dropped := before - len(df)) > 0: + print(f"[WARN] Rimosse {dropped} righe con Action non valida.") - # Numeric cleanup + # Conversione numerici for col in NUMERIC_COLS: if col in df.columns: df[col] = _clean_numeric_series(df[col]) return df -# ============================================================================= -# FETCH RENDIMENTI DAL DB # ============================================================================= -def fetch_returns_from_db(isins, start_date, end_date) -> pd.DataFrame: - import sqlalchemy as sa - from sqlalchemy import text as sql_text - +# DB FETCH RETURNS +# ============================================================================= +def fetch_returns_from_db( + isins: List[str], start_date, end_date +) -> pd.DataFrame: + """Scarica i rendimenti giornalieri per gli ISIN dell'audit log.""" conn_str = read_connection_txt(CONNECTION_TXT) - engine = sa.create_engine(conn_str, fast_executemany=True) - - sp = SP_NAME_DEFAULT - nbar = SP_N_DEFAULT - ptf = PTF_CURR_DEFAULT - - sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") + engine = sa.create_engine(conn_str, fast_executemany=True) + sql_sp = sql_text( + f"EXEC {SP_NAME_DEFAULT} @ISIN = :isin, @n = :n, @PtfCurr = :ptf" + ) frames = [] with engine.begin() as conn: for isin in isins: try: - df = pd.read_sql_query(sql_sp, conn, params={"isin": isin, "n": nbar, "ptf": ptf}) - except Exception as e: - print(f"[ERROR] SP {sp} fallita per {isin}: {e}") + df = pd.read_sql_query( + sql_sp, conn, + params={"isin": isin, "n": SP_N_DEFAULT, "ptf": PTF_CURR_DEFAULT}, + ) + except Exception as exc: + print(f"[ERROR] SP {SP_NAME_DEFAULT} fallita per {isin}: {exc}") continue if df.empty: continue col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) - col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) - col_px = detect_column(df, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"]) - + col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) + col_px = detect_column(df, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"]) if not col_date: continue @@ -295,26 +280,23 @@ def fetch_returns_from_db(isins, start_date, end_date) -> pd.DataFrame: elif col_px: px = pd.to_numeric(df[col_px], errors="coerce").astype(float).replace(0, np.nan) log_r = np.log(px / px.shift(1)) - r = np.expm1(log_r) # log-return -> semplice decimale - out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r}) + out = pd.DataFrame({ + "Date": df[col_date], "ISIN": isin, + "Ret": np.expm1(log_r), + }) else: continue - frames.append(out) if not frames: return pd.DataFrame(index=pd.DatetimeIndex([], name="Date")) long = pd.concat(frames, ignore_index=True).dropna(subset=["Date"]) - - mask = ( - (long["Date"].dt.date >= start_date) - & (long["Date"].dt.date <= end_date) - ) + mask = (long["Date"].dt.date >= start_date) & (long["Date"].dt.date <= end_date) long = long.loc[mask] - wide = long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() + # Auto-detect percent vs decimal if not wide.empty: max_abs = np.nanmax(np.abs(wide.values)) if np.isfinite(max_abs) and max_abs > 0.5: @@ -322,40 +304,50 @@ def fetch_returns_from_db(isins, start_date, end_date) -> pd.DataFrame: return wide + # ============================================================================= -# RICOSTRUZIONE DAILY RETURNS +# RICOSTRUZIONE DAILY RETURNS PER STRATEGIA # ============================================================================= -def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> pd.DataFrame: +def rebuild_daily_from_log( + audit: pd.DataFrame, returns_wide: pd.DataFrame +) -> pd.DataFrame: + """ + Per ogni strategia ricostruisce il rendimento giornaliero come media + ponderata sui trade attivi: r_strat(d) = sum(amount_i * ret_isin_i(d)) / + sum(amount_i), dove i somma sui trade aperti nel giorno d. + """ strategies = sorted(audit["Strategy"].dropna().astype(str).unique()) if not strategies: return pd.DataFrame(index=returns_wide.index, columns=[]) idx = returns_wide.index - daily_num = pd.DataFrame(0.0, index=idx, columns=strategies) daily_den = pd.DataFrame(0.0, index=idx, columns=strategies) + # Mappa chiusure ISIN+OpenDate -> CloseDate closes = audit[audit["Action"] == "CLOSE"].copy() if not closes.empty: if "LinkedOpenDate" in closes.columns: closes["_key"] = ( - closes["ISIN"].astype(str) - + "|" + closes["ISIN"].astype(str) + "|" + pd.to_datetime(closes["LinkedOpenDate"]).dt.strftime("%Y-%m-%d") ) else: closes["_key"] = ( - closes["ISIN"].astype(str) - + "|" + closes["ISIN"].astype(str) + "|" + pd.to_datetime(closes["TradeDate"]).dt.strftime("%Y-%m-%d") ) closes["TradeDate"] = pd.to_datetime(closes["TradeDate"]) - closes_agg = closes.sort_values("TradeDate").groupby("_key", as_index=False)["TradeDate"].last() + closes_agg = ( + closes.sort_values("TradeDate") + .groupby("_key", as_index=False)["TradeDate"] + .last() + ) close_map = closes_agg.set_index("_key") else: close_map = pd.DataFrame().set_index(pd.Index([], name="_key")) - # debug counters + # Counters per debug total_opens = 0 used_opens = 0 skipped_missing_isin = 0 @@ -363,14 +355,12 @@ def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> p skipped_bad_window = 0 for strat in strategies: - aud_s = audit[audit["Strategy"] == strat] - opens = aud_s[aud_s["Action"] == "OPEN"].copy() + opens = audit[(audit["Strategy"] == strat) & (audit["Action"] == "OPEN")].copy() if opens.empty: continue opens["_key"] = ( - opens["ISIN"].astype(str) - + "|" + opens["ISIN"].astype(str) + "|" + pd.to_datetime(opens["TradeDate"]).dt.strftime("%Y-%m-%d") ) @@ -397,8 +387,9 @@ def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> p close_val = close_map.loc[key, "TradeDate"] if isinstance(close_val, pd.Series): close_val = close_val.iloc[-1] - d_close = pd.Timestamp(close_val).normalize() - exit_idx = int(ser.index.searchsorted(d_close, side="left")) + exit_idx = int(ser.index.searchsorted( + pd.Timestamp(close_val).normalize(), side="left" + )) else: exit_idx = len(ser) @@ -406,9 +397,8 @@ def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> p skipped_bad_window += 1 continue - idx_seg = ser.index[entry_idx:exit_idx] + idx_seg = ser.index[entry_idx:exit_idx] vals_seg = ser.values[entry_idx:exit_idx] - daily_num.loc[idx_seg, strat] += entry_amount * vals_seg daily_den.loc[idx_seg, strat] += entry_amount used_opens += 1 @@ -417,11 +407,12 @@ def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> p mask = daily_den > 0 daily[mask] = daily_num[mask] / daily_den[mask] + # Salva debug debug = pd.concat( {f"num_{c}": daily_num[c] for c in strategies} | {f"den_{c}": daily_den[c] for c in strategies} - | {f"ret_{c}": daily[c] for c in strategies}, - axis=1 + | {f"ret_{c}": daily[c] for c in strategies}, + axis=1, ) debug.to_csv(OUT_DEBUG_CSV, index_label="Date") @@ -431,56 +422,14 @@ def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> p f"EntryAmount<=0: {skipped_bad_amount}, " f"finestra non valida: {skipped_bad_window}" ) - return daily + # ============================================================================= -# MAIN +# PLOT # ============================================================================= -def main(): - OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - PLOT_DIR.mkdir(parents=True, exist_ok=True) - if not AUDIT_LOG_CSV.exists(): - raise FileNotFoundError("Missing trades_audit_log.csv") - - # parsing robusto con controllo formato - audit = load_audit_log(AUDIT_LOG_CSV) - - if audit.empty: - raise SystemExit("Audit log vuoto.") - - if "Strategy" not in audit.columns: - raise SystemExit("Colonna 'Strategy' mancante nell'audit log.") - - # === filtro whitelist: solo strategie volute === - audit["Strategy"] = audit["Strategy"].astype(str) - before = len(audit) - audit = audit[audit["Strategy"].isin(VALID_STRATEGIES)] - removed = before - len(audit) - if removed > 0: - print( - f"[INFO] Filtrate {removed} righe con strategie non incluse in {VALID_STRATEGIES}." - ) - if audit.empty: - raise SystemExit(f"Nessuna riga con strategie in {VALID_STRATEGIES} nell'audit log.") - - start_date = (audit["TradeDate"].min() - pd.Timedelta(days=10)).date() - end_date = (audit["TradeDate"].max() + pd.Timedelta(days=10)).date() - - isins = sorted(audit["ISIN"].dropna().astype(str).unique()) - ret_wide = fetch_returns_from_db(isins, start_date, end_date) - if ret_wide.empty: - raise RuntimeError("Nessun rendimento recuperato dal DB nell'intervallo richiesto.") - - daily = rebuild_daily_from_log(audit, ret_wide).sort_index() - daily.to_csv(OUT_DAILY_CSV, index_label="Date") - - equity = (1.0 + daily.fillna(0.0)).cumprod() * 100.0 - equity.to_csv(OUT_EQUITY_CSV, index_label="Date") - +def _plot_equity(equity: pd.DataFrame, out_path: Path) -> None: import matplotlib.pyplot as plt - - # Equity plt.figure(figsize=(10, 6)) for col in equity.columns: plt.plot(equity.index, equity[col], label=col) @@ -488,11 +437,12 @@ def main(): plt.grid(True) plt.legend() plt.tight_layout() - plt.savefig(str(PLOT_EQUITY), dpi=150) + plt.savefig(str(out_path), dpi=150) plt.close() - copy_to_dropbox(PLOT_EQUITY) - # Drawdown + +def _plot_drawdown(equity: pd.DataFrame, out_path: Path) -> None: + import matplotlib.pyplot as plt dd = equity / equity.cummax() - 1.0 plt.figure(figsize=(10, 5)) for col in dd.columns: @@ -501,16 +451,66 @@ def main(): plt.grid(True) plt.legend() plt.tight_layout() - plt.savefig(str(PLOT_DD), dpi=150) + plt.savefig(str(out_path), dpi=150) plt.close() + + +# ============================================================================= +# MAIN +# ============================================================================= +def main() -> None: + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + PLOT_DIR.mkdir(parents=True, exist_ok=True) + + if not AUDIT_LOG_CSV.exists(): + raise FileNotFoundError("Missing trades_audit_log.csv") + + audit = load_audit_log(AUDIT_LOG_CSV) + if audit.empty: + raise SystemExit("Audit log vuoto.") + if "Strategy" not in audit.columns: + raise SystemExit("Colonna 'Strategy' mancante nell'audit log.") + + # Filtro whitelist + audit["Strategy"] = audit["Strategy"].astype(str) + before = len(audit) + audit = audit[audit["Strategy"].isin(VALID_STRATEGIES)] + if (removed := before - len(audit)) > 0: + print( + f"[INFO] Filtrate {removed} righe con strategie non incluse " + f"in {VALID_STRATEGIES}." + ) + if audit.empty: + raise SystemExit( + f"Nessuna riga con strategie in {VALID_STRATEGIES} nell'audit log." + ) + + start_date = (audit["TradeDate"].min() - pd.Timedelta(days=10)).date() + end_date = (audit["TradeDate"].max() + pd.Timedelta(days=10)).date() + + isins = sorted(audit["ISIN"].dropna().astype(str).unique()) + ret_wide = fetch_returns_from_db(isins, start_date, end_date) + if ret_wide.empty: + raise RuntimeError("Nessun rendimento recuperato dal DB.") + + # Daily returns + equity + daily = rebuild_daily_from_log(audit, ret_wide).sort_index() + daily.to_csv(OUT_DAILY_CSV, index_label="Date") + + equity = (1.0 + daily.fillna(0.0)).cumprod() * 100.0 + equity.to_csv(OUT_EQUITY_CSV, index_label="Date") + + # Plots + _plot_equity(equity, PLOT_EQUITY) + _plot_drawdown(equity, PLOT_DD) + + # Dropbox export + copy_to_dropbox(PLOT_EQUITY) copy_to_dropbox(PLOT_DD) print("Salvati:") - print(" -", OUT_DAILY_CSV) - print(" -", OUT_EQUITY_CSV) - print(" -", OUT_DEBUG_CSV) - print(" -", PLOT_EQUITY) - print(" -", PLOT_DD) + for path in [OUT_DAILY_CSV, OUT_EQUITY_CSV, OUT_DEBUG_CSV, PLOT_EQUITY, PLOT_DD]: + print(" -", path) print(" -", DROPBOX_EXPORT_DIR / PLOT_EQUITY.name) print(" -", DROPBOX_EXPORT_DIR / PLOT_DD.name) diff --git a/shared_utils.py b/shared_utils.py index 0f643be..54ef3d3 100644 --- a/shared_utils.py +++ b/shared_utils.py @@ -9,10 +9,6 @@ from typing import Dict, List, Optional, Sequence, Tuple import numpy as np import pandas as pd import pyodbc -try: - import pywt -except ImportError: # pragma: no cover - optional dependency - pywt = None DEFAULT_CONFIG_PATH = Path("config/pattern_knn_config.json") @@ -91,58 +87,6 @@ def z_norm(arr: np.ndarray) -> Optional[np.ndarray]: return (arr - mu) / (sd + 1e-12) -def wavelet_denoise( - series: pd.Series, - wavelet: str = "db3", - level: int = 3, - mode: str = "symmetric", - threshold_mode: str = "soft", -) -> Optional[pd.Series]: - """Denoise/reshape the series with a wavelet decomposition. - - Keeps the original index length; if PyWavelets is missing the function - returns None so callers can gracefully fall back to the raw signal. - """ - if pywt is None: - print("[WARN] pywt non installato: salto il filtraggio wavelet.") - return None - s = pd.to_numeric(series, errors="coerce") - if s.dropna().empty: - return None - - w = pywt.Wavelet(wavelet) - max_level = pywt.dwt_max_level(len(s.dropna()), w.dec_len) - lvl = max(1, min(level, max_level)) if max_level > 0 else 1 - - valid = s.dropna() - coeffs = pywt.wavedec(valid.values, w, mode=mode, level=lvl) - # Universal threshold (Donoho-Johnstone) - sigma = np.median(np.abs(coeffs[-1])) / 0.6745 if len(coeffs[-1]) > 0 else 0.0 - thresh = sigma * np.sqrt(2 * np.log(len(valid))) if sigma > 0 else 0.0 - if thresh <= 0: - coeffs_f = coeffs - else: - def _safe_thresh(c: np.ndarray) -> np.ndarray: - if c is None or c.size == 0: - return c - if threshold_mode == "hard": - return pywt.threshold(c, value=thresh, mode="hard") - # soft threshold without divide-by-zero warnings - mag = np.abs(c) - mask = mag > thresh - out = np.zeros_like(c) - out[mask] = np.sign(c[mask]) * (mag[mask] - thresh) - return out - - coeffs_f = [coeffs[0]] + [_safe_thresh(c) for c in coeffs[1:]] - - rec = pywt.waverec(coeffs_f, w, mode=mode) - rec = rec[: len(valid)] - filt = pd.Series(rec, index=valid.index) - # Re-allineamento all'indice originale - return filt.reindex(s.index).interpolate(limit_direction="both") - - def build_pattern_library( ret_series: pd.Series, wp: int, @@ -262,23 +206,16 @@ def hurst_rs(series: pd.Series) -> Optional[float]: return float(h) -def build_hurst_map( - returns_long: pd.DataFrame, - lookback: Optional[int] = None, - min_length: int = 100, -) -> Dict[str, float]: +def build_hurst_map(returns_long: pd.DataFrame, lookback: int = 252) -> Dict[str, float]: if returns_long.empty: return {} ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() hurst_map: Dict[str, float] = {} for isin in ret_wide.columns: series = ret_wide[isin].dropna().astype(float) - if len(series) < max(1, int(min_length)): + if len(series) < max(lookback, 100): continue - window = len(series) if lookback is None else min(len(series), int(lookback)) - if window <= 0: - continue - h_val = hurst_rs(series.iloc[-window:]) + h_val = hurst_rs(series.iloc[-lookback:]) if h_val is None or not np.isfinite(h_val): continue hurst_map[str(isin)] = float(h_val) diff --git a/signals_daily_kNN_prod_v.2.py b/signals_daily_kNN_prod_v.2.py index 3215244..5679f52 100644 --- a/signals_daily_kNN_prod_v.2.py +++ b/signals_daily_kNN_prod_v.2.py @@ -1,47 +1,57 @@ # -*- coding: utf-8 -*- """ -Daily Signals Generator (kNN) – PRODUCTION (coerente al backtest v3.1.5) +Daily Signals Generator (kNN) - PRODUCTION +============================================ -Novità principali: -- Cap giornaliero MAX_OPEN=15: ranking unico dei buy e revisione per differenza su ogni strategia -- Risk Parity con cap per singolo asset (RP_MAX_WEIGHT) allineato al backtest v3.1.5 -- Fetch OPEN una sola volta per ISIN coinvolti in OPEN/CLOSE (cache condivisa tra strategie) -- Audit log per TUTTE le strategie operative (Equal_Weight, Risk_Parity) +Allineato al backtest v3.1.5 con estensioni Config B (grid search out-of-sample). + +Modifiche v2.1 (questa versione): +- HURST RIMOSSO dalla logica decisionale: theta_entry usa il valore globale del + config (PATTERN_CONFIG.theta). Le funzioni hurst_rs/build_hurst_map restano + disponibili in shared_utils per usi qualitativi (regime classification, report). +- Aggiunto supporto MULTI-STRATEGIA configurabile da pattern_knn_config.json + sezione "strategies". Ogni strategia pu\u00f2 sovrascrivere singoli parametri di + signals (tp_bps, sl_bps, trail_bps, time_stop_bars, decision_every, + min_holding_bars, ecc.). +- Nuovi parametri di exit: decision_every (N giorni tra refresh ranking) e + min_holding_bars (blocca SL/RANK per le prime N barre, TP/TRAIL sempre attivi). +- Fix bug "RANK duplicato": un ISIN che esce per TP/SL non viene ri-aggiunto alla + lista chiusure per RANK nello stesso ciclo. +- Formato TradeDate uniforme ISO (YYYY-MM-DD) nel trades_audit_log.csv. +- Rimossa duplicazione del codice di lettura config. Pipeline (giorno D, EOD -> t+1 OPEN): -1) Carica universo e serie rendimenti dal DB (stored procedure) -2) Pattern kNN (WP=60, HA=10, K=25), Signal=1 se EstOutcome > THETA (decimali) +1) Carica universo e serie rendimenti dal DB (stored procedure, una volta sola) +2) Pattern kNN (WP, HA, K, theta globale), Signal=1 se EstOutcome > theta 3) Ranking unico dei buy e selezione Top-N (MAX_OPEN) come target giornaliero -4) Revisione per differenza: chiudi aperture fuori top, apri nuove entrate nel top (+ risk exits) -5) Sizing (Equal Weight / Risk Parity con cap) -6) Fetch OPEN prices UNA VOLTA per ISIN interessati (OPEN/CLOSE) e popolamento ordini -7) Log ordini e snapshot Excel con fogli: Open_Equal_Weight / Open_Risk_Parity +4) Per ogni strategia: applica eventuale override parametri, esegui revisione per + differenza vs posizioni aperte, gestisci risk exits con regole specifiche +5) Sizing (Equal Weight oppure Risk Parity con cap) +6) Fetch OPEN prices UNA VOLTA per ISIN interessati e popolamento ordini +7) Log ordini e snapshot Excel """ from __future__ import annotations -import os -import ssl -import json -import time -import shutil -import warnings import datetime as dt +import json +import os +import shutil +import ssl +import time +import warnings from dataclasses import dataclass from pathlib import Path -from typing import Dict, List, Optional, Tuple, Iterable, Set +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple import numpy as np import pandas as pd -from urllib.request import urlopen -from urllib.error import URLError, HTTPError - -# DB import sqlalchemy as sa from sqlalchemy import text as sql_text +from urllib.error import HTTPError, URLError +from urllib.request import urlopen from shared_utils import ( - build_hurst_map, build_pattern_library, characterize_window, detect_column, @@ -53,30 +63,80 @@ from shared_utils import ( z_norm, ) -# ========================= -# CONFIG -# ========================= + +# ============================================================================= +# CONFIG LOADING - una sola volta, parametri immutabili a livello modulo +# ============================================================================= CONFIG = load_config() DB_CONFIG = require_section(CONFIG, "db") PATTERN_CONFIG = require_section(CONFIG, "pattern") TAGGING_CONFIG = require_section(CONFIG, "tagging") RANKING_CONFIG = require_section(CONFIG, "ranking") SIGNALS_CONFIG = require_section(CONFIG, "signals") -DB_CONFIG = CONFIG.get("db", {}) -PATTERN_CONFIG = CONFIG.get("pattern", {}) -TAGGING_CONFIG = CONFIG.get("tagging", {}) -RANKING_CONFIG = CONFIG.get("ranking", {}) -SIGNALS_CONFIG = CONFIG.get("signals", {}) +STRATEGIES_CONFIG: Dict[str, Any] = CONFIG.get("strategies", {}) -BASE_DIR = Path(".") -OUTPUT_DIR = BASE_DIR / "output" -# Universe now expected inside Input folder -UNIVERSO_XLSX = BASE_DIR / "Input" / "Universo per Trading System.xlsx" -CONNECTION_TXT = BASE_DIR / "connection.txt" -AUDIT_LOG_CSV = OUTPUT_DIR / "trades_audit_log.csv" -OPEN_TRADES_DIR = BASE_DIR / "open_trades" -DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF") -AUDIT_COLUMNS = [ +# DB / SP +SP_NAME_DEFAULT = str(require_value(DB_CONFIG, "stored_proc", "db")) +SP_N_DEFAULT = int(require_value(DB_CONFIG, "n_bars", "db")) +PTF_CURR_DEFAULT = str(require_value(DB_CONFIG, "ptf_curr", "db")) + +# Pattern recognition (parametri kNN globali) +WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) +HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) +KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) +THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) + +# Pattern tagging (informativo) +Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging")) +Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging")) +STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging")) + +# Ranking globali +MAX_OPEN = int(require_value(SIGNALS_CONFIG, "max_open", "signals")) +TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) +RP_MAX_WEIGHT_RAW = RANKING_CONFIG.get("rp_max_weight") +RP_MAX_WEIGHT_DEFAULT = ( + float(RP_MAX_WEIGHT_RAW) if RP_MAX_WEIGHT_RAW is not None else 2.0 / max(TOP_N_MAX, 1) +) + +# Sizing globali +BASE_CAPITAL_PER_STRATEGY = float( + require_value(SIGNALS_CONFIG, "base_capital_per_strategy", "signals") +) +MIN_TRADE_NOTIONAL = float( + require_value(SIGNALS_CONFIG, "min_trade_notional", "signals") +) +RISK_PARITY_LOOKBACK = int( + require_value(SIGNALS_CONFIG, "risk_parity_lookback", "signals") +) + +# Calendario +BUSINESS_DAYS_ONLY = True +SEED = 42 + +warnings.filterwarnings("ignore") +np.random.seed(SEED) + + +# ============================================================================= +# PATHS +# ============================================================================= +PATHS_CONFIG = CONFIG.get("paths", {}) +BASE_DIR = Path(PATHS_CONFIG.get("base_dir", ".")) +OUTPUT_DIR = BASE_DIR / PATHS_CONFIG.get("output_dir", "output") +UNIVERSO_XLSX = BASE_DIR / PATHS_CONFIG.get( + "input_universe", "Input/Universo per Trading System.xlsx" +) +CONNECTION_TXT = BASE_DIR / PATHS_CONFIG.get("connection_txt", "connection.txt") +AUDIT_LOG_CSV = BASE_DIR / PATHS_CONFIG.get( + "audit_log_csv", "output/trades_audit_log.csv" +) +OPEN_TRADES_DIR = BASE_DIR / PATHS_CONFIG.get("open_trades_dir", "open_trades") +DROPBOX_EXPORT_DIR = Path( + r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF" +) + +AUDIT_COLUMNS = [ "Strategy", "ISIN", "Action", @@ -92,128 +152,164 @@ AUDIT_COLUMNS = [ "Notes", ] -def _dated_signals_filename() -> Path: - date_prefix = pd.Timestamp.today().strftime("%Y%m%d") - return OUTPUT_DIR / f"{date_prefix}_signals.xlsx" +OPEN_MAX_RETRY = 3 +OPEN_SLEEP_SEC = 0.1 +OPEN_TIMEOUT = 10 -# Stored procedure / parametri DB -SP_NAME_DEFAULT = str(require_value(DB_CONFIG, "stored_proc", "db")) -SP_N_DEFAULT = int(require_value(DB_CONFIG, "n_bars", "db")) -PTF_CURR_DEFAULT = str(require_value(DB_CONFIG, "ptf_curr", "db")) -# Pattern recognition (come backtest) -WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) -HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) -KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) -THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # 0,005% in decimali (identico al backtest) -Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging")) -Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging")) -STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging")) +# ============================================================================= +# STRATEGY DEFINITIONS +# ============================================================================= +@dataclass +class StrategyConfig: + """ + Configurazione di una singola strategia operativa. -# Exit rules (identiche al backtest) -SL_BPS = float(require_value(SIGNALS_CONFIG, "sl_bps", "signals")) -TP_BPS = float(require_value(SIGNALS_CONFIG, "tp_bps", "signals")) -TRAIL_BPS = float(require_value(SIGNALS_CONFIG, "trail_bps", "signals")) -TIME_STOP_BARS = int(require_value(SIGNALS_CONFIG, "time_stop_bars", "signals")) -THETA_EXIT = float(require_value(SIGNALS_CONFIG, "theta_exit", "signals")) # soglia debolezza -WEAK_DAYS_EXIT = require_value(SIGNALS_CONFIG, "weak_days_exit", "signals") # uscita IMMEDIATA in caso di debolezza (come backtest) + I parametri override leggono dalla sezione 'strategies..params' di + pattern_knn_config.json. Se non presenti, ereditano dalla sezione 'signals'. + """ -# Ranking e selezione Top-N per APERTURE -MAX_OPEN = int(require_value(SIGNALS_CONFIG, "max_open", "signals")) # cap strumenti aperti oggi (come backtest) + name: str + sizing: str # "equal_weight" | "risk_parity" + enabled: bool + sl_bps: Optional[float] + tp_bps: Optional[float] + trail_bps: Optional[float] + time_stop_bars: Optional[int] + theta_exit: Optional[float] + weak_days_exit: Optional[int] + decision_every: int + min_holding_bars: int + rp_max_weight: float -# Allineamento al backtest v3.1.5 per il cap del Risk Parity -TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) -RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # ≈ 0.1333 = 13,33% per singolo asset -if RP_MAX_WEIGHT is None: - RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1) -else: - RP_MAX_WEIGHT = float(RP_MAX_WEIGHT) -# Sizing -BASE_CAPITAL_PER_STRATEGY = float(require_value(SIGNALS_CONFIG, "base_capital_per_strategy", "signals")) -MIN_TRADE_NOTIONAL = float(require_value(SIGNALS_CONFIG, "min_trade_notional", "signals")) -RISK_PARITY_LOOKBACK = int(require_value(SIGNALS_CONFIG, "risk_parity_lookback", "signals")) -SP_NAME_DEFAULT = DB_CONFIG.get("stored_proc", "opt_RendimentoGiornaliero1_ALL") -SP_N_DEFAULT = DB_CONFIG.get("n_bars", 1305) -PTF_CURR_DEFAULT = DB_CONFIG.get("ptf_curr", "EUR") +def _get_param(strategy_params: Dict, key: str, signals_default: Dict, cast=None): + """Restituisce il valore della strategia se presente, altrimenti il default globale.""" + if key in strategy_params: + val = strategy_params[key] + else: + val = signals_default.get(key) + if cast is not None and val is not None: + try: + return cast(val) + except (TypeError, ValueError): + return val + return val -# Pattern recognition (come backtest) -WP = PATTERN_CONFIG.get("wp", 60) -HA = PATTERN_CONFIG.get("ha", 10) -KNN_K = PATTERN_CONFIG.get("knn_k", 25) -THETA = PATTERN_CONFIG.get("theta", 0.005) # 0,005% in decimali (identico al backtest) -Z_REV = TAGGING_CONFIG.get("z_rev", 2.0) -Z_VOL = TAGGING_CONFIG.get("z_vol", 2.0) -STD_COMP_PCT = TAGGING_CONFIG.get("std_comp_pct", 0.15) -# Exit rules (identiche al backtest) -SL_BPS = SIGNALS_CONFIG.get("sl_bps", 300.0) -TP_BPS = SIGNALS_CONFIG.get("tp_bps", 800.0) -TRAIL_BPS = SIGNALS_CONFIG.get("trail_bps", 300.0) -TIME_STOP_BARS = SIGNALS_CONFIG.get("time_stop_bars", 20) -THETA_EXIT = SIGNALS_CONFIG.get("theta_exit", 0.0) # soglia debolezza -WEAK_DAYS_EXIT = SIGNALS_CONFIG.get("weak_days_exit") # uscita IMMEDIATA in caso di debolezza (come backtest) +def build_strategies_from_config() -> List[StrategyConfig]: + """ + Costruisce la lista delle strategie attive da pattern_knn_config.json. -# Ranking e selezione Top-N per APERTURE -MAX_OPEN = SIGNALS_CONFIG.get("max_open", 15) # cap strumenti aperti oggi (come backtest) + Se la sezione 'strategies' non esiste o \u00e8 vuota, fallback alle due + strategie legacy Equal_Weight + Risk_Parity con i parametri globali. + """ + strategies: List[StrategyConfig] = [] + raw = STRATEGIES_CONFIG or {} -# Allineamento al backtest v3.1.5 per il cap del Risk Parity -TOP_N_MAX = RANKING_CONFIG.get("top_n_max", MAX_OPEN) -RP_MAX_WEIGHT = RANKING_CONFIG.get("rp_max_weight", 2 / max(TOP_N_MAX, 1)) # ≈ 0.1333 = 13,33% per singolo asset + # Filtra metadata keys + items = {k: v for k, v in raw.items() if not k.startswith("_") and isinstance(v, dict)} -# Sizing -BASE_CAPITAL_PER_STRATEGY = SIGNALS_CONFIG.get("base_capital_per_strategy", 100.0) -MIN_TRADE_NOTIONAL = SIGNALS_CONFIG.get("min_trade_notional", 0.01) -RISK_PARITY_LOOKBACK = SIGNALS_CONFIG.get("risk_parity_lookback", 60) + # Fallback legacy + if not items: + items = { + "Equal_Weight": {"sizing": "equal_weight", "enabled": True, "params": {}}, + "Risk_Parity": {"sizing": "risk_parity", "enabled": True, "params": {}}, + } -# Calendario -BUSINESS_DAYS_ONLY = True -SEED = 42 + for name, spec in items.items(): + if not spec.get("enabled", True): + continue + params = spec.get("params", {}) or {} + sizing = str(spec.get("sizing", "equal_weight")).lower() -warnings.filterwarnings("ignore") -np.random.seed(SEED) + rp_max = params.get("rp_max_weight", RP_MAX_WEIGHT_DEFAULT) + strategies.append( + StrategyConfig( + name=str(name), + sizing=sizing, + enabled=True, + sl_bps=_get_param(params, "sl_bps", SIGNALS_CONFIG, float), + tp_bps=_get_param(params, "tp_bps", SIGNALS_CONFIG, float), + trail_bps=_get_param(params, "trail_bps", SIGNALS_CONFIG, float), + time_stop_bars=_get_param(params, "time_stop_bars", SIGNALS_CONFIG, int), + theta_exit=_get_param(params, "theta_exit", SIGNALS_CONFIG, float), + weak_days_exit=_get_param(params, "weak_days_exit", SIGNALS_CONFIG, int), + decision_every=int(_get_param(params, "decision_every", SIGNALS_CONFIG) or 1), + min_holding_bars=int(_get_param(params, "min_holding_bars", SIGNALS_CONFIG) or 0), + rp_max_weight=float(rp_max) if rp_max is not None else RP_MAX_WEIGHT_DEFAULT, + ) + ) + return strategies -# ========================= + +STRATEGIES: List[StrategyConfig] = build_strategies_from_config() + + +# ============================================================================= # UTILS -# ========================= -def ensure_dir(p: Path): +# ============================================================================= +def ensure_dir(p: Path) -> None: p.mkdir(parents=True, exist_ok=True) -def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR): - if not src or not dst_dir: - return - if not src.exists(): + +def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR) -> None: + if not src or not dst_dir or not src.exists(): return try: ensure_dir(dst_dir) - dst = dst_dir / src.name - shutil.copy2(src, dst) + shutil.copy2(src, dst_dir / src.name) except Exception as exc: print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}") + def next_business_day(d: dt.date) -> dt.date: nd = d + dt.timedelta(days=1) if not BUSINESS_DAYS_ONLY: return nd - while nd.weekday() >= 5: # 5=Sat, 6=Sun + while nd.weekday() >= 5: nd += dt.timedelta(days=1) return nd -def _safe_to_float(x) -> Optional[float]: + +def _safe_to_float(x: Any) -> Optional[float]: try: return float(x) - except Exception: + except (TypeError, ValueError): return None -def _db_fetch_returns(conn_str: str, - isins: List[str], - sp_name: Optional[str] = None, - n_bars: Optional[int] = None, - ptf_curr: Optional[str] = None) -> pd.DataFrame: + +def _format_date_iso(d: Any) -> str: + """Formato uniforme ISO YYYY-MM-DD per tutte le date nel trades_audit_log.""" + if d is None or pd.isna(d): + return "" + if isinstance(d, str): + return d + if isinstance(d, (dt.date, dt.datetime, pd.Timestamp)): + return pd.Timestamp(d).strftime("%Y-%m-%d") + return str(d) + + +def _dated_signals_filename() -> Path: + date_prefix = pd.Timestamp.today().strftime("%Y%m%d") + return OUTPUT_DIR / f"{date_prefix}_signals.xlsx" + + +# ============================================================================= +# DB FETCH +# ============================================================================= +def _db_fetch_returns( + conn_str: str, + isins: List[str], + sp_name: Optional[str] = None, + n_bars: Optional[int] = None, + ptf_curr: Optional[str] = None, +) -> pd.DataFrame: + """Scarica via stored procedure le serie storiche dei rendimenti giornalieri.""" engine = sa.create_engine(conn_str, fast_executemany=True) - sp = sp_name or os.environ.get("SP_NAME", SP_NAME_DEFAULT) - n_val = n_bars if n_bars is not None else int(os.environ.get("SP_N", SP_N_DEFAULT)) - ptf = (ptf_curr or os.environ.get("PTF_CURR", PTF_CURR_DEFAULT) or "").strip() or PTF_CURR_DEFAULT + sp = sp_name or os.environ.get("SP_NAME", SP_NAME_DEFAULT) + n_val = n_bars if n_bars is not None else int(os.environ.get("SP_N", SP_N_DEFAULT)) + ptf = (ptf_curr or os.environ.get("PTF_CURR", PTF_CURR_DEFAULT) or "").strip() or PTF_CURR_DEFAULT sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") frames: List[pd.DataFrame] = [] @@ -222,19 +318,22 @@ def _db_fetch_returns(conn_str: str, for i, isin in enumerate(isins, start=1): print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True) try: - df = pd.read_sql_query(sql_sp, conn, params={"isin": str(isin), "n": int(n_val), "ptf": ptf}) - except Exception as e: - print(f"[ERROR] SP {sp} fallita per {isin}: {e}") + df = pd.read_sql_query( + sql_sp, + conn, + params={"isin": str(isin), "n": int(n_val), "ptf": ptf}, + ) + except Exception as exc: + print(f"[ERROR] SP {sp} fallita per {isin}: {exc}") continue - if df.empty: - print(f"[WARN] Nessun dato per {isin}") + print(f"[WARN] nessun dato per {isin}") continue col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) - col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) + col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) if not col_date or not col_ret: - print(f"[WARN] Colonne mancanti per {isin}") + print(f"[WARN] colonne mancanti per {isin}") continue out = df[[col_date, col_ret]].copy() @@ -242,11 +341,12 @@ def _db_fetch_returns(conn_str: str, out["Date"] = pd.to_datetime(out["Date"], errors="coerce").dt.tz_localize(None) out["ISIN"] = str(isin) + # Auto-detect scala (percent vs decimal) med = pd.to_numeric(out["Ret"], errors="coerce").abs().median() if pd.notnull(med) and med > 1.5: out["Ret"] = out["Ret"].astype(float) / 100.0 - print(f" ↳ righe scaricate: {len(out)}") + print(f" \u21b3 righe scaricate: {len(out)}") frames.append(out[["Date", "ISIN", "Ret"]]) if not frames: @@ -256,13 +356,10 @@ def _db_fetch_returns(conn_str: str, out_all = out_all.dropna(subset=["Date"]).sort_values(["ISIN", "Date"]).reset_index(drop=True) return out_all[["Date", "ISIN", "Ret"]] -# ========================= -# UNIVERSO + OPEN PRICE API (schema checker) -# ========================= -OPEN_MAX_RETRY = 3 -OPEN_SLEEP_SEC = 0.1 -OPEN_TIMEOUT = 10 +# ============================================================================= +# UNIVERSO + OPEN PRICE +# ============================================================================= def load_universe(path: Path) -> pd.DataFrame: df = pd.read_excel(path) if "ISIN" not in df.columns: @@ -274,26 +371,25 @@ def load_universe(path: Path) -> pd.DataFrame: df[col] = df[col].astype(str).str.strip() return df + def _build_symbol_euronext(row: pd.Series) -> Tuple[str, str]: - isin = str(row.get("ISIN", "")).strip() + isin = str(row.get("ISIN", "")).strip() venue = str(row.get("Mercato", "")).strip() - tok = str(row.get("TickerOpen", "") or "").strip() - base = "https://fin.scorer.app/finance/euronext/price" + tok = str(row.get("TickerOpen", "") or "").strip() + base = "https://fin.scorer.app/finance/euronext/price" if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper(): return base, tok if isin and venue: return base, f"{isin}-{venue}" return base, isin + def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]: - """ - Endpoint euronext/price/ISIN-Mercato con retry/backoff. - Log dettagliato stile checker: [TRY]/[RETRY]/[OK]/[FAIL]. - """ + """Fetch del prezzo open via API con retry.""" try: row = universe.loc[universe["ISIN"] == str(isin)].iloc[0] - except Exception: - print(f"[WARN] ISIN {isin} non trovato nell’universo.") + except (KeyError, IndexError): + print(f"[WARN] ISIN {isin} non trovato nell'universo.") return None base, symbol = _build_symbol_euronext(row) @@ -307,73 +403,89 @@ def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]: with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp: data = json.loads(resp.read().decode("utf-8")) if not isinstance(data, list) or not data: - print(" ↳ NO DATA") + print(" \u21b3 NO DATA") continue - d = (data[0] or {}).get("data") or {} px = d.get("open") if d.get("open") is not None else d.get("prevClose") if px is None: - print(" ↳ WARN: 'open' e 'prevClose' assenti") + print(" \u21b3 WARN: 'open' e 'prevClose' assenti") continue - px = float(px) - print(f" ↳ OK open={d.get('open')} close={d.get('close')} (ritorno prezzo={px})") + print(f" \u21b3 OK open={d.get('open')} close={d.get('close')} (ritorno prezzo={px})") return px - - except (HTTPError, URLError, ssl.SSLError) as e: + except (HTTPError, URLError, ssl.SSLError) as exc: if attempt < OPEN_MAX_RETRY: - print(f" ↳ ERR {e}\nritento tra {OPEN_SLEEP_SEC}s") + print(f" \u21b3 ERR {exc}\nritento tra {OPEN_SLEEP_SEC}s") time.sleep(OPEN_SLEEP_SEC) else: - print(f" ↳ ERR {e}") + print(f" \u21b3 ERR {exc}") print(f"[ERROR] nessun prezzo per {symbol} dopo {OPEN_MAX_RETRY} tentativi") return None -# ========================= -# HURST ESTIMATOR & MAP -# ========================= -# ========================= + +def _fetch_open_prices_once( + isins: Iterable[str], universe: pd.DataFrame +) -> Dict[str, Optional[float]]: + return {isin: get_open_price(isin, universe) for isin in sorted(set(isins))} + + +# ============================================================================= # GENERAZIONE SEGNALI (EOD su D) -# ========================= -def generate_signals_today(universe: pd.DataFrame, - returns_long: pd.DataFrame, - today: dt.date, - hurst_map: Optional[Dict[str, float]] = None) -> pd.DataFrame: +# ============================================================================= +def generate_signals_today( + universe: pd.DataFrame, + returns_long: pd.DataFrame, + today: dt.date, +) -> pd.DataFrame: + """ + Genera segnali kNN per ogni ISIN dell'universo. + + Nota v2.1: theta_entry NON e' piu' personalizzata per asset via Hurst. + Tutti gli ISIN usano la stessa soglia globale (PATTERN_CONFIG.theta). + """ ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() decision_date = ret_wide.index[-1].date() rows = [] for isin in ret_wide.columns: r = ret_wide[isin].dropna().astype(float) - # THETA = HURST IN PERCENTUALE (H = 0.50 -> theta_entry = 0.005 = 0.5%) - theta_entry = THETA # fallback globale - H_val = None - if hurst_map is not None: - H_val = hurst_map.get(str(isin)) - if H_val is not None and not pd.isna(H_val): - theta_entry = float(H_val) / 100.0 + theta_entry = THETA # soglia globale (Hurst rimosso) if len(r) < max(200, WP + HA + 10): - rows.append({"Date": decision_date, "ISIN": isin, - "Signal": 0, "EstOutcome": np.nan, "AvgDist": np.nan, - "PatternType": None, "Confidence": np.nan}) + rows.append({ + "Date": decision_date, "ISIN": isin, + "Signal": 0, "EstOutcome": np.nan, "AvgDist": np.nan, + "PatternType": None, "Confidence": np.nan, + "Theta": float(theta_entry), + }) continue lib_wins, lib_out = build_pattern_library(r, WP, HA) if lib_wins is None or len(r) < WP + HA: est_out, avg_dist, sig = np.nan, np.nan, 0 - ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + ptype, pconf = characterize_window( + r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT + ) else: - curr = r.values[-WP:] - curr_zn = z_norm(curr) + curr_zn = z_norm(r.values[-WP:]) if curr_zn is None: est_out, avg_dist, sig = np.nan, np.nan, 0 - ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + ptype, pconf = characterize_window( + r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT + ) else: - est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K) - sig = 1 if (pd.notna(est_out) and float(est_out) > float(theta_entry)) else 0 - ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + est_out, avg_dist, _ = predict_from_library( + curr_zn, lib_wins, lib_out, k=KNN_K + ) + sig = ( + 1 + if (pd.notna(est_out) and float(est_out) > float(theta_entry)) + else 0 + ) + ptype, pconf = characterize_window( + r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT + ) rows.append({ "Date": decision_date, "ISIN": isin, @@ -382,71 +494,68 @@ def generate_signals_today(universe: pd.DataFrame, "AvgDist": (None if pd.isna(avg_dist) else float(avg_dist)), "PatternType": ptype, "Confidence": (None if pconf is None else float(max(0.0, min(1.0, pconf)))), - "Hurst": (None if H_val is None else float(H_val)), "Theta": float(theta_entry), }) - sig_df = pd.DataFrame(rows).set_index(["Date","ISIN"]).sort_index() - return sig_df + return pd.DataFrame(rows).set_index(["Date", "ISIN"]).sort_index() -# ========================= -# TOP-N SELECTION & PRICE CACHE -# ========================= + +# ============================================================================= +# RANKING & SELECTION +# ============================================================================= def _rank_buy(signals_today: pd.DataFrame, decision_date: dt.date) -> pd.DataFrame: - """ - Ritorna un DataFrame con i soli buy del giorno, ordinati per: - EstOutcome desc, Confidence desc, AvgDist asc. - Colonne: ['ISIN','EstOutcome','Confidence','AvgDist'] - """ + """Ritorna i buy del giorno ordinati per EstOutcome desc, Confidence desc, AvgDist asc.""" if signals_today.empty: - return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"]) + return pd.DataFrame(columns=["ISIN", "EstOutcome", "Confidence", "AvgDist"]) day_df = signals_today.reset_index().query("Date == @decision_date") if day_df.empty: - return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"]) + return pd.DataFrame(columns=["ISIN", "EstOutcome", "Confidence", "AvgDist"]) + + buy_mask = day_df["Signal"] == 1 + buy = day_df[buy_mask].copy() + buy["EstOutcome"] = pd.to_numeric(buy["EstOutcome"], errors="coerce") + buy["Confidence"] = pd.to_numeric(buy["Confidence"], errors="coerce") + buy["AvgDist"] = pd.to_numeric(buy["AvgDist"], errors="coerce") + buy = buy.sort_values( + by=["EstOutcome", "Confidence", "AvgDist"], + ascending=[False, False, True], + na_position="last", + ) + return buy[["ISIN", "EstOutcome", "Confidence", "AvgDist"]].reset_index(drop=True) - buy = (day_df[day_df["Signal"] == 1] - .assign( - EstOutcome=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "EstOutcome"], errors="coerce"), - Confidence=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "Confidence"], errors="coerce"), - AvgDist=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "AvgDist"], errors="coerce"), - ) - .sort_values(by=["EstOutcome","Confidence","AvgDist"], ascending=[False,False,True], na_position="last")) - return buy[["ISIN","EstOutcome","Confidence","AvgDist"]].reset_index(drop=True) def _select_top_signals(buy_rank_df: pd.DataFrame, max_open: int) -> List[str]: if buy_rank_df.empty: return [] return buy_rank_df["ISIN"].head(max_open).tolist() -def _fetch_open_prices_once(isins: Iterable[str], universe: pd.DataFrame) -> Dict[str, Optional[float]]: - cache: Dict[str, Optional[float]] = {} - for isin in sorted(set(isins)): - cache[isin] = get_open_price(isin, universe) - return cache -# ========================= +# ============================================================================= # POSIZIONI / ORDINI / LOG -# ========================= +# ============================================================================= @dataclass class OpenPos: - Strategy: str - ISIN: str - EntryDate: dt.date + Strategy: str + ISIN: str + EntryDate: dt.date EntryIndex: int EntryAmount: float SizeWeight: float - PeakPnL: float = 0.0 - Notes: str = "" + PeakPnL: float = 0.0 + Notes: str = "" + def open_trades_path(strategy: str) -> Path: ensure_dir(OPEN_TRADES_DIR) return OPEN_TRADES_DIR / f"open_{strategy}.csv" + def load_open_trades(strategy: str) -> pd.DataFrame: p = open_trades_path(strategy) if not p.exists(): return pd.DataFrame(columns=[ - "Strategy","ISIN","AssetName","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes" + "Strategy", "ISIN", "AssetName", "EntryDate", "EntryIndex", + "EntryAmount", "SizeWeight", "PeakPnL", "WeakDays", "Notes", ]) df = pd.read_csv(p) if "EntryDate" in df.columns: @@ -458,11 +567,17 @@ def load_open_trades(strategy: str) -> pd.DataFrame: df["Strategy"] = strategy return df -def save_open_trades(strategy: str, df: pd.DataFrame): + +def save_open_trades(strategy: str, df: pd.DataFrame) -> None: p = open_trades_path(strategy) df.to_csv(p, index=False) + def _read_audit_log_mixed(path: Path) -> pd.DataFrame: + """ + Lettura tollerante del log esistente. + Supporta sia formato ISO che europeo per le date (per retrocompatibilita'). + """ if not path.exists(): return pd.DataFrame(columns=AUDIT_COLUMNS) @@ -493,77 +608,130 @@ def _read_audit_log_mixed(path: Path) -> pd.DataFrame: if df.empty: return df - # Normalize string keys and force expected action values. for col in ["Strategy", "ISIN", "Action", "ExitReason", "Notes"]: if col in df.columns: df[col] = df[col].astype(str).str.strip() df["Action"] = df["Action"].str.upper() return df -def append_audit_rows(rows: List[Dict]): + +def append_audit_rows(rows: List[Dict]) -> None: + """Append delle nuove righe nel trades_audit_log con TradeDate sempre in ISO.""" if not rows: return ensure_dir(AUDIT_LOG_CSV.parent) + new_rows = pd.DataFrame(rows) + + # Normalizza TradeDate/LinkedOpenDate a ISO + for col in ["TradeDate", "LinkedOpenDate"]: + if col in new_rows.columns: + new_rows[col] = new_rows[col].apply(_format_date_iso) + for c in AUDIT_COLUMNS: if c not in new_rows.columns: new_rows[c] = np.nan new_rows = new_rows[AUDIT_COLUMNS] - old = _read_audit_log_mixed(AUDIT_LOG_CSV) if AUDIT_LOG_CSV.exists() else pd.DataFrame(columns=AUDIT_COLUMNS) if AUDIT_LOG_CSV.exists(): + old = _read_audit_log_mixed(AUDIT_LOG_CSV) log = pd.concat([old, new_rows], ignore_index=True) else: log = new_rows log.to_csv(AUDIT_LOG_CSV, index=False, sep=";", encoding="utf-8") -# sizing -def compute_current_capital_from_log(strategy: str, - returns_wide: pd.DataFrame, - asof_date: dt.date) -> float: + +# ============================================================================= +# SIZING +# ============================================================================= +def compute_current_capital_from_log( + strategy: str, returns_wide: pd.DataFrame, asof_date: dt.date +) -> float: return BASE_CAPITAL_PER_STRATEGY + def size_equal_weight(candidates: List[str]) -> Dict[str, float]: if not candidates: return {} w = 1.0 / max(1, len(candidates)) return {isin: w for isin in candidates} -def size_risk_parity(candidates: List[str], - returns_wide: pd.DataFrame, - asof_idx: int) -> Dict[str, float]: - """ - Risk Parity sui soli candidati, con cap per singolo asset (RP_MAX_WEIGHT), - allineato alla logica usata nel backtest v3.1.5. - Eventuale parte non allocata resta in cash (non rinormalizziamo dopo il cap). - """ + +def size_risk_parity( + candidates: List[str], + returns_wide: pd.DataFrame, + asof_idx: int, + rp_max_weight: float, +) -> Dict[str, float]: + """Risk Parity sui soli candidati con cap per singolo asset.""" if not candidates: return {} L = RISK_PARITY_LOOKBACK start = max(0, asof_idx - L + 1) - sub = returns_wide.iloc[start:asof_idx+1][candidates] + sub = returns_wide.iloc[start : asof_idx + 1][candidates] vol = sub.std().replace(0, np.nan) inv_vol = 1.0 / vol inv_vol = inv_vol.replace([np.inf, -np.inf], np.nan).fillna(0.0) if inv_vol.sum() == 0: return size_equal_weight(candidates) w = (inv_vol / inv_vol.sum()).to_dict() - # Cap per singolo peso, come RP_MAX_WEIGHT nel Pattern Recon - if RP_MAX_WEIGHT is not None: - w = {k: min(RP_MAX_WEIGHT, float(v)) for k, v in w.items()} + if rp_max_weight is not None: + w = {k: min(rp_max_weight, float(v)) for k, v in w.items()} return w -def _risk_exit_flags(isin: str, - entry_date: Optional[dt.date], - ret_wide: pd.DataFrame, - decision_date: dt.date, - est_map_today: pd.Series) -> List[str]: - """Calcola motivi di uscita (SL/TP/Trailing/Time/Weak) per una posizione.""" + +def compute_sizing( + strategy: StrategyConfig, + candidates: List[str], + returns_wide: pd.DataFrame, + asof_idx: int, +) -> Dict[str, float]: + if strategy.sizing == "risk_parity": + return size_risk_parity(candidates, returns_wide, asof_idx, strategy.rp_max_weight) + return size_equal_weight(candidates) + + +# ============================================================================= +# RISK EXITS +# ============================================================================= +def _bars_in_trade( + isin: str, + entry_date: Optional[dt.date], + ret_wide: pd.DataFrame, + decision_date: dt.date, +) -> int: + """Numero di barre trascorse da entry_date a decision_date (esclusa la barra di entrata).""" + if entry_date is None or isin not in ret_wide.columns: + return 0 + sub = ret_wide[isin].loc[pd.Timestamp(entry_date) : pd.Timestamp(decision_date)] + bars = len(sub) + if not sub.empty and sub.index[0].date() == entry_date: + bars -= 1 + return max(0, bars) + + +def _risk_exit_flags( + strategy: StrategyConfig, + isin: str, + entry_date: Optional[dt.date], + ret_wide: pd.DataFrame, + decision_date: dt.date, + est_map_today: pd.Series, +) -> List[str]: + """ + Calcola motivi di uscita di rischio per una posizione, applicando i parametri + SPECIFICI della strategia (TP/SL/TRAIL/TIME/WEAK + min_holding_bars). + + Regole min_holding_bars: + - TP e TRAIL sono sempre attivi (un movimento favorevole va sempre incassato) + - SL, TIME, WEAK sono bloccati nei primi min_holding_bars + """ reasons: List[str] = [] pnl_if_stay = 0.0 peak_dd = 0.0 + if entry_date is not None and isin in ret_wide.columns: - sub = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)] + sub = ret_wide[isin].loc[pd.Timestamp(entry_date) : pd.Timestamp(decision_date)] if not sub.empty and sub.index[0].date() == entry_date: sub = sub.iloc[1:] if not sub.empty: @@ -573,124 +741,189 @@ def _risk_exit_flags(isin: str, dd_series = eq_path / peak_curve - 1.0 peak_dd = float(-dd_series.min()) - if SL_BPS is not None and pnl_if_stay <= -SL_BPS/10000.0: - reasons.append("SL") - if TP_BPS is not None and pnl_if_stay >= TP_BPS/10000.0: + bars = _bars_in_trade(isin, entry_date, ret_wide, decision_date) + can_stop_out = bars >= strategy.min_holding_bars + + if strategy.tp_bps is not None and pnl_if_stay >= strategy.tp_bps / 10000.0: reasons.append("TP") - if TRAIL_BPS is not None and peak_dd >= TRAIL_BPS/10000.0: + if strategy.trail_bps is not None and peak_dd >= strategy.trail_bps / 10000.0: reasons.append("TRAIL") + if can_stop_out and strategy.sl_bps is not None and pnl_if_stay <= -strategy.sl_bps / 10000.0: + reasons.append("SL") + if can_stop_out and strategy.time_stop_bars is not None and bars >= int(strategy.time_stop_bars): + reasons.append("TIME") - if TIME_STOP_BARS is not None and entry_date is not None and isin in ret_wide.columns: - sub_all = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)] - bars = len(sub_all) - (1 if (not sub_all.empty and sub_all.index[0].date() == entry_date) else 0) - if bars >= int(TIME_STOP_BARS): - reasons.append("TIME") - - if THETA_EXIT is not None and est_map_today is not None and isin in est_map_today.index: - est_out_today = est_map_today.loc[isin] - if pd.notna(est_out_today) and float(est_out_today) <= float(THETA_EXIT): - reasons.append("WEAK") + if can_stop_out and strategy.theta_exit is not None and est_map_today is not None: + if isin in est_map_today.index: + est_out_today = est_map_today.loc[isin] + if pd.notna(est_out_today) and float(est_out_today) <= float(strategy.theta_exit): + reasons.append("WEAK") return reasons -def update_positions_and_build_orders(universe: pd.DataFrame, - returns_long: pd.DataFrame, - signals_today: pd.DataFrame, - today: dt.date, - buy_rank_df: Optional[pd.DataFrame], - allowed_open_isins: Optional[List[str]] = None, - asset_name_map: Optional[pd.Series] = None) -> Tuple[pd.DataFrame, List[Dict]]: + +# ============================================================================= +# DECISION DAY (decision_every) +# ============================================================================= +def _is_decision_day( + strategy: StrategyConfig, decision_date: dt.date, anchor_date: Optional[dt.date] +) -> bool: """ - - decision_date = ultima data disponibile (EOD) - - target giornaliero = primi MAX_OPEN del ranking buy (uguale per tutte le strategie) - - per ogni strategia: revisione per differenza vs posizioni già aperte - - CLOSE anche per risk exits (SL/TP/Trailing/Time/Weak) - - Esecuzione a t+1 open; fetch prezzi UNA VOLTA per tutti gli ISIN con ordini + Determina se oggi e' un giorno di "ranking refresh" per la strategia. - NB: strategia Aggressiva/Crypto rimossa. Restano ONLY: - - Equal_Weight - - Risk_Parity (con cap per singolo asset) + - decision_every=1 -> sempre True (decisione giornaliera) + - decision_every>1 -> True solo ogni N giorni di calendario rispetto all'anchor """ - strategies = ["Equal_Weight", "Risk_Parity"] + if strategy.decision_every <= 1 or anchor_date is None: + return True + delta_days = (decision_date - anchor_date).days + return (delta_days % strategy.decision_every) == 0 - ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() - decision_date: dt.date = ret_wide.index[-1].date() - asof_idx = len(ret_wide.index) - 1 - next_open_date = next_business_day(decision_date) - # Ranking e maps di oggi - est_map_all = None - if buy_rank_df is not None and not buy_rank_df.empty: - est_map_all = buy_rank_df.set_index("ISIN")["EstOutcome"] - else: - # fallback: usa tutti i segnali di oggi - day_df = signals_today.reset_index().query("Date == @decision_date") - if not day_df.empty: - est_map_all = day_df.set_index("ISIN")["EstOutcome"] +def _get_strategy_anchor(strategy_name: str) -> Optional[dt.date]: + """Recupera la data del primo OPEN della strategia dal log per ancorare decision_every.""" + if not AUDIT_LOG_CSV.exists(): + return None + try: + df = _read_audit_log_mixed(AUDIT_LOG_CSV) + if df.empty: + return None + opens = df[(df["Strategy"] == strategy_name) & (df["Action"] == "OPEN")] + if opens.empty: + return None + # Parse robusto delle date (mix ISO + europeo legacy) + dates = pd.to_datetime(opens["TradeDate"], errors="coerce") + if dates.isna().all(): + dates = pd.to_datetime(opens["TradeDate"], errors="coerce", dayfirst=True) + valid = dates.dropna() + if valid.empty: + return None + return valid.min().date() + except Exception as exc: + print(f"[WARN] impossibile leggere anchor per {strategy_name}: {exc}") + return None - target_isins: List[str] = allowed_open_isins or [] - target_set: Set[str] = set(target_isins) - # Sizing per strategia basato sul target - size_eq = size_equal_weight(target_isins) - size_rp = size_risk_parity(target_isins, ret_wide, asof_idx) - sizing_by_strategy = {"Equal_Weight": size_eq, "Risk_Parity": size_rp} +# ============================================================================= +# CORE LOGIC: ordini per una singola strategia +# ============================================================================= +def _process_strategy( + strategy: StrategyConfig, + universe: pd.DataFrame, + ret_wide: pd.DataFrame, + target_isins: List[str], + target_set: Set[str], + est_map_all: Optional[pd.Series], + decision_date: dt.date, + next_open_date: dt.date, + asof_idx: int, + asset_name_map: Optional[pd.Series], +) -> Tuple[pd.DataFrame, List[Dict], Set[str], Set[str]]: + """ + Esegue il ciclo decisionale per una singola strategia. - open_concat: List[pd.DataFrame] = [] - audit_rows_all: List[Dict] = [] - isins_for_open_fetch: Set[str] = set() - isins_for_close_fetch: Set[str] = set() + Restituisce: + - df_open aggiornato + - audit_rows da scrivere nel log + - ISIN per cui fare fetch del prezzo open (apertura) + - ISIN per cui fare fetch del prezzo open (chiusura) + """ + audit_rows: List[Dict] = [] + isins_open_fetch: Set[str] = set() + isins_close_fetch: Set[str] = set() - for strat in strategies: - df_open = load_open_trades(strat) - current_set: Set[str] = set(df_open["ISIN"].astype(str).tolist()) + df_open = load_open_trades(strategy.name) + current_set: Set[str] = set(df_open["ISIN"].astype(str).tolist()) - # --- risk exits su posizioni correnti --- - closers: List[Tuple[str, str]] = [] - if not df_open.empty: - if "WeakDays" not in df_open.columns: - df_open["WeakDays"] = 0 + # decision_every: oggi e' un giorno di refresh ranking? + anchor_date = _get_strategy_anchor(strategy.name) + refresh_today = _is_decision_day(strategy, decision_date, anchor_date) - for _, row_open in df_open.iterrows(): - isin = str(row_open["ISIN"]) - entry_date = pd.to_datetime(row_open["EntryDate"]).date() if pd.notna(row_open["EntryDate"]) else None - reasons = _risk_exit_flags(isin, entry_date, ret_wide, decision_date, est_map_all if est_map_all is not None else pd.Series(dtype=float)) - if reasons: - closers.append((isin, "+".join(sorted(set(reasons))))) + # ====== STEP 1: identifica chiusure ====== + # Set di ISIN gia' marcati per chiusura (anti-duplicato RANK) + closed_this_cycle: Set[str] = set() + closers: List[Tuple[str, str]] = [] - # --- chiusure per ranking (scivolati fuori dal top-N) --- - drop_by_rank = list(current_set - target_set) - for isin in drop_by_rank: - closers.append((isin, "RANK")) + if not df_open.empty: + for _, row_open in df_open.iterrows(): + isin = str(row_open["ISIN"]) + entry_date = ( + pd.to_datetime(row_open["EntryDate"]).date() + if pd.notna(row_open["EntryDate"]) + else None + ) + reasons = _risk_exit_flags( + strategy, + isin, + entry_date, + ret_wide, + decision_date, + est_map_all if est_map_all is not None else pd.Series(dtype=float), + ) + if reasons: + closers.append((isin, "+".join(sorted(set(reasons))))) + closed_this_cycle.add(isin) - # --- applica chiusure (accumula ordini) --- - for isin, reason in closers: - row_open = df_open.loc[df_open["ISIN"] == isin] - linked_date = row_open["EntryDate"].iloc[0] if not row_open.empty else None + # Chiusure per ranking (solo nei giorni di refresh, e solo se non gia' chiuso) + if refresh_today: + for isin in current_set - target_set: + if isin not in closed_this_cycle: + # min_holding_bars filtra anche le chiusure RANK + row_open = df_open.loc[df_open["ISIN"] == isin] + if not row_open.empty: + entry_date = ( + pd.to_datetime(row_open["EntryDate"].iloc[0]).date() + if pd.notna(row_open["EntryDate"].iloc[0]) + else None + ) + bars = _bars_in_trade(isin, entry_date, ret_wide, decision_date) + if bars < strategy.min_holding_bars: + continue # blocca RANK durante min_holding + closers.append((isin, "RANK")) + closed_this_cycle.add(isin) - isins_for_close_fetch.add(isin) - audit_rows_all.append({ - "Strategy": strat, - "ISIN": isin, - "Action": "CLOSE", - "TradeDate": next_open_date, - "EntryIndex": np.nan, - "EntryAmount": np.nan, - "SizeWeight": np.nan, - "Price": None, # riempito dopo fetch - "PnL_%": np.nan, - "ExitReason": reason, - "LinkedOpenDate": linked_date, - "Duration_bars": np.nan, - "Notes": f"DecisionDate={decision_date}" - }) - df_open = df_open[df_open["ISIN"] != isin] - current_set.discard(isin) + # ====== STEP 2: applica chiusure ====== + for isin, reason in closers: + row_open = df_open.loc[df_open["ISIN"] == isin] + linked_date = row_open["EntryDate"].iloc[0] if not row_open.empty else None + entry_idx_val = row_open["EntryIndex"].iloc[0] if not row_open.empty else np.nan + + # Duration_bars valorizzato + duration_bars = np.nan + if pd.notna(linked_date): + duration_bars = _bars_in_trade( + isin, + pd.to_datetime(linked_date).date(), + ret_wide, + decision_date, + ) + + isins_close_fetch.add(isin) + audit_rows.append({ + "Strategy": strategy.name, + "ISIN": isin, + "Action": "CLOSE", + "TradeDate": next_open_date, + "EntryIndex": entry_idx_val, + "EntryAmount": np.nan, + "SizeWeight": np.nan, + "Price": None, + "PnL_%": np.nan, + "ExitReason": reason, + "LinkedOpenDate": linked_date, + "Duration_bars": duration_bars, + "Notes": f"DecisionDate={decision_date}", + }) + df_open = df_open[df_open["ISIN"] != isin] + current_set.discard(isin) + + # ====== STEP 3: aperture (solo nei giorni di refresh) ====== + if refresh_today: + # Sizing ricalcolato sulla lista target completa (coerente col backtest) + w_dict = compute_sizing(strategy, target_isins, ret_wide, asof_idx) + add_list = [isin for isin in target_isins if isin not in current_set] - # --- aperture per differenza verso target --- - add_list = [isin for isin in target_isins if isin not in current_set] # mantiene l'ordine del ranking - w_dict = sizing_by_strategy[strat] if add_list and w_dict: - cap = compute_current_capital_from_log(strat, ret_wide, decision_date) + cap = compute_current_capital_from_log(strategy.name, ret_wide, decision_date) for isin in add_list: w = float(w_dict.get(isin, 0.0)) if w <= 0: @@ -698,81 +931,146 @@ def update_positions_and_build_orders(universe: pd.DataFrame, notional = max(MIN_TRADE_NOTIONAL, cap * w) entry_idx = asof_idx + 1 - isins_for_open_fetch.add(isin) - audit_rows_all.append({ - "Strategy": strat, + isins_open_fetch.add(isin) + audit_rows.append({ + "Strategy": strategy.name, "ISIN": isin, "Action": "OPEN", "TradeDate": next_open_date, "EntryIndex": entry_idx, "EntryAmount": float(notional), "SizeWeight": float(w), - "Price": None, # riempito dopo fetch + "Price": None, "PnL_%": np.nan, "ExitReason": "", "LinkedOpenDate": "", "Duration_bars": 0, - "Notes": f"DecisionDate={decision_date}" + "Notes": f"DecisionDate={decision_date}", }) - df_open = pd.concat([df_open, pd.DataFrame([{ - "Strategy": strat, - "ISIN": isin, - "EntryDate": decision_date, - "EntryIndex": entry_idx, - "EntryAmount": float(notional), - "SizeWeight": float(w), - "PeakPnL": 0.0, - "WeakDays": 0, - "Notes": "" - }])], ignore_index=True) + df_open = pd.concat([ + df_open, + pd.DataFrame([{ + "Strategy": strategy.name, + "ISIN": isin, + "EntryDate": decision_date, + "EntryIndex": entry_idx, + "EntryAmount": float(notional), + "SizeWeight": float(w), + "PeakPnL": 0.0, + "WeakDays": 0, + "Notes": "", + }]), + ], ignore_index=True) current_set.add(isin) - if asset_name_map is not None: - df_open["AssetName"] = df_open["ISIN"].astype(str).map(asset_name_map).fillna("") - else: - if "AssetName" not in df_open.columns: - df_open["AssetName"] = "" - if "AssetName" in df_open.columns: - cols = list(df_open.columns) - if "ISIN" in cols and "AssetName" in cols: - cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName"))) - df_open = df_open[cols] + # ====== STEP 4: arricchisci con AssetName e salva ====== + if asset_name_map is not None: + df_open["AssetName"] = ( + df_open["ISIN"].astype(str).map(asset_name_map).fillna("") + ) + elif "AssetName" not in df_open.columns: + df_open["AssetName"] = "" - save_open_trades(strat, df_open) - df_open["Strategy"] = strat + if "AssetName" in df_open.columns and "ISIN" in df_open.columns: + cols = list(df_open.columns) + cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName"))) + df_open = df_open[cols] + + save_open_trades(strategy.name, df_open) + df_open["Strategy"] = strategy.name + return df_open, audit_rows, isins_open_fetch, isins_close_fetch + + +def update_positions_and_build_orders( + universe: pd.DataFrame, + returns_long: pd.DataFrame, + signals_today: pd.DataFrame, + today: dt.date, + buy_rank_df: Optional[pd.DataFrame], + allowed_open_isins: Optional[List[str]] = None, + asset_name_map: Optional[pd.Series] = None, +) -> Tuple[pd.DataFrame, List[Dict]]: + """ + Orchestratore della parte decisionale: itera su tutte le strategie attive + in STRATEGIES e raccoglie ordini + audit rows. + """ + ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() + decision_date: dt.date = ret_wide.index[-1].date() + asof_idx = len(ret_wide.index) - 1 + next_open_date = next_business_day(decision_date) + + # Mappa EstOutcome per WEAK + est_map_all: Optional[pd.Series] = None + if buy_rank_df is not None and not buy_rank_df.empty: + est_map_all = buy_rank_df.set_index("ISIN")["EstOutcome"] + else: + day_df = signals_today.reset_index().query("Date == @decision_date") + if not day_df.empty: + est_map_all = day_df.set_index("ISIN")["EstOutcome"] + + target_isins: List[str] = allowed_open_isins or [] + target_set: Set[str] = set(target_isins) + + open_concat: List[pd.DataFrame] = [] + audit_rows_all: List[Dict] = [] + isins_for_open_fetch: Set[str] = set() + isins_for_close_fetch: Set[str] = set() + + for strategy in STRATEGIES: + df_open, audit_rows, isins_open, isins_close = _process_strategy( + strategy=strategy, + universe=universe, + ret_wide=ret_wide, + target_isins=target_isins, + target_set=target_set, + est_map_all=est_map_all, + decision_date=decision_date, + next_open_date=next_open_date, + asof_idx=asof_idx, + asset_name_map=asset_name_map, + ) open_concat.append(df_open) + audit_rows_all.extend(audit_rows) + isins_for_open_fetch.update(isins_open) + isins_for_close_fetch.update(isins_close) - # ---- FETCH UNA VOLTA (OPEN + CLOSE) ---- + # Fetch prezzi (una volta sola) fetch_isins = sorted(isins_for_open_fetch.union(isins_for_close_fetch)) if fetch_isins: - print(f"[PRICE] fetch open per {len(fetch_isins)} ISIN (open={len(isins_for_open_fetch)}, close={len(isins_for_close_fetch)})", flush=True) + print( + f"[PRICE] fetch open per {len(fetch_isins)} ISIN " + f"(open={len(isins_for_open_fetch)}, close={len(isins_for_close_fetch)})", + flush=True, + ) px_cache = _fetch_open_prices_once(fetch_isins, universe) for r in audit_rows_all: if r.get("Price") is None: r["Price"] = _safe_to_float(px_cache.get(r["ISIN"])) - # ---- SUMMARY ---- - n_open = sum(1 for r in audit_rows_all if r.get("Action") == "OPEN") + n_open = sum(1 for r in audit_rows_all if r.get("Action") == "OPEN") n_close = sum(1 for r in audit_rows_all if r.get("Action") == "CLOSE") - print(f"[SUMMARY] decision_date={decision_date} opens={n_open} closes={n_close} target={len(target_isins)} (cap={MAX_OPEN})", flush=True) + print( + f"[SUMMARY] decision_date={decision_date} opens={n_open} closes={n_close} " + f"target={len(target_isins)} (cap={MAX_OPEN}, strategie={len(STRATEGIES)})", + flush=True, + ) open_all = pd.concat(open_concat, ignore_index=True) if open_concat else pd.DataFrame() return open_all, audit_rows_all -# ========================= -# MAIN RUN -# ========================= -def main_run(run_date: Optional[dt.date] = None): + +# ============================================================================= +# MAIN +# ============================================================================= +def main_run(run_date: Optional[dt.date] = None) -> None: today = run_date or dt.date.today() ensure_dir(OUTPUT_DIR) # 1) Universo universe = load_universe(UNIVERSO_XLSX) - asset_name_col = detect_column(universe, [ - "Nome", "Name", "Asset", "Asset Name", "Descrizione", "Description" - ]) - if not asset_name_col: - print("[WARN] Colonna con il nome dell'asset non trovata nell'universo.") + asset_name_col = detect_column( + universe, ["Nome", "Name", "Asset", "Asset Name", "Descrizione", "Description"] + ) asset_name_map: Optional[pd.Series] = None if asset_name_col: asset_name_map = ( @@ -780,33 +1078,36 @@ def main_run(run_date: Optional[dt.date] = None): .dropna(subset=["ISIN"]) .assign(ISIN=lambda df: df["ISIN"].astype(str).str.strip()) ) - asset_name_map = asset_name_map.set_index("ISIN")[asset_name_col].astype(str).str.strip() + asset_name_map = ( + asset_name_map.set_index("ISIN")[asset_name_col].astype(str).str.strip() + ) + else: + print("[WARN] colonna con il nome dell'asset non trovata nell'universo.") - # 2) Ritorni (DB) + print( + f"[CONFIG] strategie attive ({len(STRATEGIES)}): " + + ", ".join(s.name for s in STRATEGIES) + ) + + # 2) Rendimenti dal DB conn_str = read_connection_txt(CONNECTION_TXT) isins = universe["ISIN"].tolist() returns_long = _db_fetch_returns( - conn_str, - isins, - sp_name=SP_NAME_DEFAULT, - n_bars=SP_N_DEFAULT, - ptf_curr=PTF_CURR_DEFAULT, + conn_str, isins, + sp_name=SP_NAME_DEFAULT, n_bars=SP_N_DEFAULT, ptf_curr=PTF_CURR_DEFAULT, ) if returns_long.empty: raise RuntimeError("Nessun rendimento disponibile dal DB (SP vuota?).") - # 2b) Hurst map per ISIN (stessa logica concettuale del backtest) - hurst_map = build_hurst_map(returns_long, lookback=1260) + # 3) Segnali EOD su D (theta globale, NO Hurst) + sig_df = generate_signals_today(universe, returns_long, today) - # 3) Segnali EOD su D con THETA = Hurst/100 per ISIN - sig_df = generate_signals_today(universe, returns_long, today, hurst_map=hurst_map) - - # 3b) Ranking unico e selezione Top-N + # 3b) Ranking + Top-N decision_date = returns_long["Date"].max().date() - buy_rank_df = _rank_buy(sig_df, decision_date) # tutti i buy ordinati - allowed_open = _select_top_signals(buy_rank_df, MAX_OPEN) # top-N ISIN + buy_rank_df = _rank_buy(sig_df, decision_date) + allowed_open = _select_top_signals(buy_rank_df, MAX_OPEN) - # 4) Posizioni + audit (OPEN/CLOSE) con target Top-N + # 4) Posizioni + audit (tutte le strategie) open_df, audit_rows = update_positions_and_build_orders( universe, returns_long, sig_df, today, buy_rank_df=buy_rank_df, @@ -814,48 +1115,52 @@ def main_run(run_date: Optional[dt.date] = None): asset_name_map=asset_name_map, ) - # 5) Append audit log (TUTTE le strategie operative) + # 5) Append audit log if audit_rows: append_audit_rows(audit_rows) - # 6) Snapshot Excel datato — fogli con nomi completi + # 6) Snapshot Excel ensure_dir(OPEN_TRADES_DIR) signals_path = _dated_signals_filename() signals_sheet = sig_df.reset_index() if asset_name_map is not None: - signals_sheet["AssetName"] = signals_sheet["ISIN"].astype(str).map(asset_name_map).fillna("") + signals_sheet["AssetName"] = ( + signals_sheet["ISIN"].astype(str).map(asset_name_map).fillna("") + ) else: signals_sheet["AssetName"] = "" - # inserisci la colonna subito dopo l'ISIN if "AssetName" in signals_sheet.columns: cols = list(signals_sheet.columns) cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName"))) signals_sheet = signals_sheet[cols] + sheet_name_overrides = { + "Equal_Weight": "Open_EW", + "Risk_Parity": "Open_RP", + "Equal_Weight_v2": "Open_EW_v2", + "Risk_Parity_v2": "Open_RP_v2", + } with pd.ExcelWriter(signals_path) as xw: signals_sheet.to_excel(xw, sheet_name="Signals", index=False) if not open_df.empty: for strat, g in open_df.groupby("Strategy"): - sheet_name_map = { - "Equal_Weight": "Open_Equal_Weight", - "Risk_Parity": "Open_Risk_Parity", - } - sheet_name = sheet_name_map.get(strat, f"Open_{strat}")[:31] + sheet_name = sheet_name_overrides.get(strat, f"Open_{strat}")[:31] g.to_excel(xw, sheet_name=sheet_name, index=False) copy_to_dropbox(signals_path) - for strat in ["Equal_Weight", "Risk_Parity"]: - csv_path = open_trades_path(strat) + for strategy in STRATEGIES: + csv_path = open_trades_path(strategy.name) if csv_path.exists(): copy_to_dropbox(csv_path) - print(f"✅ Signals generated for {today}. Saved to {signals_path}") + print(f"\u2705 Signals generated for {today}. Saved to {signals_path}") print(f"Open trades saved in {OPEN_TRADES_DIR}") print(f"Audit log updated at {AUDIT_LOG_CSV}") -# ========================= + +# ============================================================================= # ENTRY POINT -# ========================= +# ============================================================================= if __name__ == "__main__": main_run()