2662 lines
106 KiB
Python
2662 lines
106 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Created on Mon Oct 27 13:59:10 2025
|
||
|
||
Script end-to-end:
|
||
- Carica universo (Excel) e dati (DB) una sola volta
|
||
- Calcola Hurst + Pattern signals (solo long per i trade)
|
||
- Esegue walk-forward k-NN (solo long) e salva forward_bt_signals/summary
|
||
- Fase 5: selezione dinamica portafogli + Equity + Heatmap + Trade report (solo long)
|
||
- Fase 6: metriche finali (come "ottimizzatore") + salvataggio grafici su file
|
||
|
||
Note:
|
||
- Non rilegge file appena salvati; usa i DataFrame in memoria
|
||
- Salva: hurst_by_isin.xlsx, pattern_signals.xlsx, forward_bt_*.xlsx, trades_report.xlsx, final_metrics.xlsx
|
||
- Salva PNG: equity_line_portafogli.png, heatmap_*.png
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
import sqlalchemy as sa
|
||
from sqlalchemy import text
|
||
import matplotlib.pyplot as plt
|
||
from pathlib import Path
|
||
import json
|
||
import ssl
|
||
import re
|
||
from urllib.request import urlopen
|
||
from urllib.error import URLError, HTTPError
|
||
|
||
from shared_utils import (
|
||
build_pattern_library,
|
||
characterize_window,
|
||
detect_column,
|
||
load_config,
|
||
predict_from_library,
|
||
read_connection_txt,
|
||
require_section,
|
||
require_value,
|
||
z_norm,
|
||
)
|
||
#from math import isfinite
|
||
import time
|
||
|
||
# =============================
|
||
# Plot saving helper (non-recursive)
|
||
# =============================
|
||
try:
|
||
import os as _os_sf
|
||
import matplotlib.pyplot as _plt_sf
|
||
except Exception:
|
||
_plt_sf = None
|
||
|
||
SAVE_PNG = globals().get("SAVE_PNG", True)
|
||
|
||
def savefig_safe(path, **kwargs):
|
||
"""
|
||
Save a matplotlib figure to disk safely, honoring SAVE_PNG.
|
||
Usage: savefig_safe("plot/myfig.png", dpi=150, bbox_inches="tight")
|
||
"""
|
||
if not SAVE_PNG or _plt_sf is None:
|
||
return
|
||
# Ensure directory exists
|
||
try:
|
||
d = _os_sf.path.dirname(path)
|
||
if d and not _os_sf.path.exists(d):
|
||
_os_sf.makedirs(d, exist_ok=True)
|
||
except Exception as _e:
|
||
print(f"[savefig_safe] Directory creation warning: {_e}")
|
||
try:
|
||
_plt_sf.savefig(path, **kwargs)
|
||
except Exception as e:
|
||
print(f"[savefig_safe] Warning while saving '{path}': {e}")
|
||
|
||
|
||
|
||
# Calcolo Score (riusabile anche rolling)
|
||
def _apply_score(df_sum: pd.DataFrame) -> pd.DataFrame:
|
||
"""Applica la calibrazione dei pesi su df_sum e aggiunge la colonna Score."""
|
||
def _available_cols(df, cols):
|
||
return [c for c in cols if (c in df.columns and df[c].notna().sum() > 0)]
|
||
|
||
primary_cols = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
|
||
alt_cols = [("QualityScore", True), ("Confidence", True), ("OutcomeScore", True)]
|
||
|
||
mm = [(c,gh) for (c,gh) in primary_cols if c in df_sum.columns and df_sum[c].notna().sum()>0]
|
||
if len(mm) < 2:
|
||
mm = [(c,gh) for (c,gh) in alt_cols if c in df_sum.columns and df_sum[c].notna().sum()>0]
|
||
|
||
# Se ancora insufficienti, prova ad allargare al set unito
|
||
if len(mm) < 2:
|
||
union_candidates = list({x[0] for x in primary_cols+alt_cols})
|
||
mm = [(c, True) for c in _available_cols(df_sum, union_candidates)]
|
||
|
||
if len(mm) == 0:
|
||
print("[WARN] Nessuna metrica numerica disponibile: uso Score=0 e ordino per ISIN.")
|
||
df_sum["Score"] = 0.0
|
||
df_sum["Score_mode"] = "degenerate_equal"
|
||
return df_sum
|
||
|
||
# Se sono definiti pesi fissi in config, usali; altrimenti calibra automaticamente
|
||
use_fixed = False
|
||
if SCORE_WEIGHTS:
|
||
weights_raw = {k: float(v) for k, v in SCORE_WEIGHTS.items() if k in df_sum.columns}
|
||
weights_raw = {k: v for k, v in weights_raw.items() if df_sum[k].notna().sum() > 0}
|
||
if weights_raw:
|
||
use_fixed = True
|
||
w = pd.Series(weights_raw)
|
||
w = w / w.sum()
|
||
X_ranked = df_sum[w.index].rank(pct=True)
|
||
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
|
||
df_sum["Score_mode"] = "fixed_weights"
|
||
if SCORE_VERBOSE:
|
||
print("Pesi fissi (config):", w.to_dict())
|
||
else:
|
||
print("[WARN] score_weights in config non compatibili con le metriche disponibili. Uso calibrazione automatica.")
|
||
|
||
if not use_fixed:
|
||
res = calibrate_score_weights(
|
||
df_sum,
|
||
metrics_map=mm,
|
||
target_col=None
|
||
)
|
||
X_ranked = res["X_ranked"]
|
||
w = res["weights"]
|
||
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
|
||
df_sum["Score_mode"] = res["mode"]
|
||
if SCORE_VERBOSE:
|
||
print("Pesi stimati automaticamente (metriche usate):")
|
||
print("Disponibilita' metriche (righe non-NaN):",
|
||
{c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]})
|
||
print(w)
|
||
return df_sum
|
||
|
||
# =============================
|
||
# PRICE FETCH (OPEN/CLOSE) - storico
|
||
# =============================
|
||
def _build_symbol_euronext(row: pd.Series) -> tuple[str, str]:
|
||
isin = str(row.get("ISIN", "")).strip()
|
||
venue = str(row.get("Mercato", "")).strip()
|
||
tok = str(row.get("TickerOpen", "") or "").strip()
|
||
base = OPEN_PRICE_BASE_URL
|
||
if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper():
|
||
return base, tok
|
||
if isin and venue:
|
||
return base, f"{isin}-{venue}"
|
||
if isin:
|
||
return base, f"{isin}-ETFP" # fallback generico per endpoint history
|
||
return base, isin
|
||
|
||
def fetch_price_history(isins, universe: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame:
|
||
"""
|
||
Scarica la serie storica open/close per una lista di ISIN usando l'endpoint storico.
|
||
- API chiamata 1 ISIN alla volta: https://fin.scorer.app/finance/etf-inv/history/{ticker}?fromDate=YYYYMMDD&toDate=YYYYMMDD
|
||
- Caching locale su CSV per ridurre le richieste; se l'API fallisce, tenta di usare la cache.
|
||
- Fallback mercati: ETFP → XPAR → XAMS. Se si estende una serie con un altro mercato,
|
||
la giunta avviene solo se il prezzo all'ultimo punto del segmento precedente e al primo del successivo
|
||
differisce < 2% (per evitare salti di valuta/quotazione).
|
||
Ritorna DataFrame con colonne: Date (datetime), ISIN, Open, Close.
|
||
"""
|
||
start_dt = pd.to_datetime(start_date).date()
|
||
end_dt = pd.to_datetime(end_date).date()
|
||
|
||
def _symbol_cache_path(symbol: str) -> Path:
|
||
safe = re.sub(r"[^A-Za-z0-9_-]+", "_", str(symbol))
|
||
return OPEN_CACHE_DIR / f"{safe}.csv"
|
||
|
||
def _load_cache(path: Path) -> pd.DataFrame | None:
|
||
try:
|
||
if path.exists():
|
||
dfc = pd.read_csv(path, parse_dates=["Date"])
|
||
dfc["ISIN"] = dfc["ISIN"].astype(str)
|
||
return dfc
|
||
except Exception as e:
|
||
print(f"[WARN] Cache prezzi corrotta {path}: {e}")
|
||
return None
|
||
|
||
def _normalize_payload_to_df(payload, isin):
|
||
# Il nuovo endpoint ritorna [{"ticker": "...", "data": [ {...}, ... ]}]
|
||
data_block = payload
|
||
if isinstance(payload, list) and payload:
|
||
if isinstance(payload[0], dict) and "data" in payload[0]:
|
||
data_block = payload[0].get("data", [])
|
||
else:
|
||
data_block = payload
|
||
if isinstance(payload, dict) and "data" in payload:
|
||
data_block = payload.get("data", [])
|
||
rows = []
|
||
for d in data_block or []:
|
||
dt_raw = d.get("date") or d.get("Date") or d.get("data") or d.get("timestamp")
|
||
if dt_raw is None:
|
||
continue
|
||
try:
|
||
if isinstance(dt_raw, (int, float)):
|
||
dt_parsed = pd.to_datetime(int(dt_raw), unit="ms").tz_localize(None)
|
||
else:
|
||
dt_parsed = pd.to_datetime(dt_raw).tz_localize(None)
|
||
except Exception:
|
||
continue
|
||
rows.append({
|
||
"Date": dt_parsed,
|
||
"ISIN": str(isin),
|
||
"Open": _to_float_safe(d.get("open")),
|
||
"Close": _to_float_safe(d.get("close") or d.get("last"))
|
||
})
|
||
return pd.DataFrame(rows) if rows else pd.DataFrame(columns=["Date","ISIN","Open","Close"])
|
||
|
||
def _fetch_symbol(symbol: str, isin: str):
|
||
url = f"{OPEN_PRICE_BASE_URL}/{symbol}?fromDate={start_dt.strftime('%Y%m%d')}&toDate={end_dt.strftime('%Y%m%d')}"
|
||
cache_path = _symbol_cache_path(symbol)
|
||
cache_df = _load_cache(cache_path)
|
||
|
||
df_api = pd.DataFrame()
|
||
ok = False
|
||
for attempt in range(1, OPEN_MAX_RETRY + 1):
|
||
try:
|
||
with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp:
|
||
payload = json.loads(resp.read().decode("utf-8"))
|
||
df_api = _normalize_payload_to_df(payload, isin)
|
||
if df_api.empty:
|
||
print(f"[WARN] Nessun dato per {symbol}")
|
||
ok = True
|
||
break
|
||
except (HTTPError, URLError, ssl.SSLError, json.JSONDecodeError) as e:
|
||
if attempt < OPEN_MAX_RETRY:
|
||
print(f"[WARN] Download {symbol} tentativo {attempt}/{OPEN_MAX_RETRY} fallito: {e}. Retry in {OPEN_SLEEP_SEC}s")
|
||
time.sleep(OPEN_SLEEP_SEC)
|
||
else:
|
||
print(f"[ERROR] Download {symbol} fallito: {e}")
|
||
|
||
df_use = pd.DataFrame()
|
||
if ok and not df_api.empty:
|
||
df_api = df_api.sort_values("Date")
|
||
if cache_df is not None and not cache_df.empty:
|
||
df_use = (
|
||
pd.concat([cache_df, df_api], ignore_index=True)
|
||
.drop_duplicates(subset=["Date"])
|
||
.sort_values("Date")
|
||
)
|
||
else:
|
||
df_use = df_api
|
||
try:
|
||
OPEN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||
df_use.to_csv(cache_path, index=False)
|
||
except Exception as e:
|
||
print(f"[WARN] Salvataggio cache prezzi fallito ({cache_path}): {e}")
|
||
elif cache_df is not None and not cache_df.empty:
|
||
df_use = cache_df
|
||
print(f"[INFO] Uso cache prezzi per {symbol} (API indisponibile).")
|
||
return df_use
|
||
|
||
def _merge_with_check(df_base: pd.DataFrame, df_add: pd.DataFrame, label_prev: str, label_next: str):
|
||
"""
|
||
Estende df_base aggiungendo il tratto df_add antecedente al primo punto di df_base.
|
||
Controlla il salto di prezzo all'incrocio: se > 2%, non fonde e avvisa.
|
||
"""
|
||
if df_base is None or df_base.empty:
|
||
return df_add, False
|
||
if df_add is None or df_add.empty:
|
||
return df_base, False
|
||
cutoff = df_base["Date"].min()
|
||
prev_part = df_add[df_add["Date"] < cutoff]
|
||
if prev_part.empty:
|
||
return df_base, False
|
||
merged = pd.concat([prev_part, df_base], ignore_index=True)
|
||
merged = merged.sort_values("Date").drop_duplicates(subset=["Date"], keep="last")
|
||
# controllo salto: ultimo prezzo del segmento precedente vs primo del successivo
|
||
prev_last = prev_part.sort_values("Date").iloc[-1]
|
||
next_first = df_base[df_base["Date"] >= cutoff].sort_values("Date").iloc[0]
|
||
def _price(row):
|
||
return _to_float_safe(row.get("Close")) if pd.notna(row.get("Close")) else _to_float_safe(row.get("Open"))
|
||
p_prev = _price(prev_last)
|
||
p_next = _price(next_first)
|
||
if p_prev is None or p_next is None or not np.isfinite(p_prev) or not np.isfinite(p_next) or p_next == 0:
|
||
return merged, True
|
||
gap = abs(p_prev - p_next) / abs(p_next)
|
||
if gap > 0.02:
|
||
print(f"[WARN] Salto prezzo >2% tra {label_prev} e {label_next} su {prev_last['Date'].date()} -> {next_first['Date'].date()} (gap {gap:.2%}). Fallback non applicato.")
|
||
return df_base, False
|
||
return merged, True
|
||
|
||
records = []
|
||
for i, isin in enumerate(isins, 1):
|
||
try:
|
||
row = universe.loc[universe["ISIN"] == str(isin)].iloc[0]
|
||
except Exception:
|
||
print(f"[WARN] ISIN {isin} non trovato nell'universo.")
|
||
continue
|
||
base, symbol = _build_symbol_euronext(row)
|
||
df_primary = _fetch_symbol(symbol, isin)
|
||
|
||
# Fallback mercati aggiuntivi (XPAR, poi XAMS) per estendere indietro la serie
|
||
fallback_symbols = []
|
||
if "-" in symbol:
|
||
root = symbol.rsplit("-", 1)[0]
|
||
fallback_symbols.append(f"{root}-XPAR")
|
||
fallback_symbols.append(f"{root}-XAMS")
|
||
else:
|
||
fallback_symbols.append(f"{symbol}-XPAR")
|
||
fallback_symbols.append(f"{symbol}-XAMS")
|
||
|
||
df_use = df_primary
|
||
applied_any = False
|
||
for fb_sym in fallback_symbols:
|
||
# servono solo se la serie non parte da start_dt
|
||
need_fb = df_use.empty or (df_use["Date"].min().date() > start_dt)
|
||
if not need_fb:
|
||
continue
|
||
df_fb = _fetch_symbol(fb_sym, isin)
|
||
if df_fb.empty:
|
||
print(f"[WARN] Fallback {fb_sym} assente per {isin}")
|
||
continue
|
||
if df_use.empty:
|
||
df_use = df_fb
|
||
applied_any = True
|
||
print(f"[INFO] Uso fallback {fb_sym} per tutto il periodo.")
|
||
else:
|
||
merged, merged_ok = _merge_with_check(df_use, df_fb, fb_sym, symbol)
|
||
if merged_ok:
|
||
df_use = merged
|
||
applied_any = True
|
||
cutoff = df_use["Date"].min()
|
||
print(f"[INFO] Serie estesa con {fb_sym} fino a {cutoff.date()} per {isin}")
|
||
else:
|
||
print(f"[WARN] Fallback {fb_sym} scartato per gap >2% su {isin}")
|
||
|
||
if df_use.empty:
|
||
print(f"[WARN] Serie open/close non disponibile per {isin}")
|
||
continue
|
||
|
||
# Filtro range richiesto
|
||
df_use["Date"] = pd.to_datetime(df_use["Date"])
|
||
mask = (df_use["Date"].dt.date >= start_dt) & (df_use["Date"].dt.date <= end_dt)
|
||
df_use = df_use.loc[mask]
|
||
if df_use.empty:
|
||
print(f"[WARN] Nessun dato nel range richiesto per {symbol}")
|
||
continue
|
||
records.append(df_use)
|
||
|
||
if not records:
|
||
return pd.DataFrame(columns=["Date","ISIN","Open","Close"])
|
||
df_px = pd.concat(records, ignore_index=True)
|
||
df_px = df_px.sort_values(["ISIN","Date"]).reset_index(drop=True)
|
||
return df_px
|
||
|
||
def save_price_cache_summary(cache_dir: Path, outfile: Path, pattern: str = "*ETFP.csv"):
|
||
"""
|
||
Salva un riepilogo delle serie prezzi in cache (senza fallback) con min/max date e numero righe.
|
||
pattern di default: solo i simboli ETFP.
|
||
"""
|
||
try:
|
||
if not cache_dir.exists():
|
||
print(f"[WARN] Cache prezzi non trovata: {cache_dir}")
|
||
return
|
||
rows = []
|
||
for f in sorted(cache_dir.glob(pattern)):
|
||
try:
|
||
df = pd.read_csv(f, parse_dates=["Date"])
|
||
except Exception as e:
|
||
rows.append({"Symbol": f.stem, "Errore": str(e)})
|
||
continue
|
||
if df.empty:
|
||
rows.append({"Symbol": f.stem, "Rows": 0})
|
||
continue
|
||
rows.append({
|
||
"Symbol": f.stem,
|
||
"min_date": df["Date"].min().date(),
|
||
"max_date": df["Date"].max().date(),
|
||
"rows": len(df)
|
||
})
|
||
if not rows:
|
||
print(f"[WARN] Nessun file prezzi in cache ({cache_dir}).")
|
||
return
|
||
out_df = pd.DataFrame(rows).sort_values("Symbol")
|
||
outfile.parent.mkdir(parents=True, exist_ok=True)
|
||
out_df.to_excel(outfile, index=False)
|
||
print(f"[INFO] Salvato riepilogo prezzi (no fallback) in {outfile} ({len(out_df)} righe)")
|
||
except Exception as e:
|
||
print(f"[WARN] Impossibile salvare riepilogo prezzi: {e}")
|
||
|
||
def _to_float_safe(x):
|
||
try:
|
||
return float(x)
|
||
except Exception:
|
||
return np.nan
|
||
|
||
# LEGACY: blocco originale mantenuto ma non eseguito (usiamo _apply_score sopra)
|
||
# =========================================
|
||
# PARAMETRI GLOBALI
|
||
# =========================================
|
||
CONFIG = load_config()
|
||
DB_CONFIG = require_section(CONFIG, "db")
|
||
PATTERN_CONFIG = require_section(CONFIG, "pattern")
|
||
TAGGING_CONFIG = require_section(CONFIG, "tagging")
|
||
RANKING_CONFIG = require_section(CONFIG, "ranking")
|
||
PATHS_CONFIG = require_section(CONFIG, "paths")
|
||
HURST_CONFIG = CONFIG.get("hurst", {})
|
||
RUN_CONFIG = CONFIG.get("run", {})
|
||
SIGNALS_CONFIG = CONFIG.get("signals", {})
|
||
PRICES_CONFIG = CONFIG.get("prices", {})
|
||
|
||
OUTPUT_DIR = Path(PATHS_CONFIG.get("output_dir", "output"))
|
||
PLOT_DIR = Path(PATHS_CONFIG.get("plot_dir", "plot"))
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
PLOT_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
UNIVERSO_XLSX = PATHS_CONFIG.get("input_universe", "Input/Universo per Trading System.xlsx")
|
||
|
||
# Export
|
||
OUTPUT_HURST_XLSX = OUTPUT_DIR / "hurst_by_isin.xlsx"
|
||
OUTPUT_PATTERN_XLSX = OUTPUT_DIR / "pattern_signals.xlsx"
|
||
ERROR_LOG_CSV = OUTPUT_DIR / "errori_isin.csv"
|
||
FORWARD_BT_SIGNALS_XLSX = OUTPUT_DIR / "forward_bt_signals.xlsx"
|
||
FORWARD_BT_SUMMARY_XLSX = OUTPUT_DIR / "forward_bt_summary.xlsx"
|
||
TRADES_REPORT_XLSX = OUTPUT_DIR / "trades_report.xlsx"
|
||
PERF_ATTRIB_XLSX = OUTPUT_DIR / "performance_attribution.xlsx"
|
||
DAILY_FROM_TRADES_CSV = OUTPUT_DIR / "daily_from_trades.csv"
|
||
DAILY_FROM_TRADES_XLSX = OUTPUT_DIR / "daily_from_trades.xlsx"
|
||
WEIGHTS_DAILY_XLSX = OUTPUT_DIR / "weights_daily.xlsx"
|
||
FINAL_METRICS_XLSX = OUTPUT_DIR / "final_metrics.xlsx"
|
||
|
||
# Stored Procedure & parametri
|
||
STORED_PROC = str(require_value(DB_CONFIG, "stored_proc", "db"))
|
||
N_BARS = int(require_value(DB_CONFIG, "n_bars", "db"))
|
||
PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db"))
|
||
RANKING_WINDOW_BARS = int(RANKING_CONFIG.get("rolling_window_bars", N_BARS))
|
||
RP_LOOKBACK = int(SIGNALS_CONFIG.get("risk_parity_lookback", 60))
|
||
OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get("base_url", "https://fin.scorer.app/finance/etf-inv/history"))
|
||
OPEN_MAX_RETRY = int(PRICES_CONFIG.get("max_retry", 3))
|
||
OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1))
|
||
OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10))
|
||
OPEN_CACHE_DIR = Path(PRICES_CONFIG.get("cache_dir", OUTPUT_DIR / "price_cache"))
|
||
RECOMPUTE_PORTF_FROM_OPEN = bool(PRICES_CONFIG.get("recompute_portfolio_open", False))
|
||
|
||
# Pattern-matching (iper-parametri)
|
||
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) # lunghezza finestra pattern (barre)
|
||
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) # orizzonte outcome (barre)
|
||
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) # numero di vicini
|
||
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # soglia su outcome per generare segnale
|
||
EMBARGO = require_value(PATTERN_CONFIG, "embargo", "pattern")
|
||
if EMBARGO is None:
|
||
EMBARGO = WP + HA
|
||
else:
|
||
EMBARGO = int(EMBARGO)
|
||
|
||
# Tagging rule-based (soglie)
|
||
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
|
||
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
|
||
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
|
||
|
||
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) # numero massimo di asset ammessi
|
||
RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # 2 x 1/15 ≈ 0.1333 = 13,33%
|
||
if RP_MAX_WEIGHT is None:
|
||
RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1)
|
||
else:
|
||
RP_MAX_WEIGHT = float(RP_MAX_WEIGHT)
|
||
SCORE_VERBOSE = bool(RANKING_CONFIG.get("score_verbose", False))
|
||
SCORE_WEIGHTS = RANKING_CONFIG.get("score_weights")
|
||
HURST_MIN_LENGTH = int(HURST_CONFIG.get("min_length", 200))
|
||
HURST_WIN_GRID = HURST_CONFIG.get("win_grid")
|
||
HURST_MIN_SEGMENTS = int(HURST_CONFIG.get("min_segments", 1))
|
||
|
||
DAYS_PER_YEAR = int(RUN_CONFIG.get("days_per_year", 252))
|
||
TOP_N = int(RUN_CONFIG.get("top_n_default", TOP_N_MAX))
|
||
|
||
# =========================================
|
||
# UTILS GENERALI
|
||
# =========================================
|
||
def clamp01(x):
|
||
if not np.isfinite(x):
|
||
return np.nan
|
||
return float(min(1.0, max(0.0, x)))
|
||
|
||
|
||
def format_eta(seconds):
|
||
"""Format a duration (seconds) as Xm Ys or Xh Ym Ys for readability."""
|
||
if not np.isfinite(seconds):
|
||
return "n/a"
|
||
seconds = max(0, int(round(seconds)))
|
||
minutes, secs = divmod(seconds, 60)
|
||
hours, minutes = divmod(minutes, 60)
|
||
if hours:
|
||
return f"{hours}h {minutes:02d}m {secs:02d}s"
|
||
return f"{minutes}m {secs:02d}s"
|
||
|
||
# Timer helper per fasi post-backtest
|
||
_post_timer = {"t0": None, "tprev": None, "total": None, "done": 0}
|
||
def start_post_timer(total_steps: int):
|
||
_post_timer["t0"] = time.perf_counter()
|
||
_post_timer["tprev"] = _post_timer["t0"]
|
||
_post_timer["total"] = total_steps
|
||
_post_timer["done"] = 0
|
||
|
||
def checkpoint_post_timer(label: str):
|
||
if _post_timer["t0"] is None or _post_timer["total"] is None:
|
||
return
|
||
_post_timer["done"] += 1
|
||
now = time.perf_counter()
|
||
step_dt = now - _post_timer["tprev"]
|
||
total_dt = now - _post_timer["t0"]
|
||
avg = total_dt / max(_post_timer["done"], 1)
|
||
eta = avg * max(_post_timer["total"] - _post_timer["done"], 0)
|
||
print(f"[TIMER] post { _post_timer['done']}/{_post_timer['total']} {label} — step {step_dt:.2f}s, total {total_dt:.2f}s, ETA {format_eta(eta)}")
|
||
_post_timer["tprev"] = now
|
||
|
||
# ================= HURST (sui RENDIMENTI) =================
|
||
def hurst_rs_returns(r, win_grid=None, min_seg=None):
|
||
r = pd.Series(r).dropna().astype("float64").values
|
||
n = len(r)
|
||
seg_min = HURST_MIN_SEGMENTS if min_seg is None else int(min_seg)
|
||
if n < HURST_MIN_LENGTH:
|
||
return np.nan
|
||
if win_grid is None:
|
||
base = HURST_WIN_GRID or [16,24,32,48,64,96,128,192,256,384]
|
||
base = np.array(base, dtype=int)
|
||
win_grid = [w for w in base if w <= n//2]
|
||
if len(win_grid) < 4:
|
||
max_w = max(16, n//4)
|
||
g = sorted(set([int(max(8, round((n/(2**k))))) for k in range(3,8)]))
|
||
win_grid = [w for w in g if 8 <= w <= max_w]
|
||
|
||
RS_vals, sizes = [], []
|
||
for w in win_grid:
|
||
if w < 8 or w > n: continue
|
||
m = n//w
|
||
if m < seg_min: continue
|
||
rs_list = []
|
||
for i in range(m):
|
||
seg = r[i*w:(i+1)*w]
|
||
seg = seg - np.mean(seg)
|
||
sd = seg.std(ddof=1)
|
||
if sd == 0 or not np.isfinite(sd): continue
|
||
y = np.cumsum(seg)
|
||
R = np.max(y) - np.min(y)
|
||
rs = R/sd
|
||
if np.isfinite(rs) and rs > 0: rs_list.append(rs)
|
||
if rs_list:
|
||
RS_vals.append(np.mean(rs_list)); sizes.append(w)
|
||
|
||
if len(RS_vals) < 3: return np.nan
|
||
sizes = np.array(sizes, float); RS_vals = np.array(RS_vals, float)
|
||
mask = np.isfinite(RS_vals) & (RS_vals > 0)
|
||
sizes, RS_vals = sizes[mask], RS_vals[mask]
|
||
if sizes.size < 3: return np.nan
|
||
slope, _ = np.polyfit(np.log(sizes), np.log(RS_vals), 1)
|
||
return clamp01(slope)
|
||
|
||
def hurst_dfa_returns(r, win_grid=None):
|
||
r = pd.Series(r).dropna().astype("float64").values
|
||
n = len(r)
|
||
if n < HURST_MIN_LENGTH:
|
||
return np.nan
|
||
r_dm = r - np.mean(r)
|
||
y = np.cumsum(r_dm)
|
||
if win_grid is None:
|
||
base = HURST_WIN_GRID or [16,24,32,48,64,96,128,192,256]
|
||
base = np.array(base, dtype=int)
|
||
win_grid = [w for w in base if w <= n//2]
|
||
if len(win_grid) < 4:
|
||
max_w = max(16, n//4)
|
||
g = sorted(set([int(max(8, round((n/(2**k))))) for k in range(3,8)]))
|
||
win_grid = [w for w in g if 8 <= w <= max_w]
|
||
|
||
F_vals, sizes = [], []
|
||
for s in win_grid:
|
||
if s < 8: continue
|
||
m = n//s
|
||
if m < 2: continue
|
||
rms_list = []
|
||
for i in range(m):
|
||
seg = y[i*s:(i+1)*s]
|
||
t = np.arange(s, dtype=float)
|
||
A = np.vstack([t, np.ones(s)]).T
|
||
coeff, *_ = np.linalg.lstsq(A, seg, rcond=None)
|
||
trend = A @ coeff
|
||
detr = seg - trend
|
||
rms = np.sqrt(np.mean(detr**2))
|
||
if np.isfinite(rms) and rms > 0: rms_list.append(rms)
|
||
if rms_list:
|
||
F_vals.append(np.mean(rms_list)); sizes.append(s)
|
||
|
||
if len(F_vals) < 3: return np.nan
|
||
sizes = np.array(sizes, float); F_vals = np.array(F_vals, float)
|
||
mask = np.isfinite(F_vals) & (F_vals > 0)
|
||
sizes, F_vals = sizes[mask], F_vals[mask]
|
||
if sizes.size < 3: return np.nan
|
||
slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1)
|
||
return clamp01(slope)
|
||
|
||
# ---------------------------------
|
||
# R^2 su equity line (log-equity vs tempo)
|
||
# ---------------------------------
|
||
def r2_equity_line(returns: pd.Series) -> float:
|
||
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
|
||
eq = (1.0 + r).cumprod().replace(0, np.nan)
|
||
y = np.log(eq.dropna())
|
||
if len(y) < 10:
|
||
return np.nan
|
||
x = np.arange(len(y), dtype=float)
|
||
x = (x - x.mean()) / (x.std() + 1e-12)
|
||
X = np.vstack([np.ones_like(x), x]).T
|
||
beta, *_ = np.linalg.lstsq(X, y.values, rcond=None)
|
||
y_hat = X @ beta
|
||
ss_res = ((y.values - y_hat) ** 2).sum()
|
||
ss_tot = ((y.values - y.values.mean()) ** 2).sum()
|
||
return float(1 - ss_res / ss_tot) if ss_tot > 0 else np.nan
|
||
|
||
# ---------------------------------
|
||
# Drawdown metrics path-based
|
||
# ---------------------------------
|
||
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
|
||
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
|
||
eq = (1.0 + r).cumprod()
|
||
if eq.empty:
|
||
return np.nan, np.nan, np.nan
|
||
|
||
roll_max = eq.cummax()
|
||
dd = eq / roll_max - 1.0
|
||
maxdd = float(dd.min())
|
||
|
||
episodes = []
|
||
peak_i, peak_val = None, -np.inf
|
||
trough_i, trough_val = None, np.inf
|
||
in_uw = False
|
||
v = eq.values
|
||
for i in range(len(v)):
|
||
if v[i] >= peak_val:
|
||
if in_uw:
|
||
episodes.append((peak_i, trough_i, i))
|
||
in_uw = False
|
||
trough_i, trough_val = None, np.inf
|
||
peak_i, peak_val = i, v[i]
|
||
else:
|
||
if not in_uw:
|
||
in_uw = True
|
||
trough_i, trough_val = i, v[i]
|
||
elif v[i] < trough_val:
|
||
trough_i, trough_val = i, v[i]
|
||
if in_uw:
|
||
episodes.append((peak_i, trough_i, None))
|
||
|
||
dd_dur_max = np.nan
|
||
if episodes:
|
||
durs = []
|
||
for p, t, rcv in episodes:
|
||
if p is None:
|
||
continue
|
||
end_i = rcv if rcv is not None else len(eq) - 1
|
||
durs.append(end_i - p)
|
||
if durs:
|
||
dd_dur_max = int(max(durs))
|
||
|
||
ttr = np.nan
|
||
if episodes:
|
||
mdd_val = 0.0
|
||
mdd_ep = None
|
||
for p, t, rcv in episodes:
|
||
if p is None or t is None:
|
||
continue
|
||
dd_here = eq.iloc[t] / eq.iloc[p] - 1.0
|
||
if dd_here < mdd_val:
|
||
mdd_val = dd_here
|
||
mdd_ep = (p, t, rcv)
|
||
if mdd_ep is not None:
|
||
p, t, rcv = mdd_ep
|
||
if rcv is not None:
|
||
ttr = int(rcv - t)
|
||
else:
|
||
ttr = sentinel_ttr
|
||
return maxdd, dd_dur_max, ttr
|
||
|
||
# ---------------------------------
|
||
# Utility per AAW, AUW e Heal Index (come nell'ottimizzatore)
|
||
# ---------------------------------
|
||
def heal_index_metrics(returns: pd.Series):
|
||
"""
|
||
Calcola:
|
||
- AAW: area sopra acqua (run-up vs minimo cumulato)
|
||
- AUW: area sotto acqua (drawdown vs massimo cumulato)
|
||
- Heal Index: (AAW - AUW) / AUW
|
||
"""
|
||
s = returns.fillna(0.0).astype(float)
|
||
if s.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
|
||
equity = (1.0 + s).cumprod()
|
||
if equity.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
|
||
run_max = equity.cummax()
|
||
dd = equity / run_max - 1.0
|
||
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
|
||
|
||
run_min = equity.cummin()
|
||
ru = equity / run_min - 1.0
|
||
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
|
||
|
||
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
|
||
return AAW, AUW, heal
|
||
|
||
# ---------------------------------
|
||
# Utility per H_min (100% finestre positive) — come nell'ottimizzatore
|
||
# ---------------------------------
|
||
def h_min_100(returns: pd.Series, month_len: int = 21):
|
||
"""
|
||
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
|
||
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
|
||
"""
|
||
s = returns.dropna().astype(float)
|
||
n = s.size
|
||
if n == 0:
|
||
return np.nan, np.nan
|
||
|
||
log1p = np.log1p(s.values)
|
||
csum = np.cumsum(log1p)
|
||
|
||
def rolling_sum_k(k: int):
|
||
if k > n:
|
||
return np.array([])
|
||
head = csum[k - 1:]
|
||
tail = np.concatenate(([0.0], csum[:-k]))
|
||
return head - tail
|
||
|
||
for k in range(1, n + 1):
|
||
rs = rolling_sum_k(k)
|
||
if rs.size == 0:
|
||
break
|
||
roll_ret = np.exp(rs) - 1.0
|
||
if np.all(roll_ret >= 0):
|
||
h_days = k
|
||
h_months = int(np.ceil(h_days / month_len))
|
||
return h_days, h_months
|
||
|
||
return np.nan, np.nan
|
||
|
||
# =========================================
|
||
# 1) UNIVERSO: ISIN + METADATI
|
||
# =========================================
|
||
universo = pd.read_excel(UNIVERSO_XLSX)
|
||
|
||
col_isin_uni = detect_column(universo, ["ISIN", "isin", "codice isin"])
|
||
if col_isin_uni is None:
|
||
raise ValueError("Nel file universo non trovo una colonna ISIN.")
|
||
|
||
col_name_uni = detect_column(universo, ["Nome", "Name", "Descrizione", "Description", "Security Name", "Instrument Name"])
|
||
col_cat_uni = detect_column(universo, ["Categoria", "Category", "Classe", "Linea", "Tipo"])
|
||
col_ac_uni = detect_column(universo, ["Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class"])
|
||
|
||
isins = (
|
||
universo[col_isin_uni].astype(str).str.strip()
|
||
.replace("", pd.NA).dropna().drop_duplicates().tolist()
|
||
)
|
||
print(f"[INFO] ISIN totali in universo: {len(isins)}")
|
||
|
||
meta_df = pd.DataFrame({"ISIN": universo[col_isin_uni].astype(str).str.strip()})
|
||
meta_df["Nome"] = universo[col_name_uni] if col_name_uni else None
|
||
meta_df["Categoria"] = universo[col_cat_uni] if col_cat_uni else None
|
||
meta_df["Asset Class"] = universo[col_ac_uni] if col_ac_uni else None
|
||
meta_df = meta_df.drop_duplicates(subset=["ISIN"]).reset_index(drop=True)
|
||
|
||
# =========================================
|
||
# 2) CONNESSIONE DB
|
||
# =========================================
|
||
conn_str = read_connection_txt("connection.txt")
|
||
engine = sa.create_engine(conn_str, fast_executemany=True)
|
||
print("[INFO] Connessione pronta (SQLAlchemy + pyodbc).")
|
||
|
||
# =========================================
|
||
# 3) LOOP ISIN → SP → HURST + PATTERN (SOLO LONG per i trade)
|
||
# =========================================
|
||
errors = []
|
||
hurst_rows = []
|
||
pattern_rows = []
|
||
last_dates = []
|
||
|
||
sql_sp = text(f"EXEC {STORED_PROC} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
|
||
|
||
def detect_cols(df0):
|
||
col_date = detect_column(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"])
|
||
col_ret = detect_column(df0, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"])
|
||
col_px = detect_column(df0, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"])
|
||
return col_date, col_ret, col_px
|
||
|
||
ok_count = 0
|
||
first_ok_reported = False
|
||
|
||
for i, isin in enumerate(isins, 1):
|
||
try:
|
||
df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR})
|
||
if df_isin.empty:
|
||
errors.append({"ISIN": isin, "Errore": "SP vuota"})
|
||
continue
|
||
|
||
col_date, col_ret, col_px = detect_cols(df_isin)
|
||
if col_date:
|
||
df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce")
|
||
df_isin = df_isin.sort_values(col_date)
|
||
|
||
# --- Rendimenti ---
|
||
if col_ret and col_ret in df_isin.columns:
|
||
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float).dropna()
|
||
elif col_px and col_px in df_isin.columns:
|
||
px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan)
|
||
r = np.log(px/px.shift(1)).dropna()
|
||
else:
|
||
errors.append({"ISIN": isin, "Errore": "Né rendimenti né prezzi utilizzabili"})
|
||
continue
|
||
|
||
if len(r) < max(200, WP + HA + 10):
|
||
errors.append({"ISIN": isin, "Errore": f"Serie troppo corta ({len(r)} punti)"})
|
||
continue
|
||
|
||
# --- HURST (sui rendimenti) ---
|
||
h_rs = hurst_rs_returns(r)
|
||
h_dfa = hurst_dfa_returns(r)
|
||
H = np.nanmedian([h_rs, h_dfa])
|
||
H = clamp01(H) if np.isfinite(H) else np.nan
|
||
|
||
if pd.isna(H):
|
||
regime = None
|
||
elif H < 0.45:
|
||
regime = "mean_reversion"
|
||
elif H > 0.55:
|
||
regime = "breakout"
|
||
else:
|
||
regime = "neutral"
|
||
|
||
# --- Libreria pattern ---
|
||
lib_wins, lib_out = build_pattern_library(r, WP, HA, embargo=EMBARGO)
|
||
|
||
# Finestra corrente
|
||
date_last = df_isin[col_date].iloc[-1] if col_date else None
|
||
if date_last is not None:
|
||
last_dates.append(pd.to_datetime(date_last))
|
||
|
||
if lib_wins is None or len(r) < WP + HA:
|
||
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
|
||
signal = 0
|
||
est_out, avg_dist = np.nan, np.nan
|
||
else:
|
||
curr = r.values[-WP:]
|
||
curr_zn = z_norm(curr)
|
||
if curr_zn is None:
|
||
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
|
||
signal = 0; est_out = np.nan; avg_dist = np.nan
|
||
else:
|
||
est_out, avg_dist, idx_sel = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K)
|
||
# SOLO LONG: apri solo se est_out > THETA
|
||
signal = 1 if est_out > THETA else 0
|
||
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
|
||
|
||
# Salva risultati
|
||
hurst_rows.append({
|
||
"ISIN": isin,
|
||
"Hurst": None if pd.isna(H) else round(float(H), 4),
|
||
"Regime": regime
|
||
})
|
||
pattern_rows.append({
|
||
"ISIN": isin,
|
||
"DateLast": date_last,
|
||
"PatternType": ptype,
|
||
"Signal": {1:"long",-1:"short",0:"flat"}.get(int(signal), "flat"),
|
||
"Confidence": None if pconf is None else round(float(min(1.0, max(0.0, pconf))), 3),
|
||
"EstOutcome": None if pd.isna(est_out) else float(est_out),
|
||
"AvgDist": None if pd.isna(avg_dist) else float(avg_dist),
|
||
"Wp": WP, "Ha": HA, "k": KNN_K
|
||
})
|
||
ok_count += 1
|
||
|
||
if not first_ok_reported:
|
||
print("[INFO] Colonne riconosciute sul primo ISIN valido:",
|
||
"Data:", col_date, "| Rendimenti:", col_ret, "| Prezzo:", col_px,
|
||
"| H:", round(H,4) if pd.notna(H) else None, "| Regime:", regime)
|
||
first_ok_reported = True
|
||
|
||
if i % 10 == 0:
|
||
print(f"… {i}/{len(isins)} ISIN processati (ok finora: {ok_count})")
|
||
|
||
except Exception as e:
|
||
errors.append({"ISIN": isin, "Errore": str(e)})
|
||
|
||
# =========================================
|
||
# 4A) EXPORT: HURST + PATTERN (QualityScore)
|
||
# =========================================
|
||
hurst_df = pd.DataFrame(hurst_rows) if hurst_rows else pd.DataFrame(
|
||
{"ISIN": [], "Hurst": [], "Regime": []}
|
||
)
|
||
meta_df["ISIN"] = meta_df["ISIN"].astype(str).str.strip()
|
||
hurst_df["ISIN"] = hurst_df["ISIN"].astype(str).str.strip()
|
||
|
||
# Mappa ISIN -> Hurst (per usare H come theta_entry nel backtest)
|
||
hurst_map = {
|
||
str(row["ISIN"]).strip(): (float(row["Hurst"]) if pd.notna(row["Hurst"]) else np.nan)
|
||
for _, row in hurst_df.iterrows()
|
||
}
|
||
|
||
summary_hurst = meta_df.merge(hurst_df, on="ISIN", how="left")
|
||
cols_hurst = ["ISIN", "Nome", "Categoria", "Asset Class", "Hurst", "Regime"]
|
||
summary_hurst = summary_hurst[[c for c in cols_hurst if c in summary_hurst.columns]]
|
||
summary_hurst = summary_hurst.sort_values(["Hurst", "ISIN"], na_position="last").reset_index(drop=True)
|
||
summary_hurst.to_excel(OUTPUT_HURST_XLSX, index=False)
|
||
|
||
pat_df = pd.DataFrame(pattern_rows) if pattern_rows else pd.DataFrame(
|
||
{"ISIN": [], "DateLast": [], "PatternType": [], "Signal": [],
|
||
"Confidence": [], "EstOutcome": [], "AvgDist": [], "Wp": [], "Ha": [], "k": []}
|
||
)
|
||
pat_df["ISIN"] = pat_df["ISIN"].astype(str).str.strip()
|
||
|
||
summary_pattern = (
|
||
meta_df
|
||
.merge(hurst_df, on="ISIN", how="left")
|
||
.merge(pat_df, on="ISIN", how="left")
|
||
)
|
||
wanted_cols = ["ISIN","Nome","Categoria","Asset Class","Hurst","Regime",
|
||
"DateLast","PatternType","Signal","Confidence","EstOutcome","AvgDist","Wp","Ha","k"]
|
||
summary_pattern = summary_pattern[[c for c in wanted_cols if c in summary_pattern.columns]]
|
||
|
||
def _add_quality_scores(df: pd.DataFrame) -> pd.DataFrame:
|
||
out = df.copy()
|
||
conf = pd.to_numeric(out.get("Confidence", np.nan), errors="coerce")
|
||
est = pd.to_numeric(out.get("EstOutcome", np.nan), errors="coerce")
|
||
dist = pd.to_numeric(out.get("AvgDist", np.nan), errors="coerce")
|
||
|
||
max_abs_est = np.nanmax(np.abs(est)) if np.isfinite(np.nanmax(np.abs(est))) and (np.nanmax(np.abs(est)) > 0) else np.nan
|
||
outcome_score = np.where(np.isnan(max_abs_est) | (max_abs_est == 0), np.nan, np.abs(est) / max_abs_est)
|
||
similarity_score = 1.0 / (1.0 + dist.astype(float))
|
||
confidence_score = conf.astype(float)
|
||
quality = confidence_score * similarity_score * outcome_score
|
||
|
||
out["OutcomeScore"] = np.round(outcome_score, 4)
|
||
out["SimilarityScore"] = np.round(similarity_score, 4)
|
||
out["QualityScore"] = np.round(quality, 4)
|
||
return out
|
||
|
||
summary_pattern = _add_quality_scores(summary_pattern)
|
||
|
||
sort_cols = [c for c in ["QualityScore", "Confidence", "OutcomeScore"] if c in summary_pattern.columns]
|
||
if sort_cols:
|
||
summary_pattern = summary_pattern.sort_values(sort_cols, ascending=[False]*len(sort_cols),
|
||
na_position="last").reset_index(drop=True)
|
||
if "DateLast" in summary_pattern.columns:
|
||
summary_pattern = summary_pattern.sort_values(["QualityScore","DateLast"], ascending=[False, True],
|
||
na_position="last").reset_index(drop=True)
|
||
|
||
summary_pattern.to_excel(OUTPUT_PATTERN_XLSX, index=False)
|
||
|
||
print(f"[INFO] Salvato: {OUTPUT_HURST_XLSX} (righe: {len(summary_hurst)})")
|
||
print(f"[INFO] Salvato: {OUTPUT_PATTERN_XLSX} (righe: {len(summary_pattern)})")
|
||
|
||
if errors:
|
||
pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False)
|
||
print(f"[INFO] Log errori: {ERROR_LOG_CSV} (tot: {len(errors)})")
|
||
|
||
# =========================================
|
||
# 4B) FORWARD-BACKTEST (walk-forward) — SOLO LONG
|
||
# =========================================
|
||
def drawdown_stats_simple(ret_series: pd.Series):
|
||
eq = (ret_series.fillna(0)).cumsum()
|
||
rolling_max = eq.cummax()
|
||
dd = eq - rolling_max
|
||
maxdd = float(dd.min()) if len(dd) else 0.0
|
||
cagr = np.exp(ret_series.mean()*DAYS_PER_YEAR) - 1
|
||
annvol = ret_series.std() * np.sqrt(DAYS_PER_YEAR)
|
||
sharpe = (ret_series.mean() / (ret_series.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR)
|
||
calmar = (cagr / abs(maxdd)) if maxdd < 0 else np.nan
|
||
return {
|
||
"CAGR_%": round(cagr*100, 2),
|
||
"AnnVol_%": round(annvol*100, 2),
|
||
"Sharpe": round(float(sharpe), 2),
|
||
"MaxDD_%eq": round(float(maxdd*100), 2),
|
||
"Calmar": round(float(calmar), 2) if np.isfinite(calmar) else np.nan
|
||
}
|
||
|
||
def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret: str,
|
||
Wp: int, Ha: int, k: int,
|
||
theta_entry: float,
|
||
exec_ret: pd.Series | None = None,
|
||
fee_bps: float = 10,
|
||
# --- EXIT PARAMS (tutte opzionali) ---
|
||
sl_bps: float | None = 300.0, # Stop loss assoluto (bps sul PnL cumulato del trade)
|
||
tp_bps: float | None = 800.0, # Take profit assoluto (bps)
|
||
trail_bps: float | None = 300.0, # Trailing stop (drawdown dal picco, bps)
|
||
time_stop_bars: int | None = 20, # Massimo holding
|
||
theta_exit: float | None = 0.0, # esci se est_out <= theta_exit (se None, ignora)
|
||
weak_days_exit: int | None = None # esci se per N giorni est_out <= theta_exit
|
||
):
|
||
"""
|
||
Walk-forward SOLO LONG con regole di EXIT (SL/TP/TS/time/flip).
|
||
Ritorna (signals_df, summary_metrics_dict).
|
||
Nota: usa solo dati daily → le soglie sono valutate a fine giornata,
|
||
l'uscita avviene sulla barra successiva (modello prudente).
|
||
"""
|
||
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0 # rendimenti in decimali (close/close)
|
||
idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r))
|
||
idx = pd.to_datetime(idx).dt.normalize()
|
||
if exec_ret is not None:
|
||
r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float)
|
||
r_exec.index = pd.to_datetime(r_exec.index).normalize()
|
||
# reindex robusto: usa l'ordine di idx, preserva NaN se manca la data
|
||
r_exec = r_exec.reindex(idx)
|
||
if len(r_exec) != len(r):
|
||
r_exec = pd.Series(r_exec.values, index=idx).reindex(idx)
|
||
else:
|
||
r_exec = r
|
||
fee = fee_bps / 10000.0
|
||
|
||
# helper per costruire libreria solo passato
|
||
def _lib_predict(past_returns: pd.Series, win_last: np.ndarray):
|
||
lib_wins, lib_out = build_pattern_library(past_returns, Wp, Ha)
|
||
if lib_wins is None:
|
||
return np.nan, np.nan
|
||
curr_zn = z_norm(win_last)
|
||
if curr_zn is None:
|
||
return np.nan, np.nan
|
||
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=k)
|
||
return float(est_out), float(avg_dist)
|
||
|
||
# Stato della posizione
|
||
in_pos = False
|
||
entry_t = None
|
||
trade_pnl = 0.0
|
||
trade_peak = 0.0
|
||
weak_streak = 0
|
||
|
||
rows = []
|
||
|
||
# Nota: scorriamo dalle prime barre dove possiamo calcolare una finestra completa
|
||
for t in range(Wp, len(r) - 1):
|
||
past = r.iloc[:t]
|
||
# se passato insufficiente per libreria, forza out
|
||
if past.dropna().shape[0] < (Wp + Ha):
|
||
sig_out, est_out, avg_dist = 0, np.nan, np.nan
|
||
# PnL a t+1 sempre riportato in colonna Ret+1
|
||
rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan))
|
||
continue
|
||
|
||
win_last = r.iloc[t-Wp:t].values
|
||
est_out, avg_dist = _lib_predict(past, win_last)
|
||
|
||
# Default: portiamo avanti lo stato corrente (sig_out = 1 se in_pos)
|
||
sig_out = 1 if in_pos else 0
|
||
|
||
# --- LOGICA DI INGRESSO ---
|
||
if (not in_pos) and (est_out > theta_entry):
|
||
# apri domani → oggi segnaliamo 1 (per avere PnL su t+1)
|
||
sig_out = 1
|
||
in_pos = True
|
||
entry_t = t
|
||
trade_pnl = 0.0
|
||
trade_peak = 0.0
|
||
weak_streak = 0
|
||
|
||
# --- LOGICA DI USCITA (se in posizione) ---
|
||
elif in_pos:
|
||
# 1) aggiorna PnL del trade con il rendimento della barra che verrà *incassato* domani:
|
||
# Per coerenza EOD, PnL di oggi (da riportare) è su r[t+1] quando Signal(t)=1.
|
||
# Per controlli di stop a fine giornata, stimiamo la "pnl se restassi" accumulando r[t+1] ex-ante.
|
||
next_ret = r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan # rendimento che si applicherà se resto in posizione
|
||
pnl_if_stay = (1.0 + trade_pnl) * (1.0 + next_ret) - 1.0
|
||
|
||
# 2) aggiorna trailing peak ipotetico
|
||
peak_if_stay = max(trade_peak, pnl_if_stay)
|
||
|
||
# 3) valuta condizioni di uscita sullo stato "if stay"
|
||
exit_reasons = []
|
||
|
||
# SL
|
||
if (sl_bps is not None) and (pnl_if_stay <= -sl_bps/10000.0):
|
||
exit_reasons.append("SL")
|
||
|
||
# TP
|
||
if (tp_bps is not None) and (pnl_if_stay >= tp_bps/10000.0):
|
||
exit_reasons.append("TP")
|
||
|
||
# Trailing
|
||
if (trail_bps is not None) and (peak_if_stay - pnl_if_stay >= trail_bps/10000.0):
|
||
exit_reasons.append("TRAIL")
|
||
|
||
# Time stop
|
||
if (time_stop_bars is not None) and (t - entry_t + 1 >= time_stop_bars):
|
||
exit_reasons.append("TIME")
|
||
|
||
# Flip / debolezza persistente
|
||
if theta_exit is not None:
|
||
if est_out <= theta_exit:
|
||
# Debole oggi → aggiorna streak
|
||
weak_streak = weak_streak + 1 if weak_days_exit else weak_streak
|
||
# exit immediata se non usi weak_days_exit
|
||
if weak_days_exit is None:
|
||
exit_reasons.append("FLIP")
|
||
else:
|
||
if weak_streak >= weak_days_exit:
|
||
exit_reasons.append("FLIP_STREAK")
|
||
else:
|
||
weak_streak = 0
|
||
|
||
# *** Se una qualunque condizione scatta, usciamo domani → oggi mettiamo 0
|
||
if exit_reasons:
|
||
sig_out = 0
|
||
in_pos = False
|
||
entry_t = None
|
||
trade_pnl = 0.0
|
||
trade_peak = 0.0
|
||
weak_streak = 0
|
||
else:
|
||
# restiamo → aggiorniamo lo stato “vero” per il prossimo loop
|
||
trade_pnl = pnl_if_stay
|
||
trade_peak = peak_if_stay
|
||
|
||
# Registra la riga odierna; il PnL riportato è sempre il r[t+1]
|
||
rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan))
|
||
|
||
sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"])
|
||
|
||
# Costi su variazione posizione
|
||
sig_df["Signal_prev"] = sig_df["Signal"].shift(1).fillna(0)
|
||
trade_chg = (sig_df["Signal"] - sig_df["Signal_prev"]).abs()
|
||
cost = trade_chg * fee
|
||
|
||
sig_df["PnL"] = sig_df["Signal"] * sig_df["Ret+1"] - cost
|
||
sig_df.drop(columns=["Signal_prev"], inplace=True)
|
||
|
||
# Metriche sintetiche
|
||
stats = drawdown_stats_simple(sig_df["PnL"])
|
||
stats.update({
|
||
"HitRate_%": round(100 * ((sig_df["PnL"] > 0).sum() / max(1, sig_df["PnL"].notna().sum())), 2),
|
||
"AvgTradeRet_bps": round(sig_df["PnL"].mean() * 10000, 2),
|
||
"Turnover_%/step": round(100 * trade_chg.mean(), 2),
|
||
"N_Steps": int(sig_df.shape[0]),
|
||
})
|
||
# Aggiungi anche parametri di uscita usati (utile per grid search/trace)
|
||
stats.update({
|
||
"theta_entry": theta_entry,
|
||
"theta_exit": (None if theta_exit is None else float(theta_exit)),
|
||
"sl_bps": (None if sl_bps is None else float(sl_bps)),
|
||
"tp_bps": (None if tp_bps is None else float(tp_bps)),
|
||
"trail_bps": (None if trail_bps is None else float(trail_bps)),
|
||
"time_stop_bars": (None if time_stop_bars is None else int(time_stop_bars)),
|
||
"weak_days_exit": (None if weak_days_exit is None else int(weak_days_exit))
|
||
})
|
||
return sig_df, stats
|
||
|
||
|
||
# ========= ESECUZIONE BACKTEST PER TUTTI GLI ISIN =========
|
||
bt_signals = []
|
||
bt_summary = []
|
||
|
||
total_t = 0.0
|
||
start_all = time.perf_counter()
|
||
|
||
for i, isin in enumerate(isins, 1):
|
||
t0 = time.perf_counter() # ---- INIZIO TIMER SINGOLO CICLO ----
|
||
|
||
try:
|
||
df_isin = pd.read_sql_query(sql_sp, engine, params={"isin": isin, "n": N_BARS, "ptf": PTF_CURR})
|
||
if df_isin.empty:
|
||
errors.append({"ISIN": isin, "Errore": "SP vuota (BT)"})
|
||
continue
|
||
|
||
col_date, col_ret, col_px = detect_cols(df_isin)
|
||
if col_date:
|
||
df_isin[col_date] = pd.to_datetime(df_isin[col_date], errors="coerce")
|
||
df_isin = df_isin.sort_values(col_date)
|
||
|
||
if col_ret and col_ret in df_isin.columns:
|
||
df_isin[col_ret] = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) # % raw
|
||
elif col_px and col_px in df_isin.columns:
|
||
px = pd.to_numeric(df_isin[col_px], errors="coerce").astype(float).replace(0, np.nan)
|
||
df_isin[col_ret] = np.log(px / px.shift(1)) * 100.0 # %
|
||
else:
|
||
errors.append({"ISIN": isin, "Errore": "Né rendimenti né prezzi (BT)"})
|
||
continue
|
||
|
||
if df_isin[col_ret].dropna().shape[0] < max(200, WP + HA + 10):
|
||
errors.append({"ISIN": isin, "Errore": f"Serie troppo corta (BT) ({df_isin[col_ret].dropna().shape[0]} punti)"})
|
||
continue
|
||
|
||
# --- Fetch open/close per calcolare rendimenti di esecuzione (open->open) ---
|
||
try:
|
||
date_min = df_isin[col_date].min().date() if col_date else None
|
||
date_max = df_isin[col_date].max().date() if col_date else None
|
||
if date_min and date_max:
|
||
px_hist_one = fetch_price_history(
|
||
isins=[isin],
|
||
universe=meta_df if 'meta_df' in globals() else pd.DataFrame(),
|
||
start_date=date_min.isoformat(),
|
||
end_date=date_max.isoformat()
|
||
)
|
||
px_hist_one = px_hist_one.sort_values("Date")
|
||
open_series = px_hist_one[["Date","Open"]].dropna()
|
||
open_series["Date"] = pd.to_datetime(open_series["Date"]).dt.normalize()
|
||
open_series = open_series.drop_duplicates(subset=["Date"]).set_index("Date")["Open"]
|
||
open_ret = open_series.pct_change()
|
||
# riallinea sulla stessa sequenza di date del df_isin
|
||
idx_dates = pd.to_datetime(df_isin[col_date]).dt.normalize()
|
||
exec_ret = open_ret.reindex(idx_dates)
|
||
exec_ret.index = idx_dates
|
||
else:
|
||
exec_ret = None
|
||
except Exception as e:
|
||
print(f"[WARN] Fetch open/close fallito per {isin}: {e}")
|
||
exec_ret = None
|
||
|
||
# ============================
|
||
# THETA = HURST IN PERCENTUALE
|
||
# H = 0.50 -> theta_entry = 0.005 (0.5%)
|
||
# ============================
|
||
isin_str = str(isin).strip()
|
||
H_val = hurst_map.get(isin_str, np.nan)
|
||
if H_val is None or pd.isna(H_val):
|
||
theta_entry = THETA # fallback se H mancante
|
||
else:
|
||
theta_entry = float(H_val) / 100.0
|
||
|
||
sig_df, stats = knn_forward_backtest_one_asset(
|
||
df_isin=df_isin,
|
||
col_date=(col_date if col_date else df_isin.index.name or "idx"),
|
||
col_ret=col_ret,
|
||
Wp=WP,
|
||
Ha=HA,
|
||
k=KNN_K,
|
||
theta_entry=theta_entry,
|
||
exec_ret=exec_ret,
|
||
fee_bps=10,
|
||
)
|
||
|
||
name = meta_df.loc[meta_df["ISIN"]==isin, "Nome"].iloc[0] if (meta_df["ISIN"]==isin).any() else None
|
||
cat = meta_df.loc[meta_df["ISIN"]==isin, "Categoria"].iloc[0] if (meta_df["ISIN"]==isin).any() else None
|
||
ac = meta_df.loc[meta_df["ISIN"]==isin, "Asset Class"].iloc[0] if (meta_df["ISIN"]==isin).any() else None
|
||
|
||
tmp = sig_df.copy()
|
||
tmp.insert(0, "ISIN", isin)
|
||
tmp.insert(1, "Nome", name)
|
||
tmp.insert(2, "Categoria", cat)
|
||
tmp.insert(3, "Asset Class", ac)
|
||
tmp["Wp"] = WP; tmp["Ha"] = HA; tmp["k"] = KNN_K; tmp["Theta"] = theta_entry
|
||
bt_signals.append(tmp)
|
||
|
||
stats_row = {"ISIN": isin, "Nome": name, "Categoria": cat, "Asset Class": ac}
|
||
stats_row.update(stats)
|
||
bt_summary.append(stats_row)
|
||
|
||
except Exception as e:
|
||
errors.append({"ISIN": isin, "Errore": f"Backtest: {str(e)}"})
|
||
|
||
# ---- FINE TIMER SINGOLO CICLO ----
|
||
dt = time.perf_counter() - t0
|
||
total_t += dt
|
||
|
||
avg_t = total_t / i
|
||
eta = avg_t * (len(isins) - i)
|
||
|
||
print(
|
||
f"… backtest {i}/{len(isins)} completati — {dt:.2f} sec "
|
||
f"(avg {avg_t:.2f}s, ETA {format_eta(eta)} rimanenti)"
|
||
)
|
||
|
||
# ---- TIMER FINALE ----
|
||
end_all = time.perf_counter()
|
||
total_elapsed = end_all - start_all
|
||
avg_elapsed = total_elapsed / max(len(isins), 1)
|
||
print(f"[INFO] Tempo totale: {format_eta(total_elapsed)} ({total_elapsed:.2f} sec)")
|
||
print(f"[INFO] Tempo medio per asset: {format_eta(avg_elapsed)} ({avg_elapsed:.2f} sec)")
|
||
|
||
|
||
bt_signals_df = pd.concat(bt_signals, ignore_index=True) if bt_signals else pd.DataFrame(
|
||
columns=["ISIN","Nome","Categoria","Asset Class","Date","Signal","EstOutcome","AvgDist","Ret+1","PnL","Wp","Ha","k","Theta"]
|
||
)
|
||
bt_summary_df = pd.DataFrame(bt_summary) if bt_summary else pd.DataFrame(
|
||
columns=["ISIN","Nome","Categoria","Asset Class","CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar","HitRate_%","AvgTradeRet_bps","Turnover_%/step","N_Steps"]
|
||
)
|
||
|
||
bt_signals_df.to_excel(FORWARD_BT_SIGNALS_XLSX, index=False)
|
||
bt_summary_df.to_excel(FORWARD_BT_SUMMARY_XLSX, index=False)
|
||
print(f"[INFO] Salvato: {FORWARD_BT_SIGNALS_XLSX} ({len(bt_signals_df):,} righe)")
|
||
print(f"[INFO] Salvato: {FORWARD_BT_SUMMARY_XLSX} ({len(bt_summary_df):,} righe)")
|
||
|
||
if errors:
|
||
pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False)
|
||
print(f"[INFO] Log errori aggiornato: {ERROR_LOG_CSV} (tot: {len(errors)})")
|
||
|
||
# Salva riepilogo prezzi (solo simboli primari, senza fallback) ogni run
|
||
try:
|
||
save_price_cache_summary(OPEN_CACHE_DIR, OPEN_CACHE_DIR / "prezzi_summary_no_fallback.xlsx")
|
||
except Exception as e:
|
||
print(f"[WARN] Riepilogo prezzi non creato: {e}")
|
||
|
||
# Timer per fasi post-backtest (sezione 5 in poi)
|
||
start_post_timer(total_steps=4)
|
||
|
||
# ======================================================================
|
||
# 5) STRATEGIE PORTAFOGLIO DINAMICHE + EQUITY + HEATMAP + TRADE REPORT
|
||
# ======================================================================
|
||
def _ensure_bt_summary(meta_df: pd.DataFrame, bt_summary_df: pd.DataFrame) -> pd.DataFrame:
|
||
if bt_summary_df is not None and not bt_summary_df.empty:
|
||
out = bt_summary_df.copy()
|
||
else:
|
||
out = pd.DataFrame({"ISIN": meta_df["ISIN"].copy()})
|
||
for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]:
|
||
out[c] = np.nan
|
||
out = out.merge(meta_df[["ISIN","Nome","Categoria","Asset Class"]], on="ISIN", how="left")
|
||
out["ISIN"] = out["ISIN"].astype(str).str.strip()
|
||
return out
|
||
|
||
def _ensure_bt_signals(meta_df: pd.DataFrame, bt_signals_df: pd.DataFrame, last_dates_hint: list) -> pd.DataFrame:
|
||
if bt_signals_df is not None and not bt_signals_df.empty:
|
||
fbsig = bt_signals_df.copy()
|
||
fbsig["Date"] = pd.to_datetime(fbsig["Date"])
|
||
fbsig["ISIN"] = fbsig["ISIN"].astype(str).str.strip()
|
||
fbsig["Signal"] = pd.to_numeric(fbsig["Signal"], errors="coerce").fillna(0).astype(int)
|
||
fbsig["PnL"] = pd.to_numeric(fbsig["PnL"], errors="coerce").fillna(0.0)
|
||
return fbsig
|
||
|
||
end_date = (max(last_dates_hint) if last_dates_hint else pd.Timestamp.today()).normalize()
|
||
dates = pd.bdate_range(end=end_date, periods=120, freq="C")
|
||
isins_small = meta_df["ISIN"].astype(str).str.strip().tolist()[:12]
|
||
rows = []
|
||
for isin in isins_small:
|
||
for dt in dates:
|
||
rows.append({"Date": dt, "ISIN": isin, "Signal": 0, "PnL": 0.0})
|
||
return pd.DataFrame(rows)
|
||
|
||
forward_bt_summary = _ensure_bt_summary(meta_df, bt_summary_df)
|
||
forward_bt_signals = _ensure_bt_signals(meta_df, bt_signals_df, last_dates)
|
||
|
||
# -----------------------------
|
||
# 5.1 Funzioni di supporto (grafici e pesi)
|
||
# -----------------------------
|
||
def equity_from_returns(r: pd.Series) -> pd.Series:
|
||
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
|
||
return (1 + r).cumprod() * 100
|
||
|
||
def monthly_returns(r: pd.Series) -> pd.Series:
|
||
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
|
||
if not isinstance(r.index, (pd.DatetimeIndex, pd.PeriodIndex, pd.TimedeltaIndex)):
|
||
try:
|
||
r.index = pd.to_datetime(r.index)
|
||
except Exception:
|
||
return pd.Series(dtype=float)
|
||
return (1 + r).resample("M").prod() - 1
|
||
|
||
def plot_heatmap_monthly(r: pd.Series, title: str, save_path: str = None):
|
||
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
|
||
m = monthly_returns(r)
|
||
df = m.to_frame("ret")
|
||
df["Year"], df["Month"] = df.index.year, df.index.month
|
||
pv = df.pivot(index="Year", columns="Month", values="ret")
|
||
fig, ax = plt.subplots(figsize=(10,6))
|
||
im = ax.imshow(pv.fillna(0)*100, cmap="RdYlGn", vmin=-3, vmax=5, aspect="auto")
|
||
for i in range(pv.shape[0]):
|
||
for j in range(pv.shape[1]):
|
||
val = pv.iloc[i,j]
|
||
if not np.isnan(val):
|
||
ax.text(j, i, f"{val*100:.1f}", ha="center", va="center", fontsize=8)
|
||
ax.set_title(title); ax.set_xlabel("Mese"); ax.set_ylabel("Anno")
|
||
ax.set_xticks(range(12)); ax.set_xticklabels(range(1,13))
|
||
fig.colorbar(im, ax=ax, label="%")
|
||
plt.tight_layout()
|
||
if save_path:
|
||
savefig_safe(save_path, dpi=150)
|
||
plt.close(fig)
|
||
# Non mostrare il plot durante l'esecuzione
|
||
|
||
def inverse_vol_weights(df, window=60, max_weight=None):
|
||
vol = df.rolling(window).std()
|
||
inv = 1 / vol.replace(0, np.nan)
|
||
w = inv.div(inv.sum(axis=1), axis=0)
|
||
w = w.ffill().fillna(1 / max(1, df.shape[1]))
|
||
|
||
if max_weight is not None:
|
||
w = w.clip(upper=max_weight)
|
||
|
||
return w
|
||
|
||
def portfolio_metrics(r: pd.Series):
|
||
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
|
||
if len(r) == 0:
|
||
return dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan)
|
||
ann = DAYS_PER_YEAR
|
||
eq = equity_from_returns(r)
|
||
cagr = (eq.iloc[-1]/eq.iloc[0])**(ann/max(1, len(r))) - 1
|
||
vol = r.std() * np.sqrt(ann)
|
||
sharpe = (r.mean()/r.std()) * np.sqrt(ann) if r.std() > 0 else np.nan
|
||
mdd = (eq/eq.cummax() - 1).min()
|
||
return dict(CAGR=cagr, Vol=vol, Sharpe=sharpe, MaxDD=mdd)
|
||
|
||
# -----------------------------
|
||
# 5.2 Selezione dinamica ISIN (ranking)
|
||
# -----------------------------
|
||
df_sum = forward_bt_summary.copy()
|
||
for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%"]:
|
||
if c in df_sum.columns:
|
||
df_sum[c] = pd.to_numeric(df_sum[c], errors="coerce")
|
||
|
||
def _safe_rank(s: pd.Series):
|
||
s = pd.to_numeric(s, errors="coerce")
|
||
if s.notna().sum() == 0:
|
||
return pd.Series(np.zeros(len(s)), index=s.index)
|
||
s_filled = s.fillna(s.median())
|
||
return s_filled.rank(method="average")
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
from scipy import linalg
|
||
|
||
# ----------------------------
|
||
# Utils
|
||
# ----------------------------
|
||
def _safe_rank_ser(s: pd.Series) -> pd.Series:
|
||
"""Rank robusto (0..1), gestisce NaN."""
|
||
s = s.copy()
|
||
return s.rank(method="average", na_option="keep") / s.notna().sum()
|
||
|
||
def _winsorize(s: pd.Series, p=0.005):
|
||
lo, hi = s.quantile(p), s.quantile(1-p)
|
||
return s.clip(lo, hi)
|
||
|
||
def _pos_normalize(w: np.ndarray, eps=1e-12):
|
||
w = np.where(w < 0, 0, w)
|
||
s = w.sum()
|
||
return (w / (s + eps)) if s > eps else np.ones_like(w)/len(w)
|
||
|
||
def _corr_shrink(C: np.ndarray, alpha=0.10):
|
||
"""Shrink verso I per stabilità numerica."""
|
||
k = C.shape[0]
|
||
return (1-alpha)*C + alpha*np.eye(k)
|
||
|
||
def _time_blocks_index(idx: pd.Index, k_folds=5):
|
||
"""Split temporale semplice in k blocchi contigui."""
|
||
n = len(idx)
|
||
fold_sizes = np.full(k_folds, n // k_folds, dtype=int)
|
||
fold_sizes[: n % k_folds] += 1
|
||
splits, start = [], 0
|
||
for fs in fold_sizes:
|
||
end = start + fs
|
||
splits.append(idx[start:end])
|
||
start = end
|
||
return splits
|
||
|
||
# ----------------------------
|
||
# Calibrazione pesi
|
||
# ----------------------------
|
||
def calibrate_score_weights(
|
||
df_sum: pd.DataFrame,
|
||
metrics_map=None,
|
||
target_col: str | None = None,
|
||
k_folds: int = 5,
|
||
shrink_equal: float = 0.25, # shrink verso equal weight per stabilità
|
||
corr_shrink: float = 0.10 # shrink della matrice di correlazione
|
||
):
|
||
"""
|
||
metrics_map: lista di tuple (colname, good_is_high)
|
||
good_is_high=True se valori alti sono migliori.
|
||
Esempio: [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
|
||
target_col: nome colonna target OOS (es. 'FWD_CAGR_%', 'FWD_Sharpe', ecc.)
|
||
Se None => usa ERC non supervisionato.
|
||
Ritorna: dict con 'weights' (pd.Series), 'X_ranked' (DataFrame) e 'mode'
|
||
"""
|
||
if metrics_map is None:
|
||
metrics_map = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
|
||
|
||
# Costruisci matrice delle metriche X (rank 0..1)
|
||
X_cols, X_list = [], []
|
||
for col, good_high in metrics_map:
|
||
s = df_sum.get(col, pd.Series(index=df_sum.index, dtype=float)).astype(float)
|
||
s = _winsorize(s)
|
||
if not good_high:
|
||
s = -s # inverti segno se "meno è meglio"
|
||
X_cols.append(col)
|
||
X_list.append(_safe_rank_ser(s))
|
||
|
||
X = pd.concat(X_list, axis=1)
|
||
X.columns = X_cols
|
||
X = X.loc[:, X.columns[X.notna().sum(0) > 0]] # droppa colonne tutte NaN
|
||
k = X.shape[1]
|
||
if k == 0:
|
||
raise ValueError("Nessuna metrica valida per la calibrazione.")
|
||
|
||
# Se non hai target: ERC non supervisionato
|
||
if target_col is None or target_col not in df_sum.columns:
|
||
# cov su features rankate
|
||
C = np.cov(np.nan_to_num(X.values, nan=np.nanmean(X.values)), rowvar=False)
|
||
# stabilizza
|
||
C = _corr_shrink(C, alpha=corr_shrink)
|
||
# ERC: pesi proposti ~ inverse vol in spazio decorrelato
|
||
# approssimazione pratica: w ∝ diag(C)^(-1/2), poi normalizza riducendo l'effetto di correlazioni con Σ^-1/2
|
||
vol = np.sqrt(np.clip(np.diag(C), 1e-12, None))
|
||
w0 = 1.0 / vol
|
||
w = _pos_normalize(w0)
|
||
return {
|
||
"mode": "unsupervised_erc",
|
||
"weights": pd.Series(w, index=X.columns, name="weight"),
|
||
"X_ranked": X
|
||
}
|
||
|
||
# Supervisionato: IC-Σ^-1
|
||
y = df_sum[target_col].astype(float).copy()
|
||
y = _winsorize(y)
|
||
y_rank = _safe_rank_ser(y) # target in rank per allineare a Spearman
|
||
|
||
# Drop righe senza target o tutte NaN nelle metriche
|
||
mask = y_rank.notna() & X.notna().any(1)
|
||
Xf, yf = X[mask].copy(), y_rank[mask].copy()
|
||
if len(Xf) < max(30, k*5):
|
||
# fallback se troppo pochi dati
|
||
C = np.corrcoef(np.nan_to_num(X.values, nan=np.nanmean(X.values)), rowvar=False)
|
||
C = _corr_shrink(C, alpha=corr_shrink)
|
||
ic = np.array([
|
||
pd.Series(X.iloc[:, j]).corr(y_rank, method="spearman")
|
||
for j in range(k)
|
||
])
|
||
ic = np.nan_to_num(ic, nan=0.0)
|
||
w = linalg.solve(C + 1e-6*np.eye(k), ic, assume_a='sym')
|
||
w = (1-shrink_equal)*_pos_normalize(w) + shrink_equal*np.ones(k)/k
|
||
return {
|
||
"mode": "supervised_icSigmaInv_fallback",
|
||
"weights": pd.Series(w, index=X.columns, name="weight"),
|
||
"X_ranked": X
|
||
}
|
||
|
||
# CV a blocchi temporali (walk-forward grossolano)
|
||
# Ordine temporale = index di df_sum (assumo ordinato)
|
||
folds = _time_blocks_index(Xf.index, k_folds=k_folds)
|
||
W_list = []
|
||
for t in range(1, len(folds)):
|
||
train_idx = pd.Index([]).append(pd.Index([]))
|
||
for i in range(t): # usa tutti i blocchi precedenti come train
|
||
train_idx = train_idx.append(folds[i])
|
||
valid_idx = folds[t]
|
||
|
||
Xt, yt = Xf.loc[train_idx], yf.loc[train_idx]
|
||
Xv, yv = Xf.loc[valid_idx], yf.loc[valid_idx]
|
||
|
||
# IC per colonna usando solo train
|
||
ic = np.array([Xt.iloc[:, j].corr(yt, method="spearman") for j in range(Xt.shape[1])])
|
||
ic = np.nan_to_num(ic, nan=0.0)
|
||
|
||
# Corr(X) su train
|
||
C = np.corrcoef(np.nan_to_num(Xt.values, nan=np.nanmean(Xt.values)), rowvar=False)
|
||
C = _corr_shrink(C, alpha=corr_shrink)
|
||
|
||
try:
|
||
w_raw = linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic, assume_a='sym')
|
||
except Exception:
|
||
w_raw = ic.copy()
|
||
|
||
w_fold = _pos_normalize(w_raw)
|
||
# shrink verso equal
|
||
w_fold = (1-shrink_equal)*w_fold + shrink_equal*np.ones_like(w_fold)/len(w_fold)
|
||
W_list.append(w_fold)
|
||
|
||
# media pesi sui fold
|
||
if not W_list:
|
||
# in casi limite (pochi dati), ripiega
|
||
ic = np.array([Xf.iloc[:, j].corr(yf, method="spearman") for j in range(Xf.shape[1])])
|
||
ic = np.nan_to_num(ic, nan=0.0)
|
||
C = np.corrcoef(np.nan_to_num(Xf.values, nan=np.nanmean(Xf.values)), rowvar=False)
|
||
C = _corr_shrink(C, alpha=corr_shrink)
|
||
w = linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic, assume_a='sym')
|
||
else:
|
||
w = np.mean(np.vstack(W_list), axis=0)
|
||
|
||
w = _pos_normalize(w)
|
||
return {
|
||
"mode": "supervised_icSigmaInv_cv",
|
||
"weights": pd.Series(w, index=X.columns, name="weight"),
|
||
"X_ranked": X
|
||
}
|
||
|
||
# --- PRE-FLIGHT METRICS GUARD ---------------------------------------------
|
||
def _coerce_num(s: pd.Series) -> pd.Series:
|
||
return pd.to_numeric(s, errors="coerce").replace([np.inf, -np.inf], np.nan)
|
||
|
||
# Assicurati che df_sum esista ed abbia le colonne base
|
||
df_sum = forward_bt_summary.copy()
|
||
for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%",
|
||
"QualityScore","Confidence","OutcomeScore"]:
|
||
if c in df_sum.columns:
|
||
df_sum[c] = _coerce_num(df_sum[c])
|
||
|
||
# Se Sharpe/CAGR/MaxDD mancano o sono tutti NaN, prova a ricostruirli dai segnali
|
||
need_rebuild = (
|
||
("Sharpe" not in df_sum.columns or df_sum["Sharpe"].notna().sum()==0) or
|
||
("CAGR_%" not in df_sum.columns) or
|
||
("MaxDD_%eq" not in df_sum.columns)
|
||
)
|
||
|
||
if need_rebuild:
|
||
try:
|
||
# Recompute metriche minime per ISIN da bt_signals_df (se presente)
|
||
tmp = bt_signals_df.copy()
|
||
if not tmp.empty:
|
||
tmp["PnL"] = pd.to_numeric(tmp["PnL"], errors="coerce").fillna(0.0)
|
||
agg_rows = []
|
||
for isin, g in tmp.groupby("ISIN"):
|
||
stats = drawdown_stats_simple(g["PnL"])
|
||
stats["ISIN"] = str(isin)
|
||
agg_rows.append(stats)
|
||
rebuilt = pd.DataFrame(agg_rows)
|
||
|
||
# Coerce numerico e merge su df_sum
|
||
for c in ["CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"]:
|
||
if c in rebuilt.columns:
|
||
rebuilt[c] = _coerce_num(rebuilt[c])
|
||
|
||
df_sum = (
|
||
df_sum.drop(columns=[c for c in ["CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"] if c in df_sum.columns])
|
||
.merge(rebuilt[["ISIN","CAGR_%","AnnVol_%","Sharpe","MaxDD_%eq","Calmar"]], on="ISIN", how="left")
|
||
)
|
||
except Exception as e:
|
||
print(f"[WARN] Ricostruzione metriche fallita: {e}")
|
||
|
||
df_sum = _apply_score(df_sum)
|
||
|
||
TOP_N = 15
|
||
base_isins = (
|
||
df_sum
|
||
.sort_values("Score", ascending=False)
|
||
.head(TOP_N)["ISIN"].astype(str).str.strip().tolist()
|
||
)
|
||
|
||
# Nessuna strategia cripto separata: le criptovalute sono trattate come gli altri asset
|
||
print(f"[INFO] Ranking full-sample (solo debug, i portafogli usano ranking rolling): {base_isins}")
|
||
|
||
# -----------------------------
|
||
# 5.3 Costruzione portafogli
|
||
# (Equal Weight + Risk Parity con cap)
|
||
# -----------------------------
|
||
bt = forward_bt_signals.copy()
|
||
bt["Date"] = pd.to_datetime(bt["Date"])
|
||
bt["ISIN"] = bt["ISIN"].astype(str).str.strip()
|
||
bt = bt.sort_values(["Date", "ISIN"])
|
||
|
||
wide_pnl = (
|
||
bt.pivot_table(index="Date", columns="ISIN", values="PnL", aggfunc="sum")
|
||
.fillna(0.0)
|
||
)
|
||
wide_sig = (
|
||
bt.pivot_table(index="Date", columns="ISIN", values="Signal", aggfunc="last")
|
||
.fillna(0)
|
||
.astype(int)
|
||
)
|
||
wide_est = (
|
||
bt.pivot_table(index="Date", columns="ISIN", values="EstOutcome", aggfunc="last")
|
||
.sort_index()
|
||
)
|
||
|
||
# (Opzionale) ricostruzione PnL portafoglio con open->open: disattivata di default perché il PnL
|
||
# viene già calcolato a livello di singolo asset usando gli open.
|
||
if globals().get("RECOMPUTE_PORTF_FROM_OPEN", False):
|
||
try:
|
||
date_min = (bt["Date"].min() - pd.Timedelta(days=5)).date()
|
||
date_max = (bt["Date"].max() + pd.Timedelta(days=5)).date()
|
||
px_hist = fetch_price_history(
|
||
isins=bt["ISIN"].unique(),
|
||
universe=meta_df if 'meta_df' in globals() else pd.DataFrame(),
|
||
start_date=date_min.isoformat(),
|
||
end_date=date_max.isoformat()
|
||
)
|
||
open_pivot = (
|
||
px_hist.pivot(index="Date", columns="ISIN", values="Open")
|
||
.sort_index()
|
||
)
|
||
open_ret = open_pivot.pct_change()
|
||
# segnale su giorno t, esecuzione a open t+1
|
||
wide_pnl = wide_sig * open_ret.shift(-1)
|
||
common_idx = wide_sig.index.intersection(wide_pnl.index)
|
||
common_idx = pd.to_datetime(common_idx)
|
||
wide_sig = wide_sig.reindex(common_idx).fillna(0).astype(int)
|
||
wide_pnl = wide_pnl.reindex(common_idx).fillna(0.0)
|
||
wide_sig.index = pd.to_datetime(wide_sig.index)
|
||
wide_pnl.index = pd.to_datetime(wide_pnl.index)
|
||
print(f"[INFO] PnL ricostruito su open->open per {len(open_pivot.columns)} ISIN.")
|
||
except Exception as e:
|
||
print(f"[WARN] Ricostruzione PnL open->open fallita, uso PnL originale: {e}")
|
||
|
||
# I portafogli verranno costruiti piu' sotto con ranking rolling (vedi _build_dynamic_portfolio_returns).
|
||
|
||
def plot_portfolio_composition(weights: pd.DataFrame,
|
||
title: str,
|
||
save_path: str | None = None,
|
||
max_legend: int = 12):
|
||
"""
|
||
Stacked area dei pesi per ISIN nel tempo (un colore per asset).
|
||
- Accetta un DataFrame 'weights' indicizzato per Date, colonne = ISIN, valori = peso (0..1).
|
||
- Normalizza le righe per sicurezza.
|
||
- Raggruppa la coda lunga in 'Altri' se gli asset superano 'max_legend'.
|
||
- Salva su 'save_path' se fornito; se è solo un filename, salva nella cartella corrente.
|
||
- Non lancia eccezioni inutili: stampa messaggi SKIP quando i dati non sono plottabili.
|
||
|
||
Esempio:
|
||
plot_portfolio_composition(w_eq, "Equal Weight", "composition_equal_weight.png")
|
||
plot_portfolio_composition(w_rp, "Risk Parity", "composition_risk_parity.png")
|
||
"""
|
||
import os
|
||
import numpy as np
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
|
||
# ---- Guard basi ----
|
||
if weights is None or getattr(weights, "empty", True):
|
||
print(f"[SKIP] Nessun peso disponibile per: {title}")
|
||
return
|
||
|
||
# Copia e sanificazione
|
||
W = weights.copy()
|
||
|
||
# Forza indice univoco e ordinato (se Date duplicati, tieni l’ultimo)
|
||
if W.index.has_duplicates:
|
||
W = W[~W.index.duplicated(keep="last")]
|
||
W = W.sort_index()
|
||
|
||
# Coerce numerico e sostituisci NaN con 0
|
||
W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0)
|
||
|
||
# Droppa colonne totalmente nulle
|
||
keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0]
|
||
if not keep_cols:
|
||
print(f"[SKIP] Tutte le colonne hanno peso zero per: {title}")
|
||
return
|
||
W = W[keep_cols]
|
||
|
||
# Normalizzazione riga (se somma=0, lascia 0)
|
||
row_sum = W.sum(axis=1)
|
||
with np.errstate(invalid="ignore", divide="ignore"):
|
||
W = W.div(row_sum.replace(0.0, np.nan), axis=0).fillna(0.0).clip(lower=0.0)
|
||
|
||
# Se dopo la pulizia resta vuoto o una sola riga, non ha senso il plot area
|
||
if len(W.index) < 2 or W.shape[1] == 0:
|
||
print(f"[SKIP] Serie troppo corta o senza colonne valide per: {title}")
|
||
return
|
||
|
||
# Ordina gli asset per peso medio decrescente (più leggibile)
|
||
avg_w = W.mean(axis=0).sort_values(ascending=False)
|
||
ordered = avg_w.index.tolist()
|
||
|
||
# Raggruppa coda lunga in "Altri" se troppi asset
|
||
if len(ordered) > max_legend:
|
||
head, tail = ordered[:max_legend], ordered[max_legend:]
|
||
W_show = W[head].copy()
|
||
W_show["Altri"] = W[tail].sum(axis=1)
|
||
ordered = head + ["Altri"]
|
||
else:
|
||
W_show = W[ordered].copy()
|
||
|
||
# Palette (tab20 riciclata se necessario)
|
||
cmap = plt.colormaps.get_cmap("tab20")
|
||
colors = [cmap(i % cmap.N) for i in range(len(ordered))]
|
||
|
||
# Plot
|
||
fig, ax = plt.subplots(figsize=(11, 6))
|
||
ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors)
|
||
ax.set_title(f"Composizione portafoglio nel tempo - {title}")
|
||
ax.set_ylim(0, 1)
|
||
ax.grid(True, alpha=0.3)
|
||
ax.set_ylabel("Peso")
|
||
|
||
# Etichette Y in percentuale
|
||
yticks = ax.get_yticks()
|
||
ax.set_yticklabels([f"{y*100:.0f}%" for y in yticks])
|
||
|
||
# Legenda compatta
|
||
ncol = 2 if len(ordered) > 10 else 1
|
||
ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN")
|
||
|
||
fig.tight_layout()
|
||
|
||
# Salvataggio robusto (gestisce filename senza cartella)
|
||
if save_path:
|
||
folder = os.path.dirname(save_path) or "."
|
||
try:
|
||
os.makedirs(folder, exist_ok=True)
|
||
except Exception as e:
|
||
print(f"[WARN] Impossibile creare la cartella '{folder}': {e}. Provo a salvare nella dir corrente.")
|
||
save_path = os.path.basename(save_path)
|
||
fig.savefig(save_path, dpi=150, bbox_inches="tight")
|
||
try:
|
||
full = os.path.abspath(save_path)
|
||
except Exception:
|
||
full = save_path
|
||
print(f"[INFO] Salvato: {full}")
|
||
|
||
# Plot salvato senza visualizzazione interattiva
|
||
|
||
def make_active_weights(w_base: pd.DataFrame,
|
||
sig: pd.DataFrame,
|
||
renorm_to_1: bool = False, # True: rialloca tra gli attivi; False: lascia quota in Cash
|
||
add_cash: bool = True,
|
||
cash_label: str = "Cash") -> pd.DataFrame:
|
||
"""
|
||
w_base : pesi teorici (righe=date, colonne=ISIN)
|
||
sig : matrice Signal (0/1) allineata (righe=date, colonne=ISIN)
|
||
"""
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
if w_base is None or w_base.empty:
|
||
return pd.DataFrame(index=sig.index, columns=[])
|
||
|
||
W = w_base.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0)
|
||
S = sig.reindex_like(W).fillna(0).astype(int) # allinea e riempi
|
||
# pesa solo gli attivi
|
||
W_active = W * (S > 0)
|
||
|
||
row_sum = W_active.sum(axis=1)
|
||
if renorm_to_1:
|
||
# rialloca sui soli attivi: nessuna quota in cash
|
||
W_active = W_active.div(row_sum.replace(0, np.nan), axis=0).fillna(0.0)
|
||
if add_cash:
|
||
# se renorm True, cash è zero
|
||
W_active[cash_label] = 0.0
|
||
else:
|
||
# non renorm: lasciamo la parte non investita in cash
|
||
if add_cash:
|
||
cash = (1.0 - row_sum).clip(lower=0.0, upper=1.0)
|
||
W_active[cash_label] = cash
|
||
|
||
# rimuovi colonne sempre zero
|
||
keep = [c for c in W_active.columns if W_active[c].abs().sum() > 0]
|
||
return W_active[keep]
|
||
|
||
# -----------------------------
|
||
# Portafogli dinamici con ranking rolling
|
||
# -----------------------------
|
||
_dynamic_portfolio_cache: dict[int, dict] = {}
|
||
|
||
def _build_dynamic_portfolio_returns(
|
||
wide_pnl: pd.DataFrame,
|
||
wide_sig: pd.DataFrame,
|
||
wide_est: pd.DataFrame,
|
||
top_n: int,
|
||
window_bars: int = RANKING_WINDOW_BARS,
|
||
rp_lookback: int = RP_LOOKBACK
|
||
) -> dict:
|
||
if wide_pnl is None or wide_pnl.empty:
|
||
idx = pd.Index([])
|
||
empty_w = pd.DataFrame(index=idx, columns=[])
|
||
return {
|
||
"ret_eq": pd.Series(dtype=float),
|
||
"ret_rp": pd.Series(dtype=float),
|
||
"w_eq": empty_w,
|
||
"w_rp": empty_w,
|
||
"w_eq_act": empty_w,
|
||
"w_rp_act": empty_w,
|
||
"selection": {}
|
||
}
|
||
|
||
dates = wide_pnl.index.sort_values()
|
||
all_cols = wide_pnl.columns.tolist()
|
||
|
||
w_eq = pd.DataFrame(0.0, index=dates, columns=all_cols)
|
||
w_rp = pd.DataFrame(0.0, index=dates, columns=all_cols)
|
||
selection = {}
|
||
|
||
for dt in dates:
|
||
# Considera solo gli ISIN con segnale attivo oggi
|
||
sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float)
|
||
on_cols = [c for c in all_cols if sig_row.get(c, 0) == 1]
|
||
if not on_cols:
|
||
selection[dt] = []
|
||
continue
|
||
|
||
window_est = wide_est.loc[:dt].tail(window_bars) if not wide_est.empty else pd.DataFrame()
|
||
scores = []
|
||
for c in on_cols:
|
||
s = pd.to_numeric(window_est[c], errors="coerce") if c in window_est.columns else pd.Series(dtype=float)
|
||
est_score = s.mean(skipna=True)
|
||
if pd.isna(est_score):
|
||
continue
|
||
scores.append((c, est_score))
|
||
|
||
if not scores:
|
||
selection[dt] = []
|
||
continue
|
||
|
||
scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True)
|
||
base_isins_dt = [c for c, _ in scores_sorted[:top_n]]
|
||
selection[dt] = base_isins_dt
|
||
if not base_isins_dt:
|
||
continue
|
||
|
||
w_eq.loc[dt, base_isins_dt] = 1 / len(base_isins_dt)
|
||
|
||
window_pnl = wide_pnl.loc[:dt].tail(window_bars)
|
||
rp_hist = window_pnl[base_isins_dt]
|
||
rp_w = inverse_vol_weights(rp_hist, window=rp_lookback, max_weight=RP_MAX_WEIGHT)
|
||
if not rp_w.empty:
|
||
last = rp_w.iloc[-1].fillna(0.0)
|
||
last_sum = float(last.sum())
|
||
if last_sum > 0:
|
||
last = last / last_sum
|
||
w_rp.loc[dt, last.index] = last.values
|
||
|
||
w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
|
||
w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
|
||
|
||
ret_eq = (wide_pnl * w_eq_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
|
||
ret_rp = (wide_pnl * w_rp_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
|
||
|
||
return {
|
||
"ret_eq": ret_eq,
|
||
"ret_rp": ret_rp,
|
||
"w_eq": w_eq,
|
||
"w_rp": w_rp,
|
||
"w_eq_act": w_eq_act,
|
||
"w_rp_act": w_rp_act,
|
||
"selection": selection
|
||
}
|
||
|
||
def _get_dynamic_portfolio(top_n: int) -> dict:
|
||
if top_n not in _dynamic_portfolio_cache:
|
||
_dynamic_portfolio_cache[top_n] = _build_dynamic_portfolio_returns(
|
||
wide_pnl=wide_pnl,
|
||
wide_sig=wide_sig,
|
||
wide_est=wide_est,
|
||
top_n=top_n,
|
||
window_bars=RANKING_WINDOW_BARS,
|
||
rp_lookback=RP_LOOKBACK
|
||
)
|
||
return _dynamic_portfolio_cache[top_n]
|
||
|
||
# Portafoglio principale (Top_N di default) calcolato in modo rolling
|
||
_main_port = _get_dynamic_portfolio(TOP_N)
|
||
ret_eq = _main_port["ret_eq"]
|
||
ret_rp = _main_port["ret_rp"]
|
||
w_eq = _main_port["w_eq"]
|
||
w_rp = _main_port["w_rp"]
|
||
w_eq_act = _main_port["w_eq_act"]
|
||
w_rp_act = _main_port["w_rp_act"]
|
||
selection_by_date = _main_port["selection"]
|
||
weights_rp = w_rp.copy()
|
||
print(f"[INFO] Portafoglio rolling calcolato (TopN={TOP_N}, finestra={RANKING_WINDOW_BARS} barre, rp_lookback={RP_LOOKBACK}).")
|
||
checkpoint_post_timer("Portafoglio rolling")
|
||
|
||
# -----------------------------
|
||
# 5.4 Equity line + Heatmap (salva PNG)
|
||
# -----------------------------
|
||
# (DISATTIVATO) Equity line/heatmap teoriche su ret_eq/ret_rp
|
||
# eq_eq, eq_rp = map(equity_from_returns, [ret_eq, ret_rp])
|
||
# plt.figure(figsize=(10,6))
|
||
# plt.plot(eq_eq, label="Equal Weight")
|
||
# plt.plot(eq_rp, label="Risk Parity")
|
||
# plt.legend(); plt.grid(); plt.title("Equity line - Selezione dinamica (Top N)")
|
||
# plt.tight_layout()
|
||
# savefig_safe(str(PLOT_DIR / "equity_line_portafogli.png"), dpi=150)
|
||
# for name, r, path in [
|
||
# ("Equal Weight", ret_eq, PLOT_DIR / "heatmap_equal_weight.png"),
|
||
# ("Risk Parity", ret_rp, PLOT_DIR / "heatmap_risk_parity.png"),
|
||
# ]:
|
||
# m = portfolio_metrics(r)
|
||
# print(f"{name:22s} → CAGR {m['CAGR']*100:5.2f}% | Vol {m['Vol']*100:5.2f}% | Sharpe {m['Sharpe'] if m['Sharpe']==m['Sharpe'] else float('nan'):4.2f} | MaxDD {m['MaxDD']*100:5.2f}%")
|
||
# plot_heatmap_monthly(r, f"Heatmap mensile – {name}", save_path=path)
|
||
|
||
# =============================
|
||
# 5.4bis Composizione nel tempo (ATTIVI vs CASH)
|
||
# =============================
|
||
import os
|
||
import numpy as np
|
||
import matplotlib.pyplot as plt
|
||
|
||
def plot_portfolio_composition_fixed(weights: pd.DataFrame,
|
||
title: str,
|
||
save_path: str | None = None,
|
||
max_legend: int = 20):
|
||
"""
|
||
Stacked area dei pesi nel tempo.
|
||
'weights' deve essere già quello ATTIVO (già mascherato con i Signal)
|
||
e può includere una colonna 'Cash'.
|
||
- NON ri-normalizza le righe: mostra la quota di Cash reale.
|
||
- Se vuoi forzare sempre 100% investito, passa prima da renorm_to_1=True in make_active_weights.
|
||
"""
|
||
if weights is None or getattr(weights, "empty", True):
|
||
print(f"[SKIP] Nessun peso per: {title}")
|
||
return
|
||
|
||
W = weights.copy().apply(pd.to_numeric, errors="coerce").fillna(0.0)
|
||
if W.index.has_duplicates:
|
||
W = W[~W.index.duplicated(keep="last")]
|
||
W = W.sort_index()
|
||
|
||
# drop colonne sempre zero
|
||
keep_cols = [c for c in W.columns if float(np.abs(W[c]).sum()) > 0.0]
|
||
if not keep_cols:
|
||
print(f"[SKIP] Tutti i pesi sono zero per: {title}")
|
||
return
|
||
W = W[keep_cols]
|
||
|
||
if len(W.index) < 2:
|
||
print(f"[SKIP] Serie troppo corta per: {title}")
|
||
return
|
||
|
||
# Ordina per peso medio (Cash in coda così resta sopra nello stack)
|
||
avg_w = W.mean(0).sort_values(ascending=False)
|
||
ordered = avg_w.index.tolist()
|
||
if "Cash" in ordered:
|
||
ordered = [c for c in ordered if c != "Cash"] + ["Cash"]
|
||
|
||
# Raggruppa coda lunga in "Altri"
|
||
if len(ordered) > max_legend:
|
||
head = ordered[:max_legend]
|
||
# Garantisce che 'Cash' resti in legenda anche oltre il cap
|
||
if "Cash" not in head and "Cash" in ordered:
|
||
head = head[:-1] + ["Cash"]
|
||
tail = [c for c in ordered if c not in head]
|
||
W_show = W[head].copy()
|
||
if tail:
|
||
W_show["Altri"] = W[tail].sum(1)
|
||
ordered = head + ["Altri"]
|
||
else:
|
||
ordered = head
|
||
else:
|
||
W_show = W[ordered].copy()
|
||
|
||
cmap = plt.colormaps.get_cmap("tab20")
|
||
colors = [cmap(i % cmap.N) for i in range(len(ordered))]
|
||
|
||
fig, ax = plt.subplots(figsize=(11, 6))
|
||
ax.stackplot(W_show.index, [W_show[c].values for c in ordered], labels=ordered, colors=colors)
|
||
ax.set_title(f"Composizione portafoglio nel tempo - {title}")
|
||
# limite Y = somma massima osservata (<= 1 se pesi + Cash corretti)
|
||
ymax = float(np.nanmax(W_show.sum(1).values))
|
||
if not np.isfinite(ymax) or ymax <= 0:
|
||
ymax = 1.0
|
||
ax.set_ylim(0, max(1.0, ymax))
|
||
ax.grid(True, alpha=0.3)
|
||
ax.set_ylabel("Peso")
|
||
ax.set_yticks(ax.get_yticks())
|
||
ax.set_yticklabels([f"{y*100:.0f}%" for y in ax.get_yticks()])
|
||
|
||
ncol = 2 if len(ordered) > 10 else 1
|
||
ax.legend(loc="upper left", bbox_to_anchor=(1.01, 1), frameon=False, ncol=ncol, title="ISIN")
|
||
fig.tight_layout()
|
||
|
||
if save_path:
|
||
folder = os.path.dirname(save_path) or "."
|
||
try:
|
||
os.makedirs(folder, exist_ok=True)
|
||
except Exception as e:
|
||
print(f"[WARN] mkdir '{folder}': {e} -> salvo in cwd")
|
||
save_path = os.path.basename(save_path)
|
||
fig.savefig(save_path, dpi=150, bbox_inches="tight")
|
||
print(f"[INFO] Salvato: {os.path.abspath(save_path)}")
|
||
|
||
# Plot salvato senza visualizzazione interattiva
|
||
|
||
# --- 1) Pesi teorici dei portafogli (già costruiti sopra) ---
|
||
# w_eq : equal weight su 'cols'
|
||
# w_rp : risk parity (weights_rp)
|
||
|
||
def _sanitize_weights(W: pd.DataFrame, index_like: pd.Index) -> pd.DataFrame:
|
||
if W is None or W.empty:
|
||
return pd.DataFrame(index=index_like, columns=[])
|
||
W = W.apply(pd.to_numeric, errors="coerce").fillna(0.0)
|
||
# normalizziamo a somma 1 (pesi TEORICI)
|
||
rs = W.sum(1).replace(0, np.nan)
|
||
return W.div(rs, axis=0).fillna(0.0).clip(lower=0.0)
|
||
|
||
# ricostruisco coerentemente nel caso non fossero gia definiti
|
||
if 'w_eq' not in globals():
|
||
w_eq = pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns)
|
||
if 'w_rp' not in globals():
|
||
w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns)
|
||
|
||
w_eq = _sanitize_weights(w_eq, wide_pnl.index)
|
||
w_rp = _sanitize_weights(w_rp, wide_pnl.index)
|
||
|
||
# --- 2) Pesi ATTIVI (mascherati con i Signal) ---
|
||
# renorm_to_1=False → lascia la quota NON investita in 'Cash'
|
||
w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
|
||
w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
|
||
|
||
# Export pesi giornalieri (Equal/Risk Parity) con cash normalizzato a 100%
|
||
def _export_weights_daily(w_eq_act_df: pd.DataFrame, w_rp_act_df: pd.DataFrame, path=WEIGHTS_DAILY_XLSX):
|
||
try:
|
||
def _prep(df: pd.DataFrame) -> pd.DataFrame:
|
||
if df is None or df.empty:
|
||
return pd.DataFrame()
|
||
out = df.copy()
|
||
if "Cash" not in out.columns:
|
||
out["Cash"] = 0.0
|
||
out = out.apply(pd.to_numeric, errors="coerce").fillna(0.0)
|
||
if out.index.has_duplicates:
|
||
out = out[~out.index.duplicated(keep="last")]
|
||
out = out.sort_index()
|
||
row_sum = out.sum(axis=1).replace(0, np.nan)
|
||
out = out.div(row_sum, axis=0).fillna(0.0)
|
||
cols = [c for c in out.columns if c != "Cash"] + ["Cash"]
|
||
return out[cols]
|
||
|
||
w_eq_x = _prep(w_eq_act_df)
|
||
w_rp_x = _prep(w_rp_act_df)
|
||
if w_eq_x.empty and w_rp_x.empty:
|
||
print("[INFO] Nessun peso da esportare (weights_daily.xlsx non creato).")
|
||
return
|
||
with pd.ExcelWriter(path) as xw:
|
||
if not w_eq_x.empty:
|
||
w_eq_x.to_excel(xw, "Equal_Weight", index=True)
|
||
if not w_rp_x.empty:
|
||
w_rp_x.to_excel(xw, "Risk_Parity", index=True)
|
||
print(f"[INFO] Salvato: {path}")
|
||
except Exception as e:
|
||
print(f"[WARN] Export weights_daily.xlsx fallito: {e}")
|
||
|
||
_export_weights_daily(w_eq_act, w_rp_act, path=WEIGHTS_DAILY_XLSX)
|
||
|
||
# --- 3) Plot + salvataggio ---
|
||
plot_portfolio_composition_fixed(w_eq_act, "Equal Weight (attivi + Cash)", str(PLOT_DIR / "composition_equal_weight_active.png"))
|
||
plot_portfolio_composition_fixed(w_rp_act, "Risk Parity (attivi + Cash)", str(PLOT_DIR / "composition_risk_parity_active.png"))
|
||
checkpoint_post_timer("Pesi/plot portafogli")
|
||
|
||
|
||
# -----------------------------
|
||
# 5.5 Report trades — SOLO LONG
|
||
# -----------------------------
|
||
def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFrame, name: str) -> tuple[pd.DataFrame, pd.DataFrame]:
|
||
"""
|
||
Report trade SOLO LONG coerente EOD:
|
||
- segnale laggato di 1 barra (apertura dal giorno successivo al primo 1)
|
||
- PnL allineato al giorno di esposizione: r = pnl.shift(-1)
|
||
- chiusura al primo 0 (CloseDate = dt), e a fine serie CloseDate = ultimo + BDay(1)
|
||
- Duration_bars = numero di barre accumulate nel PnL del trade
|
||
Ritorna (df_trades, df_daily) dove df_daily contiene PnL giorno per giorno per ogni trade valido.
|
||
"""
|
||
from pandas.tseries.offsets import BDay
|
||
|
||
# Allinea indici
|
||
common_idx = sig.index.intersection(pnl.index)
|
||
if not weights.empty:
|
||
common_idx = common_idx.intersection(weights.index)
|
||
sig = sig.loc[common_idx].copy()
|
||
pnl = pnl.loc[common_idx].copy()
|
||
weights = weights.loc[common_idx].copy() if not weights.empty else pd.DataFrame(index=common_idx)
|
||
|
||
# Sanitizza
|
||
sig = sig.fillna(0).astype(int).clip(lower=0) # solo long
|
||
pnl = pnl.apply(pd.to_numeric, errors="coerce") # mantieni NaN per buchi open
|
||
|
||
rows = []
|
||
daily_rows = []
|
||
|
||
for isin in sig.columns:
|
||
# 1) Segnale EOD (lag 1)
|
||
s = sig[isin].fillna(0).astype(int).shift(1).fillna(0).astype(int)
|
||
|
||
# 2) PnL allineato al giorno di esposizione (EOD): usa pnl.shift(-1)
|
||
r = pnl[isin].shift(1)
|
||
|
||
# 3) Pesi (se disponibili)
|
||
w = (weights[isin].fillna(0.0) if (isin in weights.columns) else pd.Series(0.0, index=s.index))
|
||
|
||
in_pos, start, acc, acc_dates = False, None, [], []
|
||
|
||
for dt in s.index:
|
||
sig_t = int(s.at[dt])
|
||
|
||
# CHIUSURA: primo 0 dopo un periodo in posizione → chiudi oggi (dt)
|
||
if in_pos and (sig_t == 0):
|
||
if any(pd.isna(acc)):
|
||
print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{dt.date()}")
|
||
else:
|
||
pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0
|
||
w_start = float(w.get(start, 0.0))
|
||
rows.append(dict(
|
||
Strategy=name,
|
||
ISIN=isin,
|
||
OpenDate=start,
|
||
CloseDate=dt,
|
||
Direction="long",
|
||
Size=w_start,
|
||
Duration_bars=len(acc),
|
||
**{"PnL_%": pnl_val * 100.0}
|
||
))
|
||
# salva contributo giornaliero reale (senza spalmare)
|
||
for dd, rv in zip(acc_dates, acc):
|
||
daily_rows.append({
|
||
"Strategy": name,
|
||
"ISIN": isin,
|
||
"Date": dd,
|
||
"Size": w_start,
|
||
"PnL_day": float(rv)
|
||
})
|
||
in_pos, start, acc = False, None, []
|
||
acc_dates = []
|
||
|
||
# APERTURA: primo 1 (laggato) quando non in posizione
|
||
if (not in_pos) and (sig_t == 1):
|
||
in_pos, start, acc, acc_dates = True, dt, [], []
|
||
|
||
# ACCUMULO: PnL del giorno di esposizione
|
||
if in_pos:
|
||
acc.append(r.at[dt])
|
||
acc_dates.append(dt)
|
||
|
||
# CHIUSURA A FINE SERIE → prossimo business day
|
||
if in_pos:
|
||
close_dt = s.index[-1] + BDay(1)
|
||
if any(pd.isna(acc)):
|
||
print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{close_dt.date()}")
|
||
else:
|
||
pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0
|
||
w_start = float(w.get(start, 0.0))
|
||
rows.append(dict(
|
||
Strategy=name,
|
||
ISIN=isin,
|
||
OpenDate=start,
|
||
CloseDate=close_dt,
|
||
Direction="long",
|
||
Size=w_start,
|
||
Duration_bars=len(acc),
|
||
**{"PnL_%": pnl_val * 100.0}
|
||
))
|
||
for dd, rv in zip(acc_dates, acc):
|
||
daily_rows.append({
|
||
"Strategy": name,
|
||
"ISIN": isin,
|
||
"Date": dd,
|
||
"Size": w_start,
|
||
"PnL_day": float(rv)
|
||
})
|
||
|
||
# Ordina colonne
|
||
cols = ["Strategy","ISIN","OpenDate","CloseDate","Direction","Size","Duration_bars","PnL_%"]
|
||
out = pd.DataFrame(rows)
|
||
out = out[[c for c in cols if c in out.columns] + [c for c in out.columns if c not in cols]]
|
||
daily_df = pd.DataFrame(daily_rows)
|
||
if not daily_df.empty:
|
||
daily_df["Date"] = pd.to_datetime(daily_df["Date"])
|
||
return out, daily_df
|
||
|
||
# Colonne asset effettivamente usate nel portafoglio principale
|
||
asset_cols = [c for c in w_eq.columns if float(pd.to_numeric(w_eq[c], errors="coerce").abs().sum()) > 0.0]
|
||
if not asset_cols:
|
||
asset_cols = list(wide_pnl.columns)
|
||
|
||
rep_eq, daily_eq = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]],
|
||
wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]],
|
||
w_eq_act, "Equal Weight")
|
||
rep_rp, daily_rp = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]],
|
||
wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]],
|
||
w_rp_act, "Risk Parity")
|
||
|
||
with pd.ExcelWriter(TRADES_REPORT_XLSX) as xw:
|
||
rep_eq.to_excel(xw, "Equal_Weight", index=False)
|
||
rep_rp.to_excel(xw, "Risk_Parity", index=False)
|
||
checkpoint_post_timer("Report trades")
|
||
|
||
# Performance attribution per ISIN
|
||
def _build_performance_attribution(trades_df: pd.DataFrame, meta_df: pd.DataFrame | None) -> pd.DataFrame:
|
||
if trades_df is None or trades_df.empty:
|
||
return pd.DataFrame(columns=["ISIN","Nome","Tot_Trades","Positivi","Negativi","Positivi_%","Negativi_%","PnL_Cum_%"])
|
||
df = trades_df.copy()
|
||
df["PnL_%"] = pd.to_numeric(df["PnL_%"], errors="coerce")
|
||
rows = []
|
||
for isin, g in df.groupby("ISIN"):
|
||
tot = len(g)
|
||
pos = int((g["PnL_%"] > 0).sum())
|
||
neg = int((g["PnL_%"] < 0).sum())
|
||
rows.append({
|
||
"ISIN": str(isin),
|
||
"Tot_Trades": tot,
|
||
"Positivi": pos,
|
||
"Negativi": neg,
|
||
"Positivi_%": (pos / tot * 100.0) if tot > 0 else np.nan,
|
||
"Negativi_%": (neg / tot * 100.0) if tot > 0 else np.nan,
|
||
"PnL_Cum_%": float(g["PnL_%"].sum())
|
||
})
|
||
out = pd.DataFrame(rows)
|
||
if meta_df is not None and "ISIN" in meta_df.columns:
|
||
meta_cols = [c for c in ["ISIN","Nome","Descrizione","Categoria","Asset Class"] if c in meta_df.columns]
|
||
if meta_cols:
|
||
out = out.merge(meta_df[meta_cols].drop_duplicates("ISIN"), on="ISIN", how="left")
|
||
# ordina colonne con Nome subito dopo ISIN
|
||
cols = [c for c in ["ISIN","Nome"] if c in out.columns] + [c for c in out.columns if c not in ["ISIN","Nome"]]
|
||
return out[cols]
|
||
|
||
perf_attr_df = _build_performance_attribution(pd.concat([rep_eq, rep_rp], ignore_index=True), df_sum if 'df_sum' in globals() else None)
|
||
perf_attr_df.to_excel(PERF_ATTRIB_XLSX, index=False)
|
||
print(f"[INFO] Performance attribution salvata in {PERF_ATTRIB_XLSX}")
|
||
|
||
print(f"[INFO] Report trades salvato in {TRADES_REPORT_XLSX}")
|
||
# ============================================================
|
||
# 5.6 Rebuild DAILY PnL from trades_report (calendarized)
|
||
# → per rendere coerente il compounding dei trade con equity/heatmap
|
||
# ============================================================
|
||
import pandas as pd
|
||
import numpy as np
|
||
|
||
def rebuild_daily_from_trades_dict(trades_dict):
|
||
"""
|
||
trades_dict: {'Equal_Weight': df, 'Risk_Parity': df}
|
||
Ogni df deve avere: OpenDate, CloseDate, Size, Duration_bars, PnL_%
|
||
Regola: distribuiamo il PnL del trade su ciascun giorno di durata con
|
||
un rendimento giornaliero costante r tale che (1+r)^D - 1 = PnL.
|
||
I giorni attivi sono [OpenDate, CloseDate) sul calendario business.
|
||
Il contributo del trade ogni giorno è Size * r.
|
||
Ritorna: DataFrame con indice Date (business days) e colonne per strategia.
|
||
"""
|
||
# Costruisci indice calendario minimo→massimo
|
||
min_dt, max_dt = None, None
|
||
for df in trades_dict.values():
|
||
if df is None or df.empty:
|
||
continue
|
||
df = df.copy()
|
||
df["OpenDate"] = pd.to_datetime(df["OpenDate"])
|
||
df["CloseDate"] = pd.to_datetime(df["CloseDate"])
|
||
lo, hi = df["OpenDate"].min(), df["CloseDate"].max()
|
||
if pd.notna(lo): min_dt = lo if (min_dt is None or lo < min_dt) else min_dt
|
||
if pd.notna(hi): max_dt = hi if (max_dt is None or hi > max_dt) else max_dt
|
||
if min_dt is None or max_dt is None:
|
||
return pd.DataFrame()
|
||
|
||
cal = pd.bdate_range(start=min_dt, end=max_dt, freq="C") # business calendar
|
||
daily = pd.DataFrame(0.0, index=cal, columns=list(trades_dict.keys()))
|
||
|
||
for strat, df in trades_dict.items():
|
||
if df is None or df.empty:
|
||
continue
|
||
d = df.copy()
|
||
d["OpenDate"] = pd.to_datetime(d["OpenDate"])
|
||
d["CloseDate"] = pd.to_datetime(d["CloseDate"])
|
||
d["Size"] = pd.to_numeric(d.get("Size", 0.0), errors="coerce").fillna(0.0)
|
||
d["Duration_bars"] = pd.to_numeric(d.get("Duration_bars", 0), errors="coerce").fillna(0).astype(int)
|
||
d["PnL_%"] = pd.to_numeric(d.get("PnL_%", 0.0), errors="coerce").fillna(0.0)
|
||
|
||
# Costruisci serie giornaliera sommando i contributi dei singoli trade
|
||
s = pd.Series(0.0, index=cal)
|
||
for _, row in d.iterrows():
|
||
D = int(row["Duration_bars"]) if pd.notna(row["Duration_bars"]) else 0
|
||
if D <= 0: # trade di durata nulla → salta
|
||
continue
|
||
pnl = float(row["PnL_%"]) / 100.0
|
||
try:
|
||
r_daily = (1.0 + pnl) ** (1.0 / D) - 1.0
|
||
except Exception:
|
||
r_daily = pnl / max(1, D)
|
||
size = float(row["Size"]) if pd.notna(row["Size"]) else 0.0
|
||
|
||
# Giorni attivi: [OpenDate, CloseDate) sul calendario business
|
||
rng = pd.bdate_range(start=row["OpenDate"], end=row["CloseDate"] - pd.tseries.offsets.BDay(1), freq="C")
|
||
if len(rng) != D:
|
||
# se per qualche motivo differisce (festivi ecc.) ricalibra usando la lunghezza effettiva
|
||
if len(rng) <= 0:
|
||
continue
|
||
try:
|
||
r_daily = (1.0 + pnl) ** (1.0 / len(rng)) - 1.0
|
||
except Exception:
|
||
r_daily = pnl / len(rng)
|
||
s[rng] = s[rng] + size * r_daily
|
||
daily[strat] = s
|
||
|
||
return daily
|
||
|
||
# Costruisci il dict dai DataFrame di trade appena creati
|
||
trades_dict = {
|
||
"Equal_Weight": rep_eq if 'rep_eq' in globals() else pd.DataFrame(),
|
||
"Risk_Parity": rep_rp if 'rep_rp' in globals() else pd.DataFrame(),
|
||
}
|
||
|
||
# Ricostruzione daily dai PnL giornalieri reali dei trade (senza spalmare)
|
||
daily_detail = pd.concat([daily_eq, daily_rp], ignore_index=True)
|
||
if not daily_detail.empty:
|
||
daily_detail["Date"] = pd.to_datetime(daily_detail["Date"])
|
||
daily_detail["PnL_day"] = pd.to_numeric(daily_detail["PnL_day"], errors="coerce")
|
||
daily_detail["Size"] = pd.to_numeric(daily_detail["Size"], errors="coerce").fillna(0.0)
|
||
daily_detail = daily_detail.dropna(subset=["Date", "PnL_day"])
|
||
daily_detail["Pnl_contrib"] = daily_detail["PnL_day"] * daily_detail["Size"]
|
||
daily_from_trades = (
|
||
daily_detail.pivot_table(index="Date", columns="Strategy", values="Pnl_contrib", aggfunc="sum")
|
||
.sort_index()
|
||
.fillna(0.0)
|
||
)
|
||
else:
|
||
daily_from_trades = pd.DataFrame()
|
||
|
||
# Salva su disco (CSV + XLSX) per ispezione
|
||
if not daily_from_trades.empty:
|
||
daily_from_trades.to_csv(DAILY_FROM_TRADES_CSV, index_label="Date")
|
||
|
||
# Plot equity & heatmap basati sui DAILY da trade (coerenti col compounding)
|
||
import matplotlib.pyplot as plt
|
||
|
||
col_map = [
|
||
("Equal Weight", ["Equal Weight", "Equal_Weight"]),
|
||
("Risk Parity", ["Risk Parity", "Risk_Parity"]),
|
||
]
|
||
|
||
def _find_col(df, aliases):
|
||
for c in aliases:
|
||
if c in df.columns:
|
||
return c
|
||
return None
|
||
|
||
fig, ax = plt.subplots(figsize=(10, 6))
|
||
for lab, aliases in col_map:
|
||
col = _find_col(daily_from_trades, aliases)
|
||
if col:
|
||
eq = (1.0 + daily_from_trades[col].fillna(0.0)).cumprod() * 100
|
||
eq.plot(ax=ax, label=lab)
|
||
ax.legend()
|
||
ax.grid(True)
|
||
ax.set_title("Equity line ricostruita dai trades (calendarizzata)")
|
||
fig.tight_layout()
|
||
fig.savefig(PLOT_DIR / "equity_from_trades.png", dpi=150)
|
||
plt.close(fig)
|
||
print(f"[INFO] Salvato: {PLOT_DIR / 'equity_from_trades.png'}")
|
||
|
||
# Heatmap per ciascuna strategia
|
||
for lab, aliases, fname in [
|
||
("Equal Weight", ["Equal Weight", "Equal_Weight"], "heatmap_equal_from_trades.png"),
|
||
("Risk Parity", ["Risk Parity", "Risk_Parity"], "heatmap_rp_from_trades.png"),
|
||
]:
|
||
col = _find_col(daily_from_trades, aliases)
|
||
if col:
|
||
try:
|
||
plot_heatmap_monthly(
|
||
daily_from_trades[col],
|
||
f"Heatmap mensile - {lab} (da trades)",
|
||
save_path=PLOT_DIR / fname,
|
||
)
|
||
except Exception as e:
|
||
print(f"[WARN] Heatmap {lab} da trades: {e}")
|
||
else:
|
||
print("[INFO] daily_from_trades risulta vuoto: nessun plot/CSV generato.")
|
||
|
||
checkpoint_post_timer("Ricostruzione daily/plot")
|
||
|
||
|
||
# ============================================================
|
||
# METRICS UTILS (guard) — richieste da _calc_all_metrics_...
|
||
# ============================================================
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
# r2 della equity line vs trend line
|
||
if "r2_equity_line" not in globals():
|
||
def r2_equity_line(returns: pd.Series) -> float:
|
||
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
|
||
if r.size == 0:
|
||
return np.nan
|
||
eq = (1.0 + r).cumprod()
|
||
t = np.arange(eq.size, dtype=float)
|
||
X = np.vstack([t, np.ones_like(t)]).T
|
||
beta, alpha = np.linalg.lstsq(X, eq.values, rcond=None)[0]
|
||
yhat = alpha + beta * t
|
||
ss_res = np.sum((eq.values - yhat) ** 2)
|
||
ss_tot = np.sum((eq.values - eq.values.mean()) ** 2)
|
||
return float(1 - ss_res/ss_tot) if ss_tot > 0 else np.nan
|
||
|
||
# metriche drawdown: MaxDD, durata massima, time-to-recovery
|
||
if "drawdown_metrics" not in globals():
|
||
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
|
||
r = pd.to_numeric(returns, errors="coerce").fillna(0.0)
|
||
if r.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
eq = (1.0 + r).cumprod()
|
||
peak = eq.cummax()
|
||
dd = eq / peak - 1.0
|
||
maxdd = float(dd.min()) if dd.size else np.nan
|
||
|
||
# durata massima di drawdown (numero di barre consecutive sotto il picco)
|
||
dd_mask = dd < 0
|
||
max_dur, cur = 0, 0
|
||
for flag in dd_mask:
|
||
cur = cur + 1 if flag else 0
|
||
if cur > max_dur:
|
||
max_dur = cur
|
||
|
||
# time-to-recovery dal minimo assoluto
|
||
ttr = np.nan
|
||
if dd.size:
|
||
idx_min = dd.idxmin()
|
||
sub_eq = eq.loc[idx_min:]
|
||
rec_idx = sub_eq[sub_eq >= peak.loc[idx_min]].index.min()
|
||
if pd.notna(rec_idx):
|
||
# se indice datetime, durata in giorni; se indice numerico, in barre
|
||
if isinstance(rec_idx, pd.Timestamp):
|
||
ttr = (rec_idx - idx_min).days
|
||
else:
|
||
ttr = int(sub_eq.index.get_loc(rec_idx))
|
||
else:
|
||
ttr = sentinel_ttr
|
||
return maxdd, int(max_dur), ttr
|
||
|
||
# AAW/AUW/Heal Index (versione validata che mi hai passato)
|
||
if "heal_index_metrics" not in globals():
|
||
def heal_index_metrics(returns: pd.Series):
|
||
s = returns.fillna(0.0).astype(float)
|
||
if s.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
equity = (1.0 + s).cumprod()
|
||
if equity.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
|
||
run_max = equity.cummax()
|
||
dd = equity / run_max - 1.0
|
||
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
|
||
|
||
run_min = equity.cummin()
|
||
ru = equity / run_min - 1.0
|
||
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
|
||
|
||
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
|
||
return AAW, AUW, heal
|
||
|
||
# H_min (100% finestre positive) in giorni/mesi (versione validata)
|
||
if "h_min_100" not in globals():
|
||
def h_min_100(returns: pd.Series, month_len: int = 21):
|
||
s = returns.dropna().astype(float)
|
||
n = s.size
|
||
if n == 0:
|
||
return np.nan, np.nan
|
||
log1p = np.log1p(s.values)
|
||
csum = np.cumsum(log1p)
|
||
|
||
def rolling_sum_k(k: int):
|
||
if k > n:
|
||
return np.array([])
|
||
head = csum[k - 1:]
|
||
tail = np.concatenate(([0.0], csum[:-k]))
|
||
return head - tail
|
||
|
||
for k in range(1, n + 1):
|
||
rs = rolling_sum_k(k)
|
||
if rs.size == 0:
|
||
break
|
||
roll_ret = np.exp(rs) - 1.0
|
||
if np.all(roll_ret >= 0):
|
||
h_days = k
|
||
h_months = int(np.ceil(h_days / month_len))
|
||
return h_days, h_months
|
||
return np.nan, np.nan
|
||
|
||
# costante se non già definita
|
||
try:
|
||
DAYS_PER_YEAR
|
||
except NameError:
|
||
DAYS_PER_YEAR = 252
|
||
|
||
# ========= FUNZIONE RICHIESTA DAL LOOP =========
|
||
def _calc_all_metrics_from_returns(r: pd.Series) -> dict:
|
||
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
|
||
n = len(r)
|
||
if n == 0:
|
||
return {k: np.nan for k in [
|
||
"Rendimento_Ann","Volatilita_Ann","CAGR","R2_Equity",
|
||
"MaxDD","DD_Duration_Max","TTR_from_MDD","AAW","AUW","Heal_Index","H_min_100m_5Y"
|
||
]}
|
||
|
||
rendimento_ann = float(r.mean() * DAYS_PER_YEAR)
|
||
volatilita_ann = float(r.std(ddof=1) * np.sqrt(DAYS_PER_YEAR)) if n > 1 else np.nan
|
||
|
||
eq = (1.0 + r).cumprod()
|
||
years_elapsed = n / DAYS_PER_YEAR if n > 0 else np.nan
|
||
if years_elapsed and years_elapsed > 0 and eq.iloc[0] > 0:
|
||
cagr = float(eq.iloc[-1] ** (1.0 / years_elapsed) - 1.0)
|
||
else:
|
||
cagr = np.nan
|
||
|
||
r2 = r2_equity_line(r)
|
||
maxdd, dddur, ttr = drawdown_metrics(r, sentinel_ttr=1250)
|
||
aaw, auw, heal = heal_index_metrics(r)
|
||
|
||
# 5 anni o meno se la serie è più corta
|
||
lookback = min(n, DAYS_PER_YEAR * 5)
|
||
r5 = r.iloc[-lookback:]
|
||
_, hmin_months = h_min_100(r5, month_len=21)
|
||
|
||
return {
|
||
"Rendimento_Ann": rendimento_ann,
|
||
"Volatilita_Ann": volatilita_ann,
|
||
"CAGR": cagr,
|
||
"R2_Equity": r2,
|
||
"MaxDD": maxdd,
|
||
"DD_Duration_Max": dddur,
|
||
"TTR_from_MDD": ttr,
|
||
"AAW": aaw,
|
||
"AUW": auw,
|
||
"Heal_Index": heal,
|
||
"H_min_100m_5Y": hmin_months
|
||
}
|
||
|
||
|
||
# ======================================================================
|
||
# 6) LOOP Top-N: metriche per N = 6..20 → final_metrics.xlsx
|
||
# ======================================================================
|
||
|
||
# Safety: DAYS_PER_YEAR se non definito
|
||
try:
|
||
DAYS_PER_YEAR
|
||
except NameError:
|
||
DAYS_PER_YEAR = 252
|
||
|
||
def _select_isins_for_topN(df_sum: pd.DataFrame, top_n: int):
|
||
"""Seleziona i migliori 'top_n' ISIN in base allo Score."""
|
||
df_sum_loc = df_sum.copy()
|
||
base_isins_N = (
|
||
df_sum_loc
|
||
.sort_values("Score", ascending=False)
|
||
.head(top_n)["ISIN"].astype(str).str.strip().tolist()
|
||
)
|
||
return base_isins_N
|
||
|
||
def _build_portfolio_returns_for_isins(base_isins_N, wide_pnl):
|
||
"""
|
||
Costruisce i rendimenti di portafoglio Equal Weight e Risk Parity
|
||
per l'insieme di ISIN in base_isins_N.
|
||
|
||
Ritorna:
|
||
ret_eq_N : pd.Series
|
||
ret_rp_N : pd.Series
|
||
"""
|
||
|
||
# Colonne effettivamente disponibili
|
||
colsN = [c for c in base_isins_N if c in wide_pnl.columns]
|
||
|
||
if len(colsN) == 0:
|
||
# Nessun ISIN valido → portafogli in cash (linea piatta)
|
||
idx = wide_pnl.index
|
||
ret_eq_N = pd.Series(0.0, index=idx, name="Ret_EqW_N")
|
||
ret_rp_N = pd.Series(0.0, index=idx, name="Ret_RP_N")
|
||
return ret_eq_N, ret_rp_N
|
||
|
||
# -------- Equal Weight --------
|
||
ret_eq_N = wide_pnl[colsN].mean(axis=1)
|
||
|
||
# -------- Risk Parity con cap --------
|
||
weights_rp_N = inverse_vol_weights(
|
||
wide_pnl[colsN],
|
||
window=60,
|
||
max_weight=RP_MAX_WEIGHT # es. RP_MAX_WEIGHT = 2 / TOP_N_MAX = 0.1333
|
||
)
|
||
ret_rp_N = (wide_pnl[colsN] * weights_rp_N).sum(axis=1)
|
||
|
||
return ret_eq_N, ret_rp_N
|
||
|
||
# ==============================
|
||
# Metriche portafoglio (TOP_N corrente) → Excel
|
||
# ==============================
|
||
metrics_rows = []
|
||
for strategy_name, rser in [
|
||
("Equal_Weight", ret_eq),
|
||
("Risk_Parity", ret_rp),
|
||
]:
|
||
m = _calc_all_metrics_from_returns(rser)
|
||
m["TopN"] = TOP_N
|
||
m["Strategy"] = strategy_name
|
||
metrics_rows.append(m)
|
||
|
||
df_metrics = pd.DataFrame(metrics_rows)[[
|
||
"TopN", "Strategy",
|
||
"Rendimento_Ann", "Volatilita_Ann", "CAGR", "R2_Equity",
|
||
"MaxDD", "DD_Duration_Max", "TTR_from_MDD",
|
||
"AAW", "AUW", "Heal_Index", "H_min_100m_5Y",
|
||
]]
|
||
|
||
try:
|
||
with pd.ExcelWriter(FINAL_METRICS_XLSX, engine="openpyxl", mode="a", if_sheet_exists="replace") as xw:
|
||
df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False)
|
||
except Exception:
|
||
with pd.ExcelWriter(FINAL_METRICS_XLSX) as xw:
|
||
df_metrics.to_excel(xw, sheet_name="Portfolio_Metrics", index=False)
|
||
print(f"[INFO] Salvato: {FINAL_METRICS_XLSX} (Portfolio_Metrics)")
|