Files
Trading/Trading Pattern Recon w Hurst.py
2025-12-09 20:35:39 +01:00

2662 lines
106 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 27 13:59:10 2025
Script end-to-end:
- Carica universo (Excel) e dati (DB) una sola volta
- Calcola Hurst + Pattern signals (solo long per i trade)
- Esegue walk-forward k-NN (solo long) e salva forward_bt_signals/summary
- Fase 5: selezione dinamica portafogli + Equity + Heatmap + Trade report (solo long)
- Fase 6: metriche finali (come "ottimizzatore") + salvataggio grafici su file
Note:
- Non rilegge file appena salvati; usa i DataFrame in memoria
- Salva: hurst_by_isin.xlsx, pattern_signals.xlsx, forward_bt_*.xlsx, trades_report.xlsx, final_metrics.xlsx
- Salva PNG: equity_line_portafogli.png, heatmap_*.png
"""
import pandas as pd
import numpy as np
import sqlalchemy as sa
from sqlalchemy import text
import matplotlib.pyplot as plt
from pathlib import Path
import json
import ssl
import re
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
from shared_utils import (
build_pattern_library,
characterize_window,
detect_column,
load_config,
predict_from_library,
read_connection_txt,
require_section,
require_value,
z_norm,
)
#from math import isfinite
import time
# =============================
# Plot saving helper (non-recursive)
# =============================
try:
import os as _os_sf
import matplotlib.pyplot as _plt_sf
except Exception:
_plt_sf = None
SAVE_PNG = globals().get("SAVE_PNG", True)
def savefig_safe(path, **kwargs):
"""
Save a matplotlib figure to disk safely, honoring SAVE_PNG.
Usage: savefig_safe("plot/myfig.png", dpi=150, bbox_inches="tight")
"""
if not SAVE_PNG or _plt_sf is None:
return
# Ensure directory exists
try:
d = _os_sf.path.dirname(path)
if d and not _os_sf.path.exists(d):
_os_sf.makedirs(d, exist_ok=True)
except Exception as _e:
print(f"[savefig_safe] Directory creation warning: {_e}")
try:
_plt_sf.savefig(path, **kwargs)
except Exception as e:
print(f"[savefig_safe] Warning while saving '{path}': {e}")
# Calcolo Score (riusabile anche rolling)
def _apply_score(df_sum: pd.DataFrame) -> pd.DataFrame:
"""Applica la calibrazione dei pesi su df_sum e aggiunge la colonna Score."""
def _available_cols(df, cols):
return [c for c in cols if (c in df.columns and df[c].notna().sum() > 0)]
primary_cols = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
alt_cols = [("QualityScore", True), ("Confidence", True), ("OutcomeScore", True)]
mm = [(c,gh) for (c,gh) in primary_cols if c in df_sum.columns and df_sum[c].notna().sum()>0]
if len(mm) < 2:
mm = [(c,gh) for (c,gh) in alt_cols if c in df_sum.columns and df_sum[c].notna().sum()>0]
# Se ancora insufficienti, prova ad allargare al set unito
if len(mm) < 2:
union_candidates = list({x[0] for x in primary_cols+alt_cols})
mm = [(c, True) for c in _available_cols(df_sum, union_candidates)]
if len(mm) == 0:
print("[WARN] Nessuna metrica numerica disponibile: uso Score=0 e ordino per ISIN.")
df_sum["Score"] = 0.0
df_sum["Score_mode"] = "degenerate_equal"
return df_sum
# Se sono definiti pesi fissi in config, usali; altrimenti calibra automaticamente
use_fixed = False
if SCORE_WEIGHTS:
weights_raw = {k: float(v) for k, v in SCORE_WEIGHTS.items() if k in df_sum.columns}
weights_raw = {k: v for k, v in weights_raw.items() if df_sum[k].notna().sum() > 0}
if weights_raw:
use_fixed = True
w = pd.Series(weights_raw)
w = w / w.sum()
X_ranked = df_sum[w.index].rank(pct=True)
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
df_sum["Score_mode"] = "fixed_weights"
if SCORE_VERBOSE:
print("Pesi fissi (config):", w.to_dict())
else:
print("[WARN] score_weights in config non compatibili con le metriche disponibili. Uso calibrazione automatica.")
if not use_fixed:
res = calibrate_score_weights(
df_sum,
metrics_map=mm,
target_col=None
)
X_ranked = res["X_ranked"]
w = res["weights"]
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
df_sum["Score_mode"] = res["mode"]
if SCORE_VERBOSE:
print("Pesi stimati automaticamente (metriche usate):")
print("Disponibilita' metriche (righe non-NaN):",
{c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]})
print(w)
return df_sum
# =============================
# PRICE FETCH (OPEN/CLOSE) - storico
# =============================
def _build_symbol_euronext(row: pd.Series) -> tuple[str, str]:
isin = str(row.get("ISIN", "")).strip()
venue = str(row.get("Mercato", "")).strip()
tok = str(row.get("TickerOpen", "") or "").strip()
base = OPEN_PRICE_BASE_URL
if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper():
return base, tok
if isin and venue:
return base, f"{isin}-{venue}"
if isin:
return base, f"{isin}-ETFP" # fallback generico per endpoint history
return base, isin
def fetch_price_history(isins, universe: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame:
"""
Scarica la serie storica open/close per una lista di ISIN usando l'endpoint storico.
- API chiamata 1 ISIN alla volta: https://fin.scorer.app/finance/etf-inv/history/{ticker}?fromDate=YYYYMMDD&toDate=YYYYMMDD
- Caching locale su CSV per ridurre le richieste; se l'API fallisce, tenta di usare la cache.
- Fallback mercati: ETFP → XPAR → XAMS. Se si estende una serie con un altro mercato,
la giunta avviene solo se il prezzo all'ultimo punto del segmento precedente e al primo del successivo
differisce < 2% (per evitare salti di valuta/quotazione).
Ritorna DataFrame con colonne: Date (datetime), ISIN, Open, Close.
"""
start_dt = pd.to_datetime(start_date).date()
end_dt = pd.to_datetime(end_date).date()
def _symbol_cache_path(symbol: str) -> Path:
safe = re.sub(r"[^A-Za-z0-9_-]+", "_", str(symbol))
return OPEN_CACHE_DIR / f"{safe}.csv"
def _load_cache(path: Path) -> pd.DataFrame | None:
try:
if path.exists():
dfc = pd.read_csv(path, parse_dates=["Date"])
dfc["ISIN"] = dfc["ISIN"].astype(str)
return dfc
except Exception as e:
print(f"[WARN] Cache prezzi corrotta {path}: {e}")
return None
def _normalize_payload_to_df(payload, isin):
# Il nuovo endpoint ritorna [{"ticker": "...", "data": [ {...}, ... ]}]
data_block = payload
if isinstance(payload, list) and payload:
if isinstance(payload[0], dict) and "data" in payload[0]:
data_block = payload[0].get("data", [])
else:
data_block = payload
if isinstance(payload, dict) and "data" in payload:
data_block = payload.get("data", [])
rows = []
for d in data_block or []:
dt_raw = d.get("date") or d.get("Date") or d.get("data") or d.get("timestamp")
if dt_raw is None:
continue
try:
if isinstance(dt_raw, (int, float)):
dt_parsed = pd.to_datetime(int(dt_raw), unit="ms").tz_localize(None)
else:
dt_parsed = pd.to_datetime(dt_raw).tz_localize(None)
except Exception:
continue
rows.append({
"Date": dt_parsed,
"ISIN": str(isin),
"Open": _to_float_safe(d.get("open")),
"Close": _to_float_safe(d.get("close") or d.get("last"))
})
return pd.DataFrame(rows) if rows else pd.DataFrame(columns=["Date","ISIN","Open","Close"])
def _fetch_symbol(symbol: str, isin: str):
url = f"{OPEN_PRICE_BASE_URL}/{symbol}?fromDate={start_dt.strftime('%Y%m%d')}&toDate={end_dt.strftime('%Y%m%d')}"
cache_path = _symbol_cache_path(symbol)
cache_df = _load_cache(cache_path)
df_api = pd.DataFrame()
ok = False
for attempt in range(1, OPEN_MAX_RETRY + 1):
try:
with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp:
payload = json.loads(resp.read().decode("utf-8"))
df_api = _normalize_payload_to_df(payload, isin)
if df_api.empty:
print(f"[WARN] Nessun dato per {symbol}")
ok = True
break
except (HTTPError, URLError, ssl.SSLError, json.JSONDecodeError) as e:
if attempt < OPEN_MAX_RETRY:
print(f"[WARN] Download {symbol} tentativo {attempt}/{OPEN_MAX_RETRY} fallito: {e}. Retry in {OPEN_SLEEP_SEC}s")
time.sleep(OPEN_SLEEP_SEC)
else:
print(f"[ERROR] Download {symbol} fallito: {e}")
df_use = pd.DataFrame()
if ok and not df_api.empty:
df_api = df_api.sort_values("Date")
if cache_df is not None and not cache_df.empty:
df_use = (
pd.concat([cache_df, df_api], ignore_index=True)
.drop_duplicates(subset=["Date"])
.sort_values("Date")
)
else:
df_use = df_api
try:
OPEN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
df_use.to_csv(cache_path, index=False)
except Exception as e:
print(f"[WARN] Salvataggio cache prezzi fallito ({cache_path}): {e}")
elif cache_df is not None and not cache_df.empty:
df_use = cache_df
print(f"[INFO] Uso cache prezzi per {symbol} (API indisponibile).")
return df_use
def _merge_with_check(df_base: pd.DataFrame, df_add: pd.DataFrame, label_prev: str, label_next: str):
"""
Estende df_base aggiungendo il tratto df_add antecedente al primo punto di df_base.
Controlla il salto di prezzo all'incrocio: se > 2%, non fonde e avvisa.
"""
if df_base is None or df_base.empty:
return df_add, False
if df_add is None or df_add.empty:
return df_base, False
cutoff = df_base["Date"].min()
prev_part = df_add[df_add["Date"] < cutoff]
if prev_part.empty:
return df_base, False
merged = pd.concat([prev_part, df_base], ignore_index=True)
merged = merged.sort_values("Date").drop_duplicates(subset=["Date"], keep="last")
# controllo salto: ultimo prezzo del segmento precedente vs primo del successivo
prev_last = prev_part.sort_values("Date").iloc[-1]
next_first = df_base[df_base["Date"] >= cutoff].sort_values("Date").iloc[0]
def _price(row):
return _to_float_safe(row.get("Close")) if pd.notna(row.get("Close")) else _to_float_safe(row.get("Open"))
p_prev = _price(prev_last)
p_next = _price(next_first)
if p_prev is None or p_next is None or not np.isfinite(p_prev) or not np.isfinite(p_next) or p_next == 0:
return merged, True
gap = abs(p_prev - p_next) / abs(p_next)
if gap > 0.02:
print(f"[WARN] Salto prezzo >2% tra {label_prev} e {label_next} su {prev_last['Date'].date()} -> {next_first['Date'].date()} (gap {gap:.2%}). Fallback non applicato.")
return df_base, False
return merged, True
records = []
for i, isin in enumerate(isins, 1):
try:
row = universe.loc[universe["ISIN"] == str(isin)].iloc[0]
except Exception:
print(f"[WARN] ISIN {isin} non trovato nell'universo.")
continue
base, symbol = _build_symbol_euronext(row)
df_primary = _fetch_symbol(symbol, isin)
# Fallback mercati aggiuntivi (XPAR, poi XAMS) per estendere indietro la serie
fallback_symbols = []
if "-" in symbol:
root = symbol.rsplit("-", 1)[0]
fallback_symbols.append(f"{root}-XPAR")
fallback_symbols.append(f"{root}-XAMS")
else:
fallback_symbols.append(f"{symbol}-XPAR")
fallback_symbols.append(f"{symbol}-XAMS")
df_use = df_primary
applied_any = False
for fb_sym in fallback_symbols:
# servono solo se la serie non parte da start_dt
need_fb = df_use.empty or (df_use["Date"].min().date() > start_dt)
if not need_fb:
continue
df_fb = _fetch_symbol(fb_sym, isin)
if df_fb.empty:
print(f"[WARN] Fallback {fb_sym} assente per {isin}")
continue
if df_use.empty:
df_use = df_fb
applied_any = True
print(f"[INFO] Uso fallback {fb_sym} per tutto il periodo.")
else:
merged, merged_ok = _merge_with_check(df_use, df_fb, fb_sym, symbol)
if merged_ok:
df_use = merged
applied_any = True
cutoff = df_use["Date"].min()
print(f"[INFO] Serie estesa con {fb_sym} fino a {cutoff.date()} per {isin}")
else:
print(f"[WARN] Fallback {fb_sym} scartato per gap >2% su {isin}")
if df_use.empty:
print(f"[WARN] Serie open/close non disponibile per {isin}")
continue
# Filtro range richiesto
df_use["Date"] = pd.to_datetime(df_use["Date"])
mask = (df_use["Date"].dt.date >= start_dt) & (df_use["Date"].dt.date <= end_dt)
df_use = df_use.loc[mask]
if df_use.empty:
print(f"[WARN] Nessun dato nel range richiesto per {symbol}")
continue
records.append(df_use)
if not records:
return pd.DataFrame(columns=["Date","ISIN","Open","Close"])
df_px = pd.concat(records, ignore_index=True)
df_px = df_px.sort_values(["ISIN","Date"]).reset_index(drop=True)
return df_px
def save_price_cache_summary(cache_dir: Path, outfile: Path, pattern: str = "*ETFP.csv"):
"""
Salva un riepilogo delle serie prezzi in cache (senza fallback) con min/max date e numero righe.
pattern di default: solo i simboli ETFP.
"""
try:
if not cache_dir.exists():
print(f"[WARN] Cache prezzi non trovata: {cache_dir}")
return
rows = []
for f in sorted(cache_dir.glob(pattern)):
try:
df = pd.read_csv(f, parse_dates=["Date"])
except Exception as e:
rows.append({"Symbol": f.stem, "Errore": str(e)})
continue
if df.empty:
rows.append({"Symbol": f.stem, "Rows": 0})
continue
rows.append({
"Symbol": f.stem,
"min_date": df["Date"].min().date(),
"max_date": df["Date"].max().date(),
"rows": len(df)
})
if not rows:
print(f"[WARN] Nessun file prezzi in cache ({cache_dir}).")
return
out_df = pd.DataFrame(rows).sort_values("Symbol")
outfile.parent.mkdir(parents=True, exist_ok=True)
out_df.to_excel(outfile, index=False)
print(f"[INFO] Salvato riepilogo prezzi (no fallback) in {outfile} ({len(out_df)} righe)")
except Exception as e:
print(f"[WARN] Impossibile salvare riepilogo prezzi: {e}")
def _to_float_safe(x):
try:
return float(x)
except Exception:
return np.nan
# LEGACY: blocco originale mantenuto ma non eseguito (usiamo _apply_score sopra)
# =========================================
# PARAMETRI GLOBALI
# =========================================
CONFIG = load_config()
DB_CONFIG = require_section(CONFIG, "db")
PATTERN_CONFIG = require_section(CONFIG, "pattern")
TAGGING_CONFIG = require_section(CONFIG, "tagging")
RANKING_CONFIG = require_section(CONFIG, "ranking")
PATHS_CONFIG = require_section(CONFIG, "paths")
HURST_CONFIG = CONFIG.get("hurst", {})
RUN_CONFIG = CONFIG.get("run", {})
SIGNALS_CONFIG = CONFIG.get("signals", {})
PRICES_CONFIG = CONFIG.get("prices", {})
OUTPUT_DIR = Path(PATHS_CONFIG.get("output_dir", "output"))
PLOT_DIR = Path(PATHS_CONFIG.get("plot_dir", "plot"))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
PLOT_DIR.mkdir(parents=True, exist_ok=True)
UNIVERSO_XLSX = PATHS_CONFIG.get("input_universe", "Input/Universo per Trading System.xlsx")
# Export
OUTPUT_HURST_XLSX = OUTPUT_DIR / "hurst_by_isin.xlsx"
OUTPUT_PATTERN_XLSX = OUTPUT_DIR / "pattern_signals.xlsx"
ERROR_LOG_CSV = OUTPUT_DIR / "errori_isin.csv"
FORWARD_BT_SIGNALS_XLSX = OUTPUT_DIR / "forward_bt_signals.xlsx"
FORWARD_BT_SUMMARY_XLSX = OUTPUT_DIR / "forward_bt_summary.xlsx"
TRADES_REPORT_XLSX = OUTPUT_DIR / "trades_report.xlsx"
PERF_ATTRIB_XLSX = OUTPUT_DIR / "performance_attribution.xlsx"
DAILY_FROM_TRADES_CSV = OUTPUT_DIR / "daily_from_trades.csv"
DAILY_FROM_TRADES_XLSX = OUTPUT_DIR / "daily_from_trades.xlsx"
WEIGHTS_DAILY_XLSX = OUTPUT_DIR / "weights_daily.xlsx"
FINAL_METRICS_XLSX = OUTPUT_DIR / "final_metrics.xlsx"
# Stored Procedure & parametri
STORED_PROC = str(require_value(DB_CONFIG, "stored_proc", "db"))
N_BARS = int(require_value(DB_CONFIG, "n_bars", "db"))
PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db"))
RANKING_WINDOW_BARS = int(RANKING_CONFIG.get("rolling_window_bars", N_BARS))
RP_LOOKBACK = int(SIGNALS_CONFIG.get("risk_parity_lookback", 60))
OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get("base_url", "https://fin.scorer.app/finance/etf-inv/history"))
OPEN_MAX_RETRY = int(PRICES_CONFIG.get("max_retry", 3))
OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1))
OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10))
OPEN_CACHE_DIR = Path(PRICES_CONFIG.get("cache_dir", OUTPUT_DIR / "price_cache"))
RECOMPUTE_PORTF_FROM_OPEN = bool(PRICES_CONFIG.get("recompute_portfolio_open", False))
# Pattern-matching (iper-parametri)
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) # lunghezza finestra pattern (barre)
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) # orizzonte outcome (barre)
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) # numero di vicini
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # soglia su outcome per generare segnale
EMBARGO = require_value(PATTERN_CONFIG, "embargo", "pattern")
if EMBARGO is None:
EMBARGO = WP + HA
else:
EMBARGO = int(EMBARGO)
# Tagging rule-based (soglie)
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) # numero massimo di asset ammessi
RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # 2 x 1/15 ≈ 0.1333 = 13,33%
if RP_MAX_WEIGHT is None:
RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1)
else:
RP_MAX_WEIGHT = float(RP_MAX_WEIGHT)
SCORE_VERBOSE = bool(RANKING_CONFIG.get("score_verbose", False))
SCORE_WEIGHTS = RANKING_CONFIG.get("score_weights")
HURST_MIN_LENGTH = int(HURST_CONFIG.get("min_length", 200))
HURST_WIN_GRID = HURST_CONFIG.get("win_grid")
HURST_MIN_SEGMENTS = int(HURST_CONFIG.get("min_segments", 1))
DAYS_PER_YEAR = int(RUN_CONFIG.get("days_per_year", 252))
TOP_N = int(RUN_CONFIG.get("top_n_default", TOP_N_MAX))
# =========================================
# UTILS GENERALI
# =========================================
def clamp01(x):
if not np.isfinite(x):
return np.nan
return float(min(1.0, max(0.0, x)))
def format_eta(seconds):
"""Format a duration (seconds) as Xm Ys or Xh Ym Ys for readability."""
if not np.isfinite(seconds):
return "n/a"
seconds = max(0, int(round(seconds)))
minutes, secs = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}h {minutes:02d}m {secs:02d}s"
return f"{minutes}m {secs:02d}s"
# Timer helper per fasi post-backtest
_post_timer = {"t0": None, "tprev": None, "total": None, "done": 0}
def start_post_timer(total_steps: int):
_post_timer["t0"] = time.perf_counter()
_post_timer["tprev"] = _post_timer["t0"]
_post_timer["total"] = total_steps
_post_timer["done"] = 0
def checkpoint_post_timer(label: str):
if _post_timer["t0"] is None or _post_timer["total"] is None:
return
_post_timer["done"] += 1
now = time.perf_counter()
step_dt = now - _post_timer["tprev"]
total_dt = now - _post_timer["t0"]
avg = total_dt / max(_post_timer["done"], 1)
eta = avg * max(_post_timer["total"] - _post_timer["done"], 0)
print(f"[TIMER] post { _post_timer['done']}/{_post_timer['total']} {label} — step {step_dt:.2f}s, total {total_dt:.2f}s, ETA {format_eta(eta)}")
_post_timer["tprev"] = now
# ================= HURST (sui RENDIMENTI) =================
def hurst_rs_returns(r, win_grid=None, min_seg=None):
r = pd.Series(r).dropna().astype("float64").values
n = len(r)
seg_min = HURST_MIN_SEGMENTS if min_seg is None else int(min_seg)
if n < HURST_MIN_LENGTH:
return np.nan
if win_grid is None:
base = HURST_WIN_GRID or [16,24,32,48,64,96,128,192,256,384]
base = np.array(base, dtype=int)
win_grid = [w for w in base if w <= n//2]
if len(win_grid) < 4:
max_w = max(16, n//4)
g = sorted(set([int(max(8, round((n/(2**k))))) for k in range(3,8)]))
win_grid = [w for w in g if 8 <= w <= max_w]
RS_vals, sizes = [], []
for w in win_grid:
if w < 8 or w > n: continue
m = n//w
if m < seg_min: continue
rs_list = []
for i in range(m):
seg = r[i*w:(i+1)*w]
seg = seg - np.mean(seg)
sd = seg.std(ddof=1)
if sd == 0 or not np.isfinite(sd): continue
y = np.cumsum(seg)
R = np.max(y) - np.min(y)
rs = R/sd
if np.isfinite(rs) and rs > 0: rs_list.append(rs)
if rs_list:
RS_vals.append(np.mean(rs_list)); sizes.append(w)
if len(RS_vals) < 3: return np.nan
sizes = np.array(sizes, float); RS_vals = np.array(RS_vals, float)
mask = np.isfinite(RS_vals) & (RS_vals > 0)
sizes, RS_vals = sizes[mask], RS_vals[mask]
if sizes.size < 3: return np.nan
slope, _ = np.polyfit(np.log(sizes), np.log(RS_vals), 1)
return clamp01(slope)
def hurst_dfa_returns(r, win_grid=None):
r = pd.Series(r).dropna().astype("float64").values
n = len(r)
if n < HURST_MIN_LENGTH:
return np.nan
r_dm = r - np.mean(r)
y = np.cumsum(r_dm)
if win_grid is None:
base = HURST_WIN_GRID or [16,24,32,48,64,96,128,192,256]
base = np.array(base, dtype=int)
win_grid = [w for w in base if w <= n//2]
if len(win_grid) < 4:
max_w = max(16, n//4)
g = sorted(set([int(max(8, round((n/(2**k))))) for k in range(3,8)]))
win_grid = [w for w in g if 8 <= w <= max_w]
F_vals, sizes = [], []
for s in win_grid:
if s < 8: continue
m = n//s
if m < 2: continue
rms_list = []
for i in range(m):
seg = y[i*s:(i+1)*s]
t = np.arange(s, dtype=float)
A = np.vstack([t, np.ones(s)]).T
coeff, *_ = np.linalg.lstsq(A, seg, rcond=None)
trend = A @ coeff
detr = seg - trend
rms = np.sqrt(np.mean(detr**2))
if np.isfinite(rms) and rms > 0: rms_list.append(rms)
if rms_list:
F_vals.append(np.mean(rms_list)); sizes.append(s)
if len(F_vals) < 3: return np.nan
sizes = np.array(sizes, float); F_vals = np.array(F_vals, float)
mask = np.isfinite(F_vals) & (F_vals > 0)
sizes, F_vals = sizes[mask], F_vals[mask]
if sizes.size < 3: return np.nan
slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1)
return clamp01(slope)
# ---------------------------------
# R^2 su equity line (log-equity vs tempo)
# ---------------------------------
def r2_equity_line(returns: pd.Series) -> float:
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
eq = (1.0 + r).cumprod().replace(0, np.nan)
y = np.log(eq.dropna())
if len(y) < 10:
return np.nan
x = np.arange(len(y), dtype=float)
x = (x - x.mean()) / (x.std() + 1e-12)
X = np.vstack([np.ones_like(x), x]).T
beta, *_ = np.linalg.lstsq(X, y.values, rcond=None)
y_hat = X @ beta
ss_res = ((y.values - y_hat) ** 2).sum()
ss_tot = ((y.values - y.values.mean()) ** 2).sum()
return float(1 - ss_res / ss_tot) if ss_tot > 0 else np.nan
# ---------------------------------
# Drawdown metrics path-based
# ---------------------------------
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
eq = (1.0 + r).cumprod()
if eq.empty:
return np.nan, np.nan, np.nan
roll_max = eq.cummax()
dd = eq / roll_max - 1.0
maxdd = float(dd.min())
episodes = []
peak_i, peak_val = None, -np.inf
trough_i, trough_val = None, np.inf
in_uw = False
v = eq.values
for i in range(len(v)):
if v[i] >= peak_val:
if in_uw:
episodes.append((peak_i, trough_i, i))
in_uw = False
trough_i, trough_val = None, np.inf
peak_i, peak_val = i, v[i]
else:
if not in_uw:
in_uw = True
trough_i, trough_val = i, v[i]
elif v[i] < trough_val:
trough_i, trough_val = i, v[i]
if in_uw:
episodes.append((peak_i, trough_i, None))
dd_dur_max = np.nan
if episodes:
durs = []
for p, t, rcv in episodes:
if p is None:
continue
end_i = rcv if rcv is not None else len(eq) - 1
durs.append(end_i - p)
if durs:
dd_dur_max = int(max(durs))
ttr = np.nan
if episodes:
mdd_val = 0.0
mdd_ep = None
for p, t, rcv in episodes:
if p is None or t is None:
continue
dd_here = eq.iloc[t] / eq.iloc[p] - 1.0
if dd_here < mdd_val:
mdd_val = dd_here
mdd_ep = (p, t, rcv)
if mdd_ep is not None:
p, t, rcv = mdd_ep
if rcv is not None:
ttr = int(rcv - t)
else:
ttr = sentinel_ttr
return maxdd, dd_dur_max, ttr
# ---------------------------------
# Utility per AAW, AUW e Heal Index (come nell'ottimizzatore)
# ---------------------------------
def heal_index_metrics(returns: pd.Series):
"""
Calcola:
- AAW: area sopra acqua (run-up vs minimo cumulato)
- AUW: area sotto acqua (drawdown vs massimo cumulato)
- Heal Index: (AAW - AUW) / AUW
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
# ---------------------------------
# Utility per H_min (100% finestre positive) — come nell'ottimizzatore
# ---------------------------------
def h_min_100(returns: pd.Series, month_len: int = 21):
"""
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
"""
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1:]
tail = np.concatenate(([0.0], csum[:-k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(np.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# =========================================
# 1) UNIVERSO: ISIN + METADATI
# =========================================
universo = pd.read_excel(UNIVERSO_XLSX)
col_isin_uni = detect_column(universo, ["ISIN", "isin", "codice isin"])
if col_isin_uni is None:
raise ValueError("Nel file universo non trovo una colonna ISIN.")
col_name_uni = detect_column(universo, ["Nome", "Name", "Descrizione", "Description", "Security Name", "Instrument Name"])
col_cat_uni = detect_column(universo, ["Categoria", "Category", "Classe", "Linea", "Tipo"])
col_ac_uni = detect_column(universo, ["Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class"])
isins = (
universo[col_isin_uni].astype(str).str.strip()
.replace("", pd.NA).dropna().drop_duplicates().tolist()
)
print(f"[INFO] ISIN totali in universo: {len(isins)}")
meta_df = pd.DataFrame({"ISIN": universo[col_isin_uni].astype(str).str.strip()})
meta_df["Nome"] = universo[col_name_uni] if col_name_uni else None
meta_df["Categoria"] = universo[col_cat_uni] if col_cat_uni else None
meta_df["Asset Class"] = universo[col_ac_uni] if col_ac_uni else None
meta_df = meta_df.drop_duplicates(subset=["ISIN"]).reset_index(drop=True)
# =========================================
# 2) CONNESSIONE DB
# =========================================
conn_str = read_connection_txt("connection.txt")
engine = sa.create_engine(conn_str, fast_executemany=True)
print("[INFO] Connessione pronta (SQLAlchemy + pyodbc).")
# =========================================
# 3) LOOP ISIN → SP → HURST + PATTERN (SOLO LONG per i trade)
# =========================================
errors = []
hurst_rows = []
pattern_rows = []
last_dates = []
sql_sp = text(f"EXEC {STORED_PROC} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
def detect_cols(df0):
col_date = detect_column(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = detect_column(df0, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"])
col_px = detect_column(df0, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"])
return col_date, col_ret, col_px
ok_count = 0
first_ok_reported = False
for i, isin in enumerate(isins, 1):
try:
df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR})
if df_isin.empty:
errors.append({"ISIN": isin, "Errore": "SP vuota"})
continue
col_date, col_ret, col_px = detect_cols(df_isin)
if col_date:
df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce")
df_isin = df_isin.sort_values(col_date)
# --- Rendimenti ---
if col_ret and col_ret in df_isin.columns:
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float).dropna()
elif col_px and col_px in df_isin.columns:
px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan)
r = np.log(px/px.shift(1)).dropna()
else:
errors.append({"ISIN": isin, "Errore": "Né rendimenti né prezzi utilizzabili"})
continue
if len(r) < max(200, WP + HA + 10):
errors.append({"ISIN": isin, "Errore": f"Serie troppo corta ({len(r)} punti)"})
continue
# --- HURST (sui rendimenti) ---
h_rs = hurst_rs_returns(r)
h_dfa = hurst_dfa_returns(r)
H = np.nanmedian([h_rs, h_dfa])
H = clamp01(H) if np.isfinite(H) else np.nan
if pd.isna(H):
regime = None
elif H < 0.45:
regime = "mean_reversion"
elif H > 0.55:
regime = "breakout"
else:
regime = "neutral"
# --- Libreria pattern ---
lib_wins, lib_out = build_pattern_library(r, WP, HA, embargo=EMBARGO)
# Finestra corrente
date_last = df_isin[col_date].iloc[-1] if col_date else None
if date_last is not None:
last_dates.append(pd.to_datetime(date_last))
if lib_wins is None or len(r) < WP + HA:
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
signal = 0
est_out, avg_dist = np.nan, np.nan
else:
curr = r.values[-WP:]
curr_zn = z_norm(curr)
if curr_zn is None:
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
signal = 0; est_out = np.nan; avg_dist = np.nan
else:
est_out, avg_dist, idx_sel = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K)
# SOLO LONG: apri solo se est_out > THETA
signal = 1 if est_out > THETA else 0
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
# Salva risultati
hurst_rows.append({
"ISIN": isin,
"Hurst": None if pd.isna(H) else round(float(H), 4),
"Regime": regime
})
pattern_rows.append({
"ISIN": isin,
"DateLast": date_last,
"PatternType": ptype,
"Signal": {1:"long",-1:"short",0:"flat"}.get(int(signal), "flat"),
"Confidence": None if pconf is None else round(float(min(1.0, max(0.0, pconf))), 3),
"EstOutcome": None if pd.isna(est_out) else float(est_out),
"AvgDist": None if pd.isna(avg_dist) else float(avg_dist),
"Wp": WP, "Ha": HA, "k": KNN_K
})
ok_count += 1
if not first_ok_reported:
print("[INFO] Colonne riconosciute sul primo ISIN valido:",
"Data:", col_date, "| Rendimenti:", col_ret, "| Prezzo:", col_px,
"| H:", round(H,4) if pd.notna(H) else None, "| Regime:", regime)
first_ok_reported = True
if i % 10 == 0:
print(f"{i}/{len(isins)} ISIN processati (ok finora: {ok_count})")
except Exception as e:
errors.append({"ISIN": isin, "Errore": str(e)})
# =========================================
# 4A) EXPORT: HURST + PATTERN (QualityScore)
# =========================================
hurst_df = pd.DataFrame(hurst_rows) if hurst_rows else pd.DataFrame(
{"ISIN": [], "Hurst": [], "Regime": []}
)
meta_df["ISIN"] = meta_df["ISIN"].astype(str).str.strip()
hurst_df["ISIN"] = hurst_df["ISIN"].astype(str).str.strip()
# Mappa ISIN -> Hurst (per usare H come theta_entry nel backtest)
hurst_map = {
str(row["ISIN"]).strip(): (float(row["Hurst"]) if pd.notna(row["Hurst"]) else np.nan)
for _, row in hurst_df.iterrows()
}
summary_hurst = meta_df.merge(hurst_df, on="ISIN", how="left")
cols_hurst = ["ISIN", "Nome", "Categoria", "Asset Class", "Hurst", "Regime"]
summary_hurst = summary_hurst[[c for c in cols_hurst if c in summary_hurst.columns]]
summary_hurst = summary_hurst.sort_values(["Hurst", "ISIN"], na_position="last").reset_index(drop=True)
summary_hurst.to_excel(OUTPUT_HURST_XLSX, index=False)
pat_df = pd.DataFrame(pattern_rows) if pattern_rows else pd.DataFrame(
{"ISIN": [], "DateLast": [], "PatternType": [], "Signal": [],
"Confidence": [], "EstOutcome": [], "AvgDist": [], "Wp": [], "Ha": [], "k": []}
)
pat_df["ISIN"] = pat_df["ISIN"].astype(str).str.strip()
summary_pattern = (
meta_df
.merge(hurst_df, on="ISIN", how="left")
.merge(pat_df, on="ISIN", how="left")
)
wanted_cols = ["ISIN","Nome","Categoria","Asset Class","Hurst","Regime",
"DateLast","PatternType","Signal","Confidence","EstOutcome","AvgDist","Wp","Ha","k"]
summary_pattern = summary_pattern[[c for c in wanted_cols if c in summary_pattern.columns]]
def _add_quality_scores(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
conf = pd.to_numeric(out.get("Confidence", np.nan), errors="coerce")
est = pd.to_numeric(out.get("EstOutcome", np.nan), errors="coerce")
dist = pd.to_numeric(out.get("AvgDist", np.nan), errors="coerce")
max_abs_est = np.nanmax(np.abs(est)) if np.isfinite(np.nanmax(np.abs(est))) and (np.nanmax(np.abs(est)) > 0) else np.nan
outcome_score = np.where(np.isnan(max_abs_est) | (max_abs_est == 0), np.nan, np.abs(est) / max_abs_est)
similarity_score = 1.0 / (1.0 + dist.astype(float))
confidence_score = conf.astype(float)
quality = confidence_score * similarity_score * outcome_score
out["OutcomeScore"] = np.round(outcome_score, 4)
out["SimilarityScore"] = np.round(similarity_score, 4)
out["QualityScore"] = np.round(quality, 4)
return out
summary_pattern = _add_quality_scores(summary_pattern)
sort_cols = [c for c in ["QualityScore", "Confidence", "OutcomeScore"] if c in summary_pattern.columns]
if sort_cols:
summary_pattern = summary_pattern.sort_values(sort_cols, ascending=[False]*len(sort_cols),
na_position="last").reset_index(drop=True)
if "DateLast" in summary_pattern.columns:
summary_pattern = summary_pattern.sort_values(["QualityScore","DateLast"], ascending=[False, True],
na_position="last").reset_index(drop=True)
summary_pattern.to_excel(OUTPUT_PATTERN_XLSX, index=False)
print(f"[INFO] Salvato: {OUTPUT_HURST_XLSX} (righe: {len(summary_hurst)})")
print(f"[INFO] Salvato: {OUTPUT_PATTERN_XLSX} (righe: {len(summary_pattern)})")
if errors:
pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False)
print(f"[INFO] Log errori: {ERROR_LOG_CSV} (tot: {len(errors)})")
# =========================================
# 4B) FORWARD-BACKTEST (walk-forward) — SOLO LONG
# =========================================
def drawdown_stats_simple(ret_series: pd.Series):
eq = (ret_series.fillna(0)).cumsum()
rolling_max = eq.cummax()
dd = eq - rolling_max
maxdd = float(dd.min()) if len(dd) else 0.0
cagr = np.exp(ret_series.mean()*DAYS_PER_YEAR) - 1
annvol = ret_series.std() * np.sqrt(DAYS_PER_YEAR)
sharpe = (ret_series.mean() / (ret_series.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR)
calmar = (cagr / abs(maxdd)) if maxdd < 0 else np.nan
return {
"CAGR_%": round(cagr*100, 2),
"AnnVol_%": round(annvol*100, 2),
"Sharpe": round(float(sharpe), 2),
"MaxDD_%eq": round(float(maxdd*100), 2),
"Calmar": round(float(calmar), 2) if np.isfinite(calmar) else np.nan
}
def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret: str,
Wp: int, Ha: int, k: int,
theta_entry: float,
exec_ret: pd.Series | None = None,
fee_bps: float = 10,
# --- EXIT PARAMS (tutte opzionali) ---
sl_bps: float | None = 300.0, # Stop loss assoluto (bps sul PnL cumulato del trade)
tp_bps: float | None = 800.0, # Take profit assoluto (bps)
trail_bps: float | None = 300.0, # Trailing stop (drawdown dal picco, bps)
time_stop_bars: int | None = 20, # Massimo holding
theta_exit: float | None = 0.0, # esci se est_out <= theta_exit (se None, ignora)
weak_days_exit: int | None = None # esci se per N giorni est_out <= theta_exit
):
"""
Walk-forward SOLO LONG con regole di EXIT (SL/TP/TS/time/flip).
Ritorna (signals_df, summary_metrics_dict).
Nota: usa solo dati daily → le soglie sono valutate a fine giornata,
l'uscita avviene sulla barra successiva (modello prudente).
"""
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0 # rendimenti in decimali (close/close)
idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r))
idx = pd.to_datetime(idx).dt.normalize()
if exec_ret is not None:
r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float)
r_exec.index = pd.to_datetime(r_exec.index).normalize()
# reindex robusto: usa l'ordine di idx, preserva NaN se manca la data
r_exec = r_exec.reindex(idx)
if len(r_exec) != len(r):
r_exec = pd.Series(r_exec.values, index=idx).reindex(idx)
else:
r_exec = r
fee = fee_bps / 10000.0
# helper per costruire libreria solo passato
def _lib_predict(past_returns: pd.Series, win_last: np.ndarray):
lib_wins, lib_out = build_pattern_library(past_returns, Wp, Ha)
if lib_wins is None:
return np.nan, np.nan
curr_zn = z_norm(win_last)
if curr_zn is None:
return np.nan, np.nan
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=k)
return float(est_out), float(avg_dist)
# Stato della posizione
in_pos = False
entry_t = None
trade_pnl = 0.0
trade_peak = 0.0
weak_streak = 0
rows = []
# Nota: scorriamo dalle prime barre dove possiamo calcolare una finestra completa
for t in range(Wp, len(r) - 1):
past = r.iloc[:t]
# se passato insufficiente per libreria, forza out
if past.dropna().shape[0] < (Wp + Ha):
sig_out, est_out, avg_dist = 0, np.nan, np.nan
# PnL a t+1 sempre riportato in colonna Ret+1
rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan))
continue
win_last = r.iloc[t-Wp:t].values
est_out, avg_dist = _lib_predict(past, win_last)
# Default: portiamo avanti lo stato corrente (sig_out = 1 se in_pos)
sig_out = 1 if in_pos else 0
# --- LOGICA DI INGRESSO ---
if (not in_pos) and (est_out > theta_entry):
# apri domani → oggi segnaliamo 1 (per avere PnL su t+1)
sig_out = 1
in_pos = True
entry_t = t
trade_pnl = 0.0
trade_peak = 0.0
weak_streak = 0
# --- LOGICA DI USCITA (se in posizione) ---
elif in_pos:
# 1) aggiorna PnL del trade con il rendimento della barra che verrà *incassato* domani:
# Per coerenza EOD, PnL di oggi (da riportare) è su r[t+1] quando Signal(t)=1.
# Per controlli di stop a fine giornata, stimiamo la "pnl se restassi" accumulando r[t+1] ex-ante.
next_ret = r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan # rendimento che si applicherà se resto in posizione
pnl_if_stay = (1.0 + trade_pnl) * (1.0 + next_ret) - 1.0
# 2) aggiorna trailing peak ipotetico
peak_if_stay = max(trade_peak, pnl_if_stay)
# 3) valuta condizioni di uscita sullo stato "if stay"
exit_reasons = []
# SL
if (sl_bps is not None) and (pnl_if_stay <= -sl_bps/10000.0):
exit_reasons.append("SL")
# TP
if (tp_bps is not None) and (pnl_if_stay >= tp_bps/10000.0):
exit_reasons.append("TP")
# Trailing
if (trail_bps is not None) and (peak_if_stay - pnl_if_stay >= trail_bps/10000.0):
exit_reasons.append("TRAIL")
# Time stop
if (time_stop_bars is not None) and (t - entry_t + 1 >= time_stop_bars):
exit_reasons.append("TIME")
# Flip / debolezza persistente
if theta_exit is not None:
if est_out <= theta_exit:
# Debole oggi → aggiorna streak
weak_streak = weak_streak + 1 if weak_days_exit else weak_streak
# exit immediata se non usi weak_days_exit
if weak_days_exit is None:
exit_reasons.append("FLIP")
else:
if weak_streak >= weak_days_exit:
exit_reasons.append("FLIP_STREAK")
else:
weak_streak = 0
# *** Se una qualunque condizione scatta, usciamo domani → oggi mettiamo 0
if exit_reasons:
sig_out = 0
in_pos = False
entry_t = None
trade_pnl = 0.0
trade_peak = 0.0
weak_streak = 0
else:
# restiamo → aggiorniamo lo stato “vero” per il prossimo loop
trade_pnl = pnl_if_stay
trade_peak = peak_if_stay
# Registra la riga odierna; il PnL riportato è sempre il r[t+1]
rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan))
sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"])
# Costi su variazione posizione
sig_df["Signal_prev"] = sig_df["Signal"].shift(1).fillna(0)
trade_chg = (sig_df["Signal"] - sig_df["Signal_prev"]).abs()
cost = trade_chg * fee
sig_df["PnL"] = sig_df["Signal"] * sig_df["Ret+1"] - cost
sig_df.drop(columns=["Signal_prev"], inplace=True)
# Metriche sintetiche
stats = drawdown_stats_simple(sig_df["PnL"])
stats.update({
"HitRate_%": round(100 * ((sig_df["PnL"] > 0).sum() / max(1, sig_df["PnL"].notna().sum())), 2),
"AvgTradeRet_bps": round(sig_df["PnL"].mean() * 10000, 2),
"Turnover_%/step": round(100 * trade_chg.mean(), 2),
"N_Steps": int(sig_df.shape[0]),
})
# Aggiungi anche parametri di uscita usati (utile per grid search/trace)
stats.update({
"theta_entry": theta_entry,
"theta_exit": (None if theta_exit is None else float(theta_exit)),
"sl_bps": (None if sl_bps is None else float(sl_bps)),
"tp_bps": (None if tp_bps is None else float(tp_bps)),
"trail_bps": (None if trail_bps is None else float(trail_bps)),
"time_stop_bars": (None if time_stop_bars is None else int(time_stop_bars)),
"weak_days_exit": (None if weak_days_exit is None else int(weak_days_exit))
})
return sig_df, stats
# ========= ESECUZIONE BACKTEST PER TUTTI GLI ISIN =========
bt_signals = []
bt_summary = []
total_t = 0.0
start_all = time.perf_counter()
for i, isin in enumerate(isins, 1):
t0 = time.perf_counter() # ---- INIZIO TIMER SINGOLO CICLO ----
try:
df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR})
if df_isin.empty:
errors.append({"ISIN": isin, "Errore": "SP vuota (BT)"})
continue
col_date, col_ret, col_px = detect_cols(df_isin)
if col_date:
df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce")
df_isin = df_isin.sort_values(col_date)
if col_ret and col_ret in df_isin.columns:
df_isin[col_ret] = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) # % raw
elif col_px and col_px in df_isin.columns:
px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan)
df_isin[col_ret] = np.log(px / px.shift(1)) * 100.0 # %
else:
errors.append({"ISIN": isin, "Errore": "Né rendimenti né prezzi (BT)"})
continue
if df_isin[col_ret].dropna().shape[0] < max(200, WP + HA + 10):
errors.append({"ISIN": isin, "Errore": f"Serie troppo corta (BT) ({df_isin[col_ret].dropna().shape[0]} punti)"})
continue
# --- Fetch open/close per calcolare rendimenti di esecuzione (open->open) ---
try:
date_min = df_isin[col_date].min().date() if col_date else None
date_max = df_isin[col_date].max().date() if col_date else None
if date_min and date_max:
px_hist_one = fetch_price_history(
isins=[isin],
universe=meta_df if 'meta_df' in globals() else pd.DataFrame(),
start_date=date_min.isoformat(),
end_date=date_max.isoformat()
)
px_hist_one = px_hist_one.sort_values("Date")
open_series = px_hist_one[["Date","Open"]].dropna()
open_series["Date"] = pd.to_datetime(open_series["Date"]).dt.normalize()
open_series = open_series.drop_duplicates(subset=["Date"]).set_index("Date")["Open"]
open_ret = open_series.pct_change()
# riallinea sulla stessa sequenza di date del df_isin
idx_dates = pd.to_datetime(df_isin[col_date]).dt.normalize()
exec_ret = open_ret.reindex(idx_dates)
exec_ret.index = idx_dates
else:
exec_ret = None
except Exception as e:
print(f"[WARN] Fetch open/close fallito per {isin}: {e}")
exec_ret = None
# ============================
# THETA = HURST IN PERCENTUALE
# H = 0.50 -> theta_entry = 0.005 (0.5%)
# ============================
isin_str = str(isin).strip()
H_val = hurst_map.get(isin_str, np.nan)
if H_val is None or pd.isna(H_val):
theta_entry = THETA # fallback se H mancante
else:
theta_entry = float(H_val) / 100.0
sig_df, stats = knn_forward_backtest_one_asset(
df_isin=df_isin,
col_date=(col_date if col_date else df_isin.index.name or "idx"),
col_ret=col_ret,
Wp=WP,
Ha=HA,
k=KNN_K,
theta_entry=theta_entry,
exec_ret=exec_ret,
fee_bps=10,
)
name = meta_df.loc[meta_df["ISIN"]==isin, "Nome"].iloc[0] if (meta_df["ISIN"]==isin).any() else None
cat = meta_df.loc[meta_df["ISIN"]==isin, "Categoria"].iloc[0] if (meta_df["ISIN"]==isin).any() else None
ac = meta_df.loc[meta_df["ISIN"]==isin, "Asset Class"].iloc[0] if (meta_df["ISIN"]==isin).any() else None
tmp = sig_df.copy()
tmp.insert(0, "ISIN", isin)
tmp.insert(1, "Nome", name)
tmp.insert(2, "Categoria", cat)
tmp.insert(3, "Asset Class", ac)
tmp["Wp"] = WP; tmp["Ha"] = HA; tmp["k"] = KNN_K; tmp["Theta"] = theta_entry
bt_signals.append(tmp)
stats_row = {"ISIN": isin, "Nome": name, "Categoria": cat, "Asset Class": ac}
stats_row.update(stats)
bt_summary.append(stats_row)
except Exception as e:
errors.append({"ISIN": isin, "Errore": f"Backtest: {str(e)}"})
# ---- FINE TIMER SINGOLO CICLO ----
dt = time.perf_counter() - t0
total_t += dt
avg_t = total_t / i
eta = avg_t * (len(isins) - i)
print(
f"… backtest {i}/{len(isins)} completati — {dt:.2f} sec "
f"(avg {avg_t:.2f}s, ETA {format_eta(eta)} rimanenti)"
)
# ---- TIMER FINALE ----
end_all = time.perf_counter()
total_elapsed = end_all - start_all
avg_elapsed = total_elapsed / max(len(isins), 1)
print(f"[INFO] Tempo totale: {format_eta(total_elapsed)} ({total_elapsed:.2f} sec)")
print(f"[INFO] Tempo medio per asset: {format_eta(avg_elapsed)} ({avg_elapsed:.2f} sec)")
bt_signals_df = pd.concat(bt_signals, ignore_index=True) if bt_signals else pd.DataFrame(
columns=["ISIN","Nome","Categoria","Asset Class","Date","Signal","EstOutcome","AvgDist","Ret+1","PnL","Wp","Ha","k","Theta"]
)
bt_summary_df = pd.DataFrame(bt_summary) if bt_summary else pd.DataFrame(
columns=["ISIN","Nome","Categoria","Asset Class","CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar","HitRate_%","AvgTradeRet_bps","Turnover_%/step","N_Steps"]
)
bt_signals_df.to_excel(FORWARD_BT_SIGNALS_XLSX, index=False)
bt_summary_df.to_excel(FORWARD_BT_SUMMARY_XLSX, index=False)
print(f"[INFO] Salvato: {FORWARD_BT_SIGNALS_XLSX} ({len(bt_signals_df):,} righe)")
print(f"[INFO] Salvato: {FORWARD_BT_SUMMARY_XLSX} ({len(bt_summary_df):,} righe)")
if errors:
pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False)
print(f"[INFO] Log errori aggiornato: {ERROR_LOG_CSV} (tot: {len(errors)})")
# Salva riepilogo prezzi (solo simboli primari, senza fallback) ogni run
try:
save_price_cache_summary(OPEN_CACHE_DIR, OPEN_CACHE_DIR / "prezzi_summary_no_fallback.xlsx")
except Exception as e:
print(f"[WARN] Riepilogo prezzi non creato: {e}")
# Timer per fasi post-backtest (sezione 5 in poi)
start_post_timer(total_steps=4)
# ======================================================================
# 5) STRATEGIE PORTAFOGLIO DINAMICHE + EQUITY + HEATMAP + TRADE REPORT
# ======================================================================
def _ensure_bt_summary(meta_df: pd.DataFrame, bt_summary_df: pd.DataFrame) -> pd.DataFrame:
if bt_summary_df is not None and not bt_summary_df.empty:
out = bt_summary_df.copy()
else:
out = pd.DataFrame({"ISIN": meta_df["ISIN"].copy()})
for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]:
out[c] = np.nan
out = out.merge(meta_df[["ISIN","Nome","Categoria","Asset Class"]], on="ISIN", how="left")
out["ISIN"] = out["ISIN"].astype(str).str.strip()
return out
def _ensure_bt_signals(meta_df: pd.DataFrame, bt_signals_df: pd.DataFrame, last_dates_hint: list) -> pd.DataFrame:
if bt_signals_df is not None and not bt_signals_df.empty:
fbsig = bt_signals_df.copy()
fbsig["Date"] = pd.to_datetime(fbsig["Date"])
fbsig["ISIN"] = fbsig["ISIN"].astype(str).str.strip()
fbsig["Signal"] = pd.to_numeric(fbsig["Signal"], errors="coerce").fillna(0).astype(int)
fbsig["PnL"] = pd.to_numeric(fbsig["PnL"], errors="coerce").fillna(0.0)
return fbsig
end_date = (max(last_dates_hint) if last_dates_hint else pd.Timestamp.today()).normalize()
dates = pd.bdate_range(end=end_date, periods=120, freq="C")
isins_small = meta_df["ISIN"].astype(str).str.strip().tolist()[:12]
rows = []
for isin in isins_small:
for dt in dates:
rows.append({"Date": dt, "ISIN": isin, "Signal": 0, "PnL": 0.0})
return pd.DataFrame(rows)
forward_bt_summary = _ensure_bt_summary(meta_df, bt_summary_df)
forward_bt_signals = _ensure_bt_signals(meta_df, bt_signals_df, last_dates)
# -----------------------------
# 5.1 Funzioni di supporto (grafici e pesi)
# -----------------------------
def equity_from_returns(r: pd.Series) -> pd.Series:
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
return (1 + r).cumprod() * 100
def monthly_returns(r: pd.Series) -> pd.Series:
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
if not isinstance(r.index, (pd.DatetimeIndex, pd.PeriodIndex, pd.TimedeltaIndex)):
try:
r.index = pd.to_datetime(r.index)
except Exception:
return pd.Series(dtype=float)
return (1 + r).resample("M").prod() - 1
def plot_heatmap_monthly(r: pd.Series, title: str, save_path: str = None):
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
m = monthly_returns(r)
df = m.to_frame("ret")
df["Year"], df["Month"] = df.index.year, df.index.month
pv = df.pivot(index="Year", columns="Month", values="ret")
fig, ax = plt.subplots(figsize=(10,6))
im = ax.imshow(pv.fillna(0)*100, cmap="RdYlGn", vmin=-3, vmax=5, aspect="auto")
for i in range(pv.shape[0]):
for j in range(pv.shape[1]):
val = pv.iloc[i,j]
if not np.isnan(val):
ax.text(j, i, f"{val*100:.1f}", ha="center", va="center", fontsize=8)
ax.set_title(title); ax.set_xlabel("Mese"); ax.set_ylabel("Anno")
ax.set_xticks(range(12)); ax.set_xticklabels(range(1,13))
fig.colorbar(im, ax=ax, label="%")
plt.tight_layout()
if save_path:
savefig_safe(save_path, dpi=150)
plt.close(fig)
# Non mostrare il plot durante l'esecuzione
def inverse_vol_weights(df, window=60, max_weight=None):
vol = df.rolling(window).std()
inv = 1 / vol.replace(0, np.nan)
w = inv.div(inv.sum(axis=1), axis=0)
w = w.ffill().fillna(1 / max(1, df.shape[1]))
if max_weight is not None:
w = w.clip(upper=max_weight)
return w
def portfolio_metrics(r: pd.Series):
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
if len(r) == 0:
return dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan)
ann = DAYS_PER_YEAR
eq = equity_from_returns(r)
cagr = (eq.iloc[-1]/eq.iloc[0])**(ann/max(1, len(r))) - 1
vol = r.std() * np.sqrt(ann)
sharpe = (r.mean()/r.std()) * np.sqrt(ann) if r.std() > 0 else np.nan
mdd = (eq/eq.cummax() - 1).min()
return dict(CAGR=cagr, Vol=vol, Sharpe=sharpe, MaxDD=mdd)
# -----------------------------
# 5.2 Selezione dinamica ISIN (ranking)
# -----------------------------
df_sum = forward_bt_summary.copy()
for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]:
if c in df_sum.columns:
df_sum[c] = pd.to_numeric(df_sum[c], errors="coerce")
def _safe_rank(s: pd.Series):
s = pd.to_numeric(s, errors="coerce")
if s.notna().sum() == 0:
return pd.Series(np.zeros(len(s)), index=s.index)
s_filled = s.fillna(s.median())
return s_filled.rank(method="average")
import numpy as np
import pandas as pd
from scipy import linalg
# ----------------------------
# Utils
# ----------------------------
def _safe_rank_ser(s: pd.Series) -> pd.Series:
"""Rank robusto (0..1), gestisce NaN."""
s = s.copy()
return s.rank(method="average", na_option="keep") / s.notna().sum()
def _winsorize(s: pd.Series, p=0.005):
lo, hi = s.quantile(p), s.quantile(1-p)
return s.clip(lo, hi)
def _pos_normalize(w: np.ndarray, eps=1e-12):
w = np.where(w < 0, 0, w)
s = w.sum()
return (w / (s + eps)) if s > eps else np.ones_like(w)/len(w)
def _corr_shrink(C: np.ndarray, alpha=0.10):
"""Shrink verso I per stabilità numerica."""
k = C.shape[0]
return (1-alpha)*C + alpha*np.eye(k)
def _time_blocks_index(idx: pd.Index, k_folds=5):
"""Split temporale semplice in k blocchi contigui."""
n = len(idx)
fold_sizes = np.full(k_folds, n // k_folds, dtype=int)
fold_sizes[: n % k_folds] += 1
splits, start = [], 0
for fs in fold_sizes:
end = start + fs
splits.append(idx[start:end])
start = end
return splits
# ----------------------------
# Calibrazione pesi
# ----------------------------
def calibrate_score_weights(
df_sum: pd.DataFrame,
metrics_map=None,
target_col: str | None = None,
k_folds: int = 5,
shrink_equal: float = 0.25, # shrink verso equal weight per stabilità
corr_shrink: float = 0.10 # shrink della matrice di correlazione
):
"""
metrics_map: lista di tuple (colname, good_is_high)
good_is_high=True se valori alti sono migliori.
Esempio: [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
target_col: nome colonna target OOS (es. 'FWD_CAGR_%', 'FWD_Sharpe', ecc.)
Se None => usa ERC non supervisionato.
Ritorna: dict con 'weights' (pd.Series), 'X_ranked' (DataFrame) e 'mode'
"""
if metrics_map is None:
metrics_map = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
# Costruisci matrice delle metriche X (rank 0..1)
X_cols, X_list = [], []
for col, good_high in metrics_map:
s = df_sum.get(col, pd.Series(index=df_sum.index, dtype=float)).astype(float)
s = _winsorize(s)
if not good_high:
s = -s # inverti segno se "meno è meglio"
X_cols.append(col)
X_list.append(_safe_rank_ser(s))
X = pd.concat(X_list, axis=1)
X.columns = X_cols
X = X.loc[:, X.columns[X.notna().sum(0) > 0]] # droppa colonne tutte NaN
k = X.shape[1]
if k == 0:
raise ValueError("Nessuna metrica valida per la calibrazione.")
# Se non hai target: ERC non supervisionato
if target_col is None or target_col not in df_sum.columns:
# cov su features rankate
C = np.cov(np.nan_to_num(X.values, nan=np.nanmean(X.values)), rowvar=False)
# stabilizza
C = _corr_shrink(C, alpha=corr_shrink)
# ERC: pesi proposti ~ inverse vol in spazio decorrelato
# approssimazione pratica: w ∝ diag(C)^(-1/2), poi normalizza riducendo l'effetto di correlazioni con Σ^-1/2
vol = np.sqrt(np.clip(np.diag(C), 1e-12, None))
w0 = 1.0 / vol
w = _pos_normalize(w0)
return {
"mode": "unsupervised_erc",
"weights": pd.Series(w, index=X.columns, name="weight"),
"X_ranked": X
}
# Supervisionato: IC-Σ^-1
y = df_sum[target_col].astype(float).copy()
y = _winsorize(y)
y_rank = _safe_rank_ser(y) # target in rank per allineare a Spearman
# Drop righe senza target o tutte NaN nelle metriche
mask = y_rank.notna() & X.notna().any(1)
Xf, yf = X[mask].copy(), y_rank[mask].copy()
if len(Xf) < max(30, k*5):
# fallback se troppo pochi dati
C = np.corrcoef(np.nan_to_num(X.values, nan=np.nanmean(X.values)), rowvar=False)
C = _corr_shrink(C, alpha=corr_shrink)
ic = np.array([
pd.Series(X.iloc[:, j]).corr(y_rank, method="spearman")
for j in range(k)
])
ic = np.nan_to_num(ic, nan=0.0)
w = linalg.solve(C + 1e-6*np.eye(k), ic, assume_a='sym')
w = (1-shrink_equal)*_pos_normalize(w) + shrink_equal*np.ones(k)/k
return {
"mode": "supervised_icSigmaInv_fallback",
"weights": pd.Series(w, index=X.columns, name="weight"),
"X_ranked": X
}
# CV a blocchi temporali (walk-forward grossolano)
# Ordine temporale = index di df_sum (assumo ordinato)
folds = _time_blocks_index(Xf.index, k_folds=k_folds)
W_list = []
for t in range(1, len(folds)):
train_idx = pd.Index([]).append(pd.Index([]))
for i in range(t): # usa tutti i blocchi precedenti come train
train_idx = train_idx.append(folds[i])
valid_idx = folds[t]
Xt, yt = Xf.loc[train_idx], yf.loc[train_idx]
Xv, yv = Xf.loc[valid_idx], yf.loc[valid_idx]
# IC per colonna usando solo train
ic = np.array([Xt.iloc[:, j].corr(yt, method="spearman") for j in range(Xt.shape[1])])
ic = np.nan_to_num(ic, nan=0.0)
# Corr(X) su train
C = np.corrcoef(np.nan_to_num(Xt.values, nan=np.nanmean(Xt.values)), rowvar=False)
C = _corr_shrink(C, alpha=corr_shrink)
try:
w_raw = linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic, assume_a='sym')
except Exception:
w_raw = ic.copy()
w_fold = _pos_normalize(w_raw)
# shrink verso equal
w_fold = (1-shrink_equal)*w_fold + shrink_equal*np.ones_like(w_fold)/len(w_fold)
W_list.append(w_fold)
# media pesi sui fold
if not W_list:
# in casi limite (pochi dati), ripiega
ic = np.array([Xf.iloc[:, j].corr(yf, method="spearman") for j in range(Xf.shape[1])])
ic = np.nan_to_num(ic, nan=0.0)
C = np.corrcoef(np.nan_to_num(Xf.values, nan=np.nanmean(Xf.values)), rowvar=False)
C = _corr_shrink(C, alpha=corr_shrink)
w = linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic, assume_a='sym')
else:
w = np.mean(np.vstack(W_list), axis=0)
w = _pos_normalize(w)
return {
"mode": "supervised_icSigmaInv_cv",
"weights": pd.Series(w, index=X.columns, name="weight"),
"X_ranked": X
}
# --- PRE-FLIGHT METRICS GUARD ---------------------------------------------
def _coerce_num(s: pd.Series) -> pd.Series:
return pd.to_numeric(s, errors="coerce").replace([np.inf, -np.inf], np.nan)
# Assicurati che df_sum esista ed abbia le colonne base
df_sum = forward_bt_summary.copy()
for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%",
"QualityScore","Confidence","OutcomeScore"]:
if c in df_sum.columns:
df_sum[c] = _coerce_num(df_sum[c])
# Se Sharpe/CAGR/MaxDD mancano o sono tutti NaN, prova a ricostruirli dai segnali
need_rebuild = (
("Sharpe" not in df_sum.columns or df_sum["Sharpe"].notna().sum()==0) or
("CAGR_%" not in df_sum.columns) or
("MaxDD_%eq" not in df_sum.columns)
)
if need_rebuild:
try:
# Recompute metriche minime per ISIN da bt_signals_df (se presente)
tmp = bt_signals_df.copy()
if not tmp.empty:
tmp["PnL"] = pd.to_numeric(tmp["PnL"], errors="coerce").fillna(0.0)
agg_rows = []
for isin, g in tmp.groupby("ISIN"):
stats = drawdown_stats_simple(g["PnL"])
stats["ISIN"] = str(isin)
agg_rows.append(stats)
rebuilt = pd.DataFrame(agg_rows)
# Coerce numerico e merge su df_sum
for c in ["CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"]:
if c in rebuilt.columns:
rebuilt[c] = _coerce_num(rebuilt[c])
df_sum = (
df_sum.drop(columns=[c for c in ["CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"] if c in df_sum.columns])
.merge(rebuilt[["ISIN","CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"]], on="ISIN", how="left")
)
except Exception as e:
print(f"[WARN] Ricostruzione metriche fallita: {e}")
df_sum = _apply_score(df_sum)
TOP_N = 15
base_isins = (
df_sum
.sort_values("Score", ascending=False)
.head(TOP_N)["ISIN"].astype(str).str.strip().tolist()
)
# Nessuna strategia cripto separata: le criptovalute sono trattate come gli altri asset
print(f"[INFO] Ranking full-sample (solo debug, i portafogli usano ranking rolling): {base_isins}")
# -----------------------------
# 5.3 Costruzione portafogli
# (Equal Weight + Risk Parity con cap)
# -----------------------------
bt = forward_bt_signals.copy()
bt["Date"] = pd.to_datetime(bt["Date"])
bt["ISIN"] = bt["ISIN"].astype(str).str.strip()
bt = bt.sort_values(["Date", "ISIN"])
wide_pnl = (
bt.pivot_table(index="Date", columns="ISIN", values="PnL", aggfunc="sum")
.fillna(0.0)
)
wide_sig = (
bt.pivot_table(index="Date", columns="ISIN", values="Signal", aggfunc="last")
.fillna(0)
.astype(int)
)
wide_est = (
bt.pivot_table(index="Date", columns="ISIN", values="EstOutcome", aggfunc="last")
.sort_index()
)
# (Opzionale) ricostruzione PnL portafoglio con open->open: disattivata di default perché il PnL
# viene già calcolato a livello di singolo asset usando gli open.
if globals().get("RECOMPUTE_PORTF_FROM_OPEN", False):
try:
date_min = (bt["Date"].min() - pd.Timedelta(days=5)).date()
date_max = (bt["Date"].max() + pd.Timedelta(days=5)).date()
px_hist = fetch_price_history(
isins=bt["ISIN"].unique(),
universe=meta_df if 'meta_df' in globals() else pd.DataFrame(),
start_date=date_min.isoformat(),
end_date=date_max.isoformat()
)
open_pivot = (
px_hist.pivot(index="Date", columns="ISIN", values="Open")
.sort_index()
)
open_ret = open_pivot.pct_change()
# segnale su giorno t, esecuzione a open t+1
wide_pnl = wide_sig * open_ret.shift(-1)
common_idx = wide_sig.index.intersection(wide_pnl.index)
common_idx = pd.to_datetime(common_idx)
wide_sig = wide_sig.reindex(common_idx).fillna(0).astype(int)
wide_pnl = wide_pnl.reindex(common_idx).fillna(0.0)
wide_sig.index = pd.to_datetime(wide_sig.index)
wide_pnl.index = pd.to_datetime(wide_pnl.index)
print(f"[INFO] PnL ricostruito su open->open per {len(open_pivot.columns)} ISIN.")
except Exception as e:
print(f"[WARN] Ricostruzione PnL open->open fallita, uso PnL originale: {e}")
# I portafogli verranno costruiti piu' sotto con ranking rolling (vedi _build_dynamic_portfolio_returns).
def plot_portfolio_composition(weights: pd.DataFrame,
title: str,
save_path: str | None = None,
max_legend: int = 12):
"""
Stacked area dei pesi per ISIN nel tempo (un colore per asset).
- Accetta un DataFrame 'weights' indicizzato per Date, colonne = ISIN, valori = peso (0..1).
- Normalizza le righe per sicurezza.
- Raggruppa la coda lunga in 'Altri' se gli asset superano 'max_legend'.
- Salva su 'save_path' se fornito; se è solo un filename, salva nella cartella corrente.
- Non lancia eccezioni inutili: stampa messaggi SKIP quando i dati non sono plottabili.
Esempio:
plot_portfolio_composition(w_eq, "Equal Weight", "composition_equal_weight.png")
plot_portfolio_composition(w_rp, "Risk Parity", "composition_risk_parity.png")
"""
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# ---- Guard basi ----
if weights is None or getattr(weights, "empty", True):
print(f"[SKIP] Nessun peso disponibile per: {title}")
return
# Copia e sanificazione
W = weights.copy()
# Forza indice univoco e ordinato (se Date duplicati, tieni lultimo)
if W.index.has_duplicates:
W = W[~W.index.duplicated(keep="last")]
W = W.sort_index()
# Coerce numerico e sostituisci NaN con 0
W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0)
# Droppa colonne totalmente nulle
keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0]
if not keep_cols:
print(f"[SKIP] Tutte le colonne hanno peso zero per: {title}")
return
W = W[keep_cols]
# Normalizzazione riga (se somma=0, lascia 0)
row_sum = W.sum(axis=1)
with np.errstate(invalid="ignore", divide="ignore"):
W = W.div(row_sum.replace(0.0, np.nan), axis=0).fillna(0.0).clip(lower=0.0)
# Se dopo la pulizia resta vuoto o una sola riga, non ha senso il plot area
if len(W.index) < 2 or W.shape[1] == 0:
print(f"[SKIP] Serie troppo corta o senza colonne valide per: {title}")
return
# Ordina gli asset per peso medio decrescente (più leggibile)
avg_w = W.mean(axis=0).sort_values(ascending=False)
ordered = avg_w.index.tolist()
# Raggruppa coda lunga in "Altri" se troppi asset
if len(ordered) > max_legend:
head, tail = ordered[:max_legend], ordered[max_legend:]
W_show = W[head].copy()
W_show["Altri"] = W[tail].sum(axis=1)
ordered = head + ["Altri"]
else:
W_show = W[ordered].copy()
# Palette (tab20 riciclata se necessario)
cmap = plt.colormaps.get_cmap("tab20")
colors = [cmap(i % cmap.N) for i in range(len(ordered))]
# Plot
fig, ax = plt.subplots(figsize=(11, 6))
ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors)
ax.set_title(f"Composizione portafoglio nel tempo - {title}")
ax.set_ylim(0, 1)
ax.grid(True, alpha=0.3)
ax.set_ylabel("Peso")
# Etichette Y in percentuale
yticks = ax.get_yticks()
ax.set_yticklabels([f"{y*100:.0f}%" for y in yticks])
# Legenda compatta
ncol = 2 if len(ordered) > 10 else 1
ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN")
fig.tight_layout()
# Salvataggio robusto (gestisce filename senza cartella)
if save_path:
folder = os.path.dirname(save_path) or "."
try:
os.makedirs(folder, exist_ok=True)
except Exception as e:
print(f"[WARN] Impossibile creare la cartella '{folder}': {e}. Provo a salvare nella dir corrente.")
save_path = os.path.basename(save_path)
fig.savefig(save_path, dpi=150, bbox_inches="tight")
try:
full = os.path.abspath(save_path)
except Exception:
full = save_path
print(f"[INFO] Salvato: {full}")
# Plot salvato senza visualizzazione interattiva
def make_active_weights(w_base: pd.DataFrame,
sig: pd.DataFrame,
renorm_to_1: bool = False, # True: rialloca tra gli attivi; False: lascia quota in Cash
add_cash: bool = True,
cash_label: str = "Cash") -> pd.DataFrame:
"""
w_base : pesi teorici (righe=date, colonne=ISIN)
sig : matrice Signal (0/1) allineata (righe=date, colonne=ISIN)
"""
import numpy as np
import pandas as pd
if w_base is None or w_base.empty:
return pd.DataFrame(index=sig.index, columns=[])
W = w_base.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0)
S = sig.reindex_like(W).fillna(0).astype(int) # allinea e riempi
# pesa solo gli attivi
W_active = W * (S > 0)
row_sum = W_active.sum(axis=1)
if renorm_to_1:
# rialloca sui soli attivi: nessuna quota in cash
W_active = W_active.div(row_sum.replace(0, np.nan), axis=0).fillna(0.0)
if add_cash:
# se renorm True, cash è zero
W_active[cash_label] = 0.0
else:
# non renorm: lasciamo la parte non investita in cash
if add_cash:
cash = (1.0 - row_sum).clip(lower=0.0, upper=1.0)
W_active[cash_label] = cash
# rimuovi colonne sempre zero
keep = [c for c in W_active.columns if W_active[c].abs().sum() > 0]
return W_active[keep]
# -----------------------------
# Portafogli dinamici con ranking rolling
# -----------------------------
_dynamic_portfolio_cache: dict[int, dict] = {}
def _build_dynamic_portfolio_returns(
wide_pnl: pd.DataFrame,
wide_sig: pd.DataFrame,
wide_est: pd.DataFrame,
top_n: int,
window_bars: int = RANKING_WINDOW_BARS,
rp_lookback: int = RP_LOOKBACK
) -> dict:
if wide_pnl is None or wide_pnl.empty:
idx = pd.Index([])
empty_w = pd.DataFrame(index=idx, columns=[])
return {
"ret_eq": pd.Series(dtype=float),
"ret_rp": pd.Series(dtype=float),
"w_eq": empty_w,
"w_rp": empty_w,
"w_eq_act": empty_w,
"w_rp_act": empty_w,
"selection": {}
}
dates = wide_pnl.index.sort_values()
all_cols = wide_pnl.columns.tolist()
w_eq = pd.DataFrame(0.0, index=dates, columns=all_cols)
w_rp = pd.DataFrame(0.0, index=dates, columns=all_cols)
selection = {}
for dt in dates:
# Considera solo gli ISIN con segnale attivo oggi
sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float)
on_cols = [c for c in all_cols if sig_row.get(c, 0) == 1]
if not on_cols:
selection[dt] = []
continue
window_est = wide_est.loc[:dt].tail(window_bars) if not wide_est.empty else pd.DataFrame()
scores = []
for c in on_cols:
s = pd.to_numeric(window_est[c], errors="coerce") if c in window_est.columns else pd.Series(dtype=float)
est_score = s.mean(skipna=True)
if pd.isna(est_score):
continue
scores.append((c, est_score))
if not scores:
selection[dt] = []
continue
scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True)
base_isins_dt = [c for c, _ in scores_sorted[:top_n]]
selection[dt] = base_isins_dt
if not base_isins_dt:
continue
w_eq.loc[dt, base_isins_dt] = 1 / len(base_isins_dt)
window_pnl = wide_pnl.loc[:dt].tail(window_bars)
rp_hist = window_pnl[base_isins_dt]
rp_w = inverse_vol_weights(rp_hist, window=rp_lookback, max_weight=RP_MAX_WEIGHT)
if not rp_w.empty:
last = rp_w.iloc[-1].fillna(0.0)
last_sum = float(last.sum())
if last_sum > 0:
last = last / last_sum
w_rp.loc[dt, last.index] = last.values
w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
ret_eq = (wide_pnl * w_eq_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
ret_rp = (wide_pnl * w_rp_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
return {
"ret_eq": ret_eq,
"ret_rp": ret_rp,
"w_eq": w_eq,
"w_rp": w_rp,
"w_eq_act": w_eq_act,
"w_rp_act": w_rp_act,
"selection": selection
}
def _get_dynamic_portfolio(top_n: int) -> dict:
if top_n not in _dynamic_portfolio_cache:
_dynamic_portfolio_cache[top_n] = _build_dynamic_portfolio_returns(
wide_pnl=wide_pnl,
wide_sig=wide_sig,
wide_est=wide_est,
top_n=top_n,
window_bars=RANKING_WINDOW_BARS,
rp_lookback=RP_LOOKBACK
)
return _dynamic_portfolio_cache[top_n]
# Portafoglio principale (Top_N di default) calcolato in modo rolling
_main_port = _get_dynamic_portfolio(TOP_N)
ret_eq = _main_port["ret_eq"]
ret_rp = _main_port["ret_rp"]
w_eq = _main_port["w_eq"]
w_rp = _main_port["w_rp"]
w_eq_act = _main_port["w_eq_act"]
w_rp_act = _main_port["w_rp_act"]
selection_by_date = _main_port["selection"]
weights_rp = w_rp.copy()
print(f"[INFO] Portafoglio rolling calcolato (TopN={TOP_N}, finestra={RANKING_WINDOW_BARS} barre, rp_lookback={RP_LOOKBACK}).")
checkpoint_post_timer("Portafoglio rolling")
# -----------------------------
# 5.4 Equity line + Heatmap (salva PNG)
# -----------------------------
# (DISATTIVATO) Equity line/heatmap teoriche su ret_eq/ret_rp
# eq_eq, eq_rp = map(equity_from_returns, [ret_eq, ret_rp])
# plt.figure(figsize=(10,6))
# plt.plot(eq_eq, label="Equal Weight")
# plt.plot(eq_rp, label="Risk Parity")
# plt.legend(); plt.grid(); plt.title("Equity line - Selezione dinamica (Top N)")
# plt.tight_layout()
# savefig_safe(str(PLOT_DIR / "equity_line_portafogli.png"), dpi=150)
# for name, r, path in [
# ("Equal Weight", ret_eq, PLOT_DIR / "heatmap_equal_weight.png"),
# ("Risk Parity", ret_rp, PLOT_DIR / "heatmap_risk_parity.png"),
# ]:
# m = portfolio_metrics(r)
# print(f"{name:22s} → CAGR {m['CAGR']*100:5.2f}% | Vol {m['Vol']*100:5.2f}% | Sharpe {m['Sharpe'] if m['Sharpe']==m['Sharpe'] else float('nan'):4.2f} | MaxDD {m['MaxDD']*100:5.2f}%")
# plot_heatmap_monthly(r, f"Heatmap mensile {name}", save_path=path)
# =============================
# 5.4bis Composizione nel tempo (ATTIVI vs CASH)
# =============================
import os
import numpy as np
import matplotlib.pyplot as plt
def plot_portfolio_composition_fixed(weights: pd.DataFrame,
title: str,
save_path: str | None = None,
max_legend: int = 20):
"""
Stacked area dei pesi nel tempo.
'weights' deve essere già quello ATTIVO (già mascherato con i Signal)
e può includere una colonna 'Cash'.
- NON ri-normalizza le righe: mostra la quota di Cash reale.
- Se vuoi forzare sempre 100% investito, passa prima da renorm_to_1=True in make_active_weights.
"""
if weights is None or getattr(weights, "empty", True):
print(f"[SKIP] Nessun peso per: {title}")
return
W = weights.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0)
if W.index.has_duplicates:
W = W[~W.index.duplicated(keep="last")]
W = W.sort_index()
# drop colonne sempre zero
keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0]
if not keep_cols:
print(f"[SKIP] Tutti i pesi sono zero per: {title}")
return
W = W[keep_cols]
if len(W.index) < 2:
print(f"[SKIP] Serie troppo corta per: {title}")
return
# Ordina per peso medio (Cash in coda così resta sopra nello stack)
avg_w = W.mean(0).sort_values(ascending=False)
ordered = avg_w.index.tolist()
if "Cash" in ordered:
ordered = [c for c in ordered if c != "Cash"] + ["Cash"]
# Raggruppa coda lunga in "Altri"
if len(ordered) > max_legend:
head = ordered[:max_legend]
# Garantisce che 'Cash' resti in legenda anche oltre il cap
if "Cash" not in head and "Cash" in ordered:
head = head[:-1] + ["Cash"]
tail = [c for c in ordered if c not in head]
W_show = W[head].copy()
if tail:
W_show["Altri"] = W[tail].sum(1)
ordered = head + ["Altri"]
else:
ordered = head
else:
W_show = W[ordered].copy()
cmap = plt.colormaps.get_cmap("tab20")
colors = [cmap(i % cmap.N) for i in range(len(ordered))]
fig, ax = plt.subplots(figsize=(11, 6))
ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors)
ax.set_title(f"Composizione portafoglio nel tempo - {title}")
# limite Y = somma massima osservata (<= 1 se pesi + Cash corretti)
ymax = float(np.nanmax(W_show.sum(1).values))
if not np.isfinite(ymax) or ymax <= 0:
ymax = 1.0
ax.set_ylim(0, max(1.0, ymax))
ax.grid(True, alpha=0.3)
ax.set_ylabel("Peso")
ax.set_yticks(ax.get_yticks())
ax.set_yticklabels([f"{y*100:.0f}%" for y in ax.get_yticks()])
ncol = 2 if len(ordered) > 10 else 1
ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN")
fig.tight_layout()
if save_path:
folder = os.path.dirname(save_path) or "."
try:
os.makedirs(folder, exist_ok=True)
except Exception as e:
print(f"[WARN] mkdir '{folder}': {e} -> salvo in cwd")
save_path = os.path.basename(save_path)
fig.savefig(save_path, dpi=150, bbox_inches="tight")
print(f"[INFO] Salvato: {os.path.abspath(save_path)}")
# Plot salvato senza visualizzazione interattiva
# --- 1) Pesi teorici dei portafogli (già costruiti sopra) ---
# w_eq : equal weight su 'cols'
# w_rp : risk parity (weights_rp)
def _sanitize_weights(W: pd.DataFrame, index_like: pd.Index) -> pd.DataFrame:
if W is None or W.empty:
return pd.DataFrame(index=index_like, columns=[])
W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0)
# normalizziamo a somma 1 (pesi TEORICI)
rs = W.sum(1).replace(0, np.nan)
return W.div(rs, axis=0).fillna(0.0).clip(lower=0.0)
# ricostruisco coerentemente nel caso non fossero gia definiti
if 'w_eq' not in globals():
w_eq = pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns)
if 'w_rp' not in globals():
w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns)
w_eq = _sanitize_weights(w_eq, wide_pnl.index)
w_rp = _sanitize_weights(w_rp, wide_pnl.index)
# --- 2) Pesi ATTIVI (mascherati con i Signal) ---
# renorm_to_1=False → lascia la quota NON investita in 'Cash'
w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
# Export pesi giornalieri (Equal/Risk Parity) con cash normalizzato a 100%
def _export_weights_daily(w_eq_act_df: pd.DataFrame, w_rp_act_df: pd.DataFrame, path=WEIGHTS_DAILY_XLSX):
try:
def _prep(df: pd.DataFrame) -> pd.DataFrame:
if df is None or df.empty:
return pd.DataFrame()
out = df.copy()
if "Cash" not in out.columns:
out["Cash"] = 0.0
out = out.apply(pd.to_numeric, errors="coerce").fillna(0.0)
if out.index.has_duplicates:
out = out[~out.index.duplicated(keep="last")]
out = out.sort_index()
row_sum = out.sum(axis=1).replace(0, np.nan)
out = out.div(row_sum, axis=0).fillna(0.0)
cols = [c for c in out.columns if c != "Cash"] + ["Cash"]
return out[cols]
w_eq_x = _prep(w_eq_act_df)
w_rp_x = _prep(w_rp_act_df)
if w_eq_x.empty and w_rp_x.empty:
print("[INFO] Nessun peso da esportare (weights_daily.xlsx non creato).")
return
with pd.ExcelWriter(path) as xw:
if not w_eq_x.empty:
w_eq_x.to_excel(xw, "Equal_Weight", index=True)
if not w_rp_x.empty:
w_rp_x.to_excel(xw, "Risk_Parity", index=True)
print(f"[INFO] Salvato: {path}")
except Exception as e:
print(f"[WARN] Export weights_daily.xlsx fallito: {e}")
_export_weights_daily(w_eq_act, w_rp_act, path=WEIGHTS_DAILY_XLSX)
# --- 3) Plot + salvataggio ---
plot_portfolio_composition_fixed(w_eq_act, "Equal Weight (attivi + Cash)", str(PLOT_DIR / "composition_equal_weight_active.png"))
plot_portfolio_composition_fixed(w_rp_act, "Risk Parity (attivi + Cash)", str(PLOT_DIR / "composition_risk_parity_active.png"))
checkpoint_post_timer("Pesi/plot portafogli")
# -----------------------------
# 5.5 Report trades — SOLO LONG
# -----------------------------
def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFrame, name: str) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Report trade SOLO LONG coerente EOD:
- segnale laggato di 1 barra (apertura dal giorno successivo al primo 1)
- PnL allineato al giorno di esposizione: r = pnl.shift(-1)
- chiusura al primo 0 (CloseDate = dt), e a fine serie CloseDate = ultimo + BDay(1)
- Duration_bars = numero di barre accumulate nel PnL del trade
Ritorna (df_trades, df_daily) dove df_daily contiene PnL giorno per giorno per ogni trade valido.
"""
from pandas.tseries.offsets import BDay
# Allinea indici
common_idx = sig.index.intersection(pnl.index)
if not weights.empty:
common_idx = common_idx.intersection(weights.index)
sig = sig.loc[common_idx].copy()
pnl = pnl.loc[common_idx].copy()
weights = weights.loc[common_idx].copy() if not weights.empty else pd.DataFrame(index=common_idx)
# Sanitizza
sig = sig.fillna(0).astype(int).clip(lower=0) # solo long
pnl = pnl.apply(pd.to_numeric, errors="coerce") # mantieni NaN per buchi open
rows = []
daily_rows = []
for isin in sig.columns:
# 1) Segnale EOD (lag 1)
s = sig[isin].fillna(0).astype(int).shift(1).fillna(0).astype(int)
# 2) PnL allineato al giorno di esposizione (EOD): usa pnl.shift(-1)
r = pnl[isin].shift(1)
# 3) Pesi (se disponibili)
w = (weights[isin].fillna(0.0) if (isin in weights.columns) else pd.Series(0.0, index=s.index))
in_pos, start, acc, acc_dates = False, None, [], []
for dt in s.index:
sig_t = int(s.at[dt])
# CHIUSURA: primo 0 dopo un periodo in posizione → chiudi oggi (dt)
if in_pos and (sig_t == 0):
if any(pd.isna(acc)):
print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{dt.date()}")
else:
pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0
w_start = float(w.get(start, 0.0))
rows.append(dict(
Strategy=name,
ISIN=isin,
OpenDate=start,
CloseDate=dt,
Direction="long",
Size=w_start,
Duration_bars=len(acc),
**{"PnL_%": pnl_val * 100.0}
))
# salva contributo giornaliero reale (senza spalmare)
for dd, rv in zip(acc_dates, acc):
daily_rows.append({
"Strategy": name,
"ISIN": isin,
"Date": dd,
"Size": w_start,
"PnL_day": float(rv)
})
in_pos, start, acc = False, None, []
acc_dates = []
# APERTURA: primo 1 (laggato) quando non in posizione
if (not in_pos) and (sig_t == 1):
in_pos, start, acc, acc_dates = True, dt, [], []
# ACCUMULO: PnL del giorno di esposizione
if in_pos:
acc.append(r.at[dt])
acc_dates.append(dt)
# CHIUSURA A FINE SERIE → prossimo business day
if in_pos:
close_dt = s.index[-1] + BDay(1)
if any(pd.isna(acc)):
print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{close_dt.date()}")
else:
pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0
w_start = float(w.get(start, 0.0))
rows.append(dict(
Strategy=name,
ISIN=isin,
OpenDate=start,
CloseDate=close_dt,
Direction="long",
Size=w_start,
Duration_bars=len(acc),
**{"PnL_%": pnl_val * 100.0}
))
for dd, rv in zip(acc_dates, acc):
daily_rows.append({
"Strategy": name,
"ISIN": isin,
"Date": dd,
"Size": w_start,
"PnL_day": float(rv)
})
# Ordina colonne
cols = ["Strategy","ISIN","OpenDate","CloseDate","Direction","Size","Duration_bars","PnL_%"]
out = pd.DataFrame(rows)
out = out[[c for c in cols if c in out.columns] + [c for c in out.columns if c not in cols]]
daily_df = pd.DataFrame(daily_rows)
if not daily_df.empty:
daily_df["Date"] = pd.to_datetime(daily_df["Date"])
return out, daily_df
# Colonne asset effettivamente usate nel portafoglio principale
asset_cols = [c for c in w_eq.columns if float(pd.to_numeric(w_eq[c], errors="coerce").abs().sum()) > 0.0]
if not asset_cols:
asset_cols = list(wide_pnl.columns)
rep_eq, daily_eq = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]],
wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]],
w_eq_act, "Equal Weight")
rep_rp, daily_rp = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]],
wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]],
w_rp_act, "Risk Parity")
with pd.ExcelWriter(TRADES_REPORT_XLSX) as xw:
rep_eq.to_excel(xw, "Equal_Weight", index=False)
rep_rp.to_excel(xw, "Risk_Parity", index=False)
checkpoint_post_timer("Report trades")
# Performance attribution per ISIN
def _build_performance_attribution(trades_df: pd.DataFrame, meta_df: pd.DataFrame | None) -> pd.DataFrame:
if trades_df is None or trades_df.empty:
return pd.DataFrame(columns=["ISIN","Nome","Tot_Trades","Positivi","Negativi","Positivi_%","Negativi_%","PnL_Cum_%"])
df = trades_df.copy()
df["PnL_%"] = pd.to_numeric(df["PnL_%"], errors="coerce")
rows = []
for isin, g in df.groupby("ISIN"):
tot = len(g)
pos = int((g["PnL_%"] > 0).sum())
neg = int((g["PnL_%"] < 0).sum())
rows.append({
"ISIN": str(isin),
"Tot_Trades": tot,
"Positivi": pos,
"Negativi": neg,
"Positivi_%": (pos / tot * 100.0) if tot > 0 else np.nan,
"Negativi_%": (neg / tot * 100.0) if tot > 0 else np.nan,
"PnL_Cum_%": float(g["PnL_%"].sum())
})
out = pd.DataFrame(rows)
if meta_df is not None and "ISIN" in meta_df.columns:
meta_cols = [c for c in ["ISIN","Nome","Descrizione","Categoria","Asset Class"] if c in meta_df.columns]
if meta_cols:
out = out.merge(meta_df[meta_cols].drop_duplicates("ISIN"), on="ISIN", how="left")
# ordina colonne con Nome subito dopo ISIN
cols = [c for c in ["ISIN","Nome"] if c in out.columns] + [c for c in out.columns if c not in ["ISIN","Nome"]]
return out[cols]
perf_attr_df = _build_performance_attribution(pd.concat([rep_eq, rep_rp], ignore_index=True), df_sum if 'df_sum' in globals() else None)
perf_attr_df.to_excel(PERF_ATTRIB_XLSX, index=False)
print(f"[INFO] Performance attribution salvata in {PERF_ATTRIB_XLSX}")
print(f"[INFO] Report trades salvato in {TRADES_REPORT_XLSX}")
# ============================================================
# 5.6 Rebuild DAILY PnL from trades_report (calendarized)
# → per rendere coerente il compounding dei trade con equity/heatmap
# ============================================================
import pandas as pd
import numpy as np
def rebuild_daily_from_trades_dict(trades_dict):
"""
trades_dict: {'Equal_Weight': df, 'Risk_Parity': df}
Ogni df deve avere: OpenDate, CloseDate, Size, Duration_bars, PnL_%
Regola: distribuiamo il PnL del trade su ciascun giorno di durata con
un rendimento giornaliero costante r tale che (1+r)^D - 1 = PnL.
I giorni attivi sono [OpenDate, CloseDate) sul calendario business.
Il contributo del trade ogni giorno è Size * r.
Ritorna: DataFrame con indice Date (business days) e colonne per strategia.
"""
# Costruisci indice calendario minimo→massimo
min_dt, max_dt = None, None
for df in trades_dict.values():
if df is None or df.empty:
continue
df = df.copy()
df["OpenDate"] = pd.to_datetime(df["OpenDate"])
df["CloseDate"] = pd.to_datetime(df["CloseDate"])
lo, hi = df["OpenDate"].min(), df["CloseDate"].max()
if pd.notna(lo): min_dt = lo if (min_dt is None or lo < min_dt) else min_dt
if pd.notna(hi): max_dt = hi if (max_dt is None or hi > max_dt) else max_dt
if min_dt is None or max_dt is None:
return pd.DataFrame()
cal = pd.bdate_range(start=min_dt, end=max_dt, freq="C") # business calendar
daily = pd.DataFrame(0.0, index=cal, columns=list(trades_dict.keys()))
for strat, df in trades_dict.items():
if df is None or df.empty:
continue
d = df.copy()
d["OpenDate"] = pd.to_datetime(d["OpenDate"])
d["CloseDate"] = pd.to_datetime(d["CloseDate"])
d["Size"] = pd.to_numeric(d.get("Size", 0.0), errors="coerce").fillna(0.0)
d["Duration_bars"] = pd.to_numeric(d.get("Duration_bars", 0), errors="coerce").fillna(0).astype(int)
d["PnL_%"] = pd.to_numeric(d.get("PnL_%", 0.0), errors="coerce").fillna(0.0)
# Costruisci serie giornaliera sommando i contributi dei singoli trade
s = pd.Series(0.0, index=cal)
for _, row in d.iterrows():
D = int(row["Duration_bars"]) if pd.notna(row["Duration_bars"]) else 0
if D <= 0: # trade di durata nulla → salta
continue
pnl = float(row["PnL_%"]) / 100.0
try:
r_daily = (1.0 + pnl) ** (1.0 / D) - 1.0
except Exception:
r_daily = pnl / max(1, D)
size = float(row["Size"]) if pd.notna(row["Size"]) else 0.0
# Giorni attivi: [OpenDate, CloseDate) sul calendario business
rng = pd.bdate_range(start=row["OpenDate"], end=row["CloseDate"] - pd.tseries.offsets.BDay(1), freq="C")
if len(rng) != D:
# se per qualche motivo differisce (festivi ecc.) ricalibra usando la lunghezza effettiva
if len(rng) <= 0:
continue
try:
r_daily = (1.0 + pnl) ** (1.0 / len(rng)) - 1.0
except Exception:
r_daily = pnl / len(rng)
s[rng] = s[rng] + size * r_daily
daily[strat] = s
return daily
# Costruisci il dict dai DataFrame di trade appena creati
trades_dict = {
"Equal_Weight": rep_eq if 'rep_eq' in globals() else pd.DataFrame(),
"Risk_Parity": rep_rp if 'rep_rp' in globals() else pd.DataFrame(),
}
# Ricostruzione daily dai PnL giornalieri reali dei trade (senza spalmare)
daily_detail = pd.concat([daily_eq, daily_rp], ignore_index=True)
if not daily_detail.empty:
daily_detail["Date"] = pd.to_datetime(daily_detail["Date"])
daily_detail["PnL_day"] = pd.to_numeric(daily_detail["PnL_day"], errors="coerce")
daily_detail["Size"] = pd.to_numeric(daily_detail["Size"], errors="coerce").fillna(0.0)
daily_detail = daily_detail.dropna(subset=["Date", "PnL_day"])
daily_detail["Pnl_contrib"] = daily_detail["PnL_day"] * daily_detail["Size"]
daily_from_trades = (
daily_detail.pivot_table(index="Date", columns="Strategy", values="Pnl_contrib", aggfunc="sum")
.sort_index()
.fillna(0.0)
)
else:
daily_from_trades = pd.DataFrame()
# Salva su disco (CSV + XLSX) per ispezione
if not daily_from_trades.empty:
daily_from_trades.to_csv(DAILY_FROM_TRADES_CSV, index_label="Date")
# Plot equity & heatmap basati sui DAILY da trade (coerenti col compounding)
import matplotlib.pyplot as plt
col_map = [
("Equal Weight", ["Equal Weight", "Equal_Weight"]),
("Risk Parity", ["Risk Parity", "Risk_Parity"]),
]
def _find_col(df, aliases):
for c in aliases:
if c in df.columns:
return c
return None
fig, ax = plt.subplots(figsize=(10, 6))
for lab, aliases in col_map:
col = _find_col(daily_from_trades, aliases)
if col:
eq = (1.0 + daily_from_trades[col].fillna(0.0)).cumprod() * 100
eq.plot(ax=ax, label=lab)
ax.legend()
ax.grid(True)
ax.set_title("Equity line ricostruita dai trades (calendarizzata)")
fig.tight_layout()
fig.savefig(PLOT_DIR / "equity_from_trades.png", dpi=150)
plt.close(fig)
print(f"[INFO] Salvato: {PLOT_DIR / 'equity_from_trades.png'}")
# Heatmap per ciascuna strategia
for lab, aliases, fname in [
("Equal Weight", ["Equal Weight", "Equal_Weight"], "heatmap_equal_from_trades.png"),
("Risk Parity", ["Risk Parity", "Risk_Parity"], "heatmap_rp_from_trades.png"),
]:
col = _find_col(daily_from_trades, aliases)
if col:
try:
plot_heatmap_monthly(
daily_from_trades[col],
f"Heatmap mensile - {lab} (da trades)",
save_path=PLOT_DIR / fname,
)
except Exception as e:
print(f"[WARN] Heatmap {lab} da trades: {e}")
else:
print("[INFO] daily_from_trades risulta vuoto: nessun plot/CSV generato.")
checkpoint_post_timer("Ricostruzione daily/plot")
# ============================================================
# METRICS UTILS (guard) — richieste da _calc_all_metrics_...
# ============================================================
import numpy as np
import pandas as pd
# r2 della equity line vs trend line
if "r2_equity_line" not in globals():
def r2_equity_line(returns: pd.Series) -> float:
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
if r.size == 0:
return np.nan
eq = (1.0 + r).cumprod()
t = np.arange(eq.size, dtype=float)
X = np.vstack([t, np.ones_like(t)]).T
beta, alpha = np.linalg.lstsq(X, eq.values, rcond=None)[0]
yhat = alpha + beta * t
ss_res = np.sum((eq.values - yhat) ** 2)
ss_tot = np.sum((eq.values - eq.values.mean()) ** 2)
return float(1 - ss_res/ss_tot) if ss_tot > 0 else np.nan
# metriche drawdown: MaxDD, durata massima, time-to-recovery
if "drawdown_metrics" not in globals():
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
if r.size == 0:
return np.nan, np.nan, np.nan
eq = (1.0 + r).cumprod()
peak = eq.cummax()
dd = eq / peak - 1.0
maxdd = float(dd.min()) if dd.size else np.nan
# durata massima di drawdown (numero di barre consecutive sotto il picco)
dd_mask = dd < 0
max_dur, cur = 0, 0
for flag in dd_mask:
cur = cur + 1 if flag else 0
if cur > max_dur:
max_dur = cur
# time-to-recovery dal minimo assoluto
ttr = np.nan
if dd.size:
idx_min = dd.idxmin()
sub_eq = eq.loc[idx_min:]
rec_idx = sub_eq[sub_eq >= peak.loc[idx_min]].index.min()
if pd.notna(rec_idx):
# se indice datetime, durata in giorni; se indice numerico, in barre
if isinstance(rec_idx, pd.Timestamp):
ttr = (rec_idx - idx_min).days
else:
ttr = int(sub_eq.index.get_loc(rec_idx))
else:
ttr = sentinel_ttr
return maxdd, int(max_dur), ttr
# AAW/AUW/Heal Index (versione validata che mi hai passato)
if "heal_index_metrics" not in globals():
def heal_index_metrics(returns: pd.Series):
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
# H_min (100% finestre positive) in giorni/mesi (versione validata)
if "h_min_100" not in globals():
def h_min_100(returns: pd.Series, month_len: int = 21):
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1:]
tail = np.concatenate(([0.0], csum[:-k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(np.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# costante se non già definita
try:
DAYS_PER_YEAR
except NameError:
DAYS_PER_YEAR = 252
# ========= FUNZIONE RICHIESTA DAL LOOP =========
def _calc_all_metrics_from_returns(r: pd.Series) -> dict:
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
n = len(r)
if n == 0:
return {k: np.nan for k in [
"Rendimento_Ann","Volatilita_Ann","CAGR","R2_Equity",
"MaxDD","DD_Duration_Max","TTR_from_MDD","AAW","AUW","Heal_Index","H_min_100m_5Y"
]}
rendimento_ann = float(r.mean() * DAYS_PER_YEAR)
volatilita_ann = float(r.std(ddof=1) * np.sqrt(DAYS_PER_YEAR)) if n > 1 else np.nan
eq = (1.0 + r).cumprod()
years_elapsed = n / DAYS_PER_YEAR if n > 0 else np.nan
if years_elapsed and years_elapsed > 0 and eq.iloc[0] > 0:
cagr = float(eq.iloc[-1] ** (1.0 / years_elapsed) - 1.0)
else:
cagr = np.nan
r2 = r2_equity_line(r)
maxdd, dddur, ttr = drawdown_metrics(r, sentinel_ttr=1250)
aaw, auw, heal = heal_index_metrics(r)
# 5 anni o meno se la serie è più corta
lookback = min(n, DAYS_PER_YEAR * 5)
r5 = r.iloc[-lookback:]
_, hmin_months = h_min_100(r5, month_len=21)
return {
"Rendimento_Ann": rendimento_ann,
"Volatilita_Ann": volatilita_ann,
"CAGR": cagr,
"R2_Equity": r2,
"MaxDD": maxdd,
"DD_Duration_Max": dddur,
"TTR_from_MDD": ttr,
"AAW": aaw,
"AUW": auw,
"Heal_Index": heal,
"H_min_100m_5Y": hmin_months
}
# ======================================================================
# 6) LOOP Top-N: metriche per N = 6..20 → final_metrics.xlsx
# ======================================================================
# Safety: DAYS_PER_YEAR se non definito
try:
DAYS_PER_YEAR
except NameError:
DAYS_PER_YEAR = 252
def _select_isins_for_topN(df_sum: pd.DataFrame, top_n: int):
"""Seleziona i migliori 'top_n' ISIN in base allo Score."""
df_sum_loc = df_sum.copy()
base_isins_N = (
df_sum_loc
.sort_values("Score", ascending=False)
.head(top_n)["ISIN"].astype(str).str.strip().tolist()
)
return base_isins_N
def _build_portfolio_returns_for_isins(base_isins_N, wide_pnl):
"""
Costruisce i rendimenti di portafoglio Equal Weight e Risk Parity
per l'insieme di ISIN in base_isins_N.
Ritorna:
ret_eq_N : pd.Series
ret_rp_N : pd.Series
"""
# Colonne effettivamente disponibili
colsN = [c for c in base_isins_N if c in wide_pnl.columns]
if len(colsN) == 0:
# Nessun ISIN valido → portafogli in cash (linea piatta)
idx = wide_pnl.index
ret_eq_N = pd.Series(0.0, index=idx, name="Ret_EqW_N")
ret_rp_N = pd.Series(0.0, index=idx, name="Ret_RP_N")
return ret_eq_N, ret_rp_N
# -------- Equal Weight --------
ret_eq_N = wide_pnl[colsN].mean(axis=1)
# -------- Risk Parity con cap --------
weights_rp_N = inverse_vol_weights(
wide_pnl[colsN],
window=60,
max_weight=RP_MAX_WEIGHT # es. RP_MAX_WEIGHT = 2 / TOP_N_MAX = 0.1333
)
ret_rp_N = (wide_pnl[colsN] * weights_rp_N).sum(axis=1)
return ret_eq_N, ret_rp_N
# ==============================
# Metriche portafoglio (TOP_N corrente) → Excel
# ==============================
metrics_rows = []
for strategy_name, rser in [
("Equal_Weight", ret_eq),
("Risk_Parity", ret_rp),
]:
m = _calc_all_metrics_from_returns(rser)
m["TopN"] = TOP_N
m["Strategy"] = strategy_name
metrics_rows.append(m)
df_metrics = pd.DataFrame(metrics_rows)[[
"TopN", "Strategy",
"Rendimento_Ann", "Volatilita_Ann", "CAGR", "R2_Equity",
"MaxDD", "DD_Duration_Max", "TTR_from_MDD",
"AAW", "AUW", "Heal_Index", "H_min_100m_5Y",
]]
try:
with pd.ExcelWriter(FINAL_METRICS_XLSX, engine="openpyxl", mode="a", if_sheet_exists="replace") as xw:
df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False)
except Exception:
with pd.ExcelWriter(FINAL_METRICS_XLSX) as xw:
df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False)
print(f"[INFO] Salvato: {FINAL_METRICS_XLSX} (Portfolio_Metrics)")