# -*- coding: utf-8 -*- """ Programma: metrics_builder_v1.py Scopo: Leggere un file Excel di input (lista ISIN) e, usando la stessa SP "opt_RendimentoGiornaliero1_ALL", scaricare le serie storiche dei rendimenti e calcolare SOLO le metriche per-asset (come in asset_metrics). Nota: scarta subito gli ISIN che NON hanno almeno 5 anni di storia utile. Esecuzione (opzionale): python metrics_builder_v1.py "Universo per metrics v.1.0.xlsx" Dipendenze: pip install numpy pandas SQLAlchemy pyodbc openpyxl (assicurarsi di avere l'ODBC Driver 17 for SQL Server) File richiesti: - connection.txt con chiavi: username, password, host, port, database - Excel di input con colonne: ISIN, Nome, Categoria, Asset Class Output: - output/AAAAMMGG Asset Metrics.xlsx * foglio "Metriche_5Y" * foglio "Scartati" * foglio "Legenda" """ import sys import os import math import numpy as np import pandas as pd from sqlalchemy import create_engine, text from sqlalchemy.exc import SQLAlchemyError # ========================= # CARTELLE INPUT/OUTPUT # ========================= INPUT_DIR = "input" OUTPUT_DIR = "output" os.makedirs(INPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) # ========================= # PARAMETRI # ========================= DAYS_PER_YEAR = 252 MIN_YEARS_REQ = 5 SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260) PTF_CURRENCY = "EUR" # ========================= # UTILITY METRICHE # ========================= def r2_equity_line(returns: pd.Series) -> float: s = returns.dropna() if s.size < 3: return np.nan equity = (1.0 + s).cumprod() equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna() if equity.size < 3: return np.nan y = np.log(equity.values) if np.allclose(y.var(ddof=1), 0.0): return 0.0 x = np.arange(y.size, dtype=float) X = np.column_stack([np.ones_like(x), x]) beta, *_ = np.linalg.lstsq(X, y, rcond=None) y_hat = X @ beta ss_res = np.sum((y - y_hat) ** 2) ss_tot = np.sum((y - y.mean()) ** 2) r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan if np.isnan(r2): return np.nan return float(np.clip(r2, 0.0, 1.0)) def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): s = returns.fillna(0.0).astype(float) if s.size == 0: return np.nan, np.nan, np.nan equity = (1.0 + s).cumprod() if equity.size == 0: return np.nan, np.nan, np.nan run_max = equity.cummax() dd = equity / run_max - 1.0 max_dd = float(dd.min()) if dd.size else np.nan under_water = dd < 0 if under_water.any(): max_dd_duration = 0 current = 0 for flag in under_water.values: if flag: current += 1 if current > max_dd_duration: max_dd_duration = current else: current = 0 else: max_dd_duration = 0 if dd.size: trough_idx = int(np.argmin(dd.values)) if trough_idx > 0: peak_idx = int(np.argmax(equity.values[: trough_idx + 1])) peak_level = float(equity.values[peak_idx]) rec_idx = None for t in range(trough_idx + 1, equity.size): if equity.values[t] >= peak_level: rec_idx = t break if rec_idx is None: ttr_from_mdd = sentinel_ttr else: ttr_from_mdd = rec_idx - trough_idx else: ttr_from_mdd = np.nan else: ttr_from_mdd = np.nan return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan) def heal_index_metrics(returns: pd.Series): s = returns.fillna(0.0).astype(float) if s.size == 0: return np.nan, np.nan, np.nan equity = (1.0 + s).cumprod() if equity.size == 0: return np.nan, np.nan, np.nan run_max = equity.cummax() dd = equity / run_max - 1.0 AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan run_min = equity.cummin() ru = equity / run_min - 1.0 AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan return AAW, AUW, heal def h_min_100(returns: pd.Series, month_len: int = 21): s = returns.dropna().astype(float) n = s.size if n == 0: return np.nan, np.nan log1p = np.log1p(s.values) csum = np.cumsum(log1p) def rolling_sum_k(k: int): if k > n: return np.array([]) head = csum[k - 1:] tail = np.concatenate(([0.0], csum[:-k])) return head - tail for k in range(1, n + 1): rs = rolling_sum_k(k) if rs.size == 0: break roll_ret = np.exp(rs) - 1.0 if np.all(roll_ret >= 0): h_days = k h_months = int(math.ceil(h_days / month_len)) return h_days, h_months return np.nan, np.nan # ========================= # CONNESSIONE DB (connection.txt) # ========================= def read_connection_params(path: str = "connection.txt") -> dict: params = {} with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#"): key, value = line.split("=", 1) params[key.strip()] = value.strip() return params def make_engine(params: dict): username = params.get("username") password = params.get("password") host = params.get("host") port = params.get("port", "1433") database = params.get("database") conn_str = ( f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}" "?driver=ODBC+Driver+17+for+SQL+Server" ) return create_engine(conn_str) # ========================= # MAIN LOGIC # ========================= def main(input_excel: str = os.path.join(INPUT_DIR, "Universo per metrics v.1.0.xlsx")): # 1) Carica input if not os.path.exists(input_excel): raise FileNotFoundError(f"File di input non trovato: {input_excel}") df_in = pd.read_excel(input_excel) # Normalizza i nomi attesi df_in.columns = [str(c).strip() for c in df_in.columns] cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"] for c in cols_needed: if c not in df_in.columns: raise ValueError(f"Manca la colonna richiesta nel file input: '{c}'") # 2) Connessione DB params = read_connection_params("connection.txt") engine = make_engine(params) with engine.connect() as con: _ = con.execute(text("SELECT 1")) # 3) Range date 5 anni (fino a ieri, business days) end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1) start_date = end_date - pd.DateOffset(years=MIN_YEARS_REQ) all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize() five_year_len = len(all_dates) # 4) Scarico serie per ciascun ISIN e filtro < 5 anni final_df = pd.DataFrame(index=all_dates) accepted_isins = [] dropped_info = [] # lista dizionari per report scartati for isin in df_in["ISIN"].dropna().astype(str).unique(): print(f"[SP] Recupero: {isin}") sp = ( f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', " f"@n = {SP_SAMPLE_SIZE}, @PtfCurr = {PTF_CURRENCY}" ) try: tmp = pd.read_sql_query(sp, engine) if tmp.empty: print(f" - Nessun dato: SKIP {isin}") dropped_info.append({"ISIN": isin, "Motivo": "SP vuota"}) continue tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize() tmp = tmp.dropna(subset=["Px_Date"]) # deduplicate per data PRIMA del reindex (media) dup_cnt = tmp["Px_Date"].duplicated().sum() if dup_cnt > 0: print(f" - Attenzione: {dup_cnt} duplicati di Px_Date per {isin}, compattati con media.") tmp["RendimentoGiornaliero"] = pd.to_numeric(tmp["RendimentoGiornaliero"], errors="coerce") ser = ( tmp.sort_values("Px_Date") .groupby("Px_Date", as_index=True)["RendimentoGiornaliero"] .mean() .div(100.0) # percentuali → frazione .reindex(all_dates) ) # Criteri di accettazione (almeno 5 anni effettivi): first_valid = ser.first_valid_index() last_valid = ser.last_valid_index() nonnull = int(ser.notna().sum()) if first_valid is None or last_valid is None: dropped_info.append({"ISIN": isin, "Motivo": "nessun valore valido"}) print(f" - Nessun valore valido: SKIP {isin}") continue span_days = (last_valid - first_valid).days cond_span = span_days >= (365 * MIN_YEARS_REQ - 7) # tolleranza 1 settimana cond_count = nonnull >= int(0.98 * five_year_len) # copertura ~98% cond_start = first_valid <= (start_date + pd.Timedelta(days=7)) if not (cond_span and cond_count and cond_start): dropped_info.append({ "ISIN": isin, "Motivo": f"storia insufficiente (span={span_days}d, count={nonnull}/{five_year_len})" }) print(f" - Storia insufficiente: SKIP {isin}") continue # Riempie eventuali buchi residui con 0 (nessun rendimento quel giorno) ser = ser.fillna(0.0) final_df[isin] = ser accepted_isins.append(isin) print(f" - OK: {nonnull} osservazioni utili") except SQLAlchemyError as e: print(f" - Errore SP per {isin}: {e}") dropped_info.append({"ISIN": isin, "Motivo": "errore SP"}) continue if not accepted_isins: print("Nessun ISIN con 5 anni pieni: nessun output generato.") # comunque salvo l'elenco scartati se presente if dropped_info: today_str = pd.Timestamp.now().strftime("%Y%m%d") filename = f"{today_str} Asset Metrics.xlsx" out_path = os.path.join(OUTPUT_DIR, filename) with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer: pd.DataFrame(dropped_info).to_excel(writer, sheet_name="Scartati", index=False) print(f"Creato: {out_path} | ISIN metricati: 0 | Scartati: {len(dropped_info)}") return # 5) Calcolo metriche per-asset sui 5 anni period_df = final_df[accepted_isins] n_days = int(period_df.shape[0]) years_elapsed = n_days / DAYS_PER_YEAR if n_days > 0 else np.nan daily_mean = period_df.mean() ann_return = daily_mean * DAYS_PER_YEAR ann_vol = period_df.std(ddof=1) * np.sqrt(DAYS_PER_YEAR) gross = (1.0 + period_df).prod(skipna=True) cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns) r2_series = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns) maxdd_dict, dddur_dict, ttr_dict = {}, {}, {} aaw_dict, auw_dict, heal_dict = {}, {}, {} hmin_5y_months_dict = {} for col in period_df.columns: mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250) maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr aaw, auw, heal = heal_index_metrics(period_df[col]) aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal _, h_months_5y = h_min_100(period_df[col], month_len=21) hmin_5y_months_dict[col] = h_months_5y metrics_df = pd.DataFrame({ 'ISIN': period_df.columns, 'Rendimento_Ann': ann_return.reindex(period_df.columns).values, 'Volatilita_Ann': ann_vol.reindex(period_df.columns).values, 'CAGR': cagr.reindex(period_df.columns).values, 'R2_Equity': r2_series.reindex(period_df.columns).values, 'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values, 'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values, 'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values, 'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values, 'AUW': pd.Series(auw_dict).reindex(period_df.columns).values, 'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values, 'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values }) # Merge con info descrittive metrics_df = ( metrics_df .merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left') ) # Ordine base delle colonne (prima di aggiungere il ranking) metrics_df = metrics_df[ ['ISIN', 'Nome', 'Categoria', 'Asset Class', 'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity', 'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD', 'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y'] ] # ========================= # AGGIUNTA COLONNE DI RANKING # ========================= # Ranking raw: usiamo direttamente Heal_Index come score metrics_df["Ranking raw"] = metrics_df["Heal_Index"] # Ranking: ranking decrescente su Ranking raw (1 = migliore) metrics_df["Ranking"] = ( metrics_df["Ranking raw"] .rank(method="dense", ascending=False) .astype("Int64") # consente NaN dove Ranking raw è NaN ) # Ordinamento dell'output per Ranking crescente (1 = in alto) metrics_df = metrics_df.sort_values("Ranking", na_position="last").reset_index(drop=True) # ========================= # TRASFORMAZIONE ISIN IN HYPERLINK # ========================= def isin_to_hyperlink(isin: str) -> str: if pd.isna(isin): return "" isin_str = str(isin).strip() if not isin_str: return "" url = f"https://www.justetf.com/it/etf-profile.html?isin={isin_str}" # Formula Excel: visualizza l'ISIN come testo cliccabile return f'=HYPERLINK("{url}", "{isin_str}")' metrics_df["ISIN"] = metrics_df["ISIN"].apply(isin_to_hyperlink) # ========================= # LEGENDA (FOGLIO DEDICATO) # ========================= legenda_rows = [ ("ISIN", "Codice ISIN dello strumento. È un hyperlink verso la pagina justETF corrispondente."), ("Nome", "Nome/descrizione commerciale dello strumento (ETF/fondo)."), ("Categoria", "Categoria logica dello strumento (es. Azionario Globale, Obbligazionario Corporate, ecc.)."), ("Asset Class", "Macro asset class (Azionari, Obbligazionari, Metalli Preziosi, Materie Prime, Immobiliare, Criptovalute, Monetari)."), ("Rendimento_Ann", "Rendimento medio annuo (media dei rendimenti giornalieri × 252)."), ("Volatilita_Ann", "Volatilità annua (deviazione standard giornaliera × √252)."), ("CAGR", "Tasso di crescita annuale composto sul periodo di 5 anni."), ("R2_Equity", "R² della regressione dei logaritmi dell’equity line sul tempo (stima della “regolarità” del trend)."), ("MaxDD", "Massimo drawdown storico (peggior perdita percentuale dal massimo precedente; valore tipicamente negativo)."), ("DD_Duration_Max", "Durata massima (in giorni) di un qualunque periodo di drawdown continuo."), ("TTR_from_MDD", "Time To Recovery, in giorni, dal minimo del MaxDD al pieno recupero del picco precedente (1250 se mai recuperato)."), ("AAW", "Area Above Water: somma degli scostamenti positivi dell’equity rispetto al minimo cumulato."), ("AUW", "Area Under Water: somma degli scostamenti negativi (in valore assoluto) dell’equity rispetto al massimo cumulato."), ("Heal_Index", "(AAW - AUW) / AUW, indice sintetico che bilancia run-up e drawdown (valori più alti = profilo migliore)."), ("H_min_100m_5Y", "Numero minimo di mesi (21 giorni borsistici ≈ 1 mese) tali che TUTTE le finestre rolling di tale durata abbiano rendimento cumulato ≥ 0."), ("Ranking raw", "Valore grezzo usato per il ranking, coincidente con Heal_Index."), ("Ranking", "Classifica dallo strumento migliore (1) al peggiore, basata su Ranking raw in ordine decrescente (metodo dense).") ] legenda_df = pd.DataFrame(legenda_rows, columns=["Campo", "Descrizione"]) # ========================= # EXPORT EXCEL — con prefisso data AAAAMMGG # ========================= today_str = pd.Timestamp.now().strftime("%Y%m%d") filename = f"{today_str} Asset Metrics.xlsx" out_path = os.path.join(OUTPUT_DIR, filename) with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer: metrics_df.to_excel(writer, sheet_name='Metriche_5Y', index=False) if dropped_info: pd.DataFrame(dropped_info).to_excel(writer, sheet_name='Scartati', index=False) legenda_df.to_excel(writer, sheet_name='Legenda', index=False) print(f"Creato: {out_path} | ISIN metricati: {len(metrics_df)} | Scartati: {len(dropped_info)}") if __name__ == "__main__": inp = sys.argv[1] if len(sys.argv) > 1 else os.path.join(INPUT_DIR, "Universo per metrics v.1.0.xlsx") main(inp)