435 lines
17 KiB
Python
435 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Programma: metrics_builder_v1.py
|
||
Scopo: Leggere un file Excel di input (lista ISIN) e, usando la stessa SP
|
||
"opt_RendimentoGiornaliero1_ALL", scaricare le serie storiche dei
|
||
rendimenti e calcolare SOLO le metriche per-asset (come in asset_metrics).
|
||
|
||
Nota: scarta subito gli ISIN che NON hanno almeno 5 anni di storia utile.
|
||
|
||
Esecuzione (opzionale):
|
||
python metrics_builder_v1.py "Universo per metrics v.1.0.xlsx"
|
||
|
||
Dipendenze:
|
||
pip install numpy pandas SQLAlchemy pyodbc openpyxl
|
||
(assicurarsi di avere l'ODBC Driver 17 for SQL Server)
|
||
|
||
File richiesti:
|
||
- connection.txt con chiavi: username, password, host, port, database
|
||
- Excel di input con colonne: ISIN, Nome, Categoria, Asset Class
|
||
|
||
Output:
|
||
- output/AAAAMMGG Asset Metrics.xlsx
|
||
* foglio "Metriche_5Y"
|
||
* foglio "Scartati"
|
||
* foglio "Legenda"
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import math
|
||
import numpy as np
|
||
import pandas as pd
|
||
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.exc import SQLAlchemyError
|
||
|
||
# =========================
|
||
# CARTELLE INPUT/OUTPUT
|
||
# =========================
|
||
INPUT_DIR = "input"
|
||
OUTPUT_DIR = "output"
|
||
|
||
os.makedirs(INPUT_DIR, exist_ok=True)
|
||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||
|
||
# =========================
|
||
# PARAMETRI
|
||
# =========================
|
||
DAYS_PER_YEAR = 252
|
||
MIN_YEARS_REQ = 5
|
||
SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260)
|
||
PTF_CURRENCY = "EUR"
|
||
|
||
# =========================
|
||
# UTILITY METRICHE
|
||
# =========================
|
||
|
||
def r2_equity_line(returns: pd.Series) -> float:
|
||
s = returns.dropna()
|
||
if s.size < 3:
|
||
return np.nan
|
||
equity = (1.0 + s).cumprod()
|
||
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
|
||
if equity.size < 3:
|
||
return np.nan
|
||
y = np.log(equity.values)
|
||
if np.allclose(y.var(ddof=1), 0.0):
|
||
return 0.0
|
||
x = np.arange(y.size, dtype=float)
|
||
X = np.column_stack([np.ones_like(x), x])
|
||
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
|
||
y_hat = X @ beta
|
||
ss_res = np.sum((y - y_hat) ** 2)
|
||
ss_tot = np.sum((y - y.mean()) ** 2)
|
||
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
|
||
if np.isnan(r2):
|
||
return np.nan
|
||
return float(np.clip(r2, 0.0, 1.0))
|
||
|
||
|
||
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
|
||
s = returns.fillna(0.0).astype(float)
|
||
if s.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
equity = (1.0 + s).cumprod()
|
||
if equity.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
run_max = equity.cummax()
|
||
dd = equity / run_max - 1.0
|
||
max_dd = float(dd.min()) if dd.size else np.nan
|
||
under_water = dd < 0
|
||
if under_water.any():
|
||
max_dd_duration = 0
|
||
current = 0
|
||
for flag in under_water.values:
|
||
if flag:
|
||
current += 1
|
||
if current > max_dd_duration:
|
||
max_dd_duration = current
|
||
else:
|
||
current = 0
|
||
else:
|
||
max_dd_duration = 0
|
||
if dd.size:
|
||
trough_idx = int(np.argmin(dd.values))
|
||
if trough_idx > 0:
|
||
peak_idx = int(np.argmax(equity.values[: trough_idx + 1]))
|
||
peak_level = float(equity.values[peak_idx])
|
||
rec_idx = None
|
||
for t in range(trough_idx + 1, equity.size):
|
||
if equity.values[t] >= peak_level:
|
||
rec_idx = t
|
||
break
|
||
if rec_idx is None:
|
||
ttr_from_mdd = sentinel_ttr
|
||
else:
|
||
ttr_from_mdd = rec_idx - trough_idx
|
||
else:
|
||
ttr_from_mdd = np.nan
|
||
else:
|
||
ttr_from_mdd = np.nan
|
||
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
|
||
|
||
|
||
def heal_index_metrics(returns: pd.Series):
|
||
s = returns.fillna(0.0).astype(float)
|
||
if s.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
equity = (1.0 + s).cumprod()
|
||
if equity.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
run_max = equity.cummax()
|
||
dd = equity / run_max - 1.0
|
||
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
|
||
run_min = equity.cummin()
|
||
ru = equity / run_min - 1.0
|
||
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
|
||
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
|
||
return AAW, AUW, heal
|
||
|
||
|
||
def h_min_100(returns: pd.Series, month_len: int = 21):
|
||
s = returns.dropna().astype(float)
|
||
n = s.size
|
||
if n == 0:
|
||
return np.nan, np.nan
|
||
log1p = np.log1p(s.values)
|
||
csum = np.cumsum(log1p)
|
||
|
||
def rolling_sum_k(k: int):
|
||
if k > n:
|
||
return np.array([])
|
||
head = csum[k - 1:]
|
||
tail = np.concatenate(([0.0], csum[:-k]))
|
||
return head - tail
|
||
|
||
for k in range(1, n + 1):
|
||
rs = rolling_sum_k(k)
|
||
if rs.size == 0:
|
||
break
|
||
roll_ret = np.exp(rs) - 1.0
|
||
if np.all(roll_ret >= 0):
|
||
h_days = k
|
||
h_months = int(math.ceil(h_days / month_len))
|
||
return h_days, h_months
|
||
return np.nan, np.nan
|
||
|
||
# =========================
|
||
# CONNESSIONE DB (connection.txt)
|
||
# =========================
|
||
|
||
def read_connection_params(path: str = "connection.txt") -> dict:
|
||
params = {}
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith("#"):
|
||
key, value = line.split("=", 1)
|
||
params[key.strip()] = value.strip()
|
||
return params
|
||
|
||
|
||
def make_engine(params: dict):
|
||
username = params.get("username")
|
||
password = params.get("password")
|
||
host = params.get("host")
|
||
port = params.get("port", "1433")
|
||
database = params.get("database")
|
||
conn_str = (
|
||
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
|
||
"?driver=ODBC+Driver+17+for+SQL+Server"
|
||
)
|
||
return create_engine(conn_str)
|
||
|
||
# =========================
|
||
# MAIN LOGIC
|
||
# =========================
|
||
|
||
def main(input_excel: str = os.path.join(INPUT_DIR, "Universo per metrics v.1.0.xlsx")):
|
||
# 1) Carica input
|
||
if not os.path.exists(input_excel):
|
||
raise FileNotFoundError(f"File di input non trovato: {input_excel}")
|
||
df_in = pd.read_excel(input_excel)
|
||
|
||
# Normalizza i nomi attesi
|
||
df_in.columns = [str(c).strip() for c in df_in.columns]
|
||
cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"]
|
||
for c in cols_needed:
|
||
if c not in df_in.columns:
|
||
raise ValueError(f"Manca la colonna richiesta nel file input: '{c}'")
|
||
|
||
# 2) Connessione DB
|
||
params = read_connection_params("connection.txt")
|
||
engine = make_engine(params)
|
||
with engine.connect() as con:
|
||
_ = con.execute(text("SELECT 1"))
|
||
|
||
# 3) Range date 5 anni (fino a ieri, business days)
|
||
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
|
||
start_date = end_date - pd.DateOffset(years=MIN_YEARS_REQ)
|
||
all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize()
|
||
five_year_len = len(all_dates)
|
||
|
||
# 4) Scarico serie per ciascun ISIN e filtro < 5 anni
|
||
final_df = pd.DataFrame(index=all_dates)
|
||
accepted_isins = []
|
||
dropped_info = [] # lista dizionari per report scartati
|
||
|
||
for isin in df_in["ISIN"].dropna().astype(str).unique():
|
||
print(f"[SP] Recupero: {isin}")
|
||
sp = (
|
||
f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', "
|
||
f"@n = {SP_SAMPLE_SIZE}, @PtfCurr = {PTF_CURRENCY}"
|
||
)
|
||
try:
|
||
tmp = pd.read_sql_query(sp, engine)
|
||
if tmp.empty:
|
||
print(f" - Nessun dato: SKIP {isin}")
|
||
dropped_info.append({"ISIN": isin, "Motivo": "SP vuota"})
|
||
continue
|
||
|
||
tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize()
|
||
tmp = tmp.dropna(subset=["Px_Date"])
|
||
|
||
# deduplicate per data PRIMA del reindex (media)
|
||
dup_cnt = tmp["Px_Date"].duplicated().sum()
|
||
if dup_cnt > 0:
|
||
print(f" - Attenzione: {dup_cnt} duplicati di Px_Date per {isin}, compattati con media.")
|
||
|
||
tmp["RendimentoGiornaliero"] = pd.to_numeric(tmp["RendimentoGiornaliero"], errors="coerce")
|
||
|
||
ser = (
|
||
tmp.sort_values("Px_Date")
|
||
.groupby("Px_Date", as_index=True)["RendimentoGiornaliero"]
|
||
.mean()
|
||
.div(100.0) # percentuali → frazione
|
||
.reindex(all_dates)
|
||
)
|
||
|
||
# Criteri di accettazione (almeno 5 anni effettivi):
|
||
first_valid = ser.first_valid_index()
|
||
last_valid = ser.last_valid_index()
|
||
nonnull = int(ser.notna().sum())
|
||
|
||
if first_valid is None or last_valid is None:
|
||
dropped_info.append({"ISIN": isin, "Motivo": "nessun valore valido"})
|
||
print(f" - Nessun valore valido: SKIP {isin}")
|
||
continue
|
||
|
||
span_days = (last_valid - first_valid).days
|
||
cond_span = span_days >= (365 * MIN_YEARS_REQ - 7) # tolleranza 1 settimana
|
||
cond_count = nonnull >= int(0.98 * five_year_len) # copertura ~98%
|
||
cond_start = first_valid <= (start_date + pd.Timedelta(days=7))
|
||
|
||
if not (cond_span and cond_count and cond_start):
|
||
dropped_info.append({
|
||
"ISIN": isin,
|
||
"Motivo": f"storia insufficiente (span={span_days}d, count={nonnull}/{five_year_len})"
|
||
})
|
||
print(f" - Storia insufficiente: SKIP {isin}")
|
||
continue
|
||
|
||
# Riempie eventuali buchi residui con 0 (nessun rendimento quel giorno)
|
||
ser = ser.fillna(0.0)
|
||
final_df[isin] = ser
|
||
accepted_isins.append(isin)
|
||
print(f" - OK: {nonnull} osservazioni utili")
|
||
except SQLAlchemyError as e:
|
||
print(f" - Errore SP per {isin}: {e}")
|
||
dropped_info.append({"ISIN": isin, "Motivo": "errore SP"})
|
||
continue
|
||
|
||
if not accepted_isins:
|
||
print("Nessun ISIN con 5 anni pieni: nessun output generato.")
|
||
# comunque salvo l'elenco scartati se presente
|
||
if dropped_info:
|
||
today_str = pd.Timestamp.now().strftime("%Y%m%d")
|
||
filename = f"{today_str} Asset Metrics.xlsx"
|
||
out_path = os.path.join(OUTPUT_DIR, filename)
|
||
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
|
||
pd.DataFrame(dropped_info).to_excel(writer, sheet_name="Scartati", index=False)
|
||
print(f"Creato: {out_path} | ISIN metricati: 0 | Scartati: {len(dropped_info)}")
|
||
return
|
||
|
||
# 5) Calcolo metriche per-asset sui 5 anni
|
||
period_df = final_df[accepted_isins]
|
||
n_days = int(period_df.shape[0])
|
||
years_elapsed = n_days / DAYS_PER_YEAR if n_days > 0 else np.nan
|
||
|
||
daily_mean = period_df.mean()
|
||
ann_return = daily_mean * DAYS_PER_YEAR
|
||
ann_vol = period_df.std(ddof=1) * np.sqrt(DAYS_PER_YEAR)
|
||
|
||
gross = (1.0 + period_df).prod(skipna=True)
|
||
cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
|
||
|
||
r2_series = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
|
||
|
||
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
|
||
aaw_dict, auw_dict, heal_dict = {}, {}, {}
|
||
hmin_5y_months_dict = {}
|
||
|
||
for col in period_df.columns:
|
||
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
|
||
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
|
||
aaw, auw, heal = heal_index_metrics(period_df[col])
|
||
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
|
||
_, h_months_5y = h_min_100(period_df[col], month_len=21)
|
||
hmin_5y_months_dict[col] = h_months_5y
|
||
|
||
metrics_df = pd.DataFrame({
|
||
'ISIN': period_df.columns,
|
||
'Rendimento_Ann': ann_return.reindex(period_df.columns).values,
|
||
'Volatilita_Ann': ann_vol.reindex(period_df.columns).values,
|
||
'CAGR': cagr.reindex(period_df.columns).values,
|
||
'R2_Equity': r2_series.reindex(period_df.columns).values,
|
||
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
|
||
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
|
||
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
|
||
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
|
||
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
|
||
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
|
||
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
|
||
})
|
||
|
||
# Merge con info descrittive
|
||
metrics_df = (
|
||
metrics_df
|
||
.merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
|
||
)
|
||
|
||
# Ordine base delle colonne (prima di aggiungere il ranking)
|
||
metrics_df = metrics_df[
|
||
['ISIN', 'Nome', 'Categoria', 'Asset Class',
|
||
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
|
||
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
|
||
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']
|
||
]
|
||
|
||
# =========================
|
||
# AGGIUNTA COLONNE DI RANKING
|
||
# =========================
|
||
# Ranking raw: usiamo direttamente Heal_Index come score
|
||
metrics_df["Ranking raw"] = metrics_df["Heal_Index"]
|
||
|
||
# Ranking: ranking decrescente su Ranking raw (1 = migliore)
|
||
metrics_df["Ranking"] = (
|
||
metrics_df["Ranking raw"]
|
||
.rank(method="dense", ascending=False)
|
||
.astype("Int64") # consente NaN dove Ranking raw è NaN
|
||
)
|
||
|
||
# Ordinamento dell'output per Ranking crescente (1 = in alto)
|
||
metrics_df = metrics_df.sort_values("Ranking", na_position="last").reset_index(drop=True)
|
||
|
||
# =========================
|
||
# TRASFORMAZIONE ISIN IN HYPERLINK
|
||
# =========================
|
||
def isin_to_hyperlink(isin: str) -> str:
|
||
if pd.isna(isin):
|
||
return ""
|
||
isin_str = str(isin).strip()
|
||
if not isin_str:
|
||
return ""
|
||
url = f"https://www.justetf.com/it/etf-profile.html?isin={isin_str}"
|
||
# Formula Excel: visualizza l'ISIN come testo cliccabile
|
||
return f'=HYPERLINK("{url}", "{isin_str}")'
|
||
|
||
metrics_df["ISIN"] = metrics_df["ISIN"].apply(isin_to_hyperlink)
|
||
|
||
# =========================
|
||
# LEGENDA (FOGLIO DEDICATO)
|
||
# =========================
|
||
legenda_rows = [
|
||
("ISIN", "Codice ISIN dello strumento. È un hyperlink verso la pagina justETF corrispondente."),
|
||
("Nome", "Nome/descrizione commerciale dello strumento (ETF/fondo)."),
|
||
("Categoria", "Categoria logica dello strumento (es. Azionario Globale, Obbligazionario Corporate, ecc.)."),
|
||
("Asset Class", "Macro asset class (Azionari, Obbligazionari, Metalli Preziosi, Materie Prime, Immobiliare, Criptovalute, Monetari)."),
|
||
("Rendimento_Ann", "Rendimento medio annuo (media dei rendimenti giornalieri × 252)."),
|
||
("Volatilita_Ann", "Volatilità annua (deviazione standard giornaliera × √252)."),
|
||
("CAGR", "Tasso di crescita annuale composto sul periodo di 5 anni."),
|
||
("R2_Equity", "R² della regressione dei logaritmi dell’equity line sul tempo (stima della “regolarità” del trend)."),
|
||
("MaxDD", "Massimo drawdown storico (peggior perdita percentuale dal massimo precedente; valore tipicamente negativo)."),
|
||
("DD_Duration_Max", "Durata massima (in giorni) di un qualunque periodo di drawdown continuo."),
|
||
("TTR_from_MDD", "Time To Recovery, in giorni, dal minimo del MaxDD al pieno recupero del picco precedente (1250 se mai recuperato)."),
|
||
("AAW", "Area Above Water: somma degli scostamenti positivi dell’equity rispetto al minimo cumulato."),
|
||
("AUW", "Area Under Water: somma degli scostamenti negativi (in valore assoluto) dell’equity rispetto al massimo cumulato."),
|
||
("Heal_Index", "(AAW - AUW) / AUW, indice sintetico che bilancia run-up e drawdown (valori più alti = profilo migliore)."),
|
||
("H_min_100m_5Y", "Numero minimo di mesi (21 giorni borsistici ≈ 1 mese) tali che TUTTE le finestre rolling di tale durata abbiano rendimento cumulato ≥ 0."),
|
||
("Ranking raw", "Valore grezzo usato per il ranking, coincidente con Heal_Index."),
|
||
("Ranking", "Classifica dallo strumento migliore (1) al peggiore, basata su Ranking raw in ordine decrescente (metodo dense).")
|
||
]
|
||
legenda_df = pd.DataFrame(legenda_rows, columns=["Campo", "Descrizione"])
|
||
|
||
# =========================
|
||
# EXPORT EXCEL — con prefisso data AAAAMMGG
|
||
# =========================
|
||
today_str = pd.Timestamp.now().strftime("%Y%m%d")
|
||
filename = f"{today_str} Asset Metrics.xlsx"
|
||
out_path = os.path.join(OUTPUT_DIR, filename)
|
||
|
||
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
|
||
metrics_df.to_excel(writer, sheet_name='Metriche_5Y', index=False)
|
||
|
||
if dropped_info:
|
||
pd.DataFrame(dropped_info).to_excel(writer, sheet_name='Scartati', index=False)
|
||
|
||
legenda_df.to_excel(writer, sheet_name='Legenda', index=False)
|
||
|
||
print(f"Creato: {out_path} | ISIN metricati: {len(metrics_df)} | Scartati: {len(dropped_info)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
inp = sys.argv[1] if len(sys.argv) > 1 else os.path.join(INPUT_DIR, "Universo per metrics v.1.0.xlsx")
|
||
main(inp)
|