Files
Ranking/Metriche v.1.0.py
2025-11-20 15:20:03 +01:00

435 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Programma: metrics_builder_v1.py
Scopo: Leggere un file Excel di input (lista ISIN) e, usando la stessa SP
"opt_RendimentoGiornaliero1_ALL", scaricare le serie storiche dei
rendimenti e calcolare SOLO le metriche per-asset (come in asset_metrics).
Nota: scarta subito gli ISIN che NON hanno almeno 5 anni di storia utile.
Esecuzione (opzionale):
python metrics_builder_v1.py "Universo per metrics v.1.0.xlsx"
Dipendenze:
pip install numpy pandas SQLAlchemy pyodbc openpyxl
(assicurarsi di avere l'ODBC Driver 17 for SQL Server)
File richiesti:
- connection.txt con chiavi: username, password, host, port, database
- Excel di input con colonne: ISIN, Nome, Categoria, Asset Class
Output:
- output/AAAAMMGG Asset Metrics.xlsx
* foglio "Metriche_5Y"
* foglio "Scartati"
* foglio "Legenda"
"""
import sys
import os
import math
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
# =========================
# CARTELLE INPUT/OUTPUT
# =========================
INPUT_DIR = "input"
OUTPUT_DIR = "output"
os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# =========================
# PARAMETRI
# =========================
DAYS_PER_YEAR = 252
MIN_YEARS_REQ = 5
SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260)
PTF_CURRENCY = "EUR"
# =========================
# UTILITY METRICHE
# =========================
def r2_equity_line(returns: pd.Series) -> float:
s = returns.dropna()
if s.size < 3:
return np.nan
equity = (1.0 + s).cumprod()
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
if equity.size < 3:
return np.nan
y = np.log(equity.values)
if np.allclose(y.var(ddof=1), 0.0):
return 0.0
x = np.arange(y.size, dtype=float)
X = np.column_stack([np.ones_like(x), x])
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - y.mean()) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
if np.isnan(r2):
return np.nan
return float(np.clip(r2, 0.0, 1.0))
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
max_dd = float(dd.min()) if dd.size else np.nan
under_water = dd < 0
if under_water.any():
max_dd_duration = 0
current = 0
for flag in under_water.values:
if flag:
current += 1
if current > max_dd_duration:
max_dd_duration = current
else:
current = 0
else:
max_dd_duration = 0
if dd.size:
trough_idx = int(np.argmin(dd.values))
if trough_idx > 0:
peak_idx = int(np.argmax(equity.values[: trough_idx + 1]))
peak_level = float(equity.values[peak_idx])
rec_idx = None
for t in range(trough_idx + 1, equity.size):
if equity.values[t] >= peak_level:
rec_idx = t
break
if rec_idx is None:
ttr_from_mdd = sentinel_ttr
else:
ttr_from_mdd = rec_idx - trough_idx
else:
ttr_from_mdd = np.nan
else:
ttr_from_mdd = np.nan
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
def heal_index_metrics(returns: pd.Series):
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
def h_min_100(returns: pd.Series, month_len: int = 21):
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1:]
tail = np.concatenate(([0.0], csum[:-k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(math.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# =========================
# CONNESSIONE DB (connection.txt)
# =========================
def read_connection_params(path: str = "connection.txt") -> dict:
params = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
params[key.strip()] = value.strip()
return params
def make_engine(params: dict):
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
conn_str = (
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
return create_engine(conn_str)
# =========================
# MAIN LOGIC
# =========================
def main(input_excel: str = os.path.join(INPUT_DIR, "Universo per metrics v.1.0.xlsx")):
# 1) Carica input
if not os.path.exists(input_excel):
raise FileNotFoundError(f"File di input non trovato: {input_excel}")
df_in = pd.read_excel(input_excel)
# Normalizza i nomi attesi
df_in.columns = [str(c).strip() for c in df_in.columns]
cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"]
for c in cols_needed:
if c not in df_in.columns:
raise ValueError(f"Manca la colonna richiesta nel file input: '{c}'")
# 2) Connessione DB
params = read_connection_params("connection.txt")
engine = make_engine(params)
with engine.connect() as con:
_ = con.execute(text("SELECT 1"))
# 3) Range date 5 anni (fino a ieri, business days)
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=MIN_YEARS_REQ)
all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize()
five_year_len = len(all_dates)
# 4) Scarico serie per ciascun ISIN e filtro < 5 anni
final_df = pd.DataFrame(index=all_dates)
accepted_isins = []
dropped_info = [] # lista dizionari per report scartati
for isin in df_in["ISIN"].dropna().astype(str).unique():
print(f"[SP] Recupero: {isin}")
sp = (
f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', "
f"@n = {SP_SAMPLE_SIZE}, @PtfCurr = {PTF_CURRENCY}"
)
try:
tmp = pd.read_sql_query(sp, engine)
if tmp.empty:
print(f" - Nessun dato: SKIP {isin}")
dropped_info.append({"ISIN": isin, "Motivo": "SP vuota"})
continue
tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize()
tmp = tmp.dropna(subset=["Px_Date"])
# deduplicate per data PRIMA del reindex (media)
dup_cnt = tmp["Px_Date"].duplicated().sum()
if dup_cnt > 0:
print(f" - Attenzione: {dup_cnt} duplicati di Px_Date per {isin}, compattati con media.")
tmp["RendimentoGiornaliero"] = pd.to_numeric(tmp["RendimentoGiornaliero"], errors="coerce")
ser = (
tmp.sort_values("Px_Date")
.groupby("Px_Date", as_index=True)["RendimentoGiornaliero"]
.mean()
.div(100.0) # percentuali → frazione
.reindex(all_dates)
)
# Criteri di accettazione (almeno 5 anni effettivi):
first_valid = ser.first_valid_index()
last_valid = ser.last_valid_index()
nonnull = int(ser.notna().sum())
if first_valid is None or last_valid is None:
dropped_info.append({"ISIN": isin, "Motivo": "nessun valore valido"})
print(f" - Nessun valore valido: SKIP {isin}")
continue
span_days = (last_valid - first_valid).days
cond_span = span_days >= (365 * MIN_YEARS_REQ - 7) # tolleranza 1 settimana
cond_count = nonnull >= int(0.98 * five_year_len) # copertura ~98%
cond_start = first_valid <= (start_date + pd.Timedelta(days=7))
if not (cond_span and cond_count and cond_start):
dropped_info.append({
"ISIN": isin,
"Motivo": f"storia insufficiente (span={span_days}d, count={nonnull}/{five_year_len})"
})
print(f" - Storia insufficiente: SKIP {isin}")
continue
# Riempie eventuali buchi residui con 0 (nessun rendimento quel giorno)
ser = ser.fillna(0.0)
final_df[isin] = ser
accepted_isins.append(isin)
print(f" - OK: {nonnull} osservazioni utili")
except SQLAlchemyError as e:
print(f" - Errore SP per {isin}: {e}")
dropped_info.append({"ISIN": isin, "Motivo": "errore SP"})
continue
if not accepted_isins:
print("Nessun ISIN con 5 anni pieni: nessun output generato.")
# comunque salvo l'elenco scartati se presente
if dropped_info:
today_str = pd.Timestamp.now().strftime("%Y%m%d")
filename = f"{today_str} Asset Metrics.xlsx"
out_path = os.path.join(OUTPUT_DIR, filename)
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
pd.DataFrame(dropped_info).to_excel(writer, sheet_name="Scartati", index=False)
print(f"Creato: {out_path} | ISIN metricati: 0 | Scartati: {len(dropped_info)}")
return
# 5) Calcolo metriche per-asset sui 5 anni
period_df = final_df[accepted_isins]
n_days = int(period_df.shape[0])
years_elapsed = n_days / DAYS_PER_YEAR if n_days > 0 else np.nan
daily_mean = period_df.mean()
ann_return = daily_mean * DAYS_PER_YEAR
ann_vol = period_df.std(ddof=1) * np.sqrt(DAYS_PER_YEAR)
gross = (1.0 + period_df).prod(skipna=True)
cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
r2_series = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
aaw_dict, auw_dict, heal_dict = {}, {}, {}
hmin_5y_months_dict = {}
for col in period_df.columns:
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
aaw, auw, heal = heal_index_metrics(period_df[col])
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
_, h_months_5y = h_min_100(period_df[col], month_len=21)
hmin_5y_months_dict[col] = h_months_5y
metrics_df = pd.DataFrame({
'ISIN': period_df.columns,
'Rendimento_Ann': ann_return.reindex(period_df.columns).values,
'Volatilita_Ann': ann_vol.reindex(period_df.columns).values,
'CAGR': cagr.reindex(period_df.columns).values,
'R2_Equity': r2_series.reindex(period_df.columns).values,
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
})
# Merge con info descrittive
metrics_df = (
metrics_df
.merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
)
# Ordine base delle colonne (prima di aggiungere il ranking)
metrics_df = metrics_df[
['ISIN', 'Nome', 'Categoria', 'Asset Class',
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']
]
# =========================
# AGGIUNTA COLONNE DI RANKING
# =========================
# Ranking raw: usiamo direttamente Heal_Index come score
metrics_df["Ranking raw"] = metrics_df["Heal_Index"]
# Ranking: ranking decrescente su Ranking raw (1 = migliore)
metrics_df["Ranking"] = (
metrics_df["Ranking raw"]
.rank(method="dense", ascending=False)
.astype("Int64") # consente NaN dove Ranking raw è NaN
)
# Ordinamento dell'output per Ranking crescente (1 = in alto)
metrics_df = metrics_df.sort_values("Ranking", na_position="last").reset_index(drop=True)
# =========================
# TRASFORMAZIONE ISIN IN HYPERLINK
# =========================
def isin_to_hyperlink(isin: str) -> str:
if pd.isna(isin):
return ""
isin_str = str(isin).strip()
if not isin_str:
return ""
url = f"https://www.justetf.com/it/etf-profile.html?isin={isin_str}"
# Formula Excel: visualizza l'ISIN come testo cliccabile
return f'=HYPERLINK("{url}", "{isin_str}")'
metrics_df["ISIN"] = metrics_df["ISIN"].apply(isin_to_hyperlink)
# =========================
# LEGENDA (FOGLIO DEDICATO)
# =========================
legenda_rows = [
("ISIN", "Codice ISIN dello strumento. È un hyperlink verso la pagina justETF corrispondente."),
("Nome", "Nome/descrizione commerciale dello strumento (ETF/fondo)."),
("Categoria", "Categoria logica dello strumento (es. Azionario Globale, Obbligazionario Corporate, ecc.)."),
("Asset Class", "Macro asset class (Azionari, Obbligazionari, Metalli Preziosi, Materie Prime, Immobiliare, Criptovalute, Monetari)."),
("Rendimento_Ann", "Rendimento medio annuo (media dei rendimenti giornalieri × 252)."),
("Volatilita_Ann", "Volatilità annua (deviazione standard giornaliera × √252)."),
("CAGR", "Tasso di crescita annuale composto sul periodo di 5 anni."),
("R2_Equity", "R² della regressione dei logaritmi dellequity line sul tempo (stima della “regolarità” del trend)."),
("MaxDD", "Massimo drawdown storico (peggior perdita percentuale dal massimo precedente; valore tipicamente negativo)."),
("DD_Duration_Max", "Durata massima (in giorni) di un qualunque periodo di drawdown continuo."),
("TTR_from_MDD", "Time To Recovery, in giorni, dal minimo del MaxDD al pieno recupero del picco precedente (1250 se mai recuperato)."),
("AAW", "Area Above Water: somma degli scostamenti positivi dellequity rispetto al minimo cumulato."),
("AUW", "Area Under Water: somma degli scostamenti negativi (in valore assoluto) dellequity rispetto al massimo cumulato."),
("Heal_Index", "(AAW - AUW) / AUW, indice sintetico che bilancia run-up e drawdown (valori più alti = profilo migliore)."),
("H_min_100m_5Y", "Numero minimo di mesi (21 giorni borsistici ≈ 1 mese) tali che TUTTE le finestre rolling di tale durata abbiano rendimento cumulato ≥ 0."),
("Ranking raw", "Valore grezzo usato per il ranking, coincidente con Heal_Index."),
("Ranking", "Classifica dallo strumento migliore (1) al peggiore, basata su Ranking raw in ordine decrescente (metodo dense).")
]
legenda_df = pd.DataFrame(legenda_rows, columns=["Campo", "Descrizione"])
# =========================
# EXPORT EXCEL — con prefisso data AAAAMMGG
# =========================
today_str = pd.Timestamp.now().strftime("%Y%m%d")
filename = f"{today_str} Asset Metrics.xlsx"
out_path = os.path.join(OUTPUT_DIR, filename)
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
metrics_df.to_excel(writer, sheet_name='Metriche_5Y', index=False)
if dropped_info:
pd.DataFrame(dropped_info).to_excel(writer, sheet_name='Scartati', index=False)
legenda_df.to_excel(writer, sheet_name='Legenda', index=False)
print(f"Creato: {out_path} | ISIN metricati: {len(metrics_df)} | Scartati: {len(dropped_info)}")
if __name__ == "__main__":
inp = sys.argv[1] if len(sys.argv) > 1 else os.path.join(INPUT_DIR, "Universo per metrics v.1.0.xlsx")
main(inp)