aggiunte versioni Stocks USA, Stocks Eu e Forex, fatte alcune piccole modifiche

This commit is contained in:
fredmaloggia
2025-12-27 19:42:46 +01:00
parent 71c796b1f1
commit f5c668f70f
4 changed files with 2887 additions and 11 deletions

View File

@@ -398,7 +398,7 @@ RUN_CONFIG = CONFIG.get("run", {})
SIGNALS_CONFIG = CONFIG.get("signals", {})
PRICES_CONFIG = CONFIG.get("prices", {})
OUTPUT_DIR = Path(PATHS_CONFIG.get("output_dir", "output"))
OUTPUT_DIR = Path(PATHS_CONFIG.get("output_dir", "output_etf"))
PLOT_DIR = Path(PATHS_CONFIG.get("plot_dir", "plot"))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
PLOT_DIR.mkdir(parents=True, exist_ok=True)
@@ -732,6 +732,33 @@ def h_min_100(returns: pd.Series, month_len: int = 21):
return np.nan, np.nan
if "portfolio_metric_row" not in globals():
def portfolio_metric_row(name: str, r: pd.Series):
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
if r.empty:
return {
"Portfolio": name, "CAGR_%": np.nan, "MaxDD_%": np.nan, "Sharpe": np.nan,
"Heal_Index": np.nan, "AAW": np.nan, "AUW": np.nan,
"H100_min_days": np.nan, "H100_min_months": np.nan
}
eq = (1 + r).cumprod()
cagr = (eq.iloc[-1] / eq.iloc[0]) ** (DAYS_PER_YEAR / max(1, len(r))) - 1
maxdd = (eq / eq.cummax() - 1.0).min()
sharpe = (r.mean() / (r.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR)
aaw, auw, heal = heal_index_metrics(r)
h_days, h_months = h_min_100(r, month_len=21)
return {
"Portfolio": name,
"CAGR_%": round(float(cagr) * 100, 2) if np.isfinite(cagr) else np.nan,
"MaxDD_%": round(float(maxdd) * 100, 2) if np.isfinite(maxdd) else np.nan,
"Sharpe": round(float(sharpe), 2) if np.isfinite(sharpe) else np.nan,
"Heal_Index": round(float(heal), 4) if np.isfinite(heal) else np.nan,
"AAW": round(float(aaw), 4) if np.isfinite(aaw) else np.nan,
"AUW": round(float(auw), 4) if np.isfinite(auw) else np.nan,
"H100_min_days": h_days,
"H100_min_months": h_months
}
# =========================================
# 1) UNIVERSO: ISIN + METADATI
# =========================================
@@ -1931,6 +1958,17 @@ weights_rp = w_rp.copy()
print(f"[INFO] Portafoglio rolling calcolato (TopN={TOP_N}, finestra={RANKING_WINDOW_BARS} barre, rp_lookback={RP_LOOKBACK}).")
checkpoint_post_timer("Portafoglio rolling")
# Metriche riepilogative portafogli EW/RP
try:
port_metrics = [
portfolio_metric_row(f"EqW_Top{TOP_N}", ret_eq),
portfolio_metric_row(f"RP_Top{TOP_N}", ret_rp),
]
pd.DataFrame(port_metrics).to_excel(OUTPUT_DIR / "portfolio_metrics.xlsx", index=False)
print(f"[INFO] Salvato: {OUTPUT_DIR / 'portfolio_metrics.xlsx'}")
except Exception as e:
print(f"[WARN] Salvataggio portfolio_metrics.xlsx fallito: {e}")
# -----------------------------
# 5.4 Equity line + Heatmap (salva PNG)
# -----------------------------

View File

@@ -0,0 +1,847 @@
# -*- coding: utf-8 -*-
"""Trading Pattern Recon w Hurst - Forex
VERSIONE CORRETTA (EOD Close-to-Close) con SHORT, Wavelet denoise (DB) su EstOutcome,
fee 1bp e cap esposizione per singola valuta 35%.
REQUISITI IMPLEMENTATI (come da tue istruzioni):
1) Denoise su EstOutcome (rolling/online, NO look-ahead)
2) Wavelet Daubechies (DB) -> db4
3) Operatività EOD: decisione a close(t), esecuzione a close(t)
4) PnL sempre su close(t+1)/close(t) (cioè Ret_fwd = close[t+1]/close[t]-1)
5) Fee = 1 bp applicata sui cambi di posizione (enter/exit/flip) al close(t)
6) Cap esposizione per singola valuta = 35% (net exposure per currency)
7) Per il resto: mantiene lo schema originale (shared_utils, per-asset backtest, ranking, Top15, EW e Risk Parity)
NOTA:
- Lo script usa SOLO serie AdjClose dal tuo endpoint (stesso JSON di prima).
- Non usa Open/High/Low e non introduce logiche intraday.
Output principali (cartella OUT_DIR):
- forward_bt_signals.csv (per-ticker: Signal, EstOutcomeRaw, EstOutcomeDenoised, Ret_fwd, PnL)
- forward_bt_summary.csv (metriche per ticker)
- ranking_score.csv (ranking + score)
- topn_tickers.csv (TopN dal ranking)
- portfolio_daily.csv (ret/eq line EW e RP)
- weights_eq_active_capped.csv / weights_rp_active_capped.csv (pesi con cash + cap valuta)
- currency_exposure_eq.csv / currency_exposure_rp.csv (diagnostica esposizioni)
Dipendenze:
- numpy, pandas, matplotlib, requests
- PyWavelets (pip install PyWavelets)
"""
from __future__ import annotations
import sys
import types
from pathlib import Path
from urllib.parse import quote
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import requests
# -------------------------
# Wavelets
# -------------------------
try:
import pywt
except ImportError as e:
raise ImportError("Manca PyWavelets (pywt). Installa con: pip install PyWavelets") from e
# ------------------------------------------------------------
# shared_utils import (local file next to this script)
# ------------------------------------------------------------
# Alcune versioni di shared_utils importano moduli opzionali (es. pyodbc). Evitiamo crash.
sys.modules.setdefault("pyodbc", types.SimpleNamespace())
import importlib.util
SHARED_UTILS_PATH = Path(__file__).with_name("shared_utils.py")
if not SHARED_UTILS_PATH.exists():
raise FileNotFoundError(f"shared_utils.py non trovato accanto allo script: {SHARED_UTILS_PATH}")
spec = importlib.util.spec_from_file_location("shared_utils", str(SHARED_UTILS_PATH))
shared_utils = importlib.util.module_from_spec(spec)
sys.modules["shared_utils"] = shared_utils
assert spec.loader is not None
spec.loader.exec_module(shared_utils)
build_pattern_library = shared_utils.build_pattern_library
predict_from_library = shared_utils.predict_from_library
z_norm = shared_utils.z_norm
# ============================================================
# CONFIG
# ============================================================
TICKERS = [
"EURUSD Curncy",
"USDJPY Curncy",
"GBPUSD Curncy",
"USDCHF Curncy",
"USDCAD Curncy",
"AUDUSD Curncy",
"NZDUSD Curncy",
"EURJPY Curncy",
"EURGBP Curncy",
"EURCHF Curncy",
"EURCAD Curncy",
"EURAUD Curncy",
"EURNZD Curncy",
"GBPJPY Curncy",
"GBPCHF Curncy",
"GBPCAD Curncy",
"GBPAUD Curncy",
"GBPNZD Curncy",
"CHFJPY Curncy",
"CADJPY Curncy",
"AUDJPY Curncy",
"NZDJPY Curncy",
"AUDNZD Curncy",
"AUDCAD Curncy",
"AUDCHF Curncy",
"NZDCAD Curncy",
"NZDCHF Curncy",
"CADCHF Curncy",
]
TICKERS = [t.strip() for t in TICKERS if isinstance(t, str) and t.strip()]
BASE_URL = "https://fin.scorer.app/finance/v2/history"
FROM_DATE = "20201224"
# Pattern params (come nello schema originale)
WP = 60
HA = 10
KNN_K = 25
# Short attivo per FX
ALLOW_SHORT = True
# Fee (FIX: 1 bp)
FEE_BPS = 1
# Exit controls (EOD approximation su Ret_fwd)
SL_BPS = 300.0
TP_BPS = 800.0
TRAIL_BPS = 300.0
TIME_STOP_BARS = 20
THETA_EXIT = 0.0
# Theta entry fallback se Hurst non disponibile
THETA_FALLBACK = 0.005
# Portfolio construction
TOP_N = 15
RP_MAX_WEIGHT = 2.0 / TOP_N # 0.1333... (come in legenda che avevi visto)
RANKING_WINDOW_BARS = 252
RP_LOOKBACK = 60
# Currency cap (net exposure per currency)
CURRENCY_CAP = 0.35
# Wavelet denoise su EstOutcome
DENOISE_ENABLED = True
DENOISE_WAVELET = "db4" # DB family (Daubechies)
DENOISE_LEVEL = 3
DENOISE_MIN_LEN = 96
DENOISE_THRESHOLD_MODE = "soft"
DAYS_PER_YEAR = 252
OUT_DIR = Path("./out_shared_utils_url_forex")
OUT_DIR.mkdir(parents=True, exist_ok=True)
# ============================================================
# Helpers: JSON -> DataFrame (AdjClose)
# ============================================================
def _detect_col(cols, candidates):
cols_l = {c.lower(): c for c in cols}
for cand in candidates:
if cand.lower() in cols_l:
return cols_l[cand.lower()]
# fallback: contains
for cand in candidates:
for c in cols:
if cand.lower() in str(c).lower():
return c
return None
def fetch_price_series(ticker: str, from_date: str) -> pd.DataFrame:
"""Scarica il JSON e restituisce DataFrame indicizzato per Date con colonna AdjClose."""
url = f"{BASE_URL}/{quote(ticker)}?fromDate={from_date}"
r = requests.get(url, timeout=30)
r.raise_for_status()
obj = r.json()
# Gestione schemi JSON possibili
if isinstance(obj, list) and len(obj) == 1 and isinstance(obj[0], dict) and "data" in obj[0]:
obj = obj[0]["data"]
if not isinstance(obj, list):
raise ValueError(f"Schema JSON inatteso per {ticker}: {type(obj)}")
df = pd.DataFrame(obj)
if df.empty:
raise ValueError(f"Nessuna riga per {ticker}")
col_date = _detect_col(df.columns, ["date", "datetime", "timestamp", "time"])
if col_date is None:
raise ValueError(f"Colonna date non trovata per {ticker}. Colonne: {df.columns.tolist()}")
# Priorità: AdjClose, altrimenti Close
col_px = _detect_col(df.columns, ["adj_close", "adjclose", "adjusted_close", "Adj Close", "AdjClose"])
if col_px is None:
col_px = _detect_col(df.columns, ["close", "px_last", "last", "price"])
if col_px is None:
raise ValueError(f"Colonna prezzo non trovata per {ticker}. Colonne: {df.columns.tolist()}")
df[col_date] = pd.to_datetime(df[col_date], errors="coerce", utc=True).dt.tz_localize(None)
df[col_px] = pd.to_numeric(df[col_px], errors="coerce")
df = df.dropna(subset=[col_date, col_px]).sort_values(col_date)
df = df.drop_duplicates(subset=[col_date]).set_index(col_date)
# Normalizza indice a date
df.index = pd.to_datetime(df.index).normalize()
df = df[~df.index.duplicated(keep="last")]
out = pd.DataFrame(index=df.index)
out.index.name = "Date"
out["AdjClose"] = df[col_px].astype(float)
return out.sort_index()
# ============================================================
# Hurst (RS + DFA) su returns
# ============================================================
def hurst_rs_returns(r: np.ndarray, win_grid=None, min_seg=1) -> float:
r = pd.Series(r).dropna().astype("float64").values
n = len(r)
if n < 200:
return np.nan
if win_grid is None:
base = np.array([16, 24, 32, 48, 64, 96, 128, 192, 256, 384], dtype=int)
win_grid = [w for w in base if w <= n // 2]
RS_vals, sizes = [], []
for w in win_grid:
m = n // w
if w < 8 or m < min_seg:
continue
rs_list = []
for i in range(m):
seg = r[i * w : (i + 1) * w]
seg = seg - np.mean(seg)
sd = seg.std(ddof=1)
if sd == 0 or not np.isfinite(sd):
continue
y = np.cumsum(seg)
rs = (np.max(y) - np.min(y)) / sd
if np.isfinite(rs) and rs > 0:
rs_list.append(rs)
if rs_list:
RS_vals.append(np.mean(rs_list))
sizes.append(w)
if len(RS_vals) < 3:
return np.nan
sizes = np.array(sizes, float)
RS_vals = np.array(RS_vals, float)
mask = np.isfinite(RS_vals) & (RS_vals > 0)
sizes, RS_vals = sizes[mask], RS_vals[mask]
if sizes.size < 3:
return np.nan
slope, _ = np.polyfit(np.log(sizes), np.log(RS_vals), 1)
return float(np.clip(slope, 0.0, 1.0)) if np.isfinite(slope) else np.nan
def hurst_dfa_returns(r: np.ndarray, win_grid=None) -> float:
r = pd.Series(r).dropna().astype("float64").values
n = len(r)
if n < 200:
return np.nan
y = np.cumsum(r - np.mean(r))
if win_grid is None:
base = np.array([16, 24, 32, 48, 64, 96, 128, 192, 256], dtype=int)
win_grid = [w for w in base if w <= n // 2]
F_vals, sizes = [], []
for s in win_grid:
m = n // s
if s < 8 or m < 2:
continue
rms_list = []
for i in range(m):
seg = y[i * s : (i + 1) * s]
t = np.arange(s, dtype=float)
A = np.vstack([t, np.ones(s)]).T
coeff, *_ = np.linalg.lstsq(A, seg, rcond=None)
detr = seg - (A @ coeff)
rms = np.sqrt(np.mean(detr**2))
if np.isfinite(rms) and rms > 0:
rms_list.append(rms)
if rms_list:
F_vals.append(np.mean(rms_list))
sizes.append(s)
if len(F_vals) < 3:
return np.nan
sizes = np.array(sizes, float)
F_vals = np.array(F_vals, float)
mask = np.isfinite(F_vals) & (F_vals > 0)
sizes, F_vals = sizes[mask], F_vals[mask]
if sizes.size < 3:
return np.nan
slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1)
return float(np.clip(slope, 0.0, 1.0)) if np.isfinite(slope) else np.nan
# ============================================================
# Wavelet denoise (ONLINE, NO look-ahead)
# ============================================================
def wavelet_denoise_last_online(x: np.ndarray) -> float:
"""Denoise usando SOLO la storia fino a t. Ritorna l'ultimo punto denoisato."""
x = np.asarray(x, dtype=float)
n = x.size
if n < DENOISE_MIN_LEN:
return float(x[-1])
s = pd.Series(x).ffill().bfill().values
wav = pywt.Wavelet(DENOISE_WAVELET)
max_level = pywt.dwt_max_level(n, wav.dec_len)
level = int(min(DENOISE_LEVEL, max_level)) if max_level > 0 else 1
coeffs = pywt.wavedec(s, DENOISE_WAVELET, level=level)
detail = coeffs[-1]
if detail.size == 0:
return float(s[-1])
sigma = np.median(np.abs(detail - np.median(detail))) / 0.6745
if not np.isfinite(sigma) or sigma <= 0:
return float(s[-1])
thr = sigma * np.sqrt(2 * np.log(n))
coeffs_th = [coeffs[0]]
for d in coeffs[1:]:
coeffs_th.append(pywt.threshold(d, thr, mode=DENOISE_THRESHOLD_MODE))
rec = pywt.waverec(coeffs_th, DENOISE_WAVELET)
rec = rec[:n]
return float(rec[-1])
# ============================================================
# Per-asset forward EOD backtest
# ============================================================
def forward_backtest_one_asset(close: pd.Series, theta_entry: float, allow_short: bool) -> pd.DataFrame:
"""Backtest EOD:
- Segnale calcolato a close(t) (si usa finestra di returns che include il return di t).
- Esecuzione al close(t).
- PnL per step t = Signal(t) * Ret_fwd(t) - fee * abs(diff(Signal)).
dove Ret_fwd(t) = close(t+1)/close(t)-1.
Output per Date=t con Ret_fwd già allineato.
"""
close = pd.to_numeric(close, errors="coerce").dropna()
close = close[~close.index.duplicated(keep="last")].sort_index()
# returns log per pattern matching
r_log = np.log(close / close.shift(1))
r_log = r_log.dropna()
# forward return (close-to-close): Ret_fwd[t] = close[t+1]/close[t]-1
ret_fwd = close.pct_change().shift(-1)
# Allineamento su date comuni
idx = r_log.index.intersection(ret_fwd.index)
r_log = r_log.loc[idx]
ret_fwd = ret_fwd.loc[idx]
pos = 0
entry_t = None
trade_pnl = 0.0
trade_peak = 0.0
est_hist = []
rows = []
# t è indice relativo su r_log/ret_fwd
for t in range(WP, len(r_log) - 1):
dt = r_log.index[t]
past = r_log.iloc[:t]
if past.dropna().shape[0] < (WP + HA):
rows.append((dt, pos, np.nan, np.nan, np.nan, float(ret_fwd.iloc[t])))
continue
lib_wins, lib_out = build_pattern_library(past, WP, HA)
if lib_wins is None or lib_out is None or len(lib_out) == 0:
rows.append((dt, pos, np.nan, np.nan, np.nan, float(ret_fwd.iloc[t])))
continue
win_last = r_log.iloc[t - WP : t].values
curr_zn = z_norm(win_last)
if curr_zn is None:
rows.append((dt, pos, np.nan, np.nan, np.nan, float(ret_fwd.iloc[t])))
continue
est_raw, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K)
est_raw = float(est_raw)
avg_dist = float(avg_dist)
est_hist.append(est_raw)
est_use = wavelet_denoise_last_online(np.array(est_hist, dtype=float)) if DENOISE_ENABLED else est_raw
# ===== ENTRY =====
if pos == 0:
if est_use > theta_entry:
pos = 1
entry_t = t
trade_pnl = 0.0
trade_peak = 0.0
elif allow_short and est_use < -theta_entry:
pos = -1
entry_t = t
trade_pnl = 0.0
trade_peak = 0.0
# ===== EXIT (EOD approximation: usa Ret_fwd della prossima barra t per aggiornare pnl) =====
else:
next_ret = float(ret_fwd.iloc[t])
signed_next = next_ret if pos == 1 else (-next_ret)
pnl_if_stay = (1.0 + trade_pnl) * (1.0 + signed_next) - 1.0
peak_if_stay = max(trade_peak, pnl_if_stay)
exit_now = False
if SL_BPS is not None and pnl_if_stay <= -SL_BPS / 10000.0:
exit_now = True
if TP_BPS is not None and pnl_if_stay >= TP_BPS / 10000.0:
exit_now = True
if TRAIL_BPS is not None and (peak_if_stay - pnl_if_stay) >= TRAIL_BPS / 10000.0:
exit_now = True
if TIME_STOP_BARS is not None and entry_t is not None and (t - entry_t + 1) >= TIME_STOP_BARS:
exit_now = True
# Theta exit simmetrico
if THETA_EXIT is not None:
if pos == 1:
is_weak = est_use <= THETA_EXIT
else:
is_weak = est_use >= -THETA_EXIT
if is_weak:
exit_now = True
if exit_now:
pos = 0
entry_t = None
trade_pnl = 0.0
trade_peak = 0.0
else:
trade_pnl = pnl_if_stay
trade_peak = peak_if_stay
rows.append((dt, pos, est_raw, est_use, avg_dist, float(ret_fwd.iloc[t])))
df = pd.DataFrame(
rows,
columns=["Date", "Signal", "EstOutcomeRaw", "EstOutcome", "AvgDist", "Ret_fwd"],
).set_index("Date")
fee = FEE_BPS / 10000.0
trade_chg = df["Signal"].diff().abs().fillna(0.0)
df["PnL"] = df["Signal"] * df["Ret_fwd"] - trade_chg * fee
return df
# ============================================================
# Portfolio utilities
# ============================================================
def equity_from_returns(r: pd.Series) -> pd.Series:
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
return (1 + r).cumprod() * 100
def monthly_returns(r: pd.Series) -> pd.Series:
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
return (1 + r).resample("M").prod() - 1
def plot_heatmap_monthly(r: pd.Series, title: str):
m = monthly_returns(r)
df = m.to_frame("ret")
df["Year"], df["Month"] = df.index.year, df.index.month
pv = df.pivot(index="Year", columns="Month", values="ret")
fig, ax = plt.subplots(figsize=(10, 6))
im = ax.imshow(pv.fillna(0) * 100, aspect="auto")
for i in range(pv.shape[0]):
for j in range(pv.shape[1]):
val = pv.iloc[i, j]
if not np.isnan(val):
ax.text(j, i, f"{val*100:.1f}", ha="center", va="center", fontsize=8)
ax.set_title(title)
ax.set_xlabel("Mese")
ax.set_ylabel("Anno")
ax.set_xticks(range(12))
ax.set_xticklabels(range(1, 13))
fig.colorbar(im, ax=ax, label="%")
plt.tight_layout()
return fig
def inverse_vol_weights(returns_df: pd.DataFrame, window: int, max_weight: float | None) -> pd.DataFrame:
vol = returns_df.rolling(window).std()
inv = 1 / vol.replace(0, np.nan)
w = inv.div(inv.sum(axis=1), axis=0)
w = w.ffill()
if max_weight is not None:
w = w.clip(upper=max_weight)
return w
def make_active_weights(w_target: pd.DataFrame, sig: pd.DataFrame, add_cash: bool = True, cash_label: str = "Cash") -> pd.DataFrame:
# Attivo se Signal != 0 (sia long che short)
w = (w_target * (sig != 0)).fillna(0.0)
if add_cash:
w[cash_label] = (1.0 - w.sum(axis=1)).clip(lower=0.0)
return w
# ============================================================
# FX currency exposure cap (net exposure per currency)
# ============================================================
def parse_fx_pair(ticker: str) -> tuple[str, str] | None:
# "EURUSD Curncy" -> ("EUR","USD")
if not isinstance(ticker, str):
return None
pair = ticker.split()[0].strip().upper()
if len(pair) < 6:
return None
return pair[:3], pair[3:6]
def currency_exposure_from_weights(weights_row: pd.Series, sig_row: pd.Series) -> dict[str, float]:
"""Net exposure per currency, considerando anche la direzione (long/short).
Long EURUSD (sig=+1): +w EUR, -w USD
Short EURUSD (sig=-1): -w EUR, +w USD
"""
exp: dict[str, float] = {}
for tkr, w in weights_row.items():
if tkr == "Cash":
continue
s = float(sig_row.get(tkr, 0.0))
if s == 0 or w == 0 or not np.isfinite(w):
continue
pq = parse_fx_pair(tkr)
if pq is None:
continue
base, quote = pq
exp[base] = exp.get(base, 0.0) + w * s
exp[quote] = exp.get(quote, 0.0) - w * s
return exp
def apply_currency_cap(weights_act: pd.DataFrame, sig: pd.DataFrame, cap: float) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Applica cap |exposure(currency)| <= cap.
Metodo conservativo ma robusto (NO look-ahead):
- Se max_abs_exposure > cap, scala TUTTE le posizioni risky di fattore cap/max_abs_exposure,
e manda il residuo a Cash.
Ritorna:
- weights_capped (con Cash)
- exposure_wide (diagnostica: esposizioni per currency)
"""
w = weights_act.copy()
if "Cash" not in w.columns:
w["Cash"] = (1.0 - w.sum(axis=1)).clip(lower=0.0)
tick_cols = [c for c in w.columns if c != "Cash"]
# raccogliamo anche esposizioni
all_ccy = set()
exp_dicts: dict[pd.Timestamp, dict[str, float]] = {}
for dt in w.index:
sig_row = sig.loc[dt] if dt in sig.index else pd.Series(dtype=float)
wr = w.loc[dt, tick_cols].fillna(0.0)
exp = currency_exposure_from_weights(wr, sig_row)
exp_dicts[pd.to_datetime(dt)] = exp
all_ccy.update(exp.keys())
if not exp:
w.loc[dt, "Cash"] = (1.0 - float(wr.sum())).clip(lower=0.0) if hasattr(float(wr.sum()), 'clip') else max(0.0, 1.0 - float(wr.sum()))
continue
max_abs = max(abs(v) for v in exp.values())
if np.isfinite(max_abs) and max_abs > cap and max_abs > 1e-12:
scale = cap / max_abs
wr2 = wr * scale
w.loc[dt, tick_cols] = wr2.values
w.loc[dt, "Cash"] = max(0.0, 1.0 - float(wr2.sum()))
else:
w.loc[dt, "Cash"] = max(0.0, 1.0 - float(wr.sum()))
# exposure wide
all_ccy = sorted(all_ccy)
exp_wide = pd.DataFrame(0.0, index=w.index, columns=all_ccy)
for dt, ex in exp_dicts.items():
if dt in exp_wide.index:
for c, v in ex.items():
exp_wide.loc[dt, c] = v
return w, exp_wide
# ============================================================
# Ranking / scoring (semplice, robusto)
# ============================================================
def drawdown_stats_simple(ret: pd.Series) -> dict:
ret = pd.to_numeric(ret, errors="coerce").fillna(0.0)
eq = (1 + ret).cumprod()
peak = eq.cummax()
dd = eq / peak - 1.0
cagr = eq.iloc[-1] ** (DAYS_PER_YEAR / max(1, len(eq))) - 1
annvol = ret.std() * np.sqrt(DAYS_PER_YEAR)
sharpe = (ret.mean() / (ret.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR)
maxdd = dd.min() if len(dd) else np.nan
calmar = (cagr / abs(maxdd)) if (np.isfinite(cagr) and np.isfinite(maxdd) and maxdd < 0) else np.nan
return {
"CAGR_%": float(np.round(cagr * 100, 3)) if np.isfinite(cagr) else np.nan,
"AnnVol_%": float(np.round(annvol * 100, 3)) if np.isfinite(annvol) else np.nan,
"Sharpe": float(np.round(sharpe, 3)) if np.isfinite(sharpe) else np.nan,
"MaxDD_%": float(np.round(maxdd * 100, 3)) if np.isfinite(maxdd) else np.nan,
"Calmar": float(np.round(calmar, 3)) if np.isfinite(calmar) else np.nan,
}
def compute_score(summary_df: pd.DataFrame) -> pd.DataFrame:
"""Score semplice e stabile (senza ottimizzazioni fancy):
- alto Sharpe e CAGR, basso MaxDD
- normalizzazione via rank percentili
Mantiene colonne di diagnostica (mode) per trasparenza.
"""
df = summary_df.copy()
# ranks 0..1
def rnk(s):
s = pd.to_numeric(s, errors="coerce")
n = s.notna().sum()
if n <= 1:
return pd.Series(np.nan, index=s.index)
return s.rank(method="average") / n
sharpe_r = rnk(df.get("Sharpe"))
cagr_r = rnk(df.get("CAGR_%"))
maxdd_r = rnk(-pd.to_numeric(df.get("MaxDD_%"), errors="coerce")) # meno negativo = meglio
df["Score_mode"] = "rank_sharpe_cagr_maxdd"
df["Score"] = (0.5 * sharpe_r.fillna(0) + 0.3 * cagr_r.fillna(0) + 0.2 * maxdd_r.fillna(0))
return df
# ============================================================
# MAIN
# ============================================================
def main():
print("\n=== Trading Pattern Recon w Hurst - Forex (CORRETTO) ===")
print(f"Fee: {FEE_BPS} bp | Short: {ALLOW_SHORT} | Currency cap: {CURRENCY_CAP:.2f}")
print(f"Wavelet denoise: {DENOISE_ENABLED} ({DENOISE_WAVELET}, level={DENOISE_LEVEL}, min_len={DENOISE_MIN_LEN})")
print("Esecuzione: close(t), PnL: close(t+1)/close(t)\n")
# 1) Fetch prices
prices: dict[str, pd.DataFrame] = {}
for tkr in TICKERS:
try:
print(f"Fetching {tkr} ...")
prices[tkr] = fetch_price_series(tkr, FROM_DATE)
except Exception as e:
print(f"[WARN] Skip {tkr}: {e}")
if len(prices) < 5:
raise RuntimeError(f"Pochi ticker validi ({len(prices)}).")
# 2) Per ticker backtest
hurst_rows = []
summary_rows = []
sig_rows = []
for tkr, dfp in prices.items():
if "AdjClose" not in dfp.columns:
continue
close = dfp["AdjClose"].copy()
close = pd.to_numeric(close, errors="coerce").dropna()
if len(close) < (WP + HA + 80):
print(f"[WARN] Serie troppo corta per {tkr}: len={len(close)}")
continue
r_log = np.log(close / close.shift(1)).dropna()
if len(r_log) < 250:
print(f"[WARN] Serie troppo corta per Hurst per {tkr}")
h_rs = hurst_rs_returns(r_log.values)
h_dfa = hurst_dfa_returns(r_log.values)
H = np.nanmedian([h_rs, h_dfa])
H = float(H) if np.isfinite(H) else np.nan
theta_entry = (H / 100.0) if np.isfinite(H) else THETA_FALLBACK
hurst_rows.append({"Ticker": tkr, "Hurst": H, "theta_entry": float(theta_entry)})
bt = forward_backtest_one_asset(close=close, theta_entry=float(theta_entry), allow_short=ALLOW_SHORT)
bt = bt.copy()
bt.insert(0, "Ticker", tkr)
sig_rows.append(bt.reset_index())
stats = drawdown_stats_simple(bt["PnL"])
hit = 100.0 * ((bt["PnL"] > 0).sum() / max(1, bt["PnL"].notna().sum()))
turnover = bt["Signal"].diff().abs().fillna(0.0).mean() * 100.0 # % steps changed (0..200)
stats.update({
"Ticker": tkr,
"HitRate_%": float(np.round(hit, 3)),
"AvgStepRet_bps": float(np.round(bt["PnL"].mean() * 10000, 3)),
"Turnover_%/day": float(np.round(turnover, 3)),
"N_Steps": int(bt.shape[0]),
"theta_entry": float(theta_entry),
"theta_exit": float(THETA_EXIT) if THETA_EXIT is not None else np.nan,
"sl_bps": float(SL_BPS) if SL_BPS is not None else np.nan,
"tp_bps": float(TP_BPS) if TP_BPS is not None else np.nan,
"trail_bps": float(TRAIL_BPS) if TRAIL_BPS is not None else np.nan,
"time_stop_bars": int(TIME_STOP_BARS) if TIME_STOP_BARS is not None else np.nan,
"allow_short": bool(ALLOW_SHORT),
"fee_bps": float(FEE_BPS),
"wavelet": DENOISE_WAVELET if DENOISE_ENABLED else "none",
"currency_cap": float(CURRENCY_CAP),
})
summary_rows.append(stats)
if not sig_rows:
raise RuntimeError("Nessun ticker backtestato con successo")
hurst_df = pd.DataFrame(hurst_rows).sort_values("Ticker").reset_index(drop=True)
summary_df = pd.DataFrame(summary_rows).sort_values("Ticker").reset_index(drop=True)
signals_df = pd.concat(sig_rows, ignore_index=True)
signals_df["Date"] = pd.to_datetime(signals_df["Date"]).dt.normalize()
# 3) Ranking / Score / TopN
ranking_df = compute_score(summary_df).sort_values("Score", ascending=False).reset_index(drop=True)
top_tickers = ranking_df.head(TOP_N)["Ticker"].astype(str).tolist()
print(f"\nTop{TOP_N} tickers (dal ranking): {top_tickers}\n")
# 4) Costruzione matrici wide
wide_pnl = signals_df.pivot_table(index="Date", columns="Ticker", values="PnL", aggfunc="sum").fillna(0.0).sort_index()
wide_sig = signals_df.pivot_table(index="Date", columns="Ticker", values="Signal", aggfunc="last").fillna(0).astype(int).sort_index()
wide_est = signals_df.pivot_table(index="Date", columns="Ticker", values="EstOutcome", aggfunc="last").sort_index()
# 5) Selezione dinamica TopN giornaliera: manteniamo lo schema classico (attivi + ranking statico)
# - Equal Weight: 1/N sui tickers del ranking (TopN statico)
# - Risk Parity: inverse-vol sui rendimenti dei tickers TopN statici
cols = [c for c in top_tickers if c in wide_pnl.columns]
if not cols:
raise RuntimeError("Nessun ticker del TopN presente in wide_pnl")
dates = wide_pnl.index
# target weights
w_eq_target = pd.DataFrame(0.0, index=dates, columns=wide_pnl.columns)
w_eq_target.loc[:, cols] = 1.0 / len(cols)
rp_hist = wide_pnl[cols]
w_rp_hist = inverse_vol_weights(rp_hist, window=RP_LOOKBACK, max_weight=RP_MAX_WEIGHT).reindex(dates).ffill().fillna(0.0)
w_rp_target = pd.DataFrame(0.0, index=dates, columns=wide_pnl.columns)
w_rp_target.loc[:, cols] = w_rp_hist.values
# rinormalizza sui cols
s = w_rp_target[cols].sum(axis=1).replace(0, np.nan)
w_rp_target.loc[:, cols] = w_rp_target[cols].div(s, axis=0).fillna(0.0)
# active weights (mask su Signal != 0) + cash
w_eq_act = make_active_weights(w_eq_target, wide_sig, add_cash=True)
w_rp_act = make_active_weights(w_rp_target, wide_sig, add_cash=True)
# currency cap
w_eq_cap, ccy_eq = apply_currency_cap(w_eq_act, wide_sig, cap=CURRENCY_CAP)
w_rp_cap, ccy_rp = apply_currency_cap(w_rp_act, wide_sig, cap=CURRENCY_CAP)
# portfolio returns (solo risky assets)
ret_eq = (wide_pnl * w_eq_cap.drop(columns=["Cash"], errors="ignore")).sum(axis=1).rename("Ret_EqW_TopN")
ret_rp = (wide_pnl * w_rp_cap.drop(columns=["Cash"], errors="ignore")).sum(axis=1).rename("Ret_RP_TopN")
eq_eq = equity_from_returns(ret_eq).rename("Eq_EqW_TopN")
eq_rp = equity_from_returns(ret_rp).rename("Eq_RP_TopN")
# 6) Plots
plt.figure(figsize=(10, 5))
plt.plot(eq_eq, label=f"Equal Weight (Top{TOP_N}, fee {FEE_BPS}bp)")
plt.plot(eq_rp, label=f"Risk Parity (Top{TOP_N}, cap {RP_MAX_WEIGHT:.4f}, fee {FEE_BPS}bp)")
plt.title(f"Equity line portafogli FX (base 100) Top{TOP_N} (shared_utils + URL)")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
plot_heatmap_monthly(ret_eq, f"Heatmap mensile Equal Weight FX (Top{TOP_N})")
plt.show()
plot_heatmap_monthly(ret_rp, f"Heatmap mensile Risk Parity FX (Top{TOP_N})")
plt.show()
# 7) Export
hurst_df.to_csv(OUT_DIR / "hurst.csv", index=False)
summary_df.to_csv(OUT_DIR / "forward_bt_summary.csv", index=False)
signals_df.to_csv(OUT_DIR / "forward_bt_signals.csv", index=False)
ranking_df.to_csv(OUT_DIR / "ranking_score.csv", index=False)
pd.Series(top_tickers, name="TopN_Tickers").to_csv(OUT_DIR / "topn_tickers.csv", index=False)
pd.concat([ret_eq, ret_rp, eq_eq, eq_rp], axis=1).to_csv(OUT_DIR / "portfolio_daily.csv")
w_eq_cap.to_csv(OUT_DIR / "weights_eq_active_capped.csv")
w_rp_cap.to_csv(OUT_DIR / "weights_rp_active_capped.csv")
ccy_eq.to_csv(OUT_DIR / "currency_exposure_eq.csv")
ccy_rp.to_csv(OUT_DIR / "currency_exposure_rp.csv")
print(f"\nSaved to: {OUT_DIR.resolve()}\n")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,989 @@
# -*- coding: utf-8 -*-
"""
Trading Pattern Recon w Hurst - Stocks (corrected to match v3.1.6 logic)
Key points (faithful to v3.1.6):
- Uses shared_utils.py for: build_pattern_library, z_norm, predict_from_library
- Pulls history from https://fin.scorer.app/finance/v2/history/<TICKER>?fromDate=YYYYMMDD
- Builds per-ticker forward backtest (EOD, uses Ret+1)
- Computes per-ticker summary metrics and a dynamic Score (same calibrate_score_weights logic)
- Selects TOP_N = 15 tickers by Score
- Builds portfolios ONLY on Top-15:
* Equal Weight
* Risk Parity (inverse-vol weights on strategy PnL, 60d window) with cap = 2/TOP_N
(cap applicata con rinormalizzazione, come nella versione non-stocks)
"""
import sys
import types
from pathlib import Path
from urllib.parse import quote
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import requests
# ------------------------------------------------------------
# shared_utils import (local file next to this script)
# ------------------------------------------------------------
# If shared_utils imports optional deps you don't have (e.g. pyodbc), monkeypatch:
sys.modules["pyodbc"] = types.SimpleNamespace()
import importlib.util
SHARED_UTILS_PATH = Path(__file__).with_name("shared_utils.py")
spec = importlib.util.spec_from_file_location("shared_utils", str(SHARED_UTILS_PATH))
shared_utils = importlib.util.module_from_spec(spec)
sys.modules["shared_utils"] = shared_utils
spec.loader.exec_module(shared_utils)
build_pattern_library = shared_utils.build_pattern_library
predict_from_library = shared_utils.predict_from_library
z_norm = shared_utils.z_norm
# ------------------------------------------------------------
# CONFIG
# ------------------------------------------------------------
TICKERS = [
"NVDA US Equity",
"AAPL US Equity",
"GOOGL US Equity",
"MSFT US Equity",
"AMZN US Equity",
"META US Equity",
"AVGO US Equity",
"TSLA US Equity",
"BRK/B US Equity",
"LLY US Equity",
"JPM US Equity",
"WMT US Equity",
"V US Equity",
"ORCL US Equity",
"MA US Equity",
"XOM US Equity",
"JNJ US Equity",
"PLTR US Equity",
"NFLX US Equity",
"BAC US Equity",
"ABBV US Equity",
"COST US Equity",
"AMD US Equity",
"HD US Equity",
"PG US Equity",
"GE US Equity",
"MU US Equity",
"CSCO US Equity",
"CVX US Equity",
"KO US Equity",
"WFC US Equity",
"UNH US Equity",
"MS US Equity",
"IBM US Equity",
"CAT US Equity",
"GS US Equity",
"MRK US Equity",
"AXP US Equity",
"PM US Equity",
"RTX US Equity",
"CRM US Equity",
"APP US Equity",
"MCD US Equity",
"LRCX US Equity",
"TMUS US Equity",
"TMO US Equity",
"C US Equity",
"ABT US Equity",
"AMAT US Equity",
"ISRG US Equity"
]
# Filter out non-strings (e.g. accidental "..." / Ellipsis)
TICKERS = [t.strip() for t in TICKERS if isinstance(t, str) and str(t).strip()]
BASE_URL = "https://fin.scorer.app/finance/v2/history"
FROM_DATE = "20201224"
# Strategy params
WP = 60
HA = 10
KNN_K = 25
# Entry threshold derived from Hurst
THETA_FALLBACK = 0.005
# Costs
FEE_BPS = 10
# Exit controls (EOD forward approximation)
SL_BPS = 300.0
TP_BPS = 800.0
TRAIL_BPS = 300.0
TIME_STOP_BARS = 20
THETA_EXIT = 0.0
# Portfolio construction (faithful to v3.1.6)
TOP_N = 15
RP_MAX_WEIGHT = 2.0 / TOP_N # cap per-asset weight in RP
RANKING_WINDOW_BARS = 252
RP_LOOKBACK = 60
DAYS_PER_YEAR = 252
OUT_DIR = Path("./out_stocks_usa")
OUT_DIR.mkdir(parents=True, exist_ok=True)
# ------------------------------------------------------------
# Data loading from URL (same schema as your previous JSON)
# ------------------------------------------------------------
def _detect_col(cols, candidates):
cols_l = {c.lower(): c for c in cols}
for cand in candidates:
if cand.lower() in cols_l:
return cols_l[cand.lower()]
for cand in candidates:
for c in cols:
if cand.lower() in c.lower():
return c
return None
def fetch_price_series(ticker: str, from_date: str) -> pd.DataFrame:
"""
Downloads JSON from:
https://fin.scorer.app/finance/v2/history/<TICKER>?fromDate=YYYYMMDD
Assumes schema like the JSON you used previously:
- payload is list with single wrapper dict containing "data": [...]
- "data" is list of rows with date + close/adj_close
Returns a Series AdjClose indexed by Date.
"""
url = f"{BASE_URL}/{quote(ticker)}?fromDate={from_date}"
r = requests.get(url, timeout=30)
r.raise_for_status()
obj = r.json()
# unwrap: [{"ticker":..., "data":[...], ...}]
if isinstance(obj, list) and len(obj) == 1 and isinstance(obj[0], dict) and "data" in obj[0]:
obj = obj[0]["data"]
if not isinstance(obj, list):
raise ValueError(f"Unexpected JSON schema for {ticker}: {type(obj)}")
df = pd.DataFrame(obj)
if df.empty:
raise ValueError(f"No rows returned for {ticker}")
col_date = _detect_col(df.columns, ["date", "datetime", "timestamp", "time"])
if col_date is None:
raise ValueError(f"Date column not found for {ticker}. Columns: {df.columns.tolist()[:30]}")
col_px = _detect_col(df.columns, ["adj_close", "adjclose", "adjusted_close", "Adj Close", "AdjClose"])
if col_px is None:
col_px = _detect_col(df.columns, ["close", "px_last", "last", "price"])
if col_px is None:
raise ValueError(f"Price column not found for {ticker}. Columns: {df.columns.tolist()[:30]}")
col_open = _detect_col(df.columns, ["open", "open_price", "px_open"])
df[col_date] = pd.to_datetime(df[col_date], errors="coerce", utc=True).dt.tz_localize(None)
df[col_px] = pd.to_numeric(df[col_px], errors="coerce")
if col_open is not None:
df[col_open] = pd.to_numeric(df[col_open], errors="coerce")
df = df.dropna(subset=[col_date, col_px]).sort_values(col_date)
df = df.drop_duplicates(subset=[col_date]).set_index(col_date)
idx_norm = pd.to_datetime(df.index).normalize()
df.index = idx_norm
df = df[~df.index.duplicated(keep="last")]
out = pd.DataFrame(index=df.index)
out["AdjClose"] = df[col_px].astype(float)
if col_open is not None:
out["Open"] = df[col_open].astype(float)
out.index.name = "Date"
return out.sort_index()
# ------------------------------------------------------------
# Hurst (theta_entry = H/100)
# ------------------------------------------------------------
def hurst_rs_returns(r, win_grid=None, min_seg=1):
r = pd.Series(r).dropna().astype("float64").values
n = len(r)
if n < 200:
return np.nan
if win_grid is None:
base = np.array([16, 24, 32, 48, 64, 96, 128, 192, 256, 384], dtype=int)
win_grid = [w for w in base if w <= n // 2]
RS_vals, sizes = [], []
for w in win_grid:
m = n // w
if w < 8 or m < min_seg:
continue
rs_list = []
for i in range(m):
seg = r[i*w:(i+1)*w]
seg = seg - np.mean(seg)
sd = seg.std(ddof=1)
if sd == 0 or not np.isfinite(sd):
continue
y = np.cumsum(seg)
rs = (np.max(y) - np.min(y)) / sd
if np.isfinite(rs) and rs > 0:
rs_list.append(rs)
if rs_list:
RS_vals.append(np.mean(rs_list))
sizes.append(w)
if len(RS_vals) < 3:
return np.nan
sizes = np.array(sizes, float)
RS_vals = np.array(RS_vals, float)
mask = np.isfinite(RS_vals) & (RS_vals > 0)
sizes, RS_vals = sizes[mask], RS_vals[mask]
if sizes.size < 3:
return np.nan
slope, _ = np.polyfit(np.log(sizes), np.log(RS_vals), 1)
return float(np.clip(slope, 0.0, 1.0)) if np.isfinite(slope) else np.nan
def hurst_dfa_returns(r, win_grid=None):
r = pd.Series(r).dropna().astype("float64").values
n = len(r)
if n < 200:
return np.nan
y = np.cumsum(r - np.mean(r))
if win_grid is None:
base = np.array([16, 24, 32, 48, 64, 96, 128, 192, 256], dtype=int)
win_grid = [w for w in base if w <= n // 2]
F_vals, sizes = [], []
for s in win_grid:
m = n // s
if s < 8 or m < 2:
continue
rms_list = []
for i in range(m):
seg = y[i*s:(i+1)*s]
t = np.arange(s, dtype=float)
A = np.vstack([t, np.ones(s)]).T
coeff, *_ = np.linalg.lstsq(A, seg, rcond=None)
detr = seg - (A @ coeff)
rms = np.sqrt(np.mean(detr**2))
if np.isfinite(rms) and rms > 0:
rms_list.append(rms)
if rms_list:
F_vals.append(np.mean(rms_list))
sizes.append(s)
if len(F_vals) < 3:
return np.nan
sizes = np.array(sizes, float)
F_vals = np.array(F_vals, float)
mask = np.isfinite(F_vals) & (F_vals > 0)
sizes, F_vals = sizes[mask], F_vals[mask]
if sizes.size < 3:
return np.nan
slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1)
return float(np.clip(slope, 0.0, 1.0)) if np.isfinite(slope) else np.nan
# ------------------------------------------------------------
# Backtest (uses shared_utils) no look-ahead in pattern library
# ------------------------------------------------------------
def forward_backtest_one_asset(
r_pct: pd.Series,
theta_entry: float,
exec_ret: pd.Series | None = None,
weak_days_exit: int | None = None
) -> pd.DataFrame:
"""
r_pct: percent log returns series, indexed by Date.
Uses only past returns to build library at each time t: past = r[:t]
PnL uses Ret+1 (forward EOD style).
"""
r = (r_pct / 100.0).astype(float) # decimals (close/close for segnale)
idx = r.index
if exec_ret is not None:
r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float)
r_exec.index = pd.to_datetime(r_exec.index)
r_exec = r_exec.reindex(idx)
else:
r_exec = r
in_pos = False
entry_t = None
trade_pnl = 0.0
trade_peak = 0.0
weak_streak = 0
rows = []
for t in range(WP, len(r) - 1):
past = r.iloc[:t]
if past.dropna().shape[0] < (WP + HA):
rows.append((idx[t], 0, np.nan, np.nan, float(r_exec.iloc[t+1])))
continue
lib_wins, lib_out = build_pattern_library(past, WP, HA)
if lib_wins is None or lib_out is None or len(lib_out) == 0:
rows.append((idx[t], 0, np.nan, np.nan, float(r_exec.iloc[t+1])))
continue
win_last = r.iloc[t-WP:t].values
curr_zn = z_norm(win_last)
if curr_zn is None:
rows.append((idx[t], 1 if in_pos else 0, np.nan, np.nan, float(r_exec.iloc[t+1])))
continue
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K)
est_out = float(est_out)
avg_dist = float(avg_dist)
sig = 1 if in_pos else 0
# ENTRY
if (not in_pos) and (est_out > theta_entry):
in_pos = True
entry_t = t
trade_pnl = 0.0
trade_peak = 0.0
sig = 1
weak_streak = 0
# EXIT checks (EOD forward approximation)
elif in_pos:
next_ret = float(r_exec.iloc[t+1])
pnl_if_stay = (1.0 + trade_pnl) * (1.0 + next_ret) - 1.0
peak_if_stay = max(trade_peak, pnl_if_stay)
exit_now = False
if SL_BPS is not None and pnl_if_stay <= -SL_BPS / 10000.0:
exit_now = True
if TP_BPS is not None and pnl_if_stay >= TP_BPS / 10000.0:
exit_now = True
if TRAIL_BPS is not None and (peak_if_stay - pnl_if_stay) >= TRAIL_BPS / 10000.0:
exit_now = True
if TIME_STOP_BARS is not None and entry_t is not None and (t - entry_t + 1) >= TIME_STOP_BARS:
exit_now = True
if THETA_EXIT is not None:
if est_out <= THETA_EXIT:
if weak_days_exit is None:
exit_now = True
else:
weak_streak += 1
if weak_streak >= weak_days_exit:
exit_now = True
else:
weak_streak = 0
if exit_now:
in_pos = False
entry_t = None
trade_pnl = 0.0
trade_peak = 0.0
weak_streak = 0
sig = 0
else:
trade_pnl = pnl_if_stay
trade_peak = peak_if_stay
sig = 1
rows.append((idx[t], sig, est_out, avg_dist, float(r_exec.iloc[t+1])))
df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"]).set_index("Date")
fee = FEE_BPS / 10000.0
trade_chg = df["Signal"].diff().abs().fillna(0.0)
df["PnL"] = df["Signal"] * df["Ret+1"] - trade_chg * fee
return df
# ------------------------------------------------------------
# Metrics / utilities (aligned with v3.1.6 approach)
# ------------------------------------------------------------
def equity_from_returns(r: pd.Series) -> pd.Series:
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
return (1 + r).cumprod() * 100
def drawdown_stats_simple(ret_series: pd.Series) -> dict:
# Metriche geometriche coerenti con l'equity di portafoglio
ret_series = pd.to_numeric(ret_series, errors="coerce").fillna(0.0)
eq = (1 + ret_series).cumprod()
if eq.empty:
return {"CAGR_%": np.nan, "AnnVol_%": np.nan, "Sharpe": np.nan, "MaxDD_%eq": np.nan, "Calmar": np.nan}
roll_max = eq.cummax()
dd = eq / roll_max - 1.0
maxdd = float(dd.min()) if len(dd) else np.nan
cagr = (eq.iloc[-1] / eq.iloc[0]) ** (DAYS_PER_YEAR / max(1, len(ret_series))) - 1
annvol = ret_series.std() * np.sqrt(DAYS_PER_YEAR)
sharpe = (ret_series.mean() / (ret_series.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR)
calmar = (cagr / abs(maxdd)) if (maxdd is not None and maxdd < 0) else np.nan
return {
"CAGR_%": round(cagr * 100, 2) if np.isfinite(cagr) else np.nan,
"AnnVol_%": round(annvol * 100, 2) if np.isfinite(annvol) else np.nan,
"Sharpe": round(float(sharpe), 2) if np.isfinite(sharpe) else np.nan,
"MaxDD_%eq": round(maxdd * 100, 2) if np.isfinite(maxdd) else np.nan,
"Calmar": round(float(calmar), 2) if np.isfinite(calmar) else np.nan
}
def heal_index_metrics(returns: pd.Series):
"""
Calcola:
- AAW: area sopra acqua (run-up vs minimo cumulato)
- AUW: area sotto acqua (drawdown vs massimo cumulato)
- Heal Index: (AAW - AUW) / AUW
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
def h_min_100(returns: pd.Series, month_len: int = 21):
"""
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
"""
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
eq = (1 + s).cumprod()
best = None
for h in range(1, n + 1):
roll = eq / eq.shift(h)
roll = roll.dropna()
if (roll >= 1.0).all():
best = h
break
if best is None:
return np.nan, np.nan
return best, int(np.ceil(best / month_len))
def monthly_returns(r: pd.Series) -> pd.Series:
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
return (1 + r).resample("M").prod() - 1
def plot_heatmap_monthly(r: pd.Series, title: str):
m = monthly_returns(r)
df = m.to_frame("ret")
df["Year"], df["Month"] = df.index.year, df.index.month
pv = df.pivot(index="Year", columns="Month", values="ret")
fig, ax = plt.subplots(figsize=(10, 6))
im = ax.imshow(pv.fillna(0) * 100, aspect="auto")
for i in range(pv.shape[0]):
for j in range(pv.shape[1]):
val = pv.iloc[i, j]
if not np.isnan(val):
ax.text(j, i, f"{val*100:.1f}", ha="center", va="center", fontsize=8)
ax.set_title(title)
ax.set_xlabel("Mese")
ax.set_ylabel("Anno")
ax.set_xticks(range(12))
ax.set_xticklabels(range(1, 13))
fig.colorbar(im, ax=ax, label="%")
plt.tight_layout()
return fig
def _portfolio_metric_row(name: str, r: pd.Series) -> dict:
r = pd.to_numeric(r, errors="coerce").fillna(0.0)
if r.empty:
return {
"Portfolio": name, "CAGR_%": np.nan, "MaxDD_%": np.nan, "Sharpe": np.nan,
"Heal_Index": np.nan, "AAW": np.nan, "AUW": np.nan,
"H100_min_days": np.nan, "H100_min_months": np.nan
}
eq = (1 + r).cumprod()
cagr = (eq.iloc[-1] / eq.iloc[0]) ** (DAYS_PER_YEAR / max(1, len(r))) - 1
maxdd = (eq / eq.cummax() - 1.0).min()
sharpe = (r.mean() / (r.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR)
aaw, auw, heal = heal_index_metrics(r)
h_days, h_months = h_min_100(r, month_len=21)
return {
"Portfolio": name,
"CAGR_%": round(float(cagr) * 100, 2) if np.isfinite(cagr) else np.nan,
"MaxDD_%": round(float(maxdd) * 100, 2) if np.isfinite(maxdd) else np.nan,
"Sharpe": round(float(sharpe), 2) if np.isfinite(sharpe) else np.nan,
"Heal_Index": round(float(heal), 4) if np.isfinite(heal) else np.nan,
"AAW": round(float(aaw), 4) if np.isfinite(aaw) else np.nan,
"AUW": round(float(auw), 4) if np.isfinite(auw) else np.nan,
"H100_min_days": h_days,
"H100_min_months": h_months
}
def save_portfolio_metrics(ret_eq: pd.Series, ret_rp: pd.Series, path: Path, top_n: int):
"""Salva metriche EqW/RP in Excel; fallback CSV se engine Excel mancante."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
rows = [
_portfolio_metric_row(f"EqW_Top{top_n}", ret_eq),
_portfolio_metric_row(f"RP_Top{top_n}", ret_rp),
]
df = pd.DataFrame(rows)
try:
df.to_excel(path, index=False)
print(f"[INFO] Salvato: {path.resolve()}")
except Exception as e:
alt = path.with_suffix(".csv")
df.to_csv(alt, index=False)
print(f"[WARN] to_excel fallita ({e}), salvato CSV: {alt.resolve()}")
def inverse_vol_weights(df: pd.DataFrame, window=60, max_weight=None) -> pd.DataFrame:
"""Faithful to v3.1.6: inv-vol weights normalized per day, then clipped (no renorm after clip)."""
vol = df.rolling(window).std()
inv = 1 / vol.replace(0, np.nan)
w = inv.div(inv.sum(axis=1), axis=0)
w = w.ffill().fillna(1 / max(1, df.shape[1]))
if max_weight is not None:
w = w.clip(upper=max_weight)
return w
def make_active_weights(
w_target: pd.DataFrame,
wide_sig: pd.DataFrame,
renorm_to_1: bool = False,
add_cash: bool = True,
cash_label: str = "Cash"
) -> pd.DataFrame:
"""
Applica il mask dei segnali ai pesi target; opzionalmente rinormalizza e aggiunge Cash.
"""
if w_target is None or w_target.empty:
return pd.DataFrame()
all_dates = w_target.index
all_cols = list(w_target.columns)
res = pd.DataFrame(0.0, index=all_dates, columns=all_cols)
for dt in all_dates:
wt = w_target.loc[dt].copy()
sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float)
mask = sig_row.reindex(all_cols).fillna(0)
wt = wt * (mask == 1)
if renorm_to_1:
s = wt.sum()
wt = wt / s if s > 0 else wt
res.loc[dt, wt.index] = wt.values
if add_cash:
cash = 1.0 - res.sum(axis=1)
res[cash_label] = cash.clip(lower=0.0)
return res
def _build_dynamic_portfolio_returns(
wide_pnl: pd.DataFrame,
wide_sig: pd.DataFrame,
wide_est: pd.DataFrame,
top_n: int,
window_bars: int = RANKING_WINDOW_BARS,
rp_lookback: int = RP_LOOKBACK
) -> dict:
if wide_pnl is None or wide_pnl.empty:
idx = pd.Index([])
empty_w = pd.DataFrame(index=idx, columns=[])
return {
"ret_eq": pd.Series(dtype=float),
"ret_rp": pd.Series(dtype=float),
"w_eq": empty_w,
"w_rp": empty_w,
"w_eq_act": empty_w,
"w_rp_act": empty_w,
"selection": {}
}
dates = wide_pnl.index.sort_values()
all_cols = wide_pnl.columns.tolist()
w_eq = pd.DataFrame(0.0, index=dates, columns=all_cols)
w_rp = pd.DataFrame(0.0, index=dates, columns=all_cols)
selection = {}
for dt in dates:
sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float)
on_cols = [c for c in all_cols if sig_row.get(c, 0) == 1]
if not on_cols:
selection[dt] = []
continue
window_est = wide_est.loc[:dt].tail(window_bars) if not wide_est.empty else pd.DataFrame()
scores = []
for c in on_cols:
s = pd.to_numeric(window_est[c], errors="coerce") if c in window_est.columns else pd.Series(dtype=float)
est_score = s.mean(skipna=True)
if pd.isna(est_score):
continue
scores.append((c, est_score))
if not scores:
selection[dt] = []
continue
scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True)
base_cols = [c for c, _ in scores_sorted[:top_n]]
selection[dt] = base_cols
if not base_cols:
continue
w_eq.loc[dt, base_cols] = 1 / len(base_cols)
window_pnl = wide_pnl.loc[:dt].tail(window_bars)
rp_hist = window_pnl[base_cols]
rp_w = inverse_vol_weights(rp_hist, window=rp_lookback, max_weight=RP_MAX_WEIGHT)
if not rp_w.empty:
last = rp_w.iloc[-1].fillna(0.0)
last_sum = float(last.sum())
if last_sum > 0:
last = last / last_sum
w_rp.loc[dt, last.index] = last.values
w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash")
ret_eq = (wide_pnl * w_eq_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
ret_rp = (wide_pnl * w_rp_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1)
return {
"ret_eq": ret_eq,
"ret_rp": ret_rp,
"w_eq": w_eq,
"w_rp": w_rp,
"w_eq_act": w_eq_act,
"w_rp_act": w_rp_act,
"selection": selection
}
# ------------------------------------------------------------
# Score calibration (copied logic from v3.1.6)
# ------------------------------------------------------------
def _safe_rank_ser(s: pd.Series) -> pd.Series:
"""Rank robusto (0..1), gestisce NaN."""
s = s.copy()
denom = s.notna().sum()
if denom <= 1:
return pd.Series(np.nan, index=s.index)
return s.rank(method="average", na_option="keep") / denom
def _winsorize(s: pd.Series, p=0.005):
s = s.astype(float).copy()
lo, hi = s.quantile(p), s.quantile(1 - p)
return s.clip(lower=lo, upper=hi)
def _corr_shrink(C: np.ndarray, alpha: float = 0.10) -> np.ndarray:
"""Shrink correlation/covariance toward identity for stability."""
C = np.asarray(C, dtype=float)
k = C.shape[0]
I = np.eye(k)
# ensure symmetric
C = 0.5 * (C + C.T)
return (1 - alpha) * C + alpha * I
def _pos_normalize(w: np.ndarray) -> np.ndarray:
w = np.asarray(w, dtype=float)
w = np.where(np.isfinite(w), w, 0.0)
w = np.maximum(w, 0.0)
s = w.sum()
if s <= 0:
return np.ones_like(w) / len(w)
return w / s
def calibrate_score_weights(
df_sum: pd.DataFrame,
metrics_map=None,
target_col: str | None = None,
k_folds: int = 5,
shrink_equal: float = 0.25,
corr_shrink: float = 0.10
):
"""
metrics_map: lista di tuple (colname, good_is_high)
target_col: se None => unsupervised_erc (used in v3.1.6)
Returns: dict with 'weights' (pd.Series), 'X_ranked' (DataFrame), 'mode'
"""
if metrics_map is None or len(metrics_map) == 0:
raise ValueError("metrics_map vuoto: non posso calibrare Score.")
# Build ranked feature matrix X
X_cols = []
X = pd.DataFrame(index=df_sum.index)
for col, good_high in metrics_map:
if col not in df_sum.columns:
continue
s = pd.to_numeric(df_sum[col], errors="coerce")
s = _winsorize(s)
# invert if good is low
if not good_high:
s = -s
X[col] = _safe_rank_ser(s)
X_cols.append(col)
X = X.loc[:, X.columns[X.notna().sum(0) > 0]]
k = X.shape[1]
if k == 0:
raise ValueError("Nessuna metrica valida per la calibrazione.")
# Unsupervised ERC (allineato alla versione non-stocks)
if target_col is None or target_col not in df_sum.columns:
Xv = np.nan_to_num(X.values, nan=np.nanmean(X.values))
C = np.cov(Xv, rowvar=False)
C = _corr_shrink(C, alpha=corr_shrink)
vol = np.sqrt(np.clip(np.diag(C), 1e-12, None))
w0 = 1.0 / vol
w = _pos_normalize(w0)
return {
"mode": "unsupervised_erc",
"weights": pd.Series(w, index=X.columns, name="weight"),
"X_ranked": X
}
# (Supervised path not used here, but kept for completeness)
y = pd.to_numeric(df_sum[target_col], errors="coerce")
y = _winsorize(y)
y_rank = _safe_rank_ser(y)
mask = y_rank.notna() & X.notna().any(1)
Xf, yf = X[mask].copy(), y_rank[mask].copy()
if len(Xf) < 30:
# fallback: unsupervised
Xv = np.nan_to_num(X.values, nan=np.nanmean(X.values))
C = np.cov(Xv, rowvar=False)
C = _corr_shrink(C, alpha=corr_shrink)
vol = np.sqrt(np.clip(np.diag(C), 1e-12, None))
w0 = 1.0 / vol
w = _pos_normalize(w0)
return {
"mode": "unsupervised_erc_fallback",
"weights": pd.Series(w, index=X.columns, name="weight"),
"X_ranked": X
}
# Simple supervised: corr with target, whiten by covariance
Xv = np.nan_to_num(Xf.values, nan=np.nanmean(Xf.values))
C = np.cov(Xv, rowvar=False)
C = _corr_shrink(C, alpha=corr_shrink)
ic = np.array([pd.Series(Xf.iloc[:, j]).corr(yf, method="spearman") for j in range(Xf.shape[1])], dtype=float)
ic = np.nan_to_num(ic, nan=0.0)
try:
w_raw = np.linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic)
except Exception:
w_raw = ic.copy()
w = _pos_normalize(w_raw)
w = (1 - shrink_equal) * w + shrink_equal * np.ones_like(w) / len(w)
w = _pos_normalize(w)
return {
"mode": "supervised_icSigmaInv",
"weights": pd.Series(w, index=X.columns, name="weight"),
"X_ranked": X
}
# ------------------------------------------------------------
# MAIN
# ------------------------------------------------------------
def main():
# 1) Fetch prices
prices = {}
for tkr in TICKERS:
print(f"Fetching {tkr} ...")
try:
prices[tkr] = fetch_price_series(tkr, FROM_DATE)
except Exception as e:
print(f"[WARN] Skip {tkr}: {e}")
if len(prices) < 5:
raise RuntimeError(f"Pochi ticker validi ({len(prices)}). Controlla TICKERS e/o endpoint.")
# 2) Backtest each ticker
hurst_rows = []
summary_rows = []
signals_rows = []
for tkr, px in prices.items():
if not isinstance(px, pd.DataFrame) or "AdjClose" not in px.columns:
print(f"[WARN] Serie senza AdjClose per {tkr}: skip")
continue
close = pd.to_numeric(px["AdjClose"], errors="coerce")
open_px = pd.to_numeric(px.get("Open"), errors="coerce") if "Open" in px.columns else None
r_dec = np.log(close / close.shift(1)).dropna()
if len(r_dec) < (WP + HA + 50):
print(f"[WARN] Serie troppo corta per {tkr} (len={len(r_dec)}): skip")
continue
r_pct = (r_dec * 100.0).rename("Ret") # percent log returns
exec_ret = None
if open_px is not None:
exec_ret = open_px.pct_change()
exec_ret.index = close.index
h_rs = hurst_rs_returns(r_dec)
h_dfa = hurst_dfa_returns(r_dec)
H = np.nanmedian([h_rs, h_dfa])
H = float(H) if np.isfinite(H) else np.nan
theta_entry = (H / 100.0) if np.isfinite(H) else THETA_FALLBACK
hurst_rows.append({"Ticker": tkr, "Hurst": H, "theta_entry": theta_entry})
sig_df = forward_backtest_one_asset(r_pct, theta_entry=theta_entry, exec_ret=exec_ret)
sig_df = sig_df.copy()
sig_df.insert(0, "Ticker", tkr)
signals_rows.append(sig_df.reset_index())
# Per-ticker summary metrics (like v3.1.6)
stats = drawdown_stats_simple(sig_df["PnL"])
hit = 100.0 * ((sig_df["PnL"] > 0).sum() / max(1, sig_df["PnL"].notna().sum()))
turnover = 100.0 * sig_df["Signal"].diff().abs().fillna(0.0).mean()
stats.update({
"Ticker": tkr,
"HitRate_%": round(float(hit), 2),
"AvgTradeRet_bps": round(float(sig_df["PnL"].mean() * 10000), 2),
"Turnover_%/step": round(float(turnover), 2),
"N_Steps": int(sig_df.shape[0]),
"theta_entry": float(theta_entry),
"theta_exit": (None if THETA_EXIT is None else float(THETA_EXIT)),
"sl_bps": (None if SL_BPS is None else float(SL_BPS)),
"tp_bps": (None if TP_BPS is None else float(TP_BPS)),
"trail_bps": (None if TRAIL_BPS is None else float(TRAIL_BPS)),
"time_stop_bars": (None if TIME_STOP_BARS is None else int(TIME_STOP_BARS)),
})
summary_rows.append(stats)
if not signals_rows:
raise RuntimeError("Nessun ticker backtestato con successo.")
hurst_df = pd.DataFrame(hurst_rows).sort_values("Ticker").reset_index(drop=True)
forward_bt_summary = pd.DataFrame(summary_rows).sort_values("Ticker").reset_index(drop=True)
forward_bt_signals = pd.concat(signals_rows, ignore_index=True)
forward_bt_signals["Date"] = pd.to_datetime(forward_bt_signals["Date"]).dt.normalize()
# 3) Build Score + select Top-15 (faithful to v3.1.6)
df_sum = forward_bt_summary.copy()
def _coerce_num(s: pd.Series) -> pd.Series:
return pd.to_numeric(s, errors="coerce").replace([np.inf, -np.inf], np.nan)
for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%",
"QualityScore","Confidence","OutcomeScore"]:
if c in df_sum.columns:
df_sum[c] = _coerce_num(df_sum[c])
primary_cols = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)]
alt_cols = [("QualityScore", True), ("Confidence", True), ("OutcomeScore", True)]
mm = [(c,gh) for (c,gh) in primary_cols if c in df_sum.columns and df_sum[c].notna().sum() > 0]
if len(mm) < 2:
mm = [(c,gh) for (c,gh) in alt_cols if c in df_sum.columns and df_sum[c].notna().sum() > 0]
if len(mm) < 2:
union_candidates = list({x[0] for x in primary_cols + alt_cols})
mm = [(c, True) for c in union_candidates if (c in df_sum.columns and df_sum[c].notna().sum() > 0)]
if len(mm) == 0:
print("[WARN] Nessuna metrica numerica disponibile: uso Score=0 e ordino per Ticker.")
df_sum["Score"] = 0.0
df_sum["Score_mode"] = "degenerate_equal"
else:
res = calibrate_score_weights(df_sum, metrics_map=mm, target_col=None)
X_ranked = res["X_ranked"]
w = res["weights"]
df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1)
df_sum["Score_mode"] = res["mode"]
print("Pesi stimati automaticamente (metriche usate):")
print(w)
df_sum = df_sum.sort_values("Score", ascending=False).reset_index(drop=True)
base_tickers = df_sum.head(TOP_N)["Ticker"].astype(str).str.strip().tolist()
print(f"Tickers selezionati dinamicamente (Top{TOP_N}): {base_tickers}")
# 4) Portafogli dinamici (allineati alla versione non-stocks)
wide_pnl = forward_bt_signals.pivot_table(index="Date", columns="Ticker", values="PnL", aggfunc="sum").fillna(0.0)
wide_sig = forward_bt_signals.pivot_table(index="Date", columns="Ticker", values="Signal", aggfunc="last").fillna(0).astype(int)
wide_est = forward_bt_signals.pivot_table(index="Date", columns="Ticker", values="EstOutcome", aggfunc="last").sort_index()
dyn_port = _build_dynamic_portfolio_returns(
wide_pnl=wide_pnl,
wide_sig=wide_sig,
wide_est=wide_est,
top_n=TOP_N,
window_bars=RANKING_WINDOW_BARS,
rp_lookback=RP_LOOKBACK
)
ret_eq = dyn_port["ret_eq"].rename("Ret_EqW_TopN")
ret_rp = dyn_port["ret_rp"].rename("Ret_RP_TopN")
eq_eq = equity_from_returns(ret_eq).rename("Eq_EqW_TopN")
eq_rp = equity_from_returns(ret_rp).rename("Eq_RP_TopN")
# 5) Plots
plt.figure(figsize=(10, 5))
plt.plot(eq_eq, label=f"Equal Weight (Top{TOP_N})")
plt.plot(eq_rp, label=f"Risk Parity (Top{TOP_N}, cap {RP_MAX_WEIGHT:.4f})")
plt.title(f"Equity line portafogli (base 100) Top{TOP_N} (v3.1.6 style)")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
plot_heatmap_monthly(ret_eq, f"Heatmap mensile Equal Weight (Top{TOP_N})")
plt.show()
plot_heatmap_monthly(ret_rp, f"Heatmap mensile Risk Parity (Top{TOP_N})")
plt.show()
# 6) Save outputs
hurst_df.to_csv(OUT_DIR / "hurst.csv", index=False)
forward_bt_summary.to_csv(OUT_DIR / "forward_bt_summary.csv", index=False)
forward_bt_signals.to_csv(OUT_DIR / "forward_bt_signals.csv", index=False)
pd.concat([ret_eq, ret_rp, eq_eq, eq_rp], axis=1).to_csv(OUT_DIR / "portfolio_daily.csv")
df_sum.to_csv(OUT_DIR / "ranking_score.csv", index=False)
pd.Series(base_tickers, name="TopN_Tickers").to_csv(OUT_DIR / "topn_tickers.csv", index=False)
save_portfolio_metrics(ret_eq, ret_rp, OUT_DIR / "portfolio_metrics.xlsx", TOP_N)
print(f"\nSaved to: {OUT_DIR.resolve()}")
if __name__ == "__main__":
main()