inserita chiamata prezzi open
This commit is contained in:
@@ -21,6 +21,10 @@ import sqlalchemy as sa
|
||||
from sqlalchemy import text
|
||||
import matplotlib.pyplot as plt
|
||||
from pathlib import Path
|
||||
import json
|
||||
import ssl
|
||||
from urllib.request import urlopen
|
||||
from urllib.error import URLError, HTTPError
|
||||
|
||||
from shared_utils import (
|
||||
build_pattern_library,
|
||||
@@ -68,6 +72,118 @@ def savefig_safe(path, **kwargs):
|
||||
|
||||
|
||||
|
||||
# Calcolo Score (riusabile anche rolling)
|
||||
def _apply_score(df_sum: pd.DataFrame) -> pd.DataFrame:
|
||||
"""Applica la calibrazione dei pesi su df_sum e aggiunge la colonna Score."""
|
||||
def _available_cols(df, cols):
|
||||
return [c for c in cols if (c in df.columns and df[c].notna().sum() > 0)]
|
||||
|
||||
primary_cols = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
|
||||
alt_cols = [("QualityScore", True), ("Confidence", True), ("OutcomeScore", True)]
|
||||
|
||||
mm = [(c,gh) for (c,gh) in primary_cols if c in df_sum.columns and df_sum[c].notna().sum()>0]
|
||||
if len(mm) < 2:
|
||||
mm = [(c,gh) for (c,gh) in alt_cols if c in df_sum.columns and df_sum[c].notna().sum()>0]
|
||||
|
||||
# Se ancora insufficienti, prova ad allargare al set unito
|
||||
if len(mm) < 2:
|
||||
union_candidates = list({x[0] for x in primary_cols+alt_cols})
|
||||
mm = [(c, True) for c in _available_cols(df_sum, union_candidates)]
|
||||
|
||||
if len(mm) == 0:
|
||||
print("[WARN] Nessuna metrica numerica disponibile: uso Score=0 e ordino per ISIN.")
|
||||
df_sum["Score"] = 0.0
|
||||
df_sum["Score_mode"] = "degenerate_equal"
|
||||
return df_sum
|
||||
|
||||
res = calibrate_score_weights(
|
||||
df_sum,
|
||||
metrics_map=mm,
|
||||
target_col=None
|
||||
)
|
||||
X_ranked = res["X_ranked"]
|
||||
w = res["weights"]
|
||||
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
|
||||
df_sum["Score_mode"] = res["mode"]
|
||||
print("Pesi stimati automaticamente (metriche usate):")
|
||||
print("Disponibilita' metriche (righe non-NaN):",
|
||||
{c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]})
|
||||
print(w)
|
||||
return df_sum
|
||||
|
||||
# =============================
|
||||
# PRICE FETCH (OPEN/CLOSE) - storico
|
||||
# =============================
|
||||
def _build_symbol_euronext(row: pd.Series) -> tuple[str, str]:
|
||||
isin = str(row.get("ISIN", "")).strip()
|
||||
venue = str(row.get("Mercato", "")).strip()
|
||||
tok = str(row.get("TickerOpen", "") or "").strip()
|
||||
base = OPEN_PRICE_BASE_URL
|
||||
if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper():
|
||||
return base, tok
|
||||
if isin and venue:
|
||||
return base, f"{isin}-{venue}"
|
||||
return base, isin
|
||||
|
||||
def fetch_price_history(isins, universe: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame:
|
||||
"""
|
||||
Scarica la serie storica open/close per una lista di ISIN usando l'endpoint storico.
|
||||
Ritorna DataFrame con colonne: Date (datetime), ISIN, Open, Close.
|
||||
"""
|
||||
records = []
|
||||
for i, isin in enumerate(isins, 1):
|
||||
try:
|
||||
row = universe.loc[universe["ISIN"] == str(isin)].iloc[0]
|
||||
except Exception:
|
||||
print(f"[WARN] ISIN {isin} non trovato nell'universo.")
|
||||
continue
|
||||
base, symbol = _build_symbol_euronext(row)
|
||||
url = f"{base}/{symbol}?fromDate={start_date}&toDate={end_date}"
|
||||
ok = False
|
||||
for attempt in range(1, OPEN_MAX_RETRY + 1):
|
||||
try:
|
||||
with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp:
|
||||
data = json.loads(resp.read().decode("utf-8"))
|
||||
if not isinstance(data, list) or not data:
|
||||
print(f"[WARN] Nessun dato per {symbol}")
|
||||
break
|
||||
for d in data:
|
||||
dt_str = d.get("date") or d.get("Date") or d.get("data")
|
||||
if not dt_str:
|
||||
continue
|
||||
try:
|
||||
dt_parsed = pd.to_datetime(dt_str).tz_localize(None)
|
||||
except Exception:
|
||||
continue
|
||||
records.append({
|
||||
"Date": dt_parsed,
|
||||
"ISIN": str(isin),
|
||||
"Open": _to_float_safe(d.get("open")),
|
||||
"Close": _to_float_safe(d.get("close"))
|
||||
})
|
||||
ok = True
|
||||
break
|
||||
except (HTTPError, URLError, ssl.SSLError) as e:
|
||||
if attempt < OPEN_MAX_RETRY:
|
||||
print(f"[WARN] Download {symbol} tentativo {attempt}/{OPEN_MAX_RETRY} fallito: {e}. Retry in {OPEN_SLEEP_SEC}s")
|
||||
time.sleep(OPEN_SLEEP_SEC)
|
||||
else:
|
||||
print(f"[ERROR] Download {symbol} fallito: {e}")
|
||||
if not ok:
|
||||
print(f"[WARN] Serie open/close non disponibile per {isin}")
|
||||
if not records:
|
||||
return pd.DataFrame(columns=["Date","ISIN","Open","Close"])
|
||||
df_px = pd.DataFrame(records)
|
||||
df_px = df_px.sort_values(["ISIN","Date"])
|
||||
return df_px
|
||||
|
||||
def _to_float_safe(x):
|
||||
try:
|
||||
return float(x)
|
||||
except Exception:
|
||||
return np.nan
|
||||
|
||||
# LEGACY: blocco originale mantenuto ma non eseguito (usiamo _apply_score sopra)
|
||||
# =========================================
|
||||
# PARAMETRI GLOBALI
|
||||
# =========================================
|
||||
@@ -79,6 +195,8 @@ RANKING_CONFIG = require_section(CONFIG, "ranking")
|
||||
PATHS_CONFIG = require_section(CONFIG, "paths")
|
||||
HURST_CONFIG = CONFIG.get("hurst", {})
|
||||
RUN_CONFIG = CONFIG.get("run", {})
|
||||
SIGNALS_CONFIG = CONFIG.get("signals", {})
|
||||
PRICES_CONFIG = CONFIG.get("prices", {})
|
||||
|
||||
OUTPUT_DIR = Path(PATHS_CONFIG.get("output_dir", "output"))
|
||||
PLOT_DIR = Path(PATHS_CONFIG.get("plot_dir", "plot"))
|
||||
@@ -94,6 +212,7 @@ ERROR_LOG_CSV = OUTPUT_DIR / "errori_isin.csv"
|
||||
FORWARD_BT_SIGNALS_XLSX = OUTPUT_DIR / "forward_bt_signals.xlsx"
|
||||
FORWARD_BT_SUMMARY_XLSX = OUTPUT_DIR / "forward_bt_summary.xlsx"
|
||||
TRADES_REPORT_XLSX = OUTPUT_DIR / "trades_report.xlsx"
|
||||
PERF_ATTRIB_XLSX = OUTPUT_DIR / "performance_attribution.xlsx"
|
||||
DAILY_FROM_TRADES_CSV = OUTPUT_DIR / "daily_from_trades.csv"
|
||||
DAILY_FROM_TRADES_XLSX = OUTPUT_DIR / "daily_from_trades.xlsx"
|
||||
FINAL_METRICS_XLSX = OUTPUT_DIR / "final_metrics.xlsx"
|
||||
@@ -102,6 +221,12 @@ FINAL_METRICS_XLSX = OUTPUT_DIR / "final_metrics.xlsx"
|
||||
STORED_PROC = str(require_value(DB_CONFIG, "stored_proc", "db"))
|
||||
N_BARS = int(require_value(DB_CONFIG, "n_bars", "db"))
|
||||
PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db"))
|
||||
RANKING_WINDOW_BARS = int(RANKING_CONFIG.get("rolling_window_bars", N_BARS))
|
||||
RP_LOOKBACK = int(SIGNALS_CONFIG.get("risk_parity_lookback", 60))
|
||||
OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get("base_url", "https://fin.scorer.app/finance/euronext/price"))
|
||||
OPEN_MAX_RETRY = int(PRICES_CONFIG.get("max_retry", 3))
|
||||
OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1))
|
||||
OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10))
|
||||
|
||||
# Pattern-matching (iper-parametri)
|
||||
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) # lunghezza finestra pattern (barre)
|
||||
@@ -934,6 +1059,11 @@ def equity_from_returns(r: pd.Series) -> pd.Series:
|
||||
|
||||
def monthly_returns(r: pd.Series) -> pd.Series:
|
||||
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
|
||||
if not isinstance(r.index, (pd.DatetimeIndex, pd.PeriodIndex, pd.TimedeltaIndex)):
|
||||
try:
|
||||
r.index = pd.to_datetime(r.index)
|
||||
except Exception:
|
||||
return pd.Series(dtype=float)
|
||||
return (1 + r).resample("M").prod() - 1
|
||||
|
||||
def plot_heatmap_monthly(r: pd.Series, title: str, save_path: str = None):
|
||||
@@ -1222,43 +1352,7 @@ if "is_crypto" not in df_sum.columns:
|
||||
|
||||
|
||||
|
||||
# Calibra i pesi (senza target supervisionato)
|
||||
# --- DYNAMIC METRICS MAP + CALL -------------------------------------------
|
||||
def _available_cols(df, cols):
|
||||
return [c for c in cols if (c in df.columns and df[c].notna().sum() > 0)]
|
||||
|
||||
primary_cols = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
|
||||
alt_cols = [("QualityScore", True), ("Confidence", True), ("OutcomeScore", True)]
|
||||
|
||||
mm = [(c,gh) for (c,gh) in primary_cols if c in df_sum.columns and df_sum[c].notna().sum()>0]
|
||||
if len(mm) < 2:
|
||||
mm = [(c,gh) for (c,gh) in alt_cols if c in df_sum.columns and df_sum[c].notna().sum()>0]
|
||||
|
||||
# Se ancora insufficienti, prova ad allargare al set unito
|
||||
if len(mm) < 2:
|
||||
union_candidates = list({x[0] for x in primary_cols+alt_cols})
|
||||
mm = [(c, True) for c in _available_cols(df_sum, union_candidates)]
|
||||
|
||||
# Se proprio non abbiamo nulla, mettiamo pesi uguali su tutti gli ISIN (Score costante)
|
||||
if len(mm) == 0:
|
||||
print("[WARN] Nessuna metrica numerica disponibile: uso Score=0 e ordino per ISIN.")
|
||||
df_sum["Score"] = 0.0
|
||||
df_sum["Score_mode"] = "degenerate_equal"
|
||||
else:
|
||||
# Chiama la calibrazione (unsupervised ERC se non hai un target)
|
||||
res = calibrate_score_weights(
|
||||
df_sum,
|
||||
metrics_map=mm,
|
||||
target_col=None # se in futuro aggiungi 'FWD_CAGR_%' etc., metti qui il nome
|
||||
)
|
||||
X_ranked = res["X_ranked"]
|
||||
w = res["weights"]
|
||||
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
|
||||
df_sum["Score_mode"] = res["mode"]
|
||||
print("Pesi stimati automaticamente (metriche usate):")
|
||||
print("Disponibilità metriche (righe non-NaN):",
|
||||
{c: int(df_sum[c].notna().sum()) for c in df_sum.columns if c in ["Sharpe","CAGR_%","MaxDD_%eq","QualityScore","Confidence","OutcomeScore"]})
|
||||
print(w)
|
||||
df_sum = _apply_score(df_sum)
|
||||
|
||||
TOP_N = 15
|
||||
base_isins = (
|
||||
@@ -1270,7 +1364,7 @@ base_isins = (
|
||||
# Nessuna strategia cripto separata: le criptovalute sono trattate come gli altri asset
|
||||
crypto_isin = None
|
||||
|
||||
print(f"🧩 ISIN selezionati dinamicamente ({len(base_isins)}): {base_isins}")
|
||||
print(f"[INFO] Ranking full-sample (solo debug, i portafogli usano ranking rolling): {base_isins}")
|
||||
|
||||
# -----------------------------
|
||||
# 5.3 Costruzione portafogli
|
||||
@@ -1291,27 +1385,34 @@ wide_sig = (
|
||||
.astype(int)
|
||||
)
|
||||
|
||||
# ISIN effettivamente disponibili nel portafoglio
|
||||
cols = [c for c in base_isins if c in wide_pnl.columns]
|
||||
|
||||
if len(cols) == 0:
|
||||
# Nessun ISIN valido → portafogli in cash (ritorni a 0)
|
||||
idx = wide_pnl.index
|
||||
ret_eq = pd.Series(0.0, index=idx, name="Ret_EqW")
|
||||
ret_rp = pd.Series(0.0, index=idx, name="Ret_RP")
|
||||
weights_rp = pd.DataFrame(0.0, index=idx, columns=[])
|
||||
else:
|
||||
# ---------- Equal Weight ----------
|
||||
ret_eq = wide_pnl[cols].mean(axis=1)
|
||||
|
||||
# ---------- Risk Parity con cap ----------
|
||||
# inverse_vol_weights deve accettare il parametro max_weight
|
||||
weights_rp = inverse_vol_weights(
|
||||
wide_pnl[cols],
|
||||
window=60,
|
||||
max_weight=RP_MAX_WEIGHT # es. 2 / TOP_N_MAX = 0.1333
|
||||
# Sostituisce i PnL close->close con rendimenti open->open usando l'API storica
|
||||
try:
|
||||
date_min = (bt["Date"].min() - pd.Timedelta(days=5)).date()
|
||||
date_max = (bt["Date"].max() + pd.Timedelta(days=5)).date()
|
||||
px_hist = fetch_price_history(
|
||||
isins=bt["ISIN"].unique(),
|
||||
universe=meta_df if 'meta_df' in globals() else pd.DataFrame(),
|
||||
start_date=date_min.isoformat(),
|
||||
end_date=date_max.isoformat()
|
||||
)
|
||||
ret_rp = (wide_pnl[cols] * weights_rp).sum(axis=1)
|
||||
open_pivot = (
|
||||
px_hist.pivot(index="Date", columns="ISIN", values="Open")
|
||||
.sort_index()
|
||||
)
|
||||
open_ret = open_pivot.pct_change()
|
||||
wide_pnl = open_ret.shift(-1) # segnale su giorno t, esecuzione a open t+1
|
||||
common_idx = wide_sig.index.intersection(wide_pnl.index)
|
||||
# forza DatetimeIndex per sicurezza
|
||||
common_idx = pd.to_datetime(common_idx)
|
||||
wide_sig = wide_sig.reindex(common_idx).fillna(0).astype(int)
|
||||
wide_pnl = wide_pnl.reindex(common_idx).fillna(0.0)
|
||||
wide_sig.index = pd.to_datetime(wide_sig.index)
|
||||
wide_pnl.index = pd.to_datetime(wide_pnl.index)
|
||||
print(f"[INFO] PnL ricostruito su open->open per {len(open_pivot.columns)} ISIN.")
|
||||
except Exception as e:
|
||||
print(f"[WARN] Ricostruzione PnL open->open fallita, uso PnL originale: {e}")
|
||||
|
||||
# I portafogli verranno costruiti piu' sotto con ranking rolling (vedi _build_dynamic_portfolio_returns).
|
||||
|
||||
def plot_portfolio_composition(weights: pd.DataFrame,
|
||||
title: str,
|
||||
@@ -1457,6 +1558,113 @@ def make_active_weights(w_base: pd.DataFrame,
|
||||
keep = [c for c in W_active.columns if W_active[c].abs().sum() > 0]
|
||||
return W_active[keep]
|
||||
|
||||
# -----------------------------
|
||||
# Portafogli dinamici con ranking rolling
|
||||
# -----------------------------
|
||||
_dynamic_portfolio_cache: dict[int, dict] = {}
|
||||
|
||||
def _build_dynamic_portfolio_returns(
|
||||
wide_pnl: pd.DataFrame,
|
||||
wide_sig: pd.DataFrame,
|
||||
top_n: int,
|
||||
window_bars: int = RANKING_WINDOW_BARS,
|
||||
rp_lookback: int = RP_LOOKBACK
|
||||
) -> dict:
|
||||
if wide_pnl is None or wide_pnl.empty:
|
||||
idx = pd.Index([])
|
||||
empty_w = pd.DataFrame(index=idx, columns=[])
|
||||
return {
|
||||
"ret_eq": pd.Series(dtype=float),
|
||||
"ret_rp": pd.Series(dtype=float),
|
||||
"w_eq": empty_w,
|
||||
"w_rp": empty_w,
|
||||
"w_eq_act": empty_w,
|
||||
"w_rp_act": empty_w,
|
||||
"selection": {}
|
||||
}
|
||||
|
||||
dates = wide_pnl.index.sort_values()
|
||||
all_cols = wide_pnl.columns.tolist()
|
||||
|
||||
w_eq = pd.DataFrame(0.0, index=dates, columns=all_cols)
|
||||
w_rp = pd.DataFrame(0.0, index=dates, columns=all_cols)
|
||||
selection = {}
|
||||
|
||||
for dt in dates:
|
||||
window_df = wide_pnl.loc[:dt].tail(window_bars)
|
||||
metrics_rows = []
|
||||
for c in all_cols:
|
||||
s = pd.to_numeric(window_df[c], errors="coerce").dropna()
|
||||
if s.empty:
|
||||
continue
|
||||
stats = drawdown_stats_simple(s)
|
||||
stats["ISIN"] = str(c)
|
||||
metrics_rows.append(stats)
|
||||
|
||||
if not metrics_rows:
|
||||
selection[dt] = []
|
||||
continue
|
||||
|
||||
df_window = pd.DataFrame(metrics_rows)
|
||||
df_window = _apply_score(df_window)
|
||||
base_isins_dt = (
|
||||
df_window.sort_values("Score", ascending=False)
|
||||
.head(top_n)["ISIN"].astype(str).str.strip().tolist()
|
||||
)
|
||||
selection[dt] = base_isins_dt
|
||||
if not base_isins_dt:
|
||||
continue
|
||||
|
||||
w_eq.loc[dt, base_isins_dt] = 1 / len(base_isins_dt)
|
||||
|
||||
rp_hist = window_df[base_isins_dt]
|
||||
rp_w = inverse_vol_weights(rp_hist, window=rp_lookback, max_weight=RP_MAX_WEIGHT)
|
||||
if not rp_w.empty:
|
||||
last = rp_w.iloc[-1].fillna(0.0)
|
||||
last_sum = float(last.sum())
|
||||
if last_sum > 0:
|
||||
last = last / last_sum
|
||||
w_rp.loc[dt, last.index] = last.values
|
||||
|
||||
w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
|
||||
w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
|
||||
|
||||
ret_eq = (wide_pnl * w_eq_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
|
||||
ret_rp = (wide_pnl * w_rp_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
|
||||
|
||||
return {
|
||||
"ret_eq": ret_eq,
|
||||
"ret_rp": ret_rp,
|
||||
"w_eq": w_eq,
|
||||
"w_rp": w_rp,
|
||||
"w_eq_act": w_eq_act,
|
||||
"w_rp_act": w_rp_act,
|
||||
"selection": selection
|
||||
}
|
||||
|
||||
def _get_dynamic_portfolio(top_n: int) -> dict:
|
||||
if top_n not in _dynamic_portfolio_cache:
|
||||
_dynamic_portfolio_cache[top_n] = _build_dynamic_portfolio_returns(
|
||||
wide_pnl=wide_pnl,
|
||||
wide_sig=wide_sig,
|
||||
top_n=top_n,
|
||||
window_bars=RANKING_WINDOW_BARS,
|
||||
rp_lookback=RP_LOOKBACK
|
||||
)
|
||||
return _dynamic_portfolio_cache[top_n]
|
||||
|
||||
# Portafoglio principale (Top_N di default) calcolato in modo rolling
|
||||
_main_port = _get_dynamic_portfolio(TOP_N)
|
||||
ret_eq = _main_port["ret_eq"]
|
||||
ret_rp = _main_port["ret_rp"]
|
||||
w_eq = _main_port["w_eq"]
|
||||
w_rp = _main_port["w_rp"]
|
||||
w_eq_act = _main_port["w_eq_act"]
|
||||
w_rp_act = _main_port["w_rp_act"]
|
||||
selection_by_date = _main_port["selection"]
|
||||
weights_rp = w_rp.copy()
|
||||
print(f"[INFO] Portafoglio rolling calcolato (TopN={TOP_N}, finestra={RANKING_WINDOW_BARS} barre, rp_lookback={RP_LOOKBACK}).")
|
||||
|
||||
# -----------------------------
|
||||
# 5.4 Equity line + Heatmap (salva PNG)
|
||||
# -----------------------------
|
||||
@@ -1582,23 +1790,13 @@ def _sanitize_weights(W: pd.DataFrame, index_like: pd.Index) -> pd.DataFrame:
|
||||
rs = W.sum(1).replace(0, np.nan)
|
||||
return W.div(rs, axis=0).fillna(0.0).clip(lower=0.0)
|
||||
|
||||
# ricostruisco coerentemente nel caso non fossero già definiti
|
||||
# ricostruisco coerentemente nel caso non fossero gia definiti
|
||||
if 'w_eq' not in globals():
|
||||
if len(cols) > 0:
|
||||
w_eq = pd.DataFrame(1/len(cols), index=wide_pnl.index, columns=cols)
|
||||
else:
|
||||
w_eq = pd.DataFrame(index=wide_pnl.index, columns=[])
|
||||
w_eq = pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns)
|
||||
if 'w_rp' not in globals():
|
||||
w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=[])
|
||||
w_rp = weights_rp.copy() if isinstance(weights_rp, pd.DataFrame) else pd.DataFrame(index=wide_pnl.index, columns=wide_pnl.columns)
|
||||
if 'w_agg' not in globals():
|
||||
if (len(cols)>0) and (crypto_isin is not None) and (crypto_isin in wide_pnl.columns):
|
||||
cols_agg = cols + [crypto_isin]
|
||||
w_agg = pd.DataFrame(0.0, index=wide_pnl.index, columns=cols_agg)
|
||||
w_agg[cols] = 0.85/len(cols)
|
||||
w_agg[crypto_isin] = 0.15
|
||||
else:
|
||||
w_agg = (pd.DataFrame(1/len(cols), index=wide_pnl.index, columns=cols)
|
||||
if len(cols)>0 else pd.DataFrame(index=wide_pnl.index, columns=[]))
|
||||
w_agg = w_eq.copy()
|
||||
|
||||
w_eq = _sanitize_weights(w_eq, wide_pnl.index)
|
||||
w_rp = _sanitize_weights(w_rp, wide_pnl.index)
|
||||
@@ -1701,33 +1899,55 @@ def make_trades_report(sig: pd.DataFrame, pnl: pd.DataFrame, weights: pd.DataFra
|
||||
out = out[[c for c in cols if c in out.columns] + [c for c in out.columns if c not in cols]]
|
||||
return out
|
||||
|
||||
# Colonne asset effettivamente usate nel portafoglio principale
|
||||
asset_cols = [c for c in w_eq.columns if float(pd.to_numeric(w_eq[c], errors="coerce").abs().sum()) > 0.0]
|
||||
if not asset_cols:
|
||||
asset_cols = list(wide_pnl.columns)
|
||||
|
||||
if len(cols) > 0:
|
||||
w_eq = pd.DataFrame(1/len(cols), index=wide_pnl.index, columns=cols)
|
||||
w_rp = weights_rp.copy()
|
||||
if crypto_isin and crypto_isin in wide_pnl.columns:
|
||||
cols_agg = cols + [crypto_isin]
|
||||
w_agg = pd.DataFrame(0.0, index=wide_pnl.index, columns=cols_agg)
|
||||
w_agg[cols] = 0.85/len(cols)
|
||||
w_agg[crypto_isin] = 0.15
|
||||
else:
|
||||
cols_agg, w_agg = cols, w_eq.copy()
|
||||
else:
|
||||
w_eq = pd.DataFrame(index=wide_pnl.index, columns=[])
|
||||
w_rp = pd.DataFrame(index=wide_pnl.index, columns=[])
|
||||
cols_agg, w_agg = [], pd.DataFrame(index=wide_pnl.index, columns=[])
|
||||
|
||||
rep_eq = make_trades_report(wide_sig[[c for c in cols if c in wide_sig.columns]],
|
||||
wide_pnl[[c for c in cols if c in wide_pnl.columns]],
|
||||
w_eq, "Equal Weight")
|
||||
rep_rp = make_trades_report(wide_sig[[c for c in cols if c in wide_sig.columns]],
|
||||
wide_pnl[[c for c in cols if c in wide_pnl.columns]],
|
||||
w_rp, "Risk Parity")
|
||||
rep_eq = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]],
|
||||
wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]],
|
||||
w_eq_act, "Equal Weight")
|
||||
rep_rp = make_trades_report(wide_sig[[c for c in asset_cols if c in wide_sig.columns]],
|
||||
wide_pnl[[c for c in asset_cols if c in wide_pnl.columns]],
|
||||
w_rp_act, "Risk Parity")
|
||||
|
||||
with pd.ExcelWriter(TRADES_REPORT_XLSX) as xw:
|
||||
rep_eq.to_excel(xw, "Equal_Weight", index=False)
|
||||
rep_rp.to_excel(xw, "Risk_Parity", index=False)
|
||||
|
||||
# Performance attribution per ISIN
|
||||
def _build_performance_attribution(trades_df: pd.DataFrame, meta_df: pd.DataFrame | None) -> pd.DataFrame:
|
||||
if trades_df is None or trades_df.empty:
|
||||
return pd.DataFrame(columns=["ISIN","Nome","Tot_Trades","Positivi","Negativi","Positivi_%","Negativi_%","PnL_Cum_%"])
|
||||
df = trades_df.copy()
|
||||
df["PnL_%"] = pd.to_numeric(df["PnL_%"], errors="coerce")
|
||||
rows = []
|
||||
for isin, g in df.groupby("ISIN"):
|
||||
tot = len(g)
|
||||
pos = int((g["PnL_%"] > 0).sum())
|
||||
neg = int((g["PnL_%"] < 0).sum())
|
||||
rows.append({
|
||||
"ISIN": str(isin),
|
||||
"Tot_Trades": tot,
|
||||
"Positivi": pos,
|
||||
"Negativi": neg,
|
||||
"Positivi_%": (pos / tot * 100.0) if tot > 0 else np.nan,
|
||||
"Negativi_%": (neg / tot * 100.0) if tot > 0 else np.nan,
|
||||
"PnL_Cum_%": float(g["PnL_%"].sum())
|
||||
})
|
||||
out = pd.DataFrame(rows)
|
||||
if meta_df is not None and "ISIN" in meta_df.columns:
|
||||
meta_cols = [c for c in ["ISIN","Nome","Descrizione","Categoria","Asset Class"] if c in meta_df.columns]
|
||||
if meta_cols:
|
||||
out = out.merge(meta_df[meta_cols].drop_duplicates("ISIN"), on="ISIN", how="left")
|
||||
# ordina colonne con Nome subito dopo ISIN
|
||||
cols = [c for c in ["ISIN","Nome"] if c in out.columns] + [c for c in out.columns if c not in ["ISIN","Nome"]]
|
||||
return out[cols]
|
||||
|
||||
perf_attr_df = _build_performance_attribution(pd.concat([rep_eq, rep_rp], ignore_index=True), df_sum if 'df_sum' in globals() else None)
|
||||
perf_attr_df.to_excel(PERF_ATTRIB_XLSX, index=False)
|
||||
print(f"[INFO] Performance attribution salvata in {PERF_ATTRIB_XLSX}")
|
||||
|
||||
print(f"✅ Report trades salvato in {TRADES_REPORT_XLSX}")
|
||||
# ============================================================
|
||||
# 5.6 Rebuild DAILY PnL from trades_report (calendarized)
|
||||
@@ -2062,12 +2282,9 @@ def _build_portfolio_returns_for_isins(base_isins_N, wide_pnl):
|
||||
# --- calcolo metriche per TopN 8..15 ---
|
||||
rows_byN = []
|
||||
for top_n in range(8, 16):
|
||||
# Selezione ISIN per questo TopN
|
||||
base_isins_N, crypto_isin_N = _select_isins_for_topN(df_sum, top_n)
|
||||
|
||||
# Costruisce i rendimenti di portafoglio EqW + RP per questo N
|
||||
# Nota: la nuova _build_portfolio_returns_for_isins accetta (base_isins_N, wide_pnl)
|
||||
ret_eq_N, ret_rp_N = _build_portfolio_returns_for_isins(base_isins_N, wide_pnl)
|
||||
portN = _get_dynamic_portfolio(top_n)
|
||||
ret_eq_N = portN["ret_eq"]
|
||||
ret_rp_N = portN["ret_rp"]
|
||||
|
||||
# (OPZIONALE) se vuoi anche salvare equity/heatmap per ciascun N:
|
||||
# _save_equity_plot_byN(ret_eq_N, ret_rp_N, top_n)
|
||||
@@ -2156,8 +2373,9 @@ def _save_heatmaps_byN(ret_eq, ret_rp, top_n: int):
|
||||
|
||||
# Loop 8..15 replicando i plot per ciascuna combinazione
|
||||
for top_n in range(8, 16):
|
||||
base_isins_N, crypto_isin_N = _select_isins_for_topN(df_sum, top_n)
|
||||
ret_eq_N, ret_rp_N = _build_portfolio_returns_for_isins(base_isins_N, wide_pnl)
|
||||
portN = _get_dynamic_portfolio(top_n)
|
||||
ret_eq_N = portN["ret_eq"]
|
||||
ret_rp_N = portN["ret_rp"]
|
||||
|
||||
_save_equity_plot_byN(ret_eq_N, ret_rp_N, top_n)
|
||||
_save_heatmaps_byN(ret_eq_N, ret_rp_N, top_n)
|
||||
@@ -2292,14 +2510,9 @@ def _build_weights_for_isins(base_isins_N, crypto_isin_N, wide_pnl):
|
||||
# === Loop 8..15: crea pesi, attiva coi Signal, plotta e SALVA in OUT_DIR ===
|
||||
# === Loop 8..15: crea pesi, attiva coi Signal, plotta e SALVA in OUT_DIR ===
|
||||
for top_n in range(8, 16):
|
||||
base_isins_N, crypto_isin_N = _select_isins_for_topN(df_sum, top_n)
|
||||
|
||||
# pesi teorici per N (Aggressiva non più usata)
|
||||
w_eq_N, w_rp_N, _ = _build_weights_for_isins(base_isins_N, crypto_isin_N, wide_pnl)
|
||||
|
||||
# pesi ATTIVI (maschera con i Signal; lascia Cash per la parte non investita)
|
||||
w_eq_act_N = make_active_weights(w_eq_N, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
|
||||
w_rp_act_N = make_active_weights(w_rp_N, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
|
||||
portN = _get_dynamic_portfolio(top_n)
|
||||
w_eq_act_N = portN["w_eq_act"]
|
||||
w_rp_act_N = portN["w_rp_act"]
|
||||
|
||||
# path di salvataggio
|
||||
sp_eq = OUT_DIR / f"composition_equal_topN_{top_n}.png"
|
||||
|
||||
Reference in New Issue
Block a user