# -*- coding: utf-8 -*- """ run_optimization.py =================== Script principale per lanciare la grid search sul sistema kNN. Riusa load_config() e read_connection_txt() di shared_utils per leggere i prezzi storici dallo stesso DB usato in produzione. Workflow: 1) Carica universo Excel 2) Per ogni ISIN, scarica la serie dal DB (cached optional) 3) Definisce ParameterGrid e TimeSeriesSplitter 4) Lancia run_grid_search 5) Aggrega e genera report Uso: python run_optimization.py # esegue con grid e splitter di default python run_optimization.py --mode quick # grid ridotta (~50 combo, ~30 minuti) python run_optimization.py --mode full # grid completa (~2000 combo, ~24h) python run_optimization.py --mode multiday # focus su decision_every e tp/sl """ from __future__ import annotations import argparse import json import sys import time from pathlib import Path from typing import Optional # Aggiunge la cartella padre al path Python così trova shared_utils.py # (shared_utils.py sta nella cartella del progetto, accanto a 'backtest_optimizer/') _PARENT_DIR = Path(__file__).resolve().parent.parent if str(_PARENT_DIR) not in sys.path: sys.path.insert(0, str(_PARENT_DIR)) import numpy as np import pandas as pd import sqlalchemy as sa from sqlalchemy import text from shared_utils import ( detect_column, load_config, read_connection_txt, require_section, require_value, ) from grid_search import ParameterGrid, TimeSeriesSplitter, run_grid_search, aggregate_results, best_combo_per_isin from report import write_full_report, summary_text_table # ===================================================== # Preset grid: scegli quello adeguato all'uso # ===================================================== def make_grid_preset(preset: str) -> ParameterGrid: if preset == "quick": # ~36 combo, focus sui parametri "in top of mind" return ParameterGrid( Wp=[60], Ha=[10], k=[25], theta_entry=[0.005], sl_bps=[300.0], tp_bps=[800.0, 1200.0], trail_bps=[200.0, 300.0], time_stop_bars=[20], theta_exit=[0.0], weak_days_exit=[None], decision_every=[1, 3, 5, 10], min_holding_bars=[0, 3, 5], only_first_signal=[False], fee_bps=[10.0], ) if preset == "multiday": # Focus sulla domanda "esteso multi-giorno": ~96 combo return ParameterGrid( Wp=[60], Ha=[10], k=[25], theta_entry=[0.005], sl_bps=[200.0, 300.0, 500.0], tp_bps=[600.0, 800.0, 1200.0, 1500.0], trail_bps=[200.0, 300.0], time_stop_bars=[20], theta_exit=[0.0], weak_days_exit=[None], decision_every=[1, 2, 3, 5, 10, 20], min_holding_bars=[0, 3], only_first_signal=[False], fee_bps=[10.0], ) if preset == "wide": # Esplora anche Wp, Ha, k: ~1500 combo return ParameterGrid( Wp=[40, 60, 80, 120], Ha=[5, 10, 15, 20], k=[15, 25, 35], theta_entry=[0.003, 0.005, 0.01], sl_bps=[200.0, 300.0, 500.0], tp_bps=[600.0, 800.0, 1200.0], trail_bps=[200.0, 300.0], time_stop_bars=[15, 20, 30], theta_exit=[0.0], weak_days_exit=[None], decision_every=[1, 3, 5, 10], min_holding_bars=[0, 3], only_first_signal=[False], fee_bps=[10.0], ) if preset == "full": # Tutto: 5000+ combo, da lanciare su cluster o overnight return ParameterGrid( Wp=[40, 60, 80, 120], Ha=[5, 10, 15, 20], k=[10, 15, 25, 35, 50], theta_entry=[0.0, 0.003, 0.005, 0.01], sl_bps=[200.0, 300.0, 500.0, None], tp_bps=[600.0, 800.0, 1200.0, 1500.0, None], trail_bps=[200.0, 300.0, None], time_stop_bars=[15, 20, 30, None], theta_exit=[0.0, -0.005], weak_days_exit=[None, 3], decision_every=[1, 2, 3, 5, 10], min_holding_bars=[0, 3, 5], only_first_signal=[False, True], fee_bps=[10.0], ) raise ValueError(f"Preset sconosciuto: {preset}") # ===================================================== # Caricamento dati dal DB (riusa stessa logica di produzione) # ===================================================== def load_asset_data( isins: list[str], engine: sa.Engine, stored_proc: str, n_bars: int, ptf_curr: str, *, min_bars: int = 500, cache_dir: Optional[Path] = None, ) -> dict[str, pd.DataFrame]: """ Per ogni ISIN, esegue la SP e ritorna un dict {isin: df}. Cache su disco in formato Parquet per accelerare i run successivi. """ sql = text(f"EXEC {stored_proc} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") out = {} for i, isin in enumerate(isins, 1): # Cache check if cache_dir is not None: cache_dir.mkdir(parents=True, exist_ok=True) cache_file = cache_dir / f"{isin}.parquet" if cache_file.exists(): try: df = pd.read_parquet(cache_file) if len(df) >= min_bars: out[isin] = df if i % 10 == 0: print(f" [DATA] {i}/{len(isins)} (cache hit per {isin})") continue except Exception: pass try: df = pd.read_sql_query(sql, engine, params={"isin": isin, "n": n_bars, "ptf": ptf_curr}) if df.empty or len(df) < min_bars: continue out[isin] = df if cache_dir is not None: try: df.to_parquet(cache_file, index=False) except Exception as e: print(f" [DATA] cache write fallita per {isin}: {e}") if i % 10 == 0: print(f" [DATA] {i}/{len(isins)}") except Exception as e: print(f" [DATA] {isin}: errore {e}") print(f"[DATA] Caricati {len(out)}/{len(isins)} ISIN") return out def detect_data_cols(df: pd.DataFrame) -> tuple[str, str]: col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "pct_chg"]) return col_date, col_ret # ===================================================== # Main # ===================================================== def main(args): cfg = load_config() db_cfg = require_section(cfg, "db") paths_cfg = require_section(cfg, "paths") pattern_cfg = require_section(cfg, "pattern") signals_cfg = cfg.get("signals", {}) # ---- Output dirs ---- output_dir = Path(args.output_dir or "output/optimization") output_dir.mkdir(parents=True, exist_ok=True) cache_dir = Path(args.cache_dir or "output/optimization/asset_cache") # ---- Universo ---- universo_xlsx = paths_cfg.get("input_universe", "Input/Universo per Trading System.xlsx") universo = pd.read_excel(universo_xlsx) col_isin = detect_column(universo, ["ISIN", "isin"]) if col_isin is None: raise ValueError("Colonna ISIN non trovata nell'universo") isins = universo[col_isin].astype(str).str.strip().replace("", pd.NA).dropna().drop_duplicates().tolist() if args.max_isin: isins = isins[:args.max_isin] print(f"[MAIN] Universo: {len(isins)} ISIN") # ---- Connessione DB e caricamento dati ---- if args.skip_db: # Modalità test: usa solo dati cached assets = {} for isin in isins: cf = cache_dir / f"{isin}.parquet" if cf.exists(): try: assets[isin] = pd.read_parquet(cf) except Exception: pass print(f"[MAIN] (skip-db) Caricati {len(assets)} ISIN dalla cache") else: conn_str = read_connection_txt("connection.txt") engine = sa.create_engine(conn_str, fast_executemany=True) print("[MAIN] Connesso al DB") assets = load_asset_data( isins=isins, engine=engine, stored_proc=db_cfg["stored_proc"], n_bars=int(db_cfg["n_bars"]), ptf_curr=str(db_cfg["ptf_curr"]), cache_dir=cache_dir, ) if not assets: print("[MAIN] Nessun asset disponibile, esco.") return # ---- Detect colonne ---- sample_df = next(iter(assets.values())) col_date, col_ret = detect_data_cols(sample_df) if not col_date or not col_ret: raise ValueError(f"Colonne data/ret non riconosciute: {sample_df.columns.tolist()}") print(f"[MAIN] Date col = '{col_date}', Ret col = '{col_ret}'") # ---- Grid e splitter ---- grid = make_grid_preset(args.mode) splitter = TimeSeriesSplitter( n_splits=args.n_splits, train_size=args.train_size, test_size=args.test_size, embargo=args.embargo, ) print(f"[MAIN] Grid preset='{args.mode}' → {grid.size()} combinazioni") print(f"[MAIN] Splitter: {args.n_splits} fold, train={args.train_size}, test={args.test_size}, embargo={args.embargo}") # ---- Esecuzione ---- t_start = time.perf_counter() df_results = run_grid_search( assets=assets, col_date=col_date, col_ret=col_ret, grid=grid, splitter=splitter, verbose=True, n_max_combos=args.max_combos, save_intermediate_to=output_dir / "grid_search_partial.xlsx", ) print(f"[MAIN] Grid search completata in {(time.perf_counter()-t_start)/60:.1f} min") if df_results.empty: print("[MAIN] Nessun risultato — esco.") return # ---- Aggregazione ---- agg = aggregate_results(df_results, by_isin=False, primary_metric=args.primary_metric, min_trades_per_fold=args.min_trades_per_fold) # ---- Baseline corrente di produzione ---- baseline_params = { "Wp": int(pattern_cfg["wp"]), "Ha": int(pattern_cfg["ha"]), "k": int(pattern_cfg["knn_k"]), "theta_entry": float(pattern_cfg["theta"]), "sl_bps": float(signals_cfg.get("sl_bps", 300.0)), "tp_bps": float(signals_cfg.get("tp_bps", 800.0)), "trail_bps": float(signals_cfg.get("trail_bps", 300.0)), "time_stop_bars": int(signals_cfg.get("time_stop_bars", 20)), "decision_every": 1, "min_holding_bars": 0, } # ---- Report ---- paths = write_full_report( df_results=df_results, agg=agg, output_dir=output_dir, primary_metric=args.primary_metric, top_k=args.top_k, baseline_params=baseline_params, ) print("\n" + "=" * 70) print(f"TOP-{args.top_k} COMBINAZIONI ({args.primary_metric}-mean)") print("=" * 70) print(summary_text_table(agg, top_k=args.top_k, primary_metric=args.primary_metric)) print("\nFile generati:") for k, p in paths.items(): print(f" {k}: {p}") def parse_args(): p = argparse.ArgumentParser(description="Grid search per il sistema kNN") p.add_argument("--mode", choices=["quick", "multiday", "wide", "full"], default="quick", help="Preset della grid (default: quick)") p.add_argument("--n-splits", type=int, default=4, help="Numero di fold walk-forward (default: 4)") p.add_argument("--train-size", type=int, default=504, help="Lunghezza fold di train (default: 504)") p.add_argument("--test-size", type=int, default=126, help="Lunghezza fold di test (default: 126)") p.add_argument("--embargo", type=int, default=20, help="Embargo train-test (default: 20)") p.add_argument("--max-isin", type=int, default=None, help="Limita il numero di ISIN (debug)") p.add_argument("--max-combos", type=int, default=None, help="Limita il numero di combo (debug)") p.add_argument("--min-trades-per-fold", type=int, default=5, help="Fold con meno trade vengono scartati (default: 5)") p.add_argument("--primary-metric", type=str, default="Sharpe", help="Metrica di ranking (Sharpe, Calmar, Sortino, CAGR_%%)") p.add_argument("--top-k", type=int, default=25, help="Numero di top combo nel report (default: 25)") p.add_argument("--output-dir", type=str, default=None) p.add_argument("--cache-dir", type=str, default=None) p.add_argument("--skip-db", action="store_true", help="Usa solo dati cached, non interroga il DB") return p.parse_args() if __name__ == "__main__": main(parse_args())