Files
Trading/Trading Pattern Recon w Hurst - ETF.py
2026-05-24 20:46:43 +02:00

2089 lines
81 KiB
Python

# -*- coding: utf-8 -*-
"""
Trading Pattern Recognition + kNN Walk-Forward Backtest (ETF)
==============================================================
Script end-to-end per:
- Caricare universo (Excel) e dati (DB) UNA SOLA VOLTA (refactor v2.1)
- Calcolare Hurst + Pattern signals (solo INFORMATIVI in v2.1, non decisionali)
- Eseguire walk-forward k-NN (solo long) con regole di uscita SL/TP/TRAIL/TIME/FLIP
- Costruire portafogli dinamici (Equal Weight + Risk Parity + varianti v2 Config B)
- Generare metriche finali, equity curves, heatmap mensili, trade report
Modifiche v2.1 (refactor):
- HURST RIMOSSO dalla logica decisionale: theta_entry = THETA globale (PATTERN_CONFIG.theta).
Le funzioni hurst_rs_returns / hurst_dfa_returns restano disponibili e producono
ancora il report hurst_by_isin.xlsx come informazione qualitativa.
- CACHE DATI DB UNIFICATA: una sola lettura per ISIN (era doppia: linee 813 e 1177
dell'originale). Risparmio ~50% sul tempo di esecuzione DB.
- FETCH PREZZI OPEN/CLOSE in BULK: una chiamata per tutto l'universo prima del
loop backtest, invece di una per-ISIN dentro il loop.
- IMPORT INLINE rimossi (5 pandas + 6 numpy + 4 matplotlib ridondanti).
- MAIN() WRAPPER per esecuzione idempotente.
- Logica matematica del kNN e dei portafogli INVARIATA al 100%: stesse formule,
stessi parametri, stessi output. Solo organizzazione del codice migliorata.
Output prodotti (cartella `output/`):
- hurst_by_isin.xlsx (informativo, regime classification)
- pattern_signals.xlsx (segnali + quality scores)
- forward_bt_signals.xlsx (segnali walk-forward per ISIN)
- forward_bt_summary.xlsx (metriche per ISIN)
- trades_report.xlsx (trade-by-trade)
- portfolio_metrics.xlsx (metriche EW, RP, EW_v2, RP_v2)
- daily_from_trades.csv (PnL giornaliero ricostruito)
- weights_daily.xlsx (pesi giornalieri per strategia)
- final_metrics.xlsx (metriche per N=6..20)
- performance_attribution.xlsx
Pipeline:
1) Universo Excel -> meta_df
2) Connessione DB
3) Caricamento serie dati per TUTTI gli ISIN (una volta) -> assets_data
4) Per ogni ISIN: Hurst + Pattern (informativi) -> hurst_rows, pattern_rows
5) Bulk fetch prezzi open/close per esecuzione t+1 -> open_returns_map
6) Per ogni ISIN: walk-forward kNN backtest -> bt_signals, bt_summary
7) Costruzione portafogli dinamici (EW, RP, EW_v2, RP_v2) -> equity, weights
8) Trade report + ricostruzione daily PnL + metriche finali
"""
from __future__ import annotations
import json
import re
import ssl
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sqlalchemy as sa
from scipy import linalg
from sqlalchemy import text
from shared_utils import (
build_pattern_library,
characterize_window,
detect_column,
load_config,
predict_from_library,
read_connection_txt,
require_section,
require_value,
z_norm,
)
# =============================================================================
# PLOT SAVING HELPER (no recursion, evita conflitti import)
# =============================================================================
def savefig_safe(path, dpi: int = 150, bbox_inches: str = "tight") -> None:
"""
Salva la figura corrente con creazione automatica della directory.
Funzione non-ricorsiva (evita conflitti con override di plt.savefig).
"""
import os
p = str(path)
folder = os.path.dirname(p) or "."
try:
os.makedirs(folder, exist_ok=True)
except Exception as exc:
print(f"[WARN] impossibile creare la cartella '{folder}': {exc}. "
f"Salvo nella directory corrente.")
p = os.path.basename(p)
plt.savefig(p, dpi=dpi, bbox_inches=bbox_inches)
# =============================================================================
# CONFIG LOADING
# =============================================================================
CONFIG = load_config()
DB_CONFIG = require_section(CONFIG, "db")
PATTERN_CONFIG = require_section(CONFIG, "pattern")
TAGGING_CONFIG = require_section(CONFIG, "tagging")
RANKING_CONFIG = require_section(CONFIG, "ranking")
PATHS_CONFIG = require_section(CONFIG, "paths")
HURST_CONFIG = CONFIG.get("hurst", {})
RUN_CONFIG = CONFIG.get("run", {})
SIGNALS_CONFIG = CONFIG.get("signals", {})
PRICES_CONFIG = CONFIG.get("prices", {})
STRATEGIES_CONFIG: Dict[str, Any] = CONFIG.get("strategies", {})
# =============================================================================
# PATHS
# =============================================================================
OUTPUT_DIR = Path(PATHS_CONFIG.get("output_dir", "out_etf"))
PLOT_DIR = Path(PATHS_CONFIG.get("plot_dir", "plot_etf"))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
PLOT_DIR.mkdir(parents=True, exist_ok=True)
UNIVERSO_XLSX = PATHS_CONFIG.get("input_universe", "Input/Universo per Trading System.xlsx")
OUTPUT_HURST_XLSX = OUTPUT_DIR / "hurst_by_isin.xlsx"
OUTPUT_PATTERN_XLSX = OUTPUT_DIR / "pattern_signals.xlsx"
ERROR_LOG_CSV = OUTPUT_DIR / "errori_isin.csv"
FORWARD_BT_SIGNALS_XLSX = OUTPUT_DIR / "forward_bt_signals.xlsx"
FORWARD_BT_SUMMARY_XLSX = OUTPUT_DIR / "forward_bt_summary.xlsx"
TRADES_REPORT_XLSX = OUTPUT_DIR / "trades_report.xlsx"
PERF_ATTRIB_XLSX = OUTPUT_DIR / "performance_attribution.xlsx"
DAILY_FROM_TRADES_CSV = OUTPUT_DIR / "daily_from_trades.csv"
DAILY_FROM_TRADES_XLSX = OUTPUT_DIR / "daily_from_trades.xlsx"
WEIGHTS_DAILY_XLSX = OUTPUT_DIR / "weights_daily.xlsx"
FINAL_METRICS_XLSX = OUTPUT_DIR / "final_metrics.xlsx"
# =============================================================================
# PARAMETRI GLOBALI
# =============================================================================
# Stored procedure e parametri DB
STORED_PROC = str(require_value(DB_CONFIG, "stored_proc", "db"))
N_BARS = int(require_value(DB_CONFIG, "n_bars", "db"))
PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db"))
RANKING_WINDOW_BARS = int(RANKING_CONFIG.get("rolling_window_bars", N_BARS))
RP_LOOKBACK = int(SIGNALS_CONFIG.get("risk_parity_lookback", 60))
# Prezzi open/close (API euronext)
OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get(
"base_url", "https://fin.scorer.app/finance/etf-inv/history"
))
OPEN_MAX_RETRY = int(PRICES_CONFIG.get("max_retry", 3))
OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1))
OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10))
OPEN_CACHE_DIR = Path(PRICES_CONFIG.get("cache_dir", OUTPUT_DIR / "price_cache"))
RECOMPUTE_PORTF_FROM_OPEN = bool(PRICES_CONFIG.get("recompute_portfolio_open", False))
# Pattern-matching (iper-parametri kNN)
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern"))
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern"))
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern"))
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern"))
EMBARGO_RAW = require_value(PATTERN_CONFIG, "embargo", "pattern")
EMBARGO = int(EMBARGO_RAW) if EMBARGO_RAW is not None else (WP + HA)
# Tagging rule-based (soglie informative)
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
# Ranking e selezione
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking"))
RP_MAX_WEIGHT_RAW = RANKING_CONFIG.get("rp_max_weight")
RP_MAX_WEIGHT = (
float(RP_MAX_WEIGHT_RAW) if RP_MAX_WEIGHT_RAW is not None else 2.0 / max(TOP_N_MAX, 1)
)
SCORE_VERBOSE = bool(RANKING_CONFIG.get("score_verbose", False))
SCORE_WEIGHTS = RANKING_CONFIG.get("score_weights")
# Hurst (parametri di calcolo, mantenuti per uso informativo)
HURST_MIN_LENGTH = int(HURST_CONFIG.get("min_length", 200))
HURST_WIN_GRID = HURST_CONFIG.get("win_grid")
HURST_MIN_SEGMENTS = int(HURST_CONFIG.get("min_segments", 1))
DAYS_PER_YEAR = int(RUN_CONFIG.get("days_per_year", 252))
TOP_N = int(RUN_CONFIG.get("top_n_default", TOP_N_MAX))
# =============================================================================
# UTILITA' GENERALI (numeriche, formattazione, ETA)
# =============================================================================
def clamp01(x):
"""Forza il valore in [0, 1]. NaN se non finito."""
if not np.isfinite(x):
return np.nan
return max(0.0, min(1.0, float(x)))
def format_eta(seconds: float) -> str:
"""Formatta una durata in HH:MM:SS o MM:SS."""
if seconds is None or not np.isfinite(seconds) or seconds < 0:
return "--:--"
seconds = int(round(seconds))
h, rem = divmod(seconds, 3600)
m, s = divmod(rem, 60)
if h > 0:
return f"{h:d}:{m:02d}:{s:02d}"
return f"{m:02d}:{s:02d}"
def _to_float_safe(x) -> float:
"""Conversione a float robusta. NaN se fallisce."""
try:
return float(x)
except (TypeError, ValueError):
return np.nan
def _coerce_num(s: pd.Series) -> pd.Series:
"""Conversione robusta a numerico, tollera separatori italiani/europei."""
if pd.api.types.is_numeric_dtype(s):
return s
txt = s.astype(str).str.strip().str.replace("%", "", regex=False)
txt = txt.replace({"": np.nan, "nan": np.nan, "None": np.nan})
return pd.to_numeric(txt.str.replace(",", "."), errors="coerce")
# =============================================================================
# POST-EXECUTION TIMER (per le fasi dopo il backtest)
# =============================================================================
_post_timer_state: Dict[str, float] = {}
def start_post_timer(total_steps: int = 1) -> None:
_post_timer_state["t0"] = time.perf_counter()
_post_timer_state["total_steps"] = max(1, total_steps)
_post_timer_state["done"] = 0
def checkpoint_post_timer(label: str) -> None:
if "t0" not in _post_timer_state:
return
elapsed = time.perf_counter() - _post_timer_state["t0"]
_post_timer_state["done"] += 1
done = _post_timer_state["done"]
tot = _post_timer_state["total_steps"]
avg = elapsed / max(done, 1)
eta = avg * max(0, tot - done)
print(f"[POST-TIMER] {label}: elapsed {format_eta(elapsed)} "
f"({done}/{tot}, ETA {format_eta(eta)})")
# =============================================================================
# HURST (su rendimenti) - MANTENUTE PER USO INFORMATIVO
# =============================================================================
# NOTA v2.1: queste funzioni restano disponibili per la generazione del file
# hurst_by_isin.xlsx (classificazione regime: mean_reversion / breakout / neutral).
# Il valore di Hurst NON viene piu' usato come soglia di entrata theta_entry.
# Tutti gli ISIN usano la stessa theta = THETA globale dal config.
def hurst_rs_returns(r, win_grid=None, min_seg=None) -> float:
"""Stima Hurst tramite Rescaled Range analysis sui rendimenti."""
r = pd.Series(r).dropna().astype("float64").values
n = len(r)
seg_min = HURST_MIN_SEGMENTS if min_seg is None else int(min_seg)
if n < HURST_MIN_LENGTH:
return np.nan
if win_grid is None:
base = HURST_WIN_GRID or [16, 24, 32, 48, 64, 96, 128, 192, 256, 384]
base = np.array(base, dtype=int)
win_grid = [w for w in base if w <= n // 2]
if len(win_grid) < 4:
max_w = max(16, n // 4)
g = sorted({int(max(8, round((n / (2 ** k))))) for k in range(3, 8)})
win_grid = [w for w in g if 8 <= w <= max_w]
RS_vals, sizes = [], []
for w in win_grid:
if w < 8 or w > n:
continue
m = n // w
if m < seg_min:
continue
rs_list = []
for i in range(m):
seg = r[i * w:(i + 1) * w]
seg = seg - np.mean(seg)
sd = seg.std(ddof=1)
if sd == 0 or not np.isfinite(sd):
continue
y = np.cumsum(seg)
rs = (np.max(y) - np.min(y)) / sd
if np.isfinite(rs) and rs > 0:
rs_list.append(rs)
if rs_list:
RS_vals.append(np.mean(rs_list))
sizes.append(w)
if len(RS_vals) < 3:
return np.nan
sizes = np.array(sizes, float)
RS_vals = np.array(RS_vals, float)
mask = np.isfinite(RS_vals) & (RS_vals > 0)
sizes, RS_vals = sizes[mask], RS_vals[mask]
if sizes.size < 3:
return np.nan
slope, _ = np.polyfit(np.log(sizes), np.log(RS_vals), 1)
return clamp01(slope)
def hurst_dfa_returns(r, win_grid=None) -> float:
"""Stima Hurst tramite Detrended Fluctuation Analysis."""
r = pd.Series(r).dropna().astype("float64").values
n = len(r)
if n < HURST_MIN_LENGTH:
return np.nan
r_dm = r - np.mean(r)
y = np.cumsum(r_dm)
if win_grid is None:
base = HURST_WIN_GRID or [16, 24, 32, 48, 64, 96, 128, 192, 256]
base = np.array(base, dtype=int)
win_grid = [w for w in base if w <= n // 2]
if len(win_grid) < 4:
max_w = max(16, n // 4)
g = sorted({int(max(8, round((n / (2 ** k))))) for k in range(3, 8)})
win_grid = [w for w in g if 8 <= w <= max_w]
F_vals, sizes = [], []
for s in win_grid:
if s < 8:
continue
m = n // s
if m < 2:
continue
rms_list = []
for i in range(m):
seg = y[i * s:(i + 1) * s]
t = np.arange(s, dtype=float)
A = np.vstack([t, np.ones(s)]).T
coeff, *_ = np.linalg.lstsq(A, seg, rcond=None)
detr = seg - A @ coeff
rms = np.sqrt(np.mean(detr ** 2))
if np.isfinite(rms) and rms > 0:
rms_list.append(rms)
if rms_list:
F_vals.append(np.mean(rms_list))
sizes.append(s)
if len(F_vals) < 3:
return np.nan
sizes = np.array(sizes, float)
F_vals = np.array(F_vals, float)
mask = np.isfinite(F_vals) & (F_vals > 0)
sizes, F_vals = sizes[mask], F_vals[mask]
if sizes.size < 3:
return np.nan
slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1)
return clamp01(slope)
# =============================================================================
# METRICHE DI EQUITY (R^2, drawdown, heal index, h_min_100)
# =============================================================================
def r2_equity_line(returns: pd.Series) -> float:
"""R^2 di una regressione log-lineare sulla equity curve (smoothness)."""
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
eq = (1.0 + r).cumprod().replace(0, np.nan)
y = np.log(eq.dropna())
if len(y) < 5:
return np.nan
x = np.arange(len(y), dtype=float)
a, b = np.polyfit(x, y, 1)
y_hat = a * x + b
ss_res = float(np.sum((y - y_hat) ** 2))
ss_tot = float(np.sum((y - y.mean()) ** 2))
if ss_tot <= 0:
return np.nan
return float(1.0 - ss_res / ss_tot)
def drawdown_metrics(returns: pd.Series) -> Dict[str, float]:
"""
Calcola MaxDD, durata e tempo di recupero medio.
Restituisce: dict con MaxDD_%, AvgDD_%, AvgDD_len, MaxDD_len, RecoverDays_avg.
"""
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
if len(r) < 2:
return {
"MaxDD_%": np.nan, "AvgDD_%": np.nan,
"AvgDD_len": np.nan, "MaxDD_len": np.nan,
"RecoverDays_avg": np.nan,
}
eq = (1.0 + r).cumprod()
peak = eq.cummax()
dd = eq / peak - 1.0
max_dd = float(dd.min())
in_dd = dd < 0
if not in_dd.any():
return {
"MaxDD_%": round(max_dd * 100, 2),
"AvgDD_%": 0.0, "AvgDD_len": 0.0, "MaxDD_len": 0.0,
"RecoverDays_avg": 0.0,
}
# Identifica drawdown periods
dd_periods = []
in_period = False
start_idx = None
for i, flag in enumerate(in_dd):
if flag and not in_period:
in_period = True
start_idx = i
elif not flag and in_period:
in_period = False
dd_periods.append((start_idx, i - 1, dd.iloc[start_idx:i].min()))
if in_period:
dd_periods.append((start_idx, len(in_dd) - 1, dd.iloc[start_idx:].min()))
lengths = [p[1] - p[0] + 1 for p in dd_periods]
depths = [p[2] for p in dd_periods]
return {
"MaxDD_%": round(max_dd * 100, 2),
"AvgDD_%": round(float(np.mean(depths)) * 100, 2),
"AvgDD_len": round(float(np.mean(lengths)), 1),
"MaxDD_len": int(np.max(lengths)),
"RecoverDays_avg": round(float(np.mean(lengths)), 1),
}
def heal_index_metrics(returns: pd.Series) -> Dict[str, float]:
"""
Heal Index: quanto velocemente la equity si riprende dai drawdown.
Definito come 1 - (Area Above Water / Area Total Path).
Valori vicini a 1 = recupero rapido, vicini a 0 = drawdown prolungati.
"""
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
if len(r) < 2:
return {"HealIndex": np.nan, "AreaAboveWater": np.nan, "AreaUnderWater": np.nan}
eq = (1.0 + r).cumprod()
peak = eq.cummax()
dd = eq / peak - 1.0 # <= 0
area_under = float(-dd.sum()) # >= 0
area_total = float(len(r))
area_above = area_total - area_under
heal = area_above / area_total if area_total > 0 else np.nan
return {
"HealIndex": round(heal, 4) if np.isfinite(heal) else np.nan,
"AreaAboveWater": round(area_above, 2),
"AreaUnderWater": round(area_under, 2),
}
def h_min_100(returns: pd.Series) -> float:
"""
h_min_100: minimo dei rendimenti su finestre rolling di 100 barre.
Indicatore di tail-risk locale.
"""
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
if len(r) < 100:
return np.nan
eq = (1.0 + r).cumprod()
roll_low = eq.rolling(100).min() / eq.rolling(100).max() - 1.0
return float(roll_low.min()) if roll_low.notna().any() else np.nan
def drawdown_stats_simple(ret_series: pd.Series) -> Dict[str, float]:
"""Statistiche minime CAGR/Vol/Sharpe/MDD/Calmar (per stats per-ISIN)."""
eq = (ret_series.fillna(0)).cumsum()
rolling_max = eq.cummax()
dd = eq - rolling_max
maxdd = float(dd.min()) if len(dd) else 0.0
cagr = np.exp(ret_series.mean() * DAYS_PER_YEAR) - 1
annvol = ret_series.std() * np.sqrt(DAYS_PER_YEAR)
sharpe = (ret_series.mean() / (ret_series.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR)
calmar = (cagr / abs(maxdd)) if maxdd < 0 else np.nan
return {
"CAGR_%": round(cagr * 100, 2),
"AnnVol_%": round(annvol * 100, 2),
"Sharpe": round(float(sharpe), 2),
"MaxDD_%eq": round(float(maxdd * 100), 2),
"Calmar": round(float(calmar), 2) if np.isfinite(calmar) else np.nan,
}
def portfolio_metric_row(label: str, returns: pd.Series) -> Dict[str, Any]:
"""Riga di metriche aggregate per portfolio_metrics.xlsx."""
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
if r.empty:
return {"Portfolio": label}
eq = (1.0 + r).cumprod()
cagr = (eq.iloc[-1] / max(eq.iloc[0], 1e-12)) ** (DAYS_PER_YEAR / max(1, len(r))) - 1.0
vol = r.std() * np.sqrt(DAYS_PER_YEAR)
sharpe = r.mean() / r.std() * np.sqrt(DAYS_PER_YEAR) if r.std() > 0 else np.nan
downside = r[r < 0]
sortino = r.mean() / downside.std() * np.sqrt(DAYS_PER_YEAR) if (
len(downside) > 1 and downside.std() > 0
) else np.nan
dd = eq / eq.cummax() - 1.0
mdd = float(dd.min())
calmar = cagr / abs(mdd) if mdd < 0 else np.nan
heal = heal_index_metrics(r)
return {
"Portfolio": label,
"CAGR_%": round(cagr * 100, 2),
"AnnVol_%": round(vol * 100, 2),
"Sharpe": round(float(sharpe), 2) if np.isfinite(sharpe) else np.nan,
"Sortino": round(float(sortino), 2) if np.isfinite(sortino) else np.nan,
"MaxDD_%": round(mdd * 100, 2),
"Calmar": round(float(calmar), 2) if np.isfinite(calmar) else np.nan,
"HealIndex": heal.get("HealIndex", np.nan),
"R2_equity": round(r2_equity_line(r), 4) if np.isfinite(r2_equity_line(r)) else np.nan,
}
# =============================================================================
# PRICE FETCH (OPEN/CLOSE) - prezzi storici per esecuzione t+1 open
# =============================================================================
def _build_symbol_euronext(row: pd.Series) -> Tuple[str, str]:
"""Costruisce (base_url, symbol) per la query API euronext."""
isin = str(row.get("ISIN", "")).strip()
venue = str(row.get("Mercato", "")).strip()
tok = str(row.get("TickerOpen", "") or "").strip()
base = OPEN_PRICE_BASE_URL
if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper():
return base, tok
if isin and venue:
return base, f"{isin}-{venue}"
return base, isin
def fetch_price_history(
isins: List[str],
universe: pd.DataFrame,
start_date: str,
end_date: str,
) -> pd.DataFrame:
"""
Scarica la serie storica di prezzi open/close dall'API euronext per la
lista di ISIN fornita. Restituisce un DataFrame long con colonne:
Date | ISIN | Open | Close
Implementa cache su disco in OPEN_CACHE_DIR (un file JSON per ISIN).
"""
OPEN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
frames: List[pd.DataFrame] = []
for isin in isins:
try:
row = universe.loc[universe["ISIN"] == str(isin)].iloc[0]
except (KeyError, IndexError):
print(f"[WARN] ISIN {isin} non trovato nell'universo")
continue
base, symbol = _build_symbol_euronext(row)
cache_file = OPEN_CACHE_DIR / f"{symbol}.json"
data = None
# Cache check
if cache_file.exists():
try:
data = json.loads(cache_file.read_text())
except Exception:
data = None
if data is None:
url = f"{base}/{symbol}?from={start_date}&to={end_date}"
for attempt in range(1, OPEN_MAX_RETRY + 1):
try:
with urlopen(url, timeout=OPEN_TIMEOUT,
context=ssl.create_default_context()) as resp:
data = json.loads(resp.read().decode("utf-8"))
try:
cache_file.write_text(json.dumps(data))
except Exception:
pass
break
except (HTTPError, URLError, ssl.SSLError) as exc:
if attempt < OPEN_MAX_RETRY:
time.sleep(OPEN_SLEEP_SEC)
else:
print(f"[ERROR] fetch {symbol}: {exc}")
if not data or not isinstance(data, list):
continue
rows = []
for item in data:
d = (item or {}).get("data") or {}
dt_str = item.get("date") if item.get("date") else item.get("dt")
if not dt_str:
continue
rows.append({
"Date": pd.to_datetime(dt_str, errors="coerce"),
"ISIN": isin,
"Open": _to_float_safe(d.get("open")),
"Close": _to_float_safe(d.get("close")),
})
if rows:
frames.append(pd.DataFrame(rows))
if not frames:
return pd.DataFrame(columns=["Date", "ISIN", "Open", "Close"])
out = pd.concat(frames, ignore_index=True).dropna(subset=["Date"])
return out.sort_values(["ISIN", "Date"]).reset_index(drop=True)
def save_price_cache_summary(cache_dir: Path, out_file: Path) -> None:
"""Genera un Excel di riepilogo della cache prezzi (debug)."""
if not cache_dir.exists():
return
files = sorted(cache_dir.glob("*.json"))
rows = []
for f in files:
try:
data = json.loads(f.read_text())
n = len(data) if isinstance(data, list) else 0
rows.append({"symbol": f.stem, "n_records": n, "size_kb": f.stat().st_size / 1024})
except Exception:
rows.append({"symbol": f.stem, "n_records": 0, "size_kb": np.nan})
if rows:
pd.DataFrame(rows).to_excel(out_file, index=False)
# =============================================================================
# DATA LOADING - una sola lettura DB per ISIN, cache in memoria
# =============================================================================
def detect_cols(df0: pd.DataFrame) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Identifica le colonne Date/Ret/Price del DataFrame."""
col_date = detect_column(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = detect_column(df0, [
"Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"
])
col_px = detect_column(df0, [
"Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"
])
return col_date, col_ret, col_px
def load_all_assets(
isins: List[str],
engine: sa.Engine,
sql_sp,
n_bars: int,
ptf_curr: str,
) -> Tuple[Dict[str, pd.DataFrame], List[Dict[str, str]]]:
"""
Carica le serie storiche di TUTTI gli ISIN una sola volta dal DB.
Ritorna (assets_data, errors) dove assets_data e' un dict {isin: df_isin}
con i DataFrame originali (Date, Ret, Px). Sostituisce i due loop separati
dell'originale (linee 813 e 1177) che facevano DOPPIA lettura DB.
Risparmio stimato: ~50% del tempo speso sul DB.
"""
assets_data: Dict[str, pd.DataFrame] = {}
errors: List[Dict[str, str]] = []
for i, isin in enumerate(isins, 1):
try:
df_isin = pd.read_sql_query(
sql_sp, engine,
params={"isin": isin, "n": n_bars, "ptf": ptf_curr},
)
if df_isin.empty:
errors.append({"ISIN": isin, "Errore": "SP vuota"})
continue
col_date, col_ret, col_px = detect_cols(df_isin)
if col_date:
df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce")
df_isin = df_isin.sort_values(col_date)
# Verifica copertura minima
if col_ret and col_ret in df_isin.columns:
n_valid = pd.to_numeric(df_isin[col_ret], errors="coerce").dropna().shape[0]
elif col_px and col_px in df_isin.columns:
n_valid = pd.to_numeric(df_isin[col_px], errors="coerce").dropna().shape[0]
else:
errors.append({"ISIN": isin, "Errore": "Ne rendimenti ne prezzi utilizzabili"})
continue
if n_valid < max(200, WP + HA + 10):
errors.append({
"ISIN": isin,
"Errore": f"Serie troppo corta ({n_valid} punti)",
})
continue
# Salva le colonne canonicalizzate per ridurre rilevamenti successivi
df_isin.attrs["col_date"] = col_date
df_isin.attrs["col_ret"] = col_ret
df_isin.attrs["col_px"] = col_px
assets_data[isin] = df_isin
if i % 10 == 0:
print(f"[DATA] {i}/{len(isins)} ISIN caricati")
except Exception as exc:
errors.append({"ISIN": isin, "Errore": str(exc)})
print(f"[DATA] Caricati {len(assets_data)}/{len(isins)} ISIN dal DB")
return assets_data, errors
def compute_returns_decimal(df_isin: pd.DataFrame) -> pd.Series:
"""
Estrae la serie di rendimenti in DECIMALE dal df_isin.
Auto-detecta percentuale vs decimale tramite mediana abs().
"""
col_ret = df_isin.attrs.get("col_ret")
col_px = df_isin.attrs.get("col_px")
if col_ret and col_ret in df_isin.columns:
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float).dropna()
# Auto-scale: se median(|r|) > 1.5 e' in percentuale, da convertire
med = r.abs().median()
if pd.notnull(med) and med > 1.5:
r = r / 100.0
return r
if col_px and col_px in df_isin.columns:
px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan)
return np.log(px / px.shift(1)).dropna()
return pd.Series(dtype=float)
def compute_returns_percent(df_isin: pd.DataFrame) -> Tuple[pd.DataFrame, str]:
"""
Restituisce (df_isin con colonna ret_pct, nome_colonna_ret).
Usato da knn_forward_backtest_one_asset che si aspetta % raw.
"""
df = df_isin.copy()
col_ret = df.attrs.get("col_ret")
col_px = df.attrs.get("col_px")
if col_ret and col_ret in df.columns:
df[col_ret] = pd.to_numeric(df[col_ret], errors="coerce").astype(float)
return df, col_ret
if col_px and col_px in df.columns:
px = pd.to_numeric(df[col_px], errors="coerce").astype(float).replace(0, np.nan)
df["_RetPct_"] = np.log(px / px.shift(1)) * 100.0
return df, "_RetPct_"
return df, ""
# =============================================================================
# kNN WALK-FORWARD BACKTEST - LOGICA MATEMATICA INVARIATA
# =============================================================================
def knn_forward_backtest_one_asset(
df_isin: pd.DataFrame,
col_date: str,
col_ret: str,
Wp: int,
Ha: int,
k: int,
theta_entry: float,
exec_ret: Optional[pd.Series] = None,
fee_bps: float = 10,
sl_bps: Optional[float] = 300.0,
tp_bps: Optional[float] = 800.0,
trail_bps: Optional[float] = 300.0,
time_stop_bars: Optional[int] = 20,
theta_exit: Optional[float] = 0.0,
weak_days_exit: Optional[int] = None,
) -> Tuple[pd.DataFrame, Dict]:
"""
Walk-forward SOLO LONG con regole di EXIT (SL/TP/TS/time/flip).
Ritorna (signals_df, summary_metrics_dict).
Nota: usa solo dati daily -> le soglie sono valutate a fine giornata,
l'uscita avviene sulla barra successiva (modello prudente).
LOGICA INVARIATA rispetto a v2.0 originale.
"""
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0
idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r))
idx = pd.to_datetime(idx).dt.normalize()
if exec_ret is not None:
r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float)
r_exec.index = pd.to_datetime(r_exec.index).normalize()
r_exec = r_exec.reindex(idx)
if len(r_exec) != len(r):
r_exec = pd.Series(r_exec.values, index=idx).reindex(idx)
else:
r_exec = r
fee = fee_bps / 10000.0
def _lib_predict(past_returns: pd.Series, win_last: np.ndarray):
lib_wins, lib_out = build_pattern_library(past_returns, Wp, Ha)
if lib_wins is None:
return np.nan, np.nan
curr_zn = z_norm(win_last)
if curr_zn is None:
return np.nan, np.nan
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=k)
return float(est_out), float(avg_dist)
in_pos = False
entry_t = None
trade_pnl = 0.0
trade_peak = 0.0
weak_streak = 0
rows = []
for t in range(Wp, len(r) - 1):
past = r.iloc[:t]
if past.dropna().shape[0] < (Wp + Ha):
sig_out, est_out, avg_dist = 0, np.nan, np.nan
rows.append((
idx.iloc[t], sig_out, est_out, avg_dist,
r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan,
))
continue
win_last = r.iloc[t - Wp:t].values
est_out, avg_dist = _lib_predict(past, win_last)
sig_out = 1 if in_pos else 0
# === ENTRATA ===
if (not in_pos) and (est_out > theta_entry):
sig_out = 1
in_pos = True
entry_t = t
trade_pnl = 0.0
trade_peak = 0.0
weak_streak = 0
# === USCITA ===
elif in_pos:
next_ret = r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan
pnl_if_stay = (1.0 + trade_pnl) * (1.0 + next_ret) - 1.0
peak_if_stay = max(trade_peak, pnl_if_stay)
exit_reasons = []
if (sl_bps is not None) and (pnl_if_stay <= -sl_bps / 10000.0):
exit_reasons.append("SL")
if (tp_bps is not None) and (pnl_if_stay >= tp_bps / 10000.0):
exit_reasons.append("TP")
if (trail_bps is not None) and (peak_if_stay - pnl_if_stay >= trail_bps / 10000.0):
exit_reasons.append("TRAIL")
if (time_stop_bars is not None) and (t - entry_t + 1 >= time_stop_bars):
exit_reasons.append("TIME")
if theta_exit is not None:
if est_out <= theta_exit:
weak_streak = weak_streak + 1 if weak_days_exit else weak_streak
if weak_days_exit is None:
exit_reasons.append("FLIP")
elif weak_streak >= weak_days_exit:
exit_reasons.append("FLIP_STREAK")
else:
weak_streak = 0
if exit_reasons:
sig_out = 0
in_pos = False
entry_t = None
trade_pnl = 0.0
trade_peak = 0.0
weak_streak = 0
else:
trade_pnl = pnl_if_stay
trade_peak = peak_if_stay
rows.append((
idx.iloc[t], sig_out, est_out, avg_dist,
r_exec.iloc[t + 1] if t + 1 < len(r_exec) else np.nan,
))
sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"])
sig_df["Signal_prev"] = sig_df["Signal"].shift(1).fillna(0)
trade_chg = (sig_df["Signal"] - sig_df["Signal_prev"]).abs()
cost = trade_chg * fee
sig_df["PnL"] = sig_df["Signal"] * sig_df["Ret+1"] - cost
sig_df.drop(columns=["Signal_prev"], inplace=True)
stats = drawdown_stats_simple(sig_df["PnL"])
stats.update({
"HitRate_%": round(
100 * ((sig_df["PnL"] > 0).sum() / max(1, sig_df["PnL"].notna().sum())), 2
),
"AvgTradeRet_bps": round(sig_df["PnL"].mean() * 10000, 2),
"Turnover_%/step": round(100 * trade_chg.mean(), 2),
"N_Steps": int(sig_df.shape[0]),
"theta_entry": theta_entry,
"theta_exit": (None if theta_exit is None else float(theta_exit)),
"sl_bps": (None if sl_bps is None else float(sl_bps)),
"tp_bps": (None if tp_bps is None else float(tp_bps)),
"trail_bps": (None if trail_bps is None else float(trail_bps)),
"time_stop_bars": (None if time_stop_bars is None else int(time_stop_bars)),
"weak_days_exit": (None if weak_days_exit is None else int(weak_days_exit)),
})
return sig_df, stats
# =============================================================================
# QUALITY SCORING (per pattern_signals.xlsx)
# =============================================================================
def _add_quality_scores(df: pd.DataFrame) -> pd.DataFrame:
"""Aggiunge OutcomeScore, SimilarityScore, QualityScore al DataFrame."""
out = df.copy()
conf = pd.to_numeric(out.get("Confidence", np.nan), errors="coerce")
est = pd.to_numeric(out.get("EstOutcome", np.nan), errors="coerce")
dist = pd.to_numeric(out.get("AvgDist", np.nan), errors="coerce")
max_abs_est = np.nanmax(np.abs(est)) if (
np.isfinite(np.nanmax(np.abs(est))) and (np.nanmax(np.abs(est)) > 0)
) else np.nan
outcome_score = np.where(
np.isnan(max_abs_est) | (max_abs_est == 0),
np.nan,
np.abs(est) / max_abs_est,
)
similarity_score = 1.0 / (1.0 + dist.astype(float))
confidence_score = conf.astype(float)
quality = confidence_score * similarity_score * outcome_score
out["OutcomeScore"] = np.round(outcome_score, 4)
out["SimilarityScore"] = np.round(similarity_score, 4)
out["QualityScore"] = np.round(quality, 4)
return out
# =============================================================================
# SCORING / RANKING (utility per scelta ISIN)
# =============================================================================
def _safe_rank(s: pd.Series) -> pd.Series:
"""Rank robusto su Series con possibili NaN."""
s = pd.to_numeric(s, errors="coerce")
if s.notna().sum() == 0:
return pd.Series(np.zeros(len(s)), index=s.index)
s_filled = s.fillna(s.median())
return s_filled.rank(method="average")
def _apply_score(df: pd.DataFrame, weights: Optional[Dict[str, float]] = None) -> pd.DataFrame:
"""
Calcola il punteggio aggregato per ranking degli ISIN.
Combina Sharpe, CAGR, Calmar, HitRate, QualityScore con pesi configurabili.
"""
out = df.copy()
if weights is None:
weights = SCORE_WEIGHTS or {
"Sharpe": 0.30, "CAGR_%": 0.20, "Calmar": 0.20,
"HitRate_%": 0.15, "QualityScore": 0.15,
}
ranks = {}
for col, w in weights.items():
if col not in out.columns or w == 0:
continue
s = _coerce_num(out[col])
if s.notna().sum() == 0:
continue
ranks[col] = _safe_rank(s) * float(w)
if not ranks:
out["Score"] = np.nan
return out
rank_df = pd.DataFrame(ranks)
out["Score"] = rank_df.sum(axis=1)
return out
# =============================================================================
# PORTFOLIO BUILDING - EW, RP, EW_v2, RP_v2 (Config B)
# =============================================================================
def equity_from_returns(r: pd.Series) -> pd.Series:
"""Equity curve a base 100 da serie di rendimenti."""
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
return (1 + r).cumprod() * 100
def monthly_returns(r: pd.Series) -> pd.Series:
"""Rendimenti mensili composti."""
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
if not isinstance(r.index, (pd.DatetimeIndex, pd.PeriodIndex, pd.TimedeltaIndex)):
try:
r.index = pd.to_datetime(r.index)
except Exception:
return pd.Series(dtype=float)
return (1 + r).resample("M").prod() - 1
def plot_heatmap_monthly(r: pd.Series, title: str, save_path: Optional[str] = None) -> None:
"""Heatmap mensile dei rendimenti (anni x mesi)."""
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
m = monthly_returns(r)
if m.empty:
return
df = m.to_frame("ret")
df["Year"], df["Month"] = df.index.year, df.index.month
pv = df.pivot(index="Year", columns="Month", values="ret")
fig, ax = plt.subplots(figsize=(10, 6))
im = ax.imshow(pv.fillna(0) * 100, cmap="RdYlGn", vmin=-3, vmax=5, aspect="auto")
for i in range(pv.shape[0]):
for j in range(pv.shape[1]):
val = pv.iloc[i, j]
if not np.isnan(val):
ax.text(j, i, f"{val * 100:.1f}", ha="center", va="center", fontsize=8)
ax.set_title(title)
ax.set_xlabel("Mese")
ax.set_ylabel("Anno")
ax.set_xticks(range(12))
ax.set_xticklabels(range(1, 13))
fig.colorbar(im, ax=ax, label="%")
plt.tight_layout()
if save_path:
savefig_safe(save_path, dpi=150)
plt.close(fig)
def inverse_vol_weights(
df: pd.DataFrame, window: int = 60, max_weight: Optional[float] = None
) -> pd.DataFrame:
"""Pesi inverse-volatility con cap opzionale per singolo asset."""
vol = df.rolling(window).std()
inv = 1 / vol.replace(0, np.nan)
w = inv.div(inv.sum(axis=1), axis=0)
w = w.ffill().fillna(1 / max(1, df.shape[1]))
if max_weight is not None:
w = w.clip(upper=max_weight)
return w
def make_active_weights(
w_base: pd.DataFrame,
sig: pd.DataFrame,
renorm_to_1: bool = False,
add_cash: bool = True,
cash_label: str = "Cash",
) -> pd.DataFrame:
"""
Filtra i pesi sui soli ISIN attivi (signal=1).
- renorm_to_1=False: lascia la quota inattiva come Cash
- renorm_to_1=True: rialloca interamente sugli attivi
"""
if w_base is None or w_base.empty:
return pd.DataFrame(index=sig.index, columns=[])
W = w_base.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0)
S = sig.reindex_like(W).fillna(0).astype(int)
W_active = W * (S > 0)
row_sum = W_active.sum(axis=1)
if renorm_to_1:
W_active = W_active.div(row_sum.replace(0, np.nan), axis=0).fillna(0.0)
if add_cash:
W_active[cash_label] = 0.0
else:
if add_cash:
cash = (1.0 - row_sum).clip(lower=0.0, upper=1.0)
W_active[cash_label] = cash
keep = [c for c in W_active.columns if W_active[c].abs().sum() > 0]
return W_active[keep]
def _sanitize_weights(w: pd.DataFrame) -> pd.DataFrame:
"""Sanifica una matrice di pesi (rimuove NaN/Inf, normalizza per riga)."""
if w is None or w.empty:
return w
out = w.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0)
out = out.replace([np.inf, -np.inf], 0.0).clip(lower=0.0)
row_sum = out.sum(axis=1)
out = out.div(row_sum.replace(0, np.nan), axis=0).fillna(0.0)
return out
def _export_weights_daily(
weights_dict: Dict[str, pd.DataFrame],
out_xlsx: Path,
) -> None:
"""Esporta i pesi giornalieri in un Excel multi-foglio."""
if not weights_dict:
return
try:
with pd.ExcelWriter(out_xlsx) as xw:
for name, w in weights_dict.items():
if w is None or w.empty:
continue
sheet = str(name)[:31]
w.to_excel(xw, sheet_name=sheet)
print(f"[INFO] Salvato: {out_xlsx}")
except Exception as exc:
print(f"[WARN] Export weights_daily fallito: {exc}")
# =============================================================================
# DYNAMIC PORTFOLIO BUILDER (ranking rolling)
# =============================================================================
_dynamic_portfolio_cache: Dict[int, Dict] = {}
def _build_dynamic_portfolio_returns(
wide_pnl: pd.DataFrame,
wide_sig: pd.DataFrame,
wide_est: pd.DataFrame,
top_n: int,
window_bars: int = None,
rp_lookback: int = None,
) -> Dict[str, Any]:
"""
Costruisce i portafogli dinamici EW e RP con ranking rolling.
Per ogni data sceglie i top_n ISIN per EstOutcome medio nella window.
REGOLA DI SIZING (corretta v2.1.1):
- Equal Weight: ogni ISIN nel target riceve peso 1/top_n, INDIPENDENTEMENTE
dal numero di ISIN effettivamente selezionati. Se ci sono meno di top_n
candidati con segnale, la quota non investita resta in CASH. Questo evita
la concentrazione 100% su un singolo ISIN nei periodi iniziali con pochi
dati.
- Risk Parity: pesi inverse-volatility con cap rp_max_weight, NORMALIZZATI a
sum=1/top_n*N (non sum=1). Cosi' la somma dei pesi attivi non supera
top_n*rp_max_weight e il deficit resta in cash. Evita la violazione del
cap che avveniva con la rinormalizzazione a 1.
"""
window_bars = window_bars or RANKING_WINDOW_BARS
rp_lookback = rp_lookback or RP_LOOKBACK
if wide_pnl is None or wide_pnl.empty:
idx = pd.Index([])
empty_w = pd.DataFrame(index=idx, columns=[])
return {
"ret_eq": pd.Series(dtype=float),
"ret_rp": pd.Series(dtype=float),
"w_eq": empty_w, "w_rp": empty_w,
"w_eq_act": empty_w, "w_rp_act": empty_w,
"selection": {},
}
dates = wide_pnl.index.sort_values()
all_cols = wide_pnl.columns.tolist()
w_eq = pd.DataFrame(0.0, index=dates, columns=all_cols)
w_rp = pd.DataFrame(0.0, index=dates, columns=all_cols)
selection = {}
# Peso EW fisso per slot: 1/top_n. Cosi' anche con 1 solo segnale,
# quell'ISIN prende 1/top_n e il resto va in cash.
ew_per_slot = 1.0 / max(1, top_n)
for dt in dates:
sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float)
on_cols = [c for c in all_cols if sig_row.get(c, 0) == 1]
if not on_cols:
selection[dt] = []
continue
window_est = wide_est.loc[:dt].tail(window_bars) if not wide_est.empty else pd.DataFrame()
scores = []
for c in on_cols:
s = pd.to_numeric(window_est[c], errors="coerce") if c in window_est.columns else pd.Series(dtype=float)
est_score = s.mean(skipna=True)
if pd.isna(est_score):
continue
scores.append((c, est_score))
if not scores:
selection[dt] = []
continue
scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True)
base_isins_dt = [c for c, _ in scores_sorted[:top_n]]
selection[dt] = base_isins_dt
if not base_isins_dt:
continue
# --- Equal Weight ---
# FIX v2.1.1: peso = 1/top_n per ciascun candidato, NON 1/len(candidates).
# Cosi' il peso massimo per asset e' sempre <= 1/top_n e non si genera
# concentrazione 100% quando i candidati sono pochi.
w_eq.loc[dt, base_isins_dt] = ew_per_slot
# --- Risk Parity ---
# Pesi inv-vol con cap RP_MAX_WEIGHT, somma target = N_candidati/top_n
# (cioe' la stessa esposizione totale di EW: 100% con top_n candidati,
# frazione minore con meno candidati).
window_pnl = wide_pnl.loc[:dt].tail(window_bars)
rp_hist = window_pnl[base_isins_dt]
rp_w = inverse_vol_weights(rp_hist, window=rp_lookback, max_weight=RP_MAX_WEIGHT)
if not rp_w.empty:
last = rp_w.iloc[-1].fillna(0.0)
last_sum = float(last.sum())
if last_sum > 0:
# Target esposizione totale: N_candidati/top_n (= 1 se candidati=top_n)
target_total = len(base_isins_dt) / max(1, top_n)
# Normalizziamo a target_total, MA mantenendo il cap
scaled = last * (target_total / last_sum)
# Riapplica il cap dopo lo scaling
scaled = scaled.clip(upper=RP_MAX_WEIGHT)
w_rp.loc[dt, scaled.index] = scaled.values
w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
ret_eq = (wide_pnl * w_eq_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
ret_rp = (wide_pnl * w_rp_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
return {
"ret_eq": ret_eq, "ret_rp": ret_rp,
"w_eq": w_eq, "w_rp": w_rp,
"w_eq_act": w_eq_act, "w_rp_act": w_rp_act,
"selection": selection,
}
def _get_dynamic_portfolio(
top_n: int, wide_pnl: pd.DataFrame, wide_sig: pd.DataFrame, wide_est: pd.DataFrame,
) -> Dict[str, Any]:
"""Cache-aware getter del portafoglio dinamico per un dato TopN."""
cache_key = top_n
if cache_key not in _dynamic_portfolio_cache:
_dynamic_portfolio_cache[cache_key] = _build_dynamic_portfolio_returns(
wide_pnl=wide_pnl, wide_sig=wide_sig, wide_est=wide_est, top_n=top_n,
window_bars=RANKING_WINDOW_BARS, rp_lookback=RP_LOOKBACK,
)
return _dynamic_portfolio_cache[cache_key]
# =============================================================================
# COMPOSITION PLOTS
# =============================================================================
def plot_portfolio_composition(
weights: pd.DataFrame,
title: str,
save_path: Optional[str] = None,
max_legend: int = 12,
) -> None:
"""Stacked area dei pesi per ISIN nel tempo."""
import os
if weights is None or weights.empty:
print(f"[SKIP] Nessun peso disponibile per: {title}")
return
W = weights.copy()
if W.index.has_duplicates:
W = W[~W.index.duplicated(keep="last")]
W = W.sort_index()
W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0)
keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0]
if not keep_cols:
print(f"[SKIP] Tutte le colonne hanno peso zero per: {title}")
return
W = W[keep_cols]
row_sum = W.sum(axis=1)
with np.errstate(invalid="ignore", divide="ignore"):
W = W.div(row_sum.replace(0.0, np.nan), axis=0).fillna(0.0).clip(lower=0.0)
if len(W.index) < 2 or W.shape[1] == 0:
print(f"[SKIP] Serie troppo corta per: {title}")
return
avg_w = W.mean(axis=0).sort_values(ascending=False)
ordered = avg_w.index.tolist()
if len(ordered) > max_legend:
head, tail = ordered[:max_legend], ordered[max_legend:]
W_show = W[head].copy()
W_show["Altri"] = W[tail].sum(axis=1)
ordered = head + ["Altri"]
else:
W_show = W[ordered].copy()
cmap = plt.colormaps.get_cmap("tab20")
palette = [cmap(i % cmap.N) for i in range(len(ordered))]
fig, ax = plt.subplots(figsize=(11, 6))
ax.stackplot(W_show.index, [W_show[c].values for c in ordered],
labels=ordered, colors=palette)
ax.set_title(f"Composizione portafoglio nel tempo - {title}")
ax.set_ylim(0, 1)
ax.grid(True, alpha=0.3)
ax.set_ylabel("Peso")
yticks = ax.get_yticks()
ax.set_yticklabels([f"{y * 100:.0f}%" for y in yticks])
ncol = 2 if len(ordered) > 10 else 1
ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1),
frameon=False, ncol=ncol, title="ISIN")
fig.tight_layout()
if save_path:
savefig_safe(save_path, dpi=150)
plt.close(fig)
def plot_portfolio_composition_fixed(
weights: pd.DataFrame,
title: str,
save_path: Optional[str] = None,
) -> None:
"""
Variante: stack ATTIVI vs CASH (somma % invested vs cash %).
Utile per vedere quando il sistema e' "sotto-investito".
"""
if weights is None or weights.empty:
return
W = weights.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0)
if "Cash" not in W.columns:
W["Cash"] = (1.0 - W.sum(axis=1)).clip(lower=0.0, upper=1.0)
invested = 1.0 - W["Cash"]
cash = W["Cash"]
fig, ax = plt.subplots(figsize=(11, 4))
ax.fill_between(W.index, 0, invested, color="#2A4D7A",
alpha=0.7, label="Investito")
ax.fill_between(W.index, invested, 1.0, color="#D3D1C7",
alpha=0.7, label="Cash")
ax.set_ylim(0, 1)
ax.set_title(f"Composizione Attivi vs Cash - {title}")
ax.set_ylabel("Quota portafoglio")
ax.legend(loc="upper right")
ax.grid(True, alpha=0.3)
fig.tight_layout()
if save_path:
savefig_safe(save_path, dpi=150)
plt.close(fig)
# =============================================================================
# TRADES REPORT (round-trip OPEN/CLOSE)
# =============================================================================
def make_trades_report(
bt_signals_df: pd.DataFrame,
meta_df: pd.DataFrame,
weights_dict: Dict[str, pd.DataFrame],
) -> Dict[str, pd.DataFrame]:
"""
Genera il trade report per ogni strategia: ogni riga rappresenta un trade
round-trip OPEN -> CLOSE con date, durata, PnL.
LOGICA INVARIATA rispetto all'originale.
"""
out: Dict[str, pd.DataFrame] = {}
if bt_signals_df is None or bt_signals_df.empty:
return out
bt = bt_signals_df.copy()
bt["Date"] = pd.to_datetime(bt["Date"])
bt["ISIN"] = bt["ISIN"].astype(str).str.strip()
bt["Signal"] = pd.to_numeric(bt["Signal"], errors="coerce").fillna(0).astype(int)
bt = bt.sort_values(["ISIN", "Date"])
isin_names = meta_df.set_index("ISIN")[["Nome", "Asset Class", "Categoria"]].to_dict("index")
for strat_name, w_df in weights_dict.items():
if w_df is None or w_df.empty:
continue
w_clean = w_df.drop(columns=["Cash"], errors="ignore")
trades = []
for isin, g in bt.groupby("ISIN"):
g = g.sort_values("Date")
sig = g["Signal"].values
dates = g["Date"].values
ret_next = g["Ret+1"].fillna(0.0).values
in_pos = False
entry_d = None
entry_idx = None
for i in range(len(sig)):
if sig[i] == 1 and not in_pos:
in_pos = True
entry_d = dates[i]
entry_idx = i
elif sig[i] == 0 and in_pos:
in_pos = False
exit_d = dates[i]
# PnL del trade come prodotto dei (1+r) escluso il giorno di chiusura
trade_rets = ret_next[entry_idx:i]
pnl = float(np.prod(1 + trade_rets) - 1) if len(trade_rets) > 0 else 0.0
duration = (pd.Timestamp(exit_d) - pd.Timestamp(entry_d)).days
# Peso medio durante il trade (nella strategia)
avg_w = np.nan
if isin in w_clean.columns:
try:
mask = (w_clean.index >= entry_d) & (w_clean.index <= exit_d)
avg_w = float(w_clean.loc[mask, isin].mean())
except Exception:
pass
info = isin_names.get(isin, {})
trades.append({
"ISIN": isin,
"Nome": info.get("Nome"),
"Asset Class": info.get("Asset Class"),
"Categoria": info.get("Categoria"),
"EntryDate": pd.Timestamp(entry_d).date(),
"ExitDate": pd.Timestamp(exit_d).date(),
"Duration_days": int(duration),
"PnL_%": round(pnl * 100, 4),
"AvgWeight": round(avg_w, 4) if np.isfinite(avg_w) else np.nan,
"Contrib_%": round(pnl * avg_w * 100, 4) if np.isfinite(avg_w) else np.nan,
})
# Eventuale trade ancora aperto a fine periodo
if in_pos and entry_idx is not None:
trade_rets = ret_next[entry_idx:]
pnl = float(np.prod(1 + trade_rets) - 1) if len(trade_rets) > 0 else 0.0
info = isin_names.get(isin, {})
trades.append({
"ISIN": isin,
"Nome": info.get("Nome"),
"Asset Class": info.get("Asset Class"),
"Categoria": info.get("Categoria"),
"EntryDate": pd.Timestamp(entry_d).date(),
"ExitDate": None,
"Duration_days": np.nan,
"PnL_%": round(pnl * 100, 4),
"AvgWeight": np.nan,
"Contrib_%": np.nan,
})
if trades:
out[strat_name] = pd.DataFrame(trades)
return out
def _build_performance_attribution(
trades_dict: Dict[str, pd.DataFrame],
) -> pd.DataFrame:
"""Tabella di attribution: contribuzione per asset class e per strategia."""
rows = []
for strat, df in trades_dict.items():
if df is None or df.empty:
continue
for ac, g in df.groupby("Asset Class"):
rows.append({
"Strategy": strat,
"Asset Class": ac,
"N_Trades": len(g),
"AvgPnL_%": round(g["PnL_%"].mean(), 3),
"SumPnL_%": round(g["PnL_%"].sum(), 2),
"Contrib_%": round(g["Contrib_%"].sum(), 3) if "Contrib_%" in g.columns else np.nan,
})
return pd.DataFrame(rows)
def rebuild_daily_from_trades_dict(
trades_dict: Dict[str, pd.DataFrame],
date_index: pd.DatetimeIndex,
) -> pd.DataFrame:
"""
Ricostruisce il PnL giornaliero per strategia partendo dai trade.
PnL del trade attribuito proporzionalmente sui giorni di holding.
"""
if not trades_dict:
return pd.DataFrame(index=date_index)
daily = pd.DataFrame(0.0, index=date_index, columns=list(trades_dict.keys()))
for strat, df in trades_dict.items():
if df is None or df.empty:
continue
for _, t in df.iterrows():
entry = pd.Timestamp(t["EntryDate"]) if pd.notnull(t.get("EntryDate")) else None
exit_d = pd.Timestamp(t["ExitDate"]) if pd.notnull(t.get("ExitDate")) else date_index[-1]
if entry is None:
continue
mask = (date_index > entry) & (date_index <= exit_d)
n_days = int(mask.sum())
if n_days == 0:
continue
contrib = float(t.get("Contrib_%", t.get("PnL_%", 0.0)) or 0.0) / 100.0
daily.loc[mask, strat] += contrib / n_days
return daily
# =============================================================================
# METRICHE FINALI PER TOP-N SWEEP (sezione 6 originale)
# =============================================================================
def _calc_all_metrics_from_returns(returns: pd.Series, label: str = "") -> Dict[str, Any]:
"""Bundle completo di metriche per una singola serie di rendimenti."""
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
if r.empty:
return {"Portfolio": label}
base = portfolio_metric_row(label, r)
dd_info = drawdown_metrics(r)
heal = heal_index_metrics(r)
h_min = h_min_100(r)
base.update({
"AvgDD_%": dd_info.get("AvgDD_%"),
"MaxDD_len": dd_info.get("MaxDD_len"),
"RecoverDays_avg": dd_info.get("RecoverDays_avg"),
"AreaAboveWater": heal.get("AreaAboveWater"),
"AreaUnderWater": heal.get("AreaUnderWater"),
"h_min_100": round(h_min * 100, 2) if np.isfinite(h_min) else np.nan,
})
return base
def _select_isins_for_topN(df_sum: pd.DataFrame, top_n: int) -> List[str]:
"""Selezione full-sample dei top_n ISIN per Score (legacy)."""
if df_sum is None or df_sum.empty or "Score" not in df_sum.columns:
return []
return (
df_sum.sort_values("Score", ascending=False)
.head(top_n)["ISIN"].astype(str).str.strip().tolist()
)
def _build_portfolio_returns_for_isins(
isins: List[str],
wide_pnl: pd.DataFrame,
wide_sig: pd.DataFrame,
sizing: str = "equal_weight",
) -> pd.Series:
"""Costruisce i rendimenti di portafoglio per una lista di ISIN."""
if not isins:
return pd.Series(dtype=float, index=wide_pnl.index if wide_pnl is not None else None)
cols = [c for c in isins if c in wide_pnl.columns]
if not cols:
return pd.Series(dtype=float, index=wide_pnl.index)
sub_pnl = wide_pnl[cols]
sub_sig = wide_sig[cols] if all(c in wide_sig.columns for c in cols) else pd.DataFrame(
1, index=sub_pnl.index, columns=cols,
)
if sizing == "risk_parity":
w = inverse_vol_weights(sub_pnl, window=RP_LOOKBACK, max_weight=RP_MAX_WEIGHT)
else:
w = pd.DataFrame(1.0 / len(cols), index=sub_pnl.index, columns=cols)
w_act = make_active_weights(w, sub_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
return (sub_pnl * w_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
# =============================================================================
# STRATEGY CONFIGURATIONS (4 strategie operative: EW, RP, EW_v2, RP_v2)
# =============================================================================
STRATEGY_PARAMS: Dict[str, Dict[str, Any]] = {
"Equal_Weight": {
"sizing": "equal_weight",
"sl_bps": SIGNALS_CONFIG.get("sl_bps", 300.0),
"tp_bps": SIGNALS_CONFIG.get("tp_bps", 800.0),
"trail_bps": SIGNALS_CONFIG.get("trail_bps", 300.0),
"time_stop_bars": SIGNALS_CONFIG.get("time_stop_bars", 20),
},
"Risk_Parity": {
"sizing": "risk_parity",
"sl_bps": SIGNALS_CONFIG.get("sl_bps", 300.0),
"tp_bps": SIGNALS_CONFIG.get("tp_bps", 800.0),
"trail_bps": SIGNALS_CONFIG.get("trail_bps", 300.0),
"time_stop_bars": SIGNALS_CONFIG.get("time_stop_bars", 20),
},
# Le strategie _v2 implementano la Config B (grid search out-of-sample).
# Note: nel backtest, min_holding_bars NON e' implementato a livello di
# knn_forward_backtest_one_asset (la logica e' compatibile solo con il
# motore di produzione). Le _v2 nel backtest usano TP/TRAIL piu' larghi
# ma stesso schema di entry/exit del legacy.
"Equal_Weight_v2": {
"sizing": "equal_weight",
"sl_bps": 300.0,
"tp_bps": 1200.0,
"trail_bps": 200.0,
"time_stop_bars": 20,
},
"Risk_Parity_v2": {
"sizing": "risk_parity",
"sl_bps": 300.0,
"tp_bps": 1200.0,
"trail_bps": 200.0,
"time_stop_bars": 20,
},
}
def _override_strategy_params_from_config() -> None:
"""Applica eventuali override dalla sezione 'strategies' del config."""
if not STRATEGIES_CONFIG:
return
for name, spec in STRATEGIES_CONFIG.items():
if name.startswith("_") or not isinstance(spec, dict):
continue
if name not in STRATEGY_PARAMS:
# Strategia definita solo in config: aggiungi
STRATEGY_PARAMS[name] = {"sizing": spec.get("sizing", "equal_weight")}
params = spec.get("params", {}) or {}
for k, v in params.items():
STRATEGY_PARAMS[name][k] = v
if "sizing" in spec:
STRATEGY_PARAMS[name]["sizing"] = spec["sizing"]
_override_strategy_params_from_config()
# =============================================================================
# MAIN PIPELINE
# =============================================================================
def main() -> None:
"""
Esegue la pipeline end-to-end. Wrapped in main() per:
- poter essere importato senza side effect
- permettere try/finally puliti sull'engine DB
- facilitare il testing
"""
start_all = time.perf_counter()
# ---- 1) UNIVERSO ----
print(f"[INFO] Caricamento universo da {UNIVERSO_XLSX}")
universe = pd.read_excel(UNIVERSO_XLSX)
col_isin_uni = detect_column(universe, ["ISIN", "isin", "codice isin"])
if col_isin_uni is None:
raise ValueError("Nel file universo non trovo una colonna ISIN.")
col_name_uni = detect_column(universe, [
"Nome", "Name", "Descrizione", "Description",
"Security Name", "Instrument Name",
])
col_cat_uni = detect_column(universe, ["Categoria", "Category", "Classe", "Linea", "Tipo"])
col_ac_uni = detect_column(universe, [
"Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class",
])
isins = (
universe[col_isin_uni].astype(str).str.strip()
.replace("", pd.NA).dropna().drop_duplicates().tolist()
)
print(f"[INFO] ISIN totali in universo: {len(isins)}")
meta_df = pd.DataFrame({"ISIN": universe[col_isin_uni].astype(str).str.strip()})
meta_df["Nome"] = universe[col_name_uni] if col_name_uni else None
meta_df["Categoria"] = universe[col_cat_uni] if col_cat_uni else None
meta_df["Asset Class"] = universe[col_ac_uni] if col_ac_uni else None
meta_df = meta_df.drop_duplicates(subset=["ISIN"]).reset_index(drop=True)
# ---- 2) CONNESSIONE DB ----
conn_str = read_connection_txt("connection.txt")
engine = sa.create_engine(conn_str, fast_executemany=True)
print("[INFO] Connessione pronta (SQLAlchemy + pyodbc).")
sql_sp = text(f"EXEC {STORED_PROC} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
# ---- 3) CARICAMENTO DATI (una sola volta per ISIN) ----
print("[INFO] Caricamento dati per tutti gli ISIN (singola passata)...")
t_data = time.perf_counter()
assets_data, load_errors = load_all_assets(isins, engine, sql_sp, N_BARS, PTF_CURR)
errors: List[Dict[str, str]] = list(load_errors)
print(f"[TIMER] Caricamento DB: {format_eta(time.perf_counter() - t_data)}")
# ---- 4) HURST + PATTERN ANALYSIS (informativo) ----
print("[INFO] Calcolo Hurst + Pattern recognition (informativi)...")
t_hp = time.perf_counter()
hurst_rows: List[Dict[str, Any]] = []
pattern_rows: List[Dict[str, Any]] = []
last_dates: List[pd.Timestamp] = []
for i, (isin, df_isin) in enumerate(assets_data.items(), 1):
try:
col_date = df_isin.attrs.get("col_date")
r = compute_returns_decimal(df_isin)
if r.empty or len(r) < max(200, WP + HA + 10):
continue
# Hurst (solo informativo - regime classification)
h_rs = hurst_rs_returns(r)
h_dfa = hurst_dfa_returns(r)
H = np.nanmedian([h_rs, h_dfa])
H = clamp01(H) if np.isfinite(H) else np.nan
if pd.isna(H):
regime = None
elif H < 0.45:
regime = "mean_reversion"
elif H > 0.55:
regime = "breakout"
else:
regime = "neutral"
# Pattern characterization (informativo)
lib_wins, lib_out = build_pattern_library(r, WP, HA, embargo=EMBARGO)
date_last = df_isin[col_date].iloc[-1] if col_date else None
if date_last is not None:
last_dates.append(pd.to_datetime(date_last))
if lib_wins is None or len(r) < WP + HA:
ptype, pconf = characterize_window(
r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT
)
signal, est_out, avg_dist = 0, np.nan, np.nan
else:
curr_zn = z_norm(r.values[-WP:])
if curr_zn is None:
ptype, pconf = characterize_window(
r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT
)
signal, est_out, avg_dist = 0, np.nan, np.nan
else:
est_out, avg_dist, _ = predict_from_library(
curr_zn, lib_wins, lib_out, k=KNN_K
)
# NOTA v2.1: usa THETA globale, NON H/100
signal = 1 if est_out > THETA else 0
ptype, pconf = characterize_window(
r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT
)
hurst_rows.append({
"ISIN": isin,
"Hurst": None if pd.isna(H) else round(float(H), 4),
"Regime": regime,
})
pattern_rows.append({
"ISIN": isin, "DateLast": date_last,
"PatternType": ptype,
"Signal": {1: "long", -1: "short", 0: "flat"}.get(int(signal), "flat"),
"Confidence": None if pconf is None else round(float(min(1.0, max(0.0, pconf))), 3),
"EstOutcome": None if pd.isna(est_out) else float(est_out),
"AvgDist": None if pd.isna(avg_dist) else float(avg_dist),
"Wp": WP, "Ha": HA, "k": KNN_K,
})
if i % 10 == 0:
print(f" [HP] {i}/{len(assets_data)} ISIN analizzati")
except Exception as exc:
errors.append({"ISIN": isin, "Errore": f"Hurst/Pattern: {exc}"})
print(f"[TIMER] Hurst+Pattern: {format_eta(time.perf_counter() - t_hp)}")
# ---- 4A) EXPORT HURST + PATTERN ----
hurst_df = pd.DataFrame(hurst_rows) if hurst_rows else pd.DataFrame(
{"ISIN": [], "Hurst": [], "Regime": []}
)
hurst_df["ISIN"] = hurst_df["ISIN"].astype(str).str.strip()
meta_df["ISIN"] = meta_df["ISIN"].astype(str).str.strip()
summary_hurst = meta_df.merge(hurst_df, on="ISIN", how="left")
cols_hurst = ["ISIN", "Nome", "Categoria", "Asset Class", "Hurst", "Regime"]
summary_hurst = summary_hurst[[c for c in cols_hurst if c in summary_hurst.columns]]
summary_hurst = summary_hurst.sort_values(
["Hurst", "ISIN"], na_position="last"
).reset_index(drop=True)
summary_hurst.to_excel(OUTPUT_HURST_XLSX, index=False)
pat_df = pd.DataFrame(pattern_rows) if pattern_rows else pd.DataFrame()
if not pat_df.empty:
pat_df["ISIN"] = pat_df["ISIN"].astype(str).str.strip()
summary_pattern = (
meta_df.merge(hurst_df, on="ISIN", how="left").merge(pat_df, on="ISIN", how="left")
)
wanted = ["ISIN", "Nome", "Categoria", "Asset Class", "Hurst", "Regime",
"DateLast", "PatternType", "Signal", "Confidence",
"EstOutcome", "AvgDist", "Wp", "Ha", "k"]
summary_pattern = summary_pattern[[c for c in wanted if c in summary_pattern.columns]]
summary_pattern = _add_quality_scores(summary_pattern)
sort_cols = [c for c in ["QualityScore", "Confidence", "OutcomeScore"]
if c in summary_pattern.columns]
if sort_cols:
summary_pattern = summary_pattern.sort_values(
sort_cols, ascending=[False] * len(sort_cols), na_position="last",
).reset_index(drop=True)
summary_pattern.to_excel(OUTPUT_PATTERN_XLSX, index=False)
print(f"[INFO] Salvato: {OUTPUT_HURST_XLSX} ({len(summary_hurst)} righe)")
print(f"[INFO] Salvato: {OUTPUT_PATTERN_XLSX} ({len(summary_pattern)} righe)")
# ---- 4B) BULK FETCH PREZZI OPEN PER ESECUZIONE t+1 ----
print("[INFO] Bulk fetch prezzi open/close per esecuzione t+1...")
t_px = time.perf_counter()
exec_rets_map: Dict[str, pd.Series] = {}
if assets_data:
# Trova range di date globale
all_dates = []
for df_isin in assets_data.values():
cd = df_isin.attrs.get("col_date")
if cd and cd in df_isin.columns:
d_min = pd.to_datetime(df_isin[cd]).min()
d_max = pd.to_datetime(df_isin[cd]).max()
if pd.notnull(d_min) and pd.notnull(d_max):
all_dates.append((d_min, d_max))
if all_dates:
date_min = min(d[0] for d in all_dates).date().isoformat()
date_max = max(d[1] for d in all_dates).date().isoformat()
print(f" range fetch: {date_min} -> {date_max}, {len(assets_data)} ISIN")
try:
px_hist_all = fetch_price_history(
isins=list(assets_data.keys()),
universe=meta_df,
start_date=date_min, end_date=date_max,
)
if not px_hist_all.empty:
for isin, g in px_hist_all.groupby("ISIN"):
g = g.sort_values("Date")
open_series = g[["Date", "Open"]].dropna()
open_series["Date"] = pd.to_datetime(open_series["Date"]).dt.normalize()
open_series = (
open_series.drop_duplicates(subset=["Date"])
.set_index("Date")["Open"]
)
exec_rets_map[isin] = open_series.pct_change()
except Exception as exc:
print(f"[WARN] Bulk fetch prezzi fallito: {exc}")
print(f"[TIMER] Bulk fetch: {format_eta(time.perf_counter() - t_px)}")
# ---- 4C) FORWARD-BACKTEST (riusa assets_data) ----
print("[INFO] Walk-forward kNN backtest per tutte le strategie...")
t_bt = time.perf_counter()
# Per ogni strategia, mantieni i risultati separati
bt_signals_by_strat: Dict[str, List[pd.DataFrame]] = {s: [] for s in STRATEGY_PARAMS}
bt_summary_by_strat: Dict[str, List[Dict]] = {s: [] for s in STRATEGY_PARAMS}
total_t = 0.0
for i, (isin, df_isin) in enumerate(assets_data.items(), 1):
t0 = time.perf_counter()
try:
df_pct, col_ret_pct = compute_returns_percent(df_isin)
col_date = df_isin.attrs.get("col_date")
if not col_ret_pct or df_pct[col_ret_pct].dropna().shape[0] < max(200, WP + HA + 10):
errors.append({"ISIN": isin, "Errore": "Serie troppo corta (BT)"})
continue
exec_ret = exec_rets_map.get(isin)
if exec_ret is not None and col_date and col_date in df_pct.columns:
idx_dates = pd.to_datetime(df_pct[col_date]).dt.normalize()
exec_ret = exec_ret.reindex(idx_dates)
exec_ret.index = idx_dates
# theta_entry = THETA globale (NO HURST v2.1)
theta_entry = THETA
# Esegui backtest per ogni strategia
for strat_name, strat_cfg in STRATEGY_PARAMS.items():
sig_df, stats = knn_forward_backtest_one_asset(
df_isin=df_pct,
col_date=col_date if col_date else "Date",
col_ret=col_ret_pct,
Wp=WP, Ha=HA, k=KNN_K,
theta_entry=theta_entry,
exec_ret=exec_ret,
fee_bps=10,
sl_bps=strat_cfg.get("sl_bps"),
tp_bps=strat_cfg.get("tp_bps"),
trail_bps=strat_cfg.get("trail_bps"),
time_stop_bars=strat_cfg.get("time_stop_bars"),
)
name = meta_df.loc[meta_df["ISIN"] == isin, "Nome"].iloc[0] if (
meta_df["ISIN"] == isin
).any() else None
cat = meta_df.loc[meta_df["ISIN"] == isin, "Categoria"].iloc[0] if (
meta_df["ISIN"] == isin
).any() else None
ac = meta_df.loc[meta_df["ISIN"] == isin, "Asset Class"].iloc[0] if (
meta_df["ISIN"] == isin
).any() else None
tmp = sig_df.copy()
tmp.insert(0, "ISIN", isin)
tmp.insert(1, "Nome", name)
tmp.insert(2, "Categoria", cat)
tmp.insert(3, "Asset Class", ac)
tmp["Wp"] = WP
tmp["Ha"] = HA
tmp["k"] = KNN_K
tmp["Theta"] = theta_entry
bt_signals_by_strat[strat_name].append(tmp)
stats_row = {"ISIN": isin, "Nome": name, "Categoria": cat, "Asset Class": ac}
stats_row.update(stats)
bt_summary_by_strat[strat_name].append(stats_row)
except Exception as exc:
errors.append({"ISIN": isin, "Errore": f"Backtest: {exc}"})
dt = time.perf_counter() - t0
total_t += dt
if i % 10 == 0 or i == len(assets_data):
avg_t = total_t / i
eta = avg_t * (len(assets_data) - i)
print(f" [BT] {i}/{len(assets_data)} ISIN "
f"(avg {avg_t:.2f}s, ETA {format_eta(eta)})")
print(f"[TIMER] Backtest totale: {format_eta(time.perf_counter() - t_bt)}")
# Aggrega risultati per strategia (usa la prima come "base" per coerenza)
# con il flusso originale che lavorava su EW only)
base_strat = "Equal_Weight"
bt_signals_df = (
pd.concat(bt_signals_by_strat[base_strat], ignore_index=True)
if bt_signals_by_strat[base_strat] else pd.DataFrame()
)
bt_summary_df = (
pd.DataFrame(bt_summary_by_strat[base_strat])
if bt_summary_by_strat[base_strat] else pd.DataFrame()
)
if not bt_signals_df.empty:
bt_signals_df.to_excel(FORWARD_BT_SIGNALS_XLSX, index=False)
bt_summary_df.to_excel(FORWARD_BT_SUMMARY_XLSX, index=False)
print(f"[INFO] Salvato: {FORWARD_BT_SIGNALS_XLSX} ({len(bt_signals_df):,} righe)")
print(f"[INFO] Salvato: {FORWARD_BT_SUMMARY_XLSX} ({len(bt_summary_df):,} righe)")
if errors:
pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False)
print(f"[INFO] Log errori: {ERROR_LOG_CSV} (tot: {len(errors)})")
try:
save_price_cache_summary(OPEN_CACHE_DIR, OPEN_CACHE_DIR / "prezzi_summary.xlsx")
except Exception as exc:
print(f"[WARN] Riepilogo prezzi non creato: {exc}")
# Timer per fasi post-backtest
start_post_timer(total_steps=4)
# ---- 5) STRATEGIE PORTAFOGLIO DINAMICHE ----
print("[INFO] Costruzione portafogli dinamici per le strategie attive...")
all_portfolios: Dict[str, Dict[str, Any]] = {}
all_returns: Dict[str, pd.Series] = {}
all_weights: Dict[str, pd.DataFrame] = {}
for strat_name, signals_list in bt_signals_by_strat.items():
if not signals_list:
continue
sigs = pd.concat(signals_list, ignore_index=True)
if sigs.empty:
continue
sigs["Date"] = pd.to_datetime(sigs["Date"])
sigs["ISIN"] = sigs["ISIN"].astype(str).str.strip()
sigs["Signal"] = pd.to_numeric(sigs["Signal"], errors="coerce").fillna(0).astype(int)
sigs["PnL"] = pd.to_numeric(sigs["PnL"], errors="coerce").fillna(0.0)
wide_pnl = sigs.pivot_table(
index="Date", columns="ISIN", values="PnL", aggfunc="sum"
).fillna(0.0)
wide_sig = sigs.pivot_table(
index="Date", columns="ISIN", values="Signal", aggfunc="last"
).fillna(0).astype(int)
wide_est = sigs.pivot_table(
index="Date", columns="ISIN", values="EstOutcome", aggfunc="last"
).sort_index()
port = _build_dynamic_portfolio_returns(
wide_pnl=wide_pnl, wide_sig=wide_sig, wide_est=wide_est,
top_n=TOP_N,
window_bars=RANKING_WINDOW_BARS, rp_lookback=RP_LOOKBACK,
)
sizing = STRATEGY_PARAMS[strat_name].get("sizing", "equal_weight")
ret_key = "ret_rp" if sizing == "risk_parity" else "ret_eq"
w_key = "w_rp_act" if sizing == "risk_parity" else "w_eq_act"
all_portfolios[strat_name] = port
all_returns[strat_name] = port[ret_key]
all_weights[strat_name] = port[w_key]
checkpoint_post_timer("Portafogli dinamici")
# ---- 5.4 EQUITY + HEATMAP per ogni strategia ----
print("[INFO] Equity curves e heatmap mensili...")
if all_returns:
plt.figure(figsize=(11, 6))
for name, r in all_returns.items():
eq = equity_from_returns(r)
plt.plot(eq.index, eq.values, label=name, linewidth=1.5)
plt.title(f"Equity Curve - Selezione dinamica Top{TOP_N}")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
savefig_safe(str(PLOT_DIR / "equity_line_portafogli.png"), dpi=150)
plt.close()
for name, r in all_returns.items():
plot_heatmap_monthly(
r, f"Heatmap mensile - {name}",
save_path=str(PLOT_DIR / f"heatmap_{name}.png"),
)
# Salva metriche portafogli
port_metrics_rows = [portfolio_metric_row(name, r) for name, r in all_returns.items()]
if port_metrics_rows:
pd.DataFrame(port_metrics_rows).to_excel(
OUTPUT_DIR / "portfolio_metrics.xlsx", index=False
)
print(f"[INFO] Salvato: {OUTPUT_DIR / 'portfolio_metrics.xlsx'}")
checkpoint_post_timer("Equity + heatmap")
# Composition plots
for name, w in all_weights.items():
plot_portfolio_composition(
w, f"{name} (Top{TOP_N})",
save_path=str(PLOT_DIR / f"composition_{name}.png"),
)
plot_portfolio_composition_fixed(
w, f"{name} (Top{TOP_N})",
save_path=str(PLOT_DIR / f"composition_cash_{name}.png"),
)
# Export pesi
_export_weights_daily(all_weights, WEIGHTS_DAILY_XLSX)
checkpoint_post_timer("Composition + weights")
# ---- 5.5 TRADE REPORT ----
print("[INFO] Trade report e performance attribution...")
if bt_signals_by_strat[base_strat]:
sigs_all = pd.concat(bt_signals_by_strat[base_strat], ignore_index=True)
trades_dict = make_trades_report(sigs_all, meta_df, all_weights)
if trades_dict:
try:
with pd.ExcelWriter(TRADES_REPORT_XLSX) as xw:
for name, df in trades_dict.items():
df.to_excel(xw, sheet_name=name[:31], index=False)
print(f"[INFO] Salvato: {TRADES_REPORT_XLSX}")
except Exception as exc:
print(f"[WARN] Export trades_report fallito: {exc}")
attr_df = _build_performance_attribution(trades_dict)
if not attr_df.empty:
attr_df.to_excel(PERF_ATTRIB_XLSX, index=False)
print(f"[INFO] Salvato: {PERF_ATTRIB_XLSX}")
# Ricostruzione daily da trades
if all_returns:
date_idx = next(iter(all_returns.values())).index
daily_from_trades = rebuild_daily_from_trades_dict(trades_dict, date_idx)
if not daily_from_trades.empty:
daily_from_trades.to_csv(DAILY_FROM_TRADES_CSV, index_label="Date")
daily_from_trades.to_excel(DAILY_FROM_TRADES_XLSX, index_label="Date")
print(f"[INFO] Salvato: {DAILY_FROM_TRADES_CSV}")
checkpoint_post_timer("Trade report")
# ---- 6) LOOP TOP-N SWEEP (metriche per N=6..20) ----
print("[INFO] Calcolo metriche finali per N=6..20...")
final_rows = []
if bt_signals_by_strat[base_strat]:
sigs_base = pd.concat(bt_signals_by_strat[base_strat], ignore_index=True)
sigs_base["Date"] = pd.to_datetime(sigs_base["Date"])
sigs_base["ISIN"] = sigs_base["ISIN"].astype(str).str.strip()
sigs_base["Signal"] = pd.to_numeric(sigs_base["Signal"], errors="coerce").fillna(0).astype(int)
sigs_base["PnL"] = pd.to_numeric(sigs_base["PnL"], errors="coerce").fillna(0.0)
wide_pnl_b = sigs_base.pivot_table(
index="Date", columns="ISIN", values="PnL", aggfunc="sum"
).fillna(0.0)
wide_sig_b = sigs_base.pivot_table(
index="Date", columns="ISIN", values="Signal", aggfunc="last"
).fillna(0).astype(int)
wide_est_b = sigs_base.pivot_table(
index="Date", columns="ISIN", values="EstOutcome", aggfunc="last"
).sort_index()
for n in range(6, 21):
port_n = _build_dynamic_portfolio_returns(
wide_pnl=wide_pnl_b, wide_sig=wide_sig_b, wide_est=wide_est_b,
top_n=n,
)
r_eq = port_n["ret_eq"]
r_rp = port_n["ret_rp"]
row_eq = _calc_all_metrics_from_returns(r_eq, f"EW_Top{n}")
row_rp = _calc_all_metrics_from_returns(r_rp, f"RP_Top{n}")
row_eq["TopN"] = n
row_rp["TopN"] = n
final_rows.extend([row_eq, row_rp])
if final_rows:
pd.DataFrame(final_rows).to_excel(FINAL_METRICS_XLSX, index=False)
print(f"[INFO] Salvato: {FINAL_METRICS_XLSX}")
# ---- WRAP UP ----
elapsed = time.perf_counter() - start_all
print(f"\n[DONE] Pipeline completata in {format_eta(elapsed)}")
print(f" Output in: {OUTPUT_DIR}")
print(f" Plot in: {PLOT_DIR}")
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
main()