commit e3e4304b75b155e2cdcfd71a63ac0feb3a13572f Author: fredmaloggia Date: Thu Nov 20 15:20:03 2025 +0100 Initial commit - Ranking diff --git a/Metriche v.1.0.py b/Metriche v.1.0.py new file mode 100644 index 0000000..cb4802d --- /dev/null +++ b/Metriche v.1.0.py @@ -0,0 +1,434 @@ +# -*- coding: utf-8 -*- +""" +Programma: metrics_builder_v1.py +Scopo: Leggere un file Excel di input (lista ISIN) e, usando la stessa SP + "opt_RendimentoGiornaliero1_ALL", scaricare le serie storiche dei + rendimenti e calcolare SOLO le metriche per-asset (come in asset_metrics). + +Nota: scarta subito gli ISIN che NON hanno almeno 5 anni di storia utile. + +Esecuzione (opzionale): + python metrics_builder_v1.py "Universo per metrics v.1.0.xlsx" + +Dipendenze: + pip install numpy pandas SQLAlchemy pyodbc openpyxl + (assicurarsi di avere l'ODBC Driver 17 for SQL Server) + +File richiesti: + - connection.txt con chiavi: username, password, host, port, database + - Excel di input con colonne: ISIN, Nome, Categoria, Asset Class + +Output: + - output/AAAAMMGG Asset Metrics.xlsx + * foglio "Metriche_5Y" + * foglio "Scartati" + * foglio "Legenda" +""" + +import sys +import os +import math +import numpy as np +import pandas as pd +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError + +# ========================= +# CARTELLE INPUT/OUTPUT +# ========================= +INPUT_DIR = "input" +OUTPUT_DIR = "output" + +os.makedirs(INPUT_DIR, exist_ok=True) +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# ========================= +# PARAMETRI +# ========================= +DAYS_PER_YEAR = 252 +MIN_YEARS_REQ = 5 +SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260) +PTF_CURRENCY = "EUR" + +# ========================= +# UTILITY METRICHE +# ========================= + +def r2_equity_line(returns: pd.Series) -> float: + s = returns.dropna() + if s.size < 3: + return np.nan + equity = (1.0 + s).cumprod() + equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna() + if equity.size < 3: + return np.nan + y = np.log(equity.values) + if np.allclose(y.var(ddof=1), 0.0): + return 0.0 + x = np.arange(y.size, dtype=float) + X = np.column_stack([np.ones_like(x), x]) + beta, *_ = np.linalg.lstsq(X, y, rcond=None) + y_hat = X @ beta + ss_res = np.sum((y - y_hat) ** 2) + ss_tot = np.sum((y - y.mean()) ** 2) + r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan + if np.isnan(r2): + return np.nan + return float(np.clip(r2, 0.0, 1.0)) + + +def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): + s = returns.fillna(0.0).astype(float) + if s.size == 0: + return np.nan, np.nan, np.nan + equity = (1.0 + s).cumprod() + if equity.size == 0: + return np.nan, np.nan, np.nan + run_max = equity.cummax() + dd = equity / run_max - 1.0 + max_dd = float(dd.min()) if dd.size else np.nan + under_water = dd < 0 + if under_water.any(): + max_dd_duration = 0 + current = 0 + for flag in under_water.values: + if flag: + current += 1 + if current > max_dd_duration: + max_dd_duration = current + else: + current = 0 + else: + max_dd_duration = 0 + if dd.size: + trough_idx = int(np.argmin(dd.values)) + if trough_idx > 0: + peak_idx = int(np.argmax(equity.values[: trough_idx + 1])) + peak_level = float(equity.values[peak_idx]) + rec_idx = None + for t in range(trough_idx + 1, equity.size): + if equity.values[t] >= peak_level: + rec_idx = t + break + if rec_idx is None: + ttr_from_mdd = sentinel_ttr + else: + ttr_from_mdd = rec_idx - trough_idx + else: + ttr_from_mdd = np.nan + else: + ttr_from_mdd = np.nan + return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan) + + +def heal_index_metrics(returns: pd.Series): + s = returns.fillna(0.0).astype(float) + if s.size == 0: + return np.nan, np.nan, np.nan + equity = (1.0 + s).cumprod() + if equity.size == 0: + return np.nan, np.nan, np.nan + run_max = equity.cummax() + dd = equity / run_max - 1.0 + AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan + run_min = equity.cummin() + ru = equity / run_min - 1.0 + AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan + heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan + return AAW, AUW, heal + + +def h_min_100(returns: pd.Series, month_len: int = 21): + s = returns.dropna().astype(float) + n = s.size + if n == 0: + return np.nan, np.nan + log1p = np.log1p(s.values) + csum = np.cumsum(log1p) + + def rolling_sum_k(k: int): + if k > n: + return np.array([]) + head = csum[k - 1:] + tail = np.concatenate(([0.0], csum[:-k])) + return head - tail + + for k in range(1, n + 1): + rs = rolling_sum_k(k) + if rs.size == 0: + break + roll_ret = np.exp(rs) - 1.0 + if np.all(roll_ret >= 0): + h_days = k + h_months = int(math.ceil(h_days / month_len)) + return h_days, h_months + return np.nan, np.nan + +# ========================= +# CONNESSIONE DB (connection.txt) +# ========================= + +def read_connection_params(path: str = "connection.txt") -> dict: + params = {} + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + key, value = line.split("=", 1) + params[key.strip()] = value.strip() + return params + + +def make_engine(params: dict): + username = params.get("username") + password = params.get("password") + host = params.get("host") + port = params.get("port", "1433") + database = params.get("database") + conn_str = ( + f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}" + "?driver=ODBC+Driver+17+for+SQL+Server" + ) + return create_engine(conn_str) + +# ========================= +# MAIN LOGIC +# ========================= + +def main(input_excel: str = os.path.join(INPUT_DIR, "Universo per metrics v.1.0.xlsx")): + # 1) Carica input + if not os.path.exists(input_excel): + raise FileNotFoundError(f"File di input non trovato: {input_excel}") + df_in = pd.read_excel(input_excel) + + # Normalizza i nomi attesi + df_in.columns = [str(c).strip() for c in df_in.columns] + cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"] + for c in cols_needed: + if c not in df_in.columns: + raise ValueError(f"Manca la colonna richiesta nel file input: '{c}'") + + # 2) Connessione DB + params = read_connection_params("connection.txt") + engine = make_engine(params) + with engine.connect() as con: + _ = con.execute(text("SELECT 1")) + + # 3) Range date 5 anni (fino a ieri, business days) + end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1) + start_date = end_date - pd.DateOffset(years=MIN_YEARS_REQ) + all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize() + five_year_len = len(all_dates) + + # 4) Scarico serie per ciascun ISIN e filtro < 5 anni + final_df = pd.DataFrame(index=all_dates) + accepted_isins = [] + dropped_info = [] # lista dizionari per report scartati + + for isin in df_in["ISIN"].dropna().astype(str).unique(): + print(f"[SP] Recupero: {isin}") + sp = ( + f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', " + f"@n = {SP_SAMPLE_SIZE}, @PtfCurr = {PTF_CURRENCY}" + ) + try: + tmp = pd.read_sql_query(sp, engine) + if tmp.empty: + print(f" - Nessun dato: SKIP {isin}") + dropped_info.append({"ISIN": isin, "Motivo": "SP vuota"}) + continue + + tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize() + tmp = tmp.dropna(subset=["Px_Date"]) + + # deduplicate per data PRIMA del reindex (media) + dup_cnt = tmp["Px_Date"].duplicated().sum() + if dup_cnt > 0: + print(f" - Attenzione: {dup_cnt} duplicati di Px_Date per {isin}, compattati con media.") + + tmp["RendimentoGiornaliero"] = pd.to_numeric(tmp["RendimentoGiornaliero"], errors="coerce") + + ser = ( + tmp.sort_values("Px_Date") + .groupby("Px_Date", as_index=True)["RendimentoGiornaliero"] + .mean() + .div(100.0) # percentuali → frazione + .reindex(all_dates) + ) + + # Criteri di accettazione (almeno 5 anni effettivi): + first_valid = ser.first_valid_index() + last_valid = ser.last_valid_index() + nonnull = int(ser.notna().sum()) + + if first_valid is None or last_valid is None: + dropped_info.append({"ISIN": isin, "Motivo": "nessun valore valido"}) + print(f" - Nessun valore valido: SKIP {isin}") + continue + + span_days = (last_valid - first_valid).days + cond_span = span_days >= (365 * MIN_YEARS_REQ - 7) # tolleranza 1 settimana + cond_count = nonnull >= int(0.98 * five_year_len) # copertura ~98% + cond_start = first_valid <= (start_date + pd.Timedelta(days=7)) + + if not (cond_span and cond_count and cond_start): + dropped_info.append({ + "ISIN": isin, + "Motivo": f"storia insufficiente (span={span_days}d, count={nonnull}/{five_year_len})" + }) + print(f" - Storia insufficiente: SKIP {isin}") + continue + + # Riempie eventuali buchi residui con 0 (nessun rendimento quel giorno) + ser = ser.fillna(0.0) + final_df[isin] = ser + accepted_isins.append(isin) + print(f" - OK: {nonnull} osservazioni utili") + except SQLAlchemyError as e: + print(f" - Errore SP per {isin}: {e}") + dropped_info.append({"ISIN": isin, "Motivo": "errore SP"}) + continue + + if not accepted_isins: + print("Nessun ISIN con 5 anni pieni: nessun output generato.") + # comunque salvo l'elenco scartati se presente + if dropped_info: + today_str = pd.Timestamp.now().strftime("%Y%m%d") + filename = f"{today_str} Asset Metrics.xlsx" + out_path = os.path.join(OUTPUT_DIR, filename) + with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer: + pd.DataFrame(dropped_info).to_excel(writer, sheet_name="Scartati", index=False) + print(f"Creato: {out_path} | ISIN metricati: 0 | Scartati: {len(dropped_info)}") + return + + # 5) Calcolo metriche per-asset sui 5 anni + period_df = final_df[accepted_isins] + n_days = int(period_df.shape[0]) + years_elapsed = n_days / DAYS_PER_YEAR if n_days > 0 else np.nan + + daily_mean = period_df.mean() + ann_return = daily_mean * DAYS_PER_YEAR + ann_vol = period_df.std(ddof=1) * np.sqrt(DAYS_PER_YEAR) + + gross = (1.0 + period_df).prod(skipna=True) + cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns) + + r2_series = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns) + + maxdd_dict, dddur_dict, ttr_dict = {}, {}, {} + aaw_dict, auw_dict, heal_dict = {}, {}, {} + hmin_5y_months_dict = {} + + for col in period_df.columns: + mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250) + maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr + aaw, auw, heal = heal_index_metrics(period_df[col]) + aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal + _, h_months_5y = h_min_100(period_df[col], month_len=21) + hmin_5y_months_dict[col] = h_months_5y + + metrics_df = pd.DataFrame({ + 'ISIN': period_df.columns, + 'Rendimento_Ann': ann_return.reindex(period_df.columns).values, + 'Volatilita_Ann': ann_vol.reindex(period_df.columns).values, + 'CAGR': cagr.reindex(period_df.columns).values, + 'R2_Equity': r2_series.reindex(period_df.columns).values, + 'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values, + 'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values, + 'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values, + 'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values, + 'AUW': pd.Series(auw_dict).reindex(period_df.columns).values, + 'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values, + 'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values + }) + + # Merge con info descrittive + metrics_df = ( + metrics_df + .merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left') + ) + + # Ordine base delle colonne (prima di aggiungere il ranking) + metrics_df = metrics_df[ + ['ISIN', 'Nome', 'Categoria', 'Asset Class', + 'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity', + 'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD', + 'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y'] + ] + + # ========================= + # AGGIUNTA COLONNE DI RANKING + # ========================= + # Ranking raw: usiamo direttamente Heal_Index come score + metrics_df["Ranking raw"] = metrics_df["Heal_Index"] + + # Ranking: ranking decrescente su Ranking raw (1 = migliore) + metrics_df["Ranking"] = ( + metrics_df["Ranking raw"] + .rank(method="dense", ascending=False) + .astype("Int64") # consente NaN dove Ranking raw è NaN + ) + + # Ordinamento dell'output per Ranking crescente (1 = in alto) + metrics_df = metrics_df.sort_values("Ranking", na_position="last").reset_index(drop=True) + + # ========================= + # TRASFORMAZIONE ISIN IN HYPERLINK + # ========================= + def isin_to_hyperlink(isin: str) -> str: + if pd.isna(isin): + return "" + isin_str = str(isin).strip() + if not isin_str: + return "" + url = f"https://www.justetf.com/it/etf-profile.html?isin={isin_str}" + # Formula Excel: visualizza l'ISIN come testo cliccabile + return f'=HYPERLINK("{url}", "{isin_str}")' + + metrics_df["ISIN"] = metrics_df["ISIN"].apply(isin_to_hyperlink) + + # ========================= + # LEGENDA (FOGLIO DEDICATO) + # ========================= + legenda_rows = [ + ("ISIN", "Codice ISIN dello strumento. È un hyperlink verso la pagina justETF corrispondente."), + ("Nome", "Nome/descrizione commerciale dello strumento (ETF/fondo)."), + ("Categoria", "Categoria logica dello strumento (es. Azionario Globale, Obbligazionario Corporate, ecc.)."), + ("Asset Class", "Macro asset class (Azionari, Obbligazionari, Metalli Preziosi, Materie Prime, Immobiliare, Criptovalute, Monetari)."), + ("Rendimento_Ann", "Rendimento medio annuo (media dei rendimenti giornalieri × 252)."), + ("Volatilita_Ann", "Volatilità annua (deviazione standard giornaliera × √252)."), + ("CAGR", "Tasso di crescita annuale composto sul periodo di 5 anni."), + ("R2_Equity", "R² della regressione dei logaritmi dell’equity line sul tempo (stima della “regolarità” del trend)."), + ("MaxDD", "Massimo drawdown storico (peggior perdita percentuale dal massimo precedente; valore tipicamente negativo)."), + ("DD_Duration_Max", "Durata massima (in giorni) di un qualunque periodo di drawdown continuo."), + ("TTR_from_MDD", "Time To Recovery, in giorni, dal minimo del MaxDD al pieno recupero del picco precedente (1250 se mai recuperato)."), + ("AAW", "Area Above Water: somma degli scostamenti positivi dell’equity rispetto al minimo cumulato."), + ("AUW", "Area Under Water: somma degli scostamenti negativi (in valore assoluto) dell’equity rispetto al massimo cumulato."), + ("Heal_Index", "(AAW - AUW) / AUW, indice sintetico che bilancia run-up e drawdown (valori più alti = profilo migliore)."), + ("H_min_100m_5Y", "Numero minimo di mesi (21 giorni borsistici ≈ 1 mese) tali che TUTTE le finestre rolling di tale durata abbiano rendimento cumulato ≥ 0."), + ("Ranking raw", "Valore grezzo usato per il ranking, coincidente con Heal_Index."), + ("Ranking", "Classifica dallo strumento migliore (1) al peggiore, basata su Ranking raw in ordine decrescente (metodo dense).") + ] + legenda_df = pd.DataFrame(legenda_rows, columns=["Campo", "Descrizione"]) + + # ========================= + # EXPORT EXCEL — con prefisso data AAAAMMGG + # ========================= + today_str = pd.Timestamp.now().strftime("%Y%m%d") + filename = f"{today_str} Asset Metrics.xlsx" + out_path = os.path.join(OUTPUT_DIR, filename) + + with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer: + metrics_df.to_excel(writer, sheet_name='Metriche_5Y', index=False) + + if dropped_info: + pd.DataFrame(dropped_info).to_excel(writer, sheet_name='Scartati', index=False) + + legenda_df.to_excel(writer, sheet_name='Legenda', index=False) + + print(f"Creato: {out_path} | ISIN metricati: {len(metrics_df)} | Scartati: {len(dropped_info)}") + + +if __name__ == "__main__": + inp = sys.argv[1] if len(sys.argv) > 1 else os.path.join(INPUT_DIR, "Universo per metrics v.1.0.xlsx") + main(inp)