Aggancio serie storiche ETFP-XPAR-XAMS

This commit is contained in:
fredmaloggia
2025-12-04 23:16:28 +01:00
parent e36f3e1577
commit 134d5879f8
2 changed files with 362 additions and 113 deletions

View File

@@ -23,6 +23,7 @@ import matplotlib.pyplot as plt
from pathlib import Path
import json
import ssl
import re
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
@@ -96,19 +97,38 @@ def _apply_score(df_sum: pd.DataFrame) -> pd.DataFrame:
df_sum["Score_mode"] = "degenerate_equal"
return df_sum
res = calibrate_score_weights(
df_sum,
metrics_map=mm,
target_col=None
)
X_ranked = res["X_ranked"]
w = res["weights"]
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
df_sum["Score_mode"] = res["mode"]
print("Pesi stimati automaticamente (metriche usate):")
print("Disponibilita' metriche (righe non-NaN):",
{c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]})
print(w)
# Se sono definiti pesi fissi in config, usali; altrimenti calibra automaticamente
use_fixed = False
if SCORE_WEIGHTS:
weights_raw = {k: float(v) for k, v in SCORE_WEIGHTS.items() if k in df_sum.columns}
weights_raw = {k: v for k, v in weights_raw.items() if df_sum[k].notna().sum() > 0}
if weights_raw:
use_fixed = True
w = pd.Series(weights_raw)
w = w / w.sum()
X_ranked = df_sum[w.index].rank(pct=True)
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
df_sum["Score_mode"] = "fixed_weights"
if SCORE_VERBOSE:
print("Pesi fissi (config):", w.to_dict())
else:
print("[WARN] score_weights in config non compatibili con le metriche disponibili. Uso calibrazione automatica.")
if not use_fixed:
res = calibrate_score_weights(
df_sum,
metrics_map=mm,
target_col=None
)
X_ranked = res["X_ranked"]
w = res["weights"]
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
df_sum["Score_mode"] = res["mode"]
if SCORE_VERBOSE:
print("Pesi stimati automaticamente (metriche usate):")
print("Disponibilita' metriche (righe non-NaN):",
{c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]})
print(w)
return df_sum
# =============================
@@ -123,13 +143,141 @@ def _build_symbol_euronext(row: pd.Series) -> tuple[str, str]:
return base, tok
if isin and venue:
return base, f"{isin}-{venue}"
if isin:
return base, f"{isin}-ETFP" # fallback generico per endpoint history
return base, isin
def fetch_price_history(isins, universe: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame:
"""
Scarica la serie storica open/close per una lista di ISIN usando l'endpoint storico.
- API chiamata 1 ISIN alla volta: https://fin.scorer.app/finance/etf-inv/history/{ticker}?fromDate=YYYYMMDD&toDate=YYYYMMDD
- Caching locale su CSV per ridurre le richieste; se l'API fallisce, tenta di usare la cache.
- Fallback mercati: ETFP → XPAR → XAMS. Se si estende una serie con un altro mercato,
la giunta avviene solo se il prezzo all'ultimo punto del segmento precedente e al primo del successivo
differisce < 2% (per evitare salti di valuta/quotazione).
Ritorna DataFrame con colonne: Date (datetime), ISIN, Open, Close.
"""
start_dt = pd.to_datetime(start_date).date()
end_dt = pd.to_datetime(end_date).date()
def _symbol_cache_path(symbol: str) -> Path:
safe = re.sub(r"[^A-Za-z0-9_-]+", "_", str(symbol))
return OPEN_CACHE_DIR / f"{safe}.csv"
def _load_cache(path: Path) -> pd.DataFrame | None:
try:
if path.exists():
dfc = pd.read_csv(path, parse_dates=["Date"])
dfc["ISIN"] = dfc["ISIN"].astype(str)
return dfc
except Exception as e:
print(f"[WARN] Cache prezzi corrotta {path}: {e}")
return None
def _normalize_payload_to_df(payload, isin):
# Il nuovo endpoint ritorna [{"ticker": "...", "data": [ {...}, ... ]}]
data_block = payload
if isinstance(payload, list) and payload:
if isinstance(payload[0], dict) and "data" in payload[0]:
data_block = payload[0].get("data", [])
else:
data_block = payload
if isinstance(payload, dict) and "data" in payload:
data_block = payload.get("data", [])
rows = []
for d in data_block or []:
dt_raw = d.get("date") or d.get("Date") or d.get("data") or d.get("timestamp")
if dt_raw is None:
continue
try:
if isinstance(dt_raw, (int, float)):
dt_parsed = pd.to_datetime(int(dt_raw), unit="ms").tz_localize(None)
else:
dt_parsed = pd.to_datetime(dt_raw).tz_localize(None)
except Exception:
continue
rows.append({
"Date": dt_parsed,
"ISIN": str(isin),
"Open": _to_float_safe(d.get("open")),
"Close": _to_float_safe(d.get("close") or d.get("last"))
})
return pd.DataFrame(rows) if rows else pd.DataFrame(columns=["Date","ISIN","Open","Close"])
def _fetch_symbol(symbol: str, isin: str):
url = f"{OPEN_PRICE_BASE_URL}/{symbol}?fromDate={start_dt.strftime('%Y%m%d')}&toDate={end_dt.strftime('%Y%m%d')}"
cache_path = _symbol_cache_path(symbol)
cache_df = _load_cache(cache_path)
df_api = pd.DataFrame()
ok = False
for attempt in range(1, OPEN_MAX_RETRY + 1):
try:
with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp:
payload = json.loads(resp.read().decode("utf-8"))
df_api = _normalize_payload_to_df(payload, isin)
if df_api.empty:
print(f"[WARN] Nessun dato per {symbol}")
ok = True
break
except (HTTPError, URLError, ssl.SSLError, json.JSONDecodeError) as e:
if attempt < OPEN_MAX_RETRY:
print(f"[WARN] Download {symbol} tentativo {attempt}/{OPEN_MAX_RETRY} fallito: {e}. Retry in {OPEN_SLEEP_SEC}s")
time.sleep(OPEN_SLEEP_SEC)
else:
print(f"[ERROR] Download {symbol} fallito: {e}")
df_use = pd.DataFrame()
if ok and not df_api.empty:
df_api = df_api.sort_values("Date")
if cache_df is not None and not cache_df.empty:
df_use = (
pd.concat([cache_df, df_api], ignore_index=True)
.drop_duplicates(subset=["Date"])
.sort_values("Date")
)
else:
df_use = df_api
try:
OPEN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
df_use.to_csv(cache_path, index=False)
except Exception as e:
print(f"[WARN] Salvataggio cache prezzi fallito ({cache_path}): {e}")
elif cache_df is not None and not cache_df.empty:
df_use = cache_df
print(f"[INFO] Uso cache prezzi per {symbol} (API indisponibile).")
return df_use
def _merge_with_check(df_base: pd.DataFrame, df_add: pd.DataFrame, label_prev: str, label_next: str):
"""
Estende df_base aggiungendo il tratto df_add antecedente al primo punto di df_base.
Controlla il salto di prezzo all'incrocio: se > 2%, non fonde e avvisa.
"""
if df_base is None or df_base.empty:
return df_add, False
if df_add is None or df_add.empty:
return df_base, False
cutoff = df_base["Date"].min()
prev_part = df_add[df_add["Date"] < cutoff]
if prev_part.empty:
return df_base, False
merged = pd.concat([prev_part, df_base], ignore_index=True)
merged = merged.sort_values("Date").drop_duplicates(subset=["Date"], keep="last")
# controllo salto: ultimo prezzo del segmento precedente vs primo del successivo
prev_last = prev_part.sort_values("Date").iloc[-1]
next_first = df_base[df_base["Date"] >= cutoff].sort_values("Date").iloc[0]
def _price(row):
return _to_float_safe(row.get("Close")) if pd.notna(row.get("Close")) else _to_float_safe(row.get("Open"))
p_prev = _price(prev_last)
p_next = _price(next_first)
if p_prev is None or p_next is None or not np.isfinite(p_prev) or not np.isfinite(p_next) or p_next == 0:
return merged, True
gap = abs(p_prev - p_next) / abs(p_next)
if gap > 0.02:
print(f"[WARN] Salto prezzo >2% tra {label_prev} e {label_next} su {prev_last['Date'].date()}{next_first['Date'].date()} (gap {gap:.2%}). Fallback non applicato.")
return df_base, False
return merged, True
records = []
for i, isin in enumerate(isins, 1):
try:
@@ -138,43 +286,60 @@ def fetch_price_history(isins, universe: pd.DataFrame, start_date: str, end_date
print(f"[WARN] ISIN {isin} non trovato nell'universo.")
continue
base, symbol = _build_symbol_euronext(row)
url = f"{base}/{symbol}?fromDate={start_date}&toDate={end_date}"
ok = False
for attempt in range(1, OPEN_MAX_RETRY + 1):
try:
with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp:
data = json.loads(resp.read().decode("utf-8"))
if not isinstance(data, list) or not data:
print(f"[WARN] Nessun dato per {symbol}")
break
for d in data:
dt_str = d.get("date") or d.get("Date") or d.get("data")
if not dt_str:
continue
try:
dt_parsed = pd.to_datetime(dt_str).tz_localize(None)
except Exception:
continue
records.append({
"Date": dt_parsed,
"ISIN": str(isin),
"Open": _to_float_safe(d.get("open")),
"Close": _to_float_safe(d.get("close"))
})
ok = True
break
except (HTTPError, URLError, ssl.SSLError) as e:
if attempt < OPEN_MAX_RETRY:
print(f"[WARN] Download {symbol} tentativo {attempt}/{OPEN_MAX_RETRY} fallito: {e}. Retry in {OPEN_SLEEP_SEC}s")
time.sleep(OPEN_SLEEP_SEC)
df_primary = _fetch_symbol(symbol, isin)
# Fallback mercati aggiuntivi (XPAR, poi XAMS) per estendere indietro la serie
fallback_symbols = []
if "-" in symbol:
root = symbol.rsplit("-", 1)[0]
fallback_symbols.append(f"{root}-XPAR")
fallback_symbols.append(f"{root}-XAMS")
else:
fallback_symbols.append(f"{symbol}-XPAR")
fallback_symbols.append(f"{symbol}-XAMS")
df_use = df_primary
applied_any = False
for fb_sym in fallback_symbols:
# servono solo se la serie non parte da start_dt
need_fb = df_use.empty or (df_use["Date"].min().date() > start_dt)
if not need_fb:
continue
df_fb = _fetch_symbol(fb_sym, isin)
if df_fb.empty:
print(f"[WARN] Fallback {fb_sym} assente per {isin}")
continue
if df_use.empty:
df_use = df_fb
applied_any = True
print(f"[INFO] Uso fallback {fb_sym} per tutto il periodo.")
else:
merged, merged_ok = _merge_with_check(df_use, df_fb, fb_sym, symbol)
if merged_ok:
df_use = merged
applied_any = True
cutoff = df_use["Date"].min()
print(f"[INFO] Serie estesa con {fb_sym} fino a {cutoff.date()} per {isin}")
else:
print(f"[ERROR] Download {symbol} fallito: {e}")
if not ok:
print(f"[WARN] Fallback {fb_sym} scartato per gap >2% su {isin}")
if df_use.empty:
print(f"[WARN] Serie open/close non disponibile per {isin}")
continue
# Filtro range richiesto
df_use["Date"] = pd.to_datetime(df_use["Date"])
mask = (df_use["Date"].dt.date >= start_dt) & (df_use["Date"].dt.date <= end_dt)
df_use = df_use.loc[mask]
if df_use.empty:
print(f"[WARN] Nessun dato nel range richiesto per {symbol}")
continue
records.append(df_use)
if not records:
return pd.DataFrame(columns=["Date","ISIN","Open","Close"])
df_px = pd.DataFrame(records)
df_px = df_px.sort_values(["ISIN","Date"])
df_px = pd.concat(records, ignore_index=True)
df_px = df_px.sort_values(["ISIN","Date"]).reset_index(drop=True)
return df_px
def _to_float_safe(x):
@@ -223,10 +388,12 @@ N_BARS = int(require_value(DB_CONFIG, "n_bars", "db"))
PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db"))
RANKING_WINDOW_BARS = int(RANKING_CONFIG.get("rolling_window_bars", N_BARS))
RP_LOOKBACK = int(SIGNALS_CONFIG.get("risk_parity_lookback", 60))
OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get("base_url", "https://fin.scorer.app/finance/euronext/price"))
OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get("base_url", "https://fin.scorer.app/finance/etf-inv/history"))
OPEN_MAX_RETRY = int(PRICES_CONFIG.get("max_retry", 3))
OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1))
OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10))
OPEN_CACHE_DIR = Path(PRICES_CONFIG.get("cache_dir", OUTPUT_DIR / "price_cache"))
RECOMPUTE_PORTF_FROM_OPEN = bool(PRICES_CONFIG.get("recompute_portfolio_open", False))
# Pattern-matching (iper-parametri)
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) # lunghezza finestra pattern (barre)
@@ -250,6 +417,8 @@ if RP_MAX_WEIGHT is None:
RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1)
else:
RP_MAX_WEIGHT = float(RP_MAX_WEIGHT)
SCORE_VERBOSE = bool(RANKING_CONFIG.get("score_verbose", False))
SCORE_WEIGHTS = RANKING_CONFIG.get("score_weights")
HURST_MIN_LENGTH = int(HURST_CONFIG.get("min_length", 200))
HURST_WIN_GRID = HURST_CONFIG.get("win_grid")
HURST_MIN_SEGMENTS = int(HURST_CONFIG.get("min_segments", 1))
@@ -277,6 +446,26 @@ def format_eta(seconds):
return f"{hours}h {minutes:02d}m {secs:02d}s"
return f"{minutes}m {secs:02d}s"
# Timer helper per fasi post-backtest
_post_timer = {"t0": None, "tprev": None, "total": None, "done": 0}
def start_post_timer(total_steps: int):
_post_timer["t0"] = time.perf_counter()
_post_timer["tprev"] = _post_timer["t0"]
_post_timer["total"] = total_steps
_post_timer["done"] = 0
def checkpoint_post_timer(label: str):
if _post_timer["t0"] is None or _post_timer["total"] is None:
return
_post_timer["done"] += 1
now = time.perf_counter()
step_dt = now - _post_timer["tprev"]
total_dt = now - _post_timer["t0"]
avg = total_dt / max(_post_timer["done"], 1)
eta = avg * max(_post_timer["total"] - _post_timer["done"], 0)
print(f"[TIMER] post { _post_timer['done']}/{_post_timer['total']} {label} — step {step_dt:.2f}s, total {total_dt:.2f}s, ETA {format_eta(eta)}")
_post_timer["tprev"] = now
# ================= HURST (sui RENDIMENTI) =================
def hurst_rs_returns(r, win_grid=None, min_seg=None):
r = pd.Series(r).dropna().astype("float64").values
@@ -746,13 +935,14 @@ def drawdown_stats_simple(ret_series: pd.Series):
}
def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret: str,
Wp: int, Ha: int, k: int,
theta_entry: float,
fee_bps: float = 10,
# --- EXIT PARAMS (tutte opzionali) ---
sl_bps: float | None = 300.0, # Stop loss assoluto (bps sul PnL cumulato del trade)
tp_bps: float | None = 800.0, # Take profit assoluto (bps)
trail_bps: float | None = 300.0, # Trailing stop (drawdown dal picco, bps)
Wp: int, Ha: int, k: int,
theta_entry: float,
exec_ret: pd.Series | None = None,
fee_bps: float = 10,
# --- EXIT PARAMS (tutte opzionali) ---
sl_bps: float | None = 300.0, # Stop loss assoluto (bps sul PnL cumulato del trade)
tp_bps: float | None = 800.0, # Take profit assoluto (bps)
trail_bps: float | None = 300.0, # Trailing stop (drawdown dal picco, bps)
time_stop_bars: int | None = 20, # Massimo holding
theta_exit: float | None = 0.0, # esci se est_out <= theta_exit (se None, ignora)
weak_days_exit: int | None = None # esci se per N giorni est_out <= theta_exit
@@ -763,8 +953,17 @@ def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret
Nota: usa solo dati daily → le soglie sono valutate a fine giornata,
l'uscita avviene sulla barra successiva (modello prudente).
"""
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0 # rendimenti in decimali
r = pd.to_numeric(df_isin[col_ret], errors="coerce").astype(float) / 100.0 # rendimenti in decimali (close/close)
idx = df_isin[col_date] if col_date in df_isin.columns else pd.RangeIndex(len(r))
if exec_ret is not None:
r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float)
if not r_exec.index.equals(idx):
r_exec = r_exec.reindex(idx)
if len(r_exec) != len(r):
# riallinea sullo stesso index; se mancano date, restano NaN
r_exec = pd.Series(r_exec.values, index=idx).reindex(idx)
else:
r_exec = r
fee = fee_bps / 10000.0
# helper per costruire libreria solo passato
@@ -794,7 +993,7 @@ def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret
if past.dropna().shape[0] < (Wp + Ha):
sig_out, est_out, avg_dist = 0, np.nan, np.nan
# PnL a t+1 sempre riportato in colonna Ret+1
rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r.iloc[t+1]))
rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan))
continue
win_last = r.iloc[t-Wp:t].values
@@ -818,7 +1017,7 @@ def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret
# 1) aggiorna PnL del trade con il rendimento della barra che verrà *incassato* domani:
# Per coerenza EOD, PnL di oggi (da riportare) è su r[t+1] quando Signal(t)=1.
# Per controlli di stop a fine giornata, stimiamo la "pnl se restassi" accumulando r[t+1] ex-ante.
next_ret = r.iloc[t+1] # rendimento che si applicherà se resto in posizione
next_ret = r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan # rendimento che si applicherà se resto in posizione
pnl_if_stay = (1.0 + trade_pnl) * (1.0 + next_ret) - 1.0
# 2) aggiorna trailing peak ipotetico
@@ -871,7 +1070,7 @@ def knn_forward_backtest_one_asset(df_isin: pd.DataFrame, col_date: str, col_ret
trade_peak = peak_if_stay
# Registra la riga odierna; il PnL riportato è sempre il r[t+1]
rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r.iloc[t+1]))
rows.append((idx.iloc[t], sig_out, est_out, avg_dist, r_exec.iloc[t+1] if t+1 < len(r_exec) else np.nan))
sig_df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"])
@@ -938,6 +1137,29 @@ for i, isin in enumerate(isins, 1):
errors.append({"ISIN": isin, "Errore": f"Serie troppo corta (BT) ({df_isin[col_ret].dropna().shape[0]} punti)"})
continue
# --- Fetch open/close per calcolare rendimenti di esecuzione (open->open) ---
try:
date_min = df_isin[col_date].min().date() if col_date else None
date_max = df_isin[col_date].max().date() if col_date else None
if date_min and date_max:
px_hist_one = fetch_price_history(
isins=[isin],
universe=meta_df if 'meta_df' in globals() else pd.DataFrame(),
start_date=date_min.isoformat(),
end_date=date_max.isoformat()
)
px_hist_one = px_hist_one.sort_values("Date")
open_series = px_hist_one[["Date","Open"]].dropna()
open_series = open_series.drop_duplicates(subset=["Date"]).set_index("Date")["Open"]
open_ret = open_series.pct_change()
# riallinea sulla stessa sequenza di date del df_isin
exec_ret = open_ret.reindex(pd.to_datetime(df_isin[col_date]))
else:
exec_ret = None
except Exception as e:
print(f"[WARN] Fetch open/close fallito per {isin}: {e}")
exec_ret = None
# ============================
# THETA = HURST IN PERCENTUALE
# H = 0.50 -> theta_entry = 0.005 (0.5%)
@@ -957,6 +1179,7 @@ for i, isin in enumerate(isins, 1):
Ha=HA,
k=KNN_K,
theta_entry=theta_entry,
exec_ret=exec_ret,
fee_bps=10,
)
@@ -1015,6 +1238,9 @@ if errors:
pd.DataFrame(errors).to_csv(ERROR_LOG_CSV, index=False)
print(f" Log errori aggiornato: {ERROR_LOG_CSV} (tot: {len(errors)})")
# Timer per fasi post-backtest (sezione 5 in poi)
start_post_timer(total_steps=4)
# ======================================================================
# 5) STRATEGIE PORTAFOGLIO DINAMICHE + EQUITY + HEATMAP + TRADE REPORT
# ======================================================================
@@ -1085,13 +1311,14 @@ def plot_heatmap_monthly(r: pd.Series, title: str, save_path: str = None):
plt.tight_layout()
if save_path:
savefig_safe(save_path, dpi=150)
plt.close(fig)
# Non mostrare il plot durante l'esecuzione
def inverse_vol_weights(df, window=60, max_weight=None):
vol = df.rolling(window).std()
inv = 1 / vol.replace(0, np.nan)
w = inv.div(inv.sum(axis=1), axis=0)
w = w.fillna(method="ffill").fillna(1 / max(1, df.shape[1]))
w = w.ffill().fillna(1 / max(1, df.shape[1]))
if max_weight is not None:
w = w.clip(upper=max_weight)
@@ -1385,32 +1612,34 @@ wide_sig = (
.astype(int)
)
# Sostituisce i PnL close->close con rendimenti open->open usando l'API storica
try:
date_min = (bt["Date"].min() - pd.Timedelta(days=5)).date()
date_max = (bt["Date"].max() + pd.Timedelta(days=5)).date()
px_hist = fetch_price_history(
isins=bt["ISIN"].unique(),
universe=meta_df if 'meta_df' in globals() else pd.DataFrame(),
start_date=date_min.isoformat(),
end_date=date_max.isoformat()
)
open_pivot = (
px_hist.pivot(index="Date", columns="ISIN", values="Open")
.sort_index()
)
open_ret = open_pivot.pct_change()
wide_pnl = open_ret.shift(-1) # segnale su giorno t, esecuzione a open t+1
common_idx = wide_sig.index.intersection(wide_pnl.index)
# forza DatetimeIndex per sicurezza
common_idx = pd.to_datetime(common_idx)
wide_sig = wide_sig.reindex(common_idx).fillna(0).astype(int)
wide_pnl = wide_pnl.reindex(common_idx).fillna(0.0)
wide_sig.index = pd.to_datetime(wide_sig.index)
wide_pnl.index = pd.to_datetime(wide_pnl.index)
print(f"[INFO] PnL ricostruito su open->open per {len(open_pivot.columns)} ISIN.")
except Exception as e:
print(f"[WARN] Ricostruzione PnL open->open fallita, uso PnL originale: {e}")
# (Opzionale) ricostruzione PnL portafoglio con open->open: disattivata di default perché il PnL
# viene già calcolato a livello di singolo asset usando gli open.
if globals().get("RECOMPUTE_PORTF_FROM_OPEN", False):
try:
date_min = (bt["Date"].min() - pd.Timedelta(days=5)).date()
date_max = (bt["Date"].max() + pd.Timedelta(days=5)).date()
px_hist = fetch_price_history(
isins=bt["ISIN"].unique(),
universe=meta_df if 'meta_df' in globals() else pd.DataFrame(),
start_date=date_min.isoformat(),
end_date=date_max.isoformat()
)
open_pivot = (
px_hist.pivot(index="Date", columns="ISIN", values="Open")
.sort_index()
)
open_ret = open_pivot.pct_change()
# segnale su giorno t, esecuzione a open t+1
wide_pnl = wide_sig * open_ret.shift(-1)
common_idx = wide_sig.index.intersection(wide_pnl.index)
common_idx = pd.to_datetime(common_idx)
wide_sig = wide_sig.reindex(common_idx).fillna(0).astype(int)
wide_pnl = wide_pnl.reindex(common_idx).fillna(0.0)
wide_sig.index = pd.to_datetime(wide_sig.index)
wide_pnl.index = pd.to_datetime(wide_pnl.index)
print(f"[INFO] PnL ricostruito su open->open per {len(open_pivot.columns)} ISIN.")
except Exception as e:
print(f"[WARN] Ricostruzione PnL open->open fallita, uso PnL originale: {e}")
# I portafogli verranno costruiti piu' sotto con ranking rolling (vedi _build_dynamic_portfolio_returns).
@@ -1664,6 +1893,7 @@ w_rp_act = _main_port["w_rp_act"]
selection_by_date = _main_port["selection"]
weights_rp = w_rp.copy()
print(f"[INFO] Portafoglio rolling calcolato (TopN={TOP_N}, finestra={RANKING_WINDOW_BARS} barre, rp_lookback={RP_LOOKBACK}).")
checkpoint_post_timer("Portafoglio rolling")
# -----------------------------
# 5.4 Equity line + Heatmap (salva PNG)
@@ -1759,6 +1989,7 @@ def plot_portfolio_composition_fixed(weights: pd.DataFrame,
ax.set_ylim(0, max(1.0, ymax))
ax.grid(True, alpha=0.3)
ax.set_ylabel("Peso")
ax.set_yticks(ax.get_yticks())
ax.set_yticklabels([f"{y*100:.0f}%" for y in ax.get_yticks()])
ncol = 2 if len(ordered) > 10 else 1
@@ -1811,6 +2042,7 @@ w_agg_act = make_active_weights(w_agg, wide_sig, renorm_to_1=False, add_cash=Tru
# --- 3) Plot + salvataggio ---
plot_portfolio_composition_fixed(w_eq_act, "Equal Weight (attivi + Cash)", str(PLOT_DIR / "composition_equal_weight_active.png"))
plot_portfolio_composition_fixed(w_rp_act, "Risk Parity (attivi + Cash)", str(PLOT_DIR / "composition_risk_parity_active.png"))
checkpoint_post_timer("Pesi/plot portafogli")
# -----------------------------
@@ -1836,7 +2068,7 @@ def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFra
# Sanitizza
sig = sig.fillna(0).astype(int).clip(lower=0) # solo long
pnl = pnl.apply(pd.to_numeric, errors="coerce").fillna(0.0)
pnl = pnl.apply(pd.to_numeric, errors="coerce") # mantieni NaN per buchi open
rows = []
@@ -1845,7 +2077,7 @@ def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFra
s = sig[isin].fillna(0).astype(int).shift(1).fillna(0).astype(int)
# 2) PnL allineato al giorno di esposizione (EOD): usa pnl.shift(-1)
r = pnl[isin].shift(1).fillna(0.0)
r = pnl[isin].shift(1)
# 3) Pesi (se disponibili)
w = (weights[isin].fillna(0.0) if (isin in weights.columns) else pd.Series(0.0, index=s.index))
@@ -1857,17 +2089,20 @@ def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFra
# CHIUSURA: primo 0 dopo un periodo in posizione → chiudi oggi (dt)
if in_pos and (sig_t == 0):
pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0
rows.append(dict(
Strategy=name,
ISIN=isin,
OpenDate=start,
CloseDate=dt,
Direction="long",
Size=float(w.get(start, 0.0)),
Duration_bars=len(acc),
**{"PnL_%": pnl_val * 100.0}
))
if any(pd.isna(acc)):
print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{dt.date()}")
else:
pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0
rows.append(dict(
Strategy=name,
ISIN=isin,
OpenDate=start,
CloseDate=dt,
Direction="long",
Size=float(w.get(start, 0.0)),
Duration_bars=len(acc),
**{"PnL_%": pnl_val * 100.0}
))
in_pos, start, acc = False, None, []
# APERTURA: primo 1 (laggato) quando non in posizione
@@ -1876,22 +2111,25 @@ def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFra
# ACCUMULO: PnL del giorno di esposizione
if in_pos:
acc.append(float(r.at[dt]))
acc.append(r.at[dt])
# CHIUSURA A FINE SERIE → prossimo business day
if in_pos:
pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0
close_dt = s.index[-1] + BDay(1)
rows.append(dict(
Strategy=name,
ISIN=isin,
OpenDate=start,
CloseDate=close_dt,
Direction="long",
Size=float(w.get(start, 0.0)),
Duration_bars=len(acc),
**{"PnL_%": pnl_val * 100.0}
))
if any(pd.isna(acc)):
print(f"[WARN] Trade derubricato {name} {isin}: open/close price mancante nel range {start.date()}-{close_dt.date()}")
else:
pnl_val = np.prod([1.0 + x for x in acc]) - 1.0 if acc else 0.0
rows.append(dict(
Strategy=name,
ISIN=isin,
OpenDate=start,
CloseDate=close_dt,
Direction="long",
Size=float(w.get(start, 0.0)),
Duration_bars=len(acc),
**{"PnL_%": pnl_val * 100.0}
))
# Ordina colonne
cols = ["Strategy","ISIN","OpenDate","CloseDate","Direction","Size","Duration_bars","PnL_%"]
@@ -1914,6 +2152,7 @@ rep_rp = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.co
with pd.ExcelWriter(TRADES_REPORT_XLSX) as xw:
rep_eq.to_excel(xw, "Equal_Weight", index=False)
rep_rp.to_excel(xw, "Risk_Parity", index=False)
checkpoint_post_timer("Report trades")
# Performance attribution per ISIN
def _build_performance_attribution(trades_df: pd.DataFrame, meta_df: pd.DataFrame | None) -> pd.DataFrame:
@@ -2068,6 +2307,8 @@ if not daily_from_trades.empty:
else:
print("[INFO] daily_from_trades risulta vuoto: nessun plot/CSV generato.")
checkpoint_post_timer("Ricostruzione daily/plot")
# ============================================================
# METRICS UTILS (guard) — richieste da _calc_all_metrics_...

View File

@@ -18,7 +18,13 @@
},
"ranking": {
"top_n_max": 15,
"rp_max_weight": 0.1333333333
"rp_max_weight": 0.1333333333,
"score_verbose": false,
"score_weights": {
"Sharpe": 0.4,
"CAGR_%": 0.4,
"MaxDD_%eq": 0.2
}
},
"signals": {
"sl_bps": 300.0,
@@ -66,10 +72,12 @@
"min_segments": 2
},
"prices": {
"base_url": "https://fin.scorer.app/finance/euronext/price",
"base_url": "https://fin.scorer.app/finance/etf-inv/history",
"max_retry": 3,
"sleep_sec": 0.1,
"timeout": 10
"timeout": 10,
"cache_dir": "output/price_cache",
"recompute_portfolio_open": false
},
"run": {
"business_days_only": true,