# -*- coding: utf-8 -*- """ Trading Pattern Recon w Hurst - Stocks (corrected to match v3.1.6 logic) Key points (faithful to v3.1.6): - Uses shared_utils.py for: build_pattern_library, z_norm, predict_from_library - Pulls history from https://fin.scorer.app/finance/v2/history/?fromDate=YYYYMMDD - Builds per-ticker forward backtest (EOD, uses Ret+1) - Computes per-ticker summary metrics and a dynamic Score (same calibrate_score_weights logic) - Selects TOP_N = 15 tickers by Score - Builds portfolios ONLY on Top-15: * Equal Weight * Risk Parity (inverse-vol weights on strategy PnL, 60d window) with cap = 2/TOP_N (cap applicata con rinormalizzazione, come nella versione non-stocks) """ import sys import types from pathlib import Path from urllib.parse import quote import numpy as np import pandas as pd import matplotlib.pyplot as plt import requests # ------------------------------------------------------------ # shared_utils import (local file next to this script) # ------------------------------------------------------------ # If shared_utils imports optional deps you don't have (e.g. pyodbc), monkeypatch: sys.modules["pyodbc"] = types.SimpleNamespace() import importlib.util SHARED_UTILS_PATH = Path(__file__).with_name("shared_utils.py") spec = importlib.util.spec_from_file_location("shared_utils", str(SHARED_UTILS_PATH)) shared_utils = importlib.util.module_from_spec(spec) sys.modules["shared_utils"] = shared_utils spec.loader.exec_module(shared_utils) build_pattern_library = shared_utils.build_pattern_library predict_from_library = shared_utils.predict_from_library z_norm = shared_utils.z_norm # ------------------------------------------------------------ # CONFIG # ------------------------------------------------------------ TICKERS = [ "NVDA US Equity", "AAPL US Equity", "GOOGL US Equity", "MSFT US Equity", "AMZN US Equity", "META US Equity", "AVGO US Equity", "TSLA US Equity", "BRK/B US Equity", "LLY US Equity", "JPM US Equity", "WMT US Equity", "V US Equity", "ORCL US Equity", "MA US Equity", "XOM US Equity", "JNJ US Equity", "PLTR US Equity", "NFLX US Equity", "BAC US Equity", "ABBV US Equity", "COST US Equity", "AMD US Equity", "HD US Equity", "PG US Equity", "GE US Equity", "MU US Equity", "CSCO US Equity", "CVX US Equity", "KO US Equity", "WFC US Equity", "UNH US Equity", "MS US Equity", "IBM US Equity", "CAT US Equity", "GS US Equity", "MRK US Equity", "AXP US Equity", "PM US Equity", "RTX US Equity", "CRM US Equity", "APP US Equity", "MCD US Equity", "LRCX US Equity", "TMUS US Equity", "TMO US Equity", "C US Equity", "ABT US Equity", "AMAT US Equity", "ISRG US Equity" ] # Filter out non-strings (e.g. accidental "..." / Ellipsis) TICKERS = [t.strip() for t in TICKERS if isinstance(t, str) and str(t).strip()] BASE_URL = "https://fin.scorer.app/finance/v2/history" FROM_DATE = "20201224" # Strategy params WP = 60 HA = 10 KNN_K = 25 # Entry threshold derived from Hurst THETA_FALLBACK = 0.005 # Costs FEE_BPS = 10 # Exit controls (EOD forward approximation) SL_BPS = 300.0 TP_BPS = 800.0 TRAIL_BPS = 300.0 TIME_STOP_BARS = 20 THETA_EXIT = 0.0 # Portfolio construction (faithful to v3.1.6) TOP_N = 15 RP_MAX_WEIGHT = 2.0 / TOP_N # cap per-asset weight in RP RANKING_WINDOW_BARS = 252 RP_LOOKBACK = 60 DAYS_PER_YEAR = 252 OUT_DIR = Path("./out_stocks_usa") PLOT_DIR = Path("./plot_stocks_usa") OUT_DIR.mkdir(parents=True, exist_ok=True) PLOT_DIR.mkdir(parents=True, exist_ok=True) # ------------------------------------------------------------ # Data loading from URL (same schema as your previous JSON) # ------------------------------------------------------------ def _detect_col(cols, candidates): cols_l = {c.lower(): c for c in cols} for cand in candidates: if cand.lower() in cols_l: return cols_l[cand.lower()] for cand in candidates: for c in cols: if cand.lower() in c.lower(): return c return None def fetch_price_series(ticker: str, from_date: str) -> pd.DataFrame: """ Downloads JSON from: https://fin.scorer.app/finance/v2/history/?fromDate=YYYYMMDD Assumes schema like the JSON you used previously: - payload is list with single wrapper dict containing "data": [...] - "data" is list of rows with date + close/adj_close Returns a Series AdjClose indexed by Date. """ url = f"{BASE_URL}/{quote(ticker)}?fromDate={from_date}" r = requests.get(url, timeout=30) r.raise_for_status() obj = r.json() # unwrap: [{"ticker":..., "data":[...], ...}] if isinstance(obj, list) and len(obj) == 1 and isinstance(obj[0], dict) and "data" in obj[0]: obj = obj[0]["data"] if not isinstance(obj, list): raise ValueError(f"Unexpected JSON schema for {ticker}: {type(obj)}") df = pd.DataFrame(obj) if df.empty: raise ValueError(f"No rows returned for {ticker}") col_date = _detect_col(df.columns, ["date", "datetime", "timestamp", "time"]) if col_date is None: raise ValueError(f"Date column not found for {ticker}. Columns: {df.columns.tolist()[:30]}") col_px = _detect_col(df.columns, ["adj_close", "adjclose", "adjusted_close", "Adj Close", "AdjClose"]) if col_px is None: col_px = _detect_col(df.columns, ["close", "px_last", "last", "price"]) if col_px is None: raise ValueError(f"Price column not found for {ticker}. Columns: {df.columns.tolist()[:30]}") col_open = _detect_col(df.columns, ["open", "open_price", "px_open"]) df[col_date] = pd.to_datetime(df[col_date], errors="coerce", utc=True).dt.tz_localize(None) df[col_px] = pd.to_numeric(df[col_px], errors="coerce") if col_open is not None: df[col_open] = pd.to_numeric(df[col_open], errors="coerce") df = df.dropna(subset=[col_date, col_px]).sort_values(col_date) df = df.drop_duplicates(subset=[col_date]).set_index(col_date) idx_norm = pd.to_datetime(df.index).normalize() df.index = idx_norm df = df[~df.index.duplicated(keep="last")] out = pd.DataFrame(index=df.index) out["AdjClose"] = df[col_px].astype(float) if col_open is not None: out["Open"] = df[col_open].astype(float) out.index.name = "Date" return out.sort_index() # ------------------------------------------------------------ # Hurst (theta_entry = H/100) # ------------------------------------------------------------ def hurst_rs_returns(r, win_grid=None, min_seg=1): r = pd.Series(r).dropna().astype("float64").values n = len(r) if n < 200: return np.nan if win_grid is None: base = np.array([16, 24, 32, 48, 64, 96, 128, 192, 256, 384], dtype=int) win_grid = [w for w in base if w <= n // 2] RS_vals, sizes = [], [] for w in win_grid: m = n // w if w < 8 or m < min_seg: continue rs_list = [] for i in range(m): seg = r[i*w:(i+1)*w] seg = seg - np.mean(seg) sd = seg.std(ddof=1) if sd == 0 or not np.isfinite(sd): continue y = np.cumsum(seg) rs = (np.max(y) - np.min(y)) / sd if np.isfinite(rs) and rs > 0: rs_list.append(rs) if rs_list: RS_vals.append(np.mean(rs_list)) sizes.append(w) if len(RS_vals) < 3: return np.nan sizes = np.array(sizes, float) RS_vals = np.array(RS_vals, float) mask = np.isfinite(RS_vals) & (RS_vals > 0) sizes, RS_vals = sizes[mask], RS_vals[mask] if sizes.size < 3: return np.nan slope, _ = np.polyfit(np.log(sizes), np.log(RS_vals), 1) return float(np.clip(slope, 0.0, 1.0)) if np.isfinite(slope) else np.nan def hurst_dfa_returns(r, win_grid=None): r = pd.Series(r).dropna().astype("float64").values n = len(r) if n < 200: return np.nan y = np.cumsum(r - np.mean(r)) if win_grid is None: base = np.array([16, 24, 32, 48, 64, 96, 128, 192, 256], dtype=int) win_grid = [w for w in base if w <= n // 2] F_vals, sizes = [], [] for s in win_grid: m = n // s if s < 8 or m < 2: continue rms_list = [] for i in range(m): seg = y[i*s:(i+1)*s] t = np.arange(s, dtype=float) A = np.vstack([t, np.ones(s)]).T coeff, *_ = np.linalg.lstsq(A, seg, rcond=None) detr = seg - (A @ coeff) rms = np.sqrt(np.mean(detr**2)) if np.isfinite(rms) and rms > 0: rms_list.append(rms) if rms_list: F_vals.append(np.mean(rms_list)) sizes.append(s) if len(F_vals) < 3: return np.nan sizes = np.array(sizes, float) F_vals = np.array(F_vals, float) mask = np.isfinite(F_vals) & (F_vals > 0) sizes, F_vals = sizes[mask], F_vals[mask] if sizes.size < 3: return np.nan slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1) return float(np.clip(slope, 0.0, 1.0)) if np.isfinite(slope) else np.nan # ------------------------------------------------------------ # Backtest (uses shared_utils) – no look-ahead in pattern library # ------------------------------------------------------------ def forward_backtest_one_asset( r_pct: pd.Series, theta_entry: float, exec_ret: pd.Series | None = None, weak_days_exit: int | None = None ) -> pd.DataFrame: """ r_pct: percent log returns series, indexed by Date. Uses only past returns to build library at each time t: past = r[:t] PnL uses Ret+1 (forward EOD style). """ r = (r_pct / 100.0).astype(float) # decimals (close/close for segnale) idx = r.index if exec_ret is not None: r_exec = pd.to_numeric(exec_ret, errors="coerce").astype(float) r_exec.index = pd.to_datetime(r_exec.index) r_exec = r_exec.reindex(idx) else: r_exec = r in_pos = False entry_t = None trade_pnl = 0.0 trade_peak = 0.0 weak_streak = 0 rows = [] for t in range(WP, len(r) - 1): past = r.iloc[:t] if past.dropna().shape[0] < (WP + HA): rows.append((idx[t], 0, np.nan, np.nan, float(r_exec.iloc[t+1]))) continue lib_wins, lib_out = build_pattern_library(past, WP, HA) if lib_wins is None or lib_out is None or len(lib_out) == 0: rows.append((idx[t], 0, np.nan, np.nan, float(r_exec.iloc[t+1]))) continue win_last = r.iloc[t-WP:t].values curr_zn = z_norm(win_last) if curr_zn is None: rows.append((idx[t], 1 if in_pos else 0, np.nan, np.nan, float(r_exec.iloc[t+1]))) continue est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K) est_out = float(est_out) avg_dist = float(avg_dist) sig = 1 if in_pos else 0 # ENTRY if (not in_pos) and (est_out > theta_entry): in_pos = True entry_t = t trade_pnl = 0.0 trade_peak = 0.0 sig = 1 weak_streak = 0 # EXIT checks (EOD forward approximation) elif in_pos: next_ret = float(r_exec.iloc[t+1]) pnl_if_stay = (1.0 + trade_pnl) * (1.0 + next_ret) - 1.0 peak_if_stay = max(trade_peak, pnl_if_stay) exit_now = False if SL_BPS is not None and pnl_if_stay <= -SL_BPS / 10000.0: exit_now = True if TP_BPS is not None and pnl_if_stay >= TP_BPS / 10000.0: exit_now = True if TRAIL_BPS is not None and (peak_if_stay - pnl_if_stay) >= TRAIL_BPS / 10000.0: exit_now = True if TIME_STOP_BARS is not None and entry_t is not None and (t - entry_t + 1) >= TIME_STOP_BARS: exit_now = True if THETA_EXIT is not None: if est_out <= THETA_EXIT: if weak_days_exit is None: exit_now = True else: weak_streak += 1 if weak_streak >= weak_days_exit: exit_now = True else: weak_streak = 0 if exit_now: in_pos = False entry_t = None trade_pnl = 0.0 trade_peak = 0.0 weak_streak = 0 sig = 0 else: trade_pnl = pnl_if_stay trade_peak = peak_if_stay sig = 1 rows.append((idx[t], sig, est_out, avg_dist, float(r_exec.iloc[t+1]))) df = pd.DataFrame(rows, columns=["Date", "Signal", "EstOutcome", "AvgDist", "Ret+1"]).set_index("Date") fee = FEE_BPS / 10000.0 trade_chg = df["Signal"].diff().abs().fillna(0.0) df["PnL"] = df["Signal"] * df["Ret+1"] - trade_chg * fee return df # ------------------------------------------------------------ # Metrics / utilities (aligned with v3.1.6 approach) # ------------------------------------------------------------ def equity_from_returns(r: pd.Series) -> pd.Series: r = pd.to_numeric(r, errors="coerce").fillna(0.0) return (1 + r).cumprod() * 100 def drawdown_stats_simple(ret_series: pd.Series) -> dict: # Metriche geometriche coerenti con l'equity di portafoglio ret_series = pd.to_numeric(ret_series, errors="coerce").fillna(0.0) eq = (1 + ret_series).cumprod() if eq.empty: return {"CAGR_%": np.nan, "AnnVol_%": np.nan, "Sharpe": np.nan, "MaxDD_%eq": np.nan, "Calmar": np.nan} roll_max = eq.cummax() dd = eq / roll_max - 1.0 maxdd = float(dd.min()) if len(dd) else np.nan cagr = (eq.iloc[-1] / eq.iloc[0]) ** (DAYS_PER_YEAR / max(1, len(ret_series))) - 1 annvol = ret_series.std() * np.sqrt(DAYS_PER_YEAR) sharpe = (ret_series.mean() / (ret_series.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR) calmar = (cagr / abs(maxdd)) if (maxdd is not None and maxdd < 0) else np.nan return { "CAGR_%": round(cagr * 100, 2) if np.isfinite(cagr) else np.nan, "AnnVol_%": round(annvol * 100, 2) if np.isfinite(annvol) else np.nan, "Sharpe": round(float(sharpe), 2) if np.isfinite(sharpe) else np.nan, "MaxDD_%eq": round(maxdd * 100, 2) if np.isfinite(maxdd) else np.nan, "Calmar": round(float(calmar), 2) if np.isfinite(calmar) else np.nan } def heal_index_metrics(returns: pd.Series): """ Calcola: - AAW: area sopra acqua (run-up vs minimo cumulato) - AUW: area sotto acqua (drawdown vs massimo cumulato) - Heal Index: (AAW - AUW) / AUW """ s = returns.fillna(0.0).astype(float) if s.size == 0: return np.nan, np.nan, np.nan equity = (1.0 + s).cumprod() if equity.size == 0: return np.nan, np.nan, np.nan run_max = equity.cummax() dd = equity / run_max - 1.0 AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan run_min = equity.cummin() ru = equity / run_min - 1.0 AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan return AAW, AUW, heal def h_min_100(returns: pd.Series, month_len: int = 21): """ Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)). """ s = returns.dropna().astype(float) n = s.size if n == 0: return np.nan, np.nan eq = (1 + s).cumprod() best = None for h in range(1, n + 1): roll = eq / eq.shift(h) roll = roll.dropna() if (roll >= 1.0).all(): best = h break if best is None: return np.nan, np.nan return best, int(np.ceil(best / month_len)) def monthly_returns(r: pd.Series) -> pd.Series: r = pd.to_numeric(r, errors="coerce").fillna(0.0) return (1 + r).resample("M").prod() - 1 def plot_heatmap_monthly(r: pd.Series, title: str): m = monthly_returns(r) df = m.to_frame("ret") df["Year"], df["Month"] = df.index.year, df.index.month pv = df.pivot(index="Year", columns="Month", values="ret") fig, ax = plt.subplots(figsize=(10, 6)) im = ax.imshow(pv.fillna(0) * 100, aspect="auto") for i in range(pv.shape[0]): for j in range(pv.shape[1]): val = pv.iloc[i, j] if not np.isnan(val): ax.text(j, i, f"{val*100:.1f}", ha="center", va="center", fontsize=8) ax.set_title(title) ax.set_xlabel("Mese") ax.set_ylabel("Anno") ax.set_xticks(range(12)) ax.set_xticklabels(range(1, 13)) fig.colorbar(im, ax=ax, label="%") plt.tight_layout() return fig def _portfolio_metric_row(name: str, r: pd.Series) -> dict: r = pd.to_numeric(r, errors="coerce").fillna(0.0) if r.empty: return { "Portfolio": name, "CAGR_%": np.nan, "MaxDD_%": np.nan, "Sharpe": np.nan, "Heal_Index": np.nan, "AAW": np.nan, "AUW": np.nan, "H100_min_days": np.nan, "H100_min_months": np.nan } eq = (1 + r).cumprod() cagr = (eq.iloc[-1] / eq.iloc[0]) ** (DAYS_PER_YEAR / max(1, len(r))) - 1 maxdd = (eq / eq.cummax() - 1.0).min() sharpe = (r.mean() / (r.std() + 1e-12)) * np.sqrt(DAYS_PER_YEAR) aaw, auw, heal = heal_index_metrics(r) h_days, h_months = h_min_100(r, month_len=21) return { "Portfolio": name, "CAGR_%": round(float(cagr) * 100, 2) if np.isfinite(cagr) else np.nan, "MaxDD_%": round(float(maxdd) * 100, 2) if np.isfinite(maxdd) else np.nan, "Sharpe": round(float(sharpe), 2) if np.isfinite(sharpe) else np.nan, "Heal_Index": round(float(heal), 4) if np.isfinite(heal) else np.nan, "AAW": round(float(aaw), 4) if np.isfinite(aaw) else np.nan, "AUW": round(float(auw), 4) if np.isfinite(auw) else np.nan, "H100_min_days": h_days, "H100_min_months": h_months } def save_portfolio_metrics(ret_eq: pd.Series, ret_rp: pd.Series, path: Path, top_n: int): """Salva metriche EqW/RP in Excel; fallback CSV se engine Excel mancante.""" path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) rows = [ _portfolio_metric_row(f"EqW_Top{top_n}", ret_eq), _portfolio_metric_row(f"RP_Top{top_n}", ret_rp), ] df = pd.DataFrame(rows) try: df.to_excel(path, index=False) print(f"[INFO] Salvato: {path.resolve()}") except Exception as e: alt = path.with_suffix(".csv") df.to_csv(alt, index=False) print(f"[WARN] to_excel fallita ({e}), salvato CSV: {alt.resolve()}") def inverse_vol_weights(df: pd.DataFrame, window=60, max_weight=None) -> pd.DataFrame: """Faithful to v3.1.6: inv-vol weights normalized per day, then clipped (no renorm after clip).""" vol = df.rolling(window).std() inv = 1 / vol.replace(0, np.nan) w = inv.div(inv.sum(axis=1), axis=0) w = w.ffill().fillna(1 / max(1, df.shape[1])) if max_weight is not None: w = w.clip(upper=max_weight) return w def make_active_weights( w_target: pd.DataFrame, wide_sig: pd.DataFrame, renorm_to_1: bool = False, add_cash: bool = True, cash_label: str = "Cash" ) -> pd.DataFrame: """ Applica il mask dei segnali ai pesi target; opzionalmente rinormalizza e aggiunge Cash. """ if w_target is None or w_target.empty: return pd.DataFrame() all_dates = w_target.index all_cols = list(w_target.columns) res = pd.DataFrame(0.0, index=all_dates, columns=all_cols) for dt in all_dates: wt = w_target.loc[dt].copy() sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float) mask = sig_row.reindex(all_cols).fillna(0) wt = wt * (mask == 1) if renorm_to_1: s = wt.sum() wt = wt / s if s > 0 else wt res.loc[dt, wt.index] = wt.values if add_cash: cash = 1.0 - res.sum(axis=1) res[cash_label] = cash.clip(lower=0.0) return res def _build_dynamic_portfolio_returns( wide_pnl: pd.DataFrame, wide_sig: pd.DataFrame, wide_est: pd.DataFrame, top_n: int, window_bars: int = RANKING_WINDOW_BARS, rp_lookback: int = RP_LOOKBACK ) -> dict: if wide_pnl is None or wide_pnl.empty: idx = pd.Index([]) empty_w = pd.DataFrame(index=idx, columns=[]) return { "ret_eq": pd.Series(dtype=float), "ret_rp": pd.Series(dtype=float), "w_eq": empty_w, "w_rp": empty_w, "w_eq_act": empty_w, "w_rp_act": empty_w, "selection": {} } dates = wide_pnl.index.sort_values() all_cols = wide_pnl.columns.tolist() w_eq = pd.DataFrame(0.0, index=dates, columns=all_cols) w_rp = pd.DataFrame(0.0, index=dates, columns=all_cols) selection = {} for dt in dates: sig_row = wide_sig.loc[dt] if dt in wide_sig.index else pd.Series(dtype=float) on_cols = [c for c in all_cols if sig_row.get(c, 0) == 1] if not on_cols: selection[dt] = [] continue window_est = wide_est.loc[:dt].tail(window_bars) if not wide_est.empty else pd.DataFrame() scores = [] for c in on_cols: s = pd.to_numeric(window_est[c], errors="coerce") if c in window_est.columns else pd.Series(dtype=float) est_score = s.mean(skipna=True) if pd.isna(est_score): continue scores.append((c, est_score)) if not scores: selection[dt] = [] continue scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True) base_cols = [c for c, _ in scores_sorted[:top_n]] selection[dt] = base_cols if not base_cols: continue w_eq.loc[dt, base_cols] = 1 / len(base_cols) window_pnl = wide_pnl.loc[:dt].tail(window_bars) rp_hist = window_pnl[base_cols] rp_w = inverse_vol_weights(rp_hist, window=rp_lookback, max_weight=RP_MAX_WEIGHT) if not rp_w.empty: last = rp_w.iloc[-1].fillna(0.0) last_sum = float(last.sum()) if last_sum > 0: last = last / last_sum w_rp.loc[dt, last.index] = last.values w_eq_act = make_active_weights(w_eq, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") w_rp_act = make_active_weights(w_rp, wide_sig, renorm_to_1=False, add_cash=True, cash_label="Cash") ret_eq = (wide_pnl * w_eq_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1) ret_rp = (wide_pnl * w_rp_act.drop(columns=["Cash"], errors="ignore")).sum(axis=1) return { "ret_eq": ret_eq, "ret_rp": ret_rp, "w_eq": w_eq, "w_rp": w_rp, "w_eq_act": w_eq_act, "w_rp_act": w_rp_act, "selection": selection } # ------------------------------------------------------------ # Score calibration (copied logic from v3.1.6) # ------------------------------------------------------------ def _safe_rank_ser(s: pd.Series) -> pd.Series: """Rank robusto (0..1), gestisce NaN.""" s = s.copy() denom = s.notna().sum() if denom <= 1: return pd.Series(np.nan, index=s.index) return s.rank(method="average", na_option="keep") / denom def _winsorize(s: pd.Series, p=0.005): s = s.astype(float).copy() lo, hi = s.quantile(p), s.quantile(1 - p) return s.clip(lower=lo, upper=hi) def _corr_shrink(C: np.ndarray, alpha: float = 0.10) -> np.ndarray: """Shrink correlation/covariance toward identity for stability.""" C = np.asarray(C, dtype=float) k = C.shape[0] I = np.eye(k) # ensure symmetric C = 0.5 * (C + C.T) return (1 - alpha) * C + alpha * I def _pos_normalize(w: np.ndarray) -> np.ndarray: w = np.asarray(w, dtype=float) w = np.where(np.isfinite(w), w, 0.0) w = np.maximum(w, 0.0) s = w.sum() if s <= 0: return np.ones_like(w) / len(w) return w / s def calibrate_score_weights( df_sum: pd.DataFrame, metrics_map=None, target_col: str | None = None, k_folds: int = 5, shrink_equal: float = 0.25, corr_shrink: float = 0.10 ): """ metrics_map: lista di tuple (colname, good_is_high) target_col: se None => unsupervised_erc (used in v3.1.6) Returns: dict with 'weights' (pd.Series), 'X_ranked' (DataFrame), 'mode' """ if metrics_map is None or len(metrics_map) == 0: raise ValueError("metrics_map vuoto: non posso calibrare Score.") # Build ranked feature matrix X X_cols = [] X = pd.DataFrame(index=df_sum.index) for col, good_high in metrics_map: if col not in df_sum.columns: continue s = pd.to_numeric(df_sum[col], errors="coerce") s = _winsorize(s) # invert if good is low if not good_high: s = -s X[col] = _safe_rank_ser(s) X_cols.append(col) X = X.loc[:, X.columns[X.notna().sum(0) > 0]] k = X.shape[1] if k == 0: raise ValueError("Nessuna metrica valida per la calibrazione.") # Unsupervised ERC (allineato alla versione non-stocks) if target_col is None or target_col not in df_sum.columns: Xv = np.nan_to_num(X.values, nan=np.nanmean(X.values)) C = np.cov(Xv, rowvar=False) C = _corr_shrink(C, alpha=corr_shrink) vol = np.sqrt(np.clip(np.diag(C), 1e-12, None)) w0 = 1.0 / vol w = _pos_normalize(w0) return { "mode": "unsupervised_erc", "weights": pd.Series(w, index=X.columns, name="weight"), "X_ranked": X } # (Supervised path not used here, but kept for completeness) y = pd.to_numeric(df_sum[target_col], errors="coerce") y = _winsorize(y) y_rank = _safe_rank_ser(y) mask = y_rank.notna() & X.notna().any(1) Xf, yf = X[mask].copy(), y_rank[mask].copy() if len(Xf) < 30: # fallback: unsupervised Xv = np.nan_to_num(X.values, nan=np.nanmean(X.values)) C = np.cov(Xv, rowvar=False) C = _corr_shrink(C, alpha=corr_shrink) vol = np.sqrt(np.clip(np.diag(C), 1e-12, None)) w0 = 1.0 / vol w = _pos_normalize(w0) return { "mode": "unsupervised_erc_fallback", "weights": pd.Series(w, index=X.columns, name="weight"), "X_ranked": X } # Simple supervised: corr with target, whiten by covariance Xv = np.nan_to_num(Xf.values, nan=np.nanmean(Xf.values)) C = np.cov(Xv, rowvar=False) C = _corr_shrink(C, alpha=corr_shrink) ic = np.array([pd.Series(Xf.iloc[:, j]).corr(yf, method="spearman") for j in range(Xf.shape[1])], dtype=float) ic = np.nan_to_num(ic, nan=0.0) try: w_raw = np.linalg.solve(C + 1e-6*np.eye(C.shape[0]), ic) except Exception: w_raw = ic.copy() w = _pos_normalize(w_raw) w = (1 - shrink_equal) * w + shrink_equal * np.ones_like(w) / len(w) w = _pos_normalize(w) return { "mode": "supervised_icSigmaInv", "weights": pd.Series(w, index=X.columns, name="weight"), "X_ranked": X } # ------------------------------------------------------------ # MAIN # ------------------------------------------------------------ def main(): # 1) Fetch prices prices = {} for tkr in TICKERS: print(f"Fetching {tkr} ...") try: prices[tkr] = fetch_price_series(tkr, FROM_DATE) except Exception as e: print(f"[WARN] Skip {tkr}: {e}") if len(prices) < 5: raise RuntimeError(f"Pochi ticker validi ({len(prices)}). Controlla TICKERS e/o endpoint.") # 2) Backtest each ticker hurst_rows = [] summary_rows = [] signals_rows = [] for tkr, px in prices.items(): if not isinstance(px, pd.DataFrame) or "AdjClose" not in px.columns: print(f"[WARN] Serie senza AdjClose per {tkr}: skip") continue close = pd.to_numeric(px["AdjClose"], errors="coerce") open_px = pd.to_numeric(px.get("Open"), errors="coerce") if "Open" in px.columns else None r_dec = np.log(close / close.shift(1)).dropna() if len(r_dec) < (WP + HA + 50): print(f"[WARN] Serie troppo corta per {tkr} (len={len(r_dec)}): skip") continue r_pct = (r_dec * 100.0).rename("Ret") # percent log returns exec_ret = None if open_px is not None: exec_ret = open_px.pct_change() exec_ret.index = close.index h_rs = hurst_rs_returns(r_dec) h_dfa = hurst_dfa_returns(r_dec) H = np.nanmedian([h_rs, h_dfa]) H = float(H) if np.isfinite(H) else np.nan theta_entry = (H / 100.0) if np.isfinite(H) else THETA_FALLBACK hurst_rows.append({"Ticker": tkr, "Hurst": H, "theta_entry": theta_entry}) sig_df = forward_backtest_one_asset(r_pct, theta_entry=theta_entry, exec_ret=exec_ret) sig_df = sig_df.copy() sig_df.insert(0, "Ticker", tkr) signals_rows.append(sig_df.reset_index()) # Per-ticker summary metrics (like v3.1.6) stats = drawdown_stats_simple(sig_df["PnL"]) hit = 100.0 * ((sig_df["PnL"] > 0).sum() / max(1, sig_df["PnL"].notna().sum())) turnover = 100.0 * sig_df["Signal"].diff().abs().fillna(0.0).mean() stats.update({ "Ticker": tkr, "HitRate_%": round(float(hit), 2), "AvgTradeRet_bps": round(float(sig_df["PnL"].mean() * 10000), 2), "Turnover_%/step": round(float(turnover), 2), "N_Steps": int(sig_df.shape[0]), "theta_entry": float(theta_entry), "theta_exit": (None if THETA_EXIT is None else float(THETA_EXIT)), "sl_bps": (None if SL_BPS is None else float(SL_BPS)), "tp_bps": (None if TP_BPS is None else float(TP_BPS)), "trail_bps": (None if TRAIL_BPS is None else float(TRAIL_BPS)), "time_stop_bars": (None if TIME_STOP_BARS is None else int(TIME_STOP_BARS)), }) summary_rows.append(stats) if not signals_rows: raise RuntimeError("Nessun ticker backtestato con successo.") hurst_df = pd.DataFrame(hurst_rows).sort_values("Ticker").reset_index(drop=True) forward_bt_summary = pd.DataFrame(summary_rows).sort_values("Ticker").reset_index(drop=True) forward_bt_signals = pd.concat(signals_rows, ignore_index=True) forward_bt_signals["Date"] = pd.to_datetime(forward_bt_signals["Date"]).dt.normalize() # 3) Build Score + select Top-15 (faithful to v3.1.6) df_sum = forward_bt_summary.copy() def _coerce_num(s: pd.Series) -> pd.Series: return pd.to_numeric(s, errors="coerce").replace([np.inf, -np.inf], np.nan) for c in ["CAGR_%","Sharpe","Calmar","MaxDD_%eq","HitRate_%", "QualityScore","Confidence","OutcomeScore"]: if c in df_sum.columns: df_sum[c] = _coerce_num(df_sum[c]) primary_cols = [("Sharpe", True), ("CAGR_%", True), ("MaxDD_%eq", False)] alt_cols = [("QualityScore", True), ("Confidence", True), ("OutcomeScore", True)] mm = [(c,gh) for (c,gh) in primary_cols if c in df_sum.columns and df_sum[c].notna().sum() > 0] if len(mm) < 2: mm = [(c,gh) for (c,gh) in alt_cols if c in df_sum.columns and df_sum[c].notna().sum() > 0] if len(mm) < 2: union_candidates = list({x[0] for x in primary_cols + alt_cols}) mm = [(c, True) for c in union_candidates if (c in df_sum.columns and df_sum[c].notna().sum() > 0)] if len(mm) == 0: print("[WARN] Nessuna metrica numerica disponibile: uso Score=0 e ordino per Ticker.") df_sum["Score"] = 0.0 df_sum["Score_mode"] = "degenerate_equal" else: res = calibrate_score_weights(df_sum, metrics_map=mm, target_col=None) X_ranked = res["X_ranked"] w = res["weights"] df_sum["Score"] = (X_ranked[w.index] * w.values).sum(1) df_sum["Score_mode"] = res["mode"] print("Pesi stimati automaticamente (metriche usate):") print(w) df_sum = df_sum.sort_values("Score", ascending=False).reset_index(drop=True) base_tickers = df_sum.head(TOP_N)["Ticker"].astype(str).str.strip().tolist() print(f"Tickers selezionati dinamicamente (Top{TOP_N}): {base_tickers}") # 4) Portafogli dinamici (allineati alla versione non-stocks) wide_pnl = forward_bt_signals.pivot_table(index="Date", columns="Ticker", values="PnL", aggfunc="sum").fillna(0.0) wide_sig = forward_bt_signals.pivot_table(index="Date", columns="Ticker", values="Signal", aggfunc="last").fillna(0).astype(int) wide_est = forward_bt_signals.pivot_table(index="Date", columns="Ticker", values="EstOutcome", aggfunc="last").sort_index() dyn_port = _build_dynamic_portfolio_returns( wide_pnl=wide_pnl, wide_sig=wide_sig, wide_est=wide_est, top_n=TOP_N, window_bars=RANKING_WINDOW_BARS, rp_lookback=RP_LOOKBACK ) ret_eq = dyn_port["ret_eq"].rename("Ret_EqW_TopN") ret_rp = dyn_port["ret_rp"].rename("Ret_RP_TopN") eq_eq = equity_from_returns(ret_eq).rename("Eq_EqW_TopN") eq_rp = equity_from_returns(ret_rp).rename("Eq_RP_TopN") # 5) Plots plt.figure(figsize=(10, 5)) plt.plot(eq_eq, label=f"Equal Weight (Top{TOP_N})") plt.plot(eq_rp, label=f"Risk Parity (Top{TOP_N}, cap {RP_MAX_WEIGHT:.4f})") plt.title(f"Equity line portafogli (base 100) – Top{TOP_N} (v3.1.6 style)") plt.grid(True) plt.legend() plt.tight_layout() plt.savefig(PLOT_DIR / "equity_line.png", dpi=150) plt.show() plot_heatmap_monthly(ret_eq, f"Heatmap mensile – Equal Weight (Top{TOP_N})") plt.savefig(PLOT_DIR / "heatmap_eqw.png", dpi=150) plt.show() plot_heatmap_monthly(ret_rp, f"Heatmap mensile – Risk Parity (Top{TOP_N})") plt.savefig(PLOT_DIR / "heatmap_rp.png", dpi=150) plt.show() # 6) Save outputs hurst_df.to_csv(OUT_DIR / "hurst.csv", index=False) forward_bt_summary.to_csv(OUT_DIR / "forward_bt_summary.csv", index=False) forward_bt_signals.to_csv(OUT_DIR / "forward_bt_signals.csv", index=False) pd.concat([ret_eq, ret_rp, eq_eq, eq_rp], axis=1).to_csv(OUT_DIR / "portfolio_daily.csv") df_sum.to_csv(OUT_DIR / "ranking_score.csv", index=False) pd.Series(base_tickers, name="TopN_Tickers").to_csv(OUT_DIR / "topn_tickers.csv", index=False) save_portfolio_metrics(ret_eq, ret_rp, OUT_DIR / "portfolio_metrics.xlsx", TOP_N) print(f"\nSaved to: {OUT_DIR.resolve()}") if __name__ == "__main__": main()