Files
Ranking/Metriche v.2.0.py

402 lines
16 KiB
Python

# -*- coding: utf-8 -*-
import sys
import math
from pathlib import Path
from typing import Union
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
# =========================
# PARAMETRI
# =========================
DAYS_PER_YEAR = 252
MIN_YEARS_REQ = 5
SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260)
PTF_CURRENCY = "EUR"
BASE_DIR = Path(__file__).resolve().parent
DEFAULT_INPUT_EXCEL = BASE_DIR / "Input" / "Universo per metrics v.1.0.xlsx"
# =========================
# UTILITY METRICHE
# =========================
def r2_equity_line(returns: pd.Series) -> float:
s = returns.dropna()
if s.size < 3:
return np.nan
equity = (1.0 + s).cumprod()
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
if equity.size < 3:
return np.nan
y = np.log(equity.values)
if np.allclose(y.var(ddof=1), 0.0):
return 0.0
x = np.arange(y.size, dtype=float)
X = np.column_stack([np.ones_like(x), x])
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - y.mean()) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
if np.isnan(r2):
return np.nan
return float(np.clip(r2, 0.0, 1.0))
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
max_dd = float(dd.min()) if dd.size else np.nan
under_water = dd < 0
if under_water.any():
max_dd_duration = 0
current = 0
for flag in under_water.values:
if flag:
current += 1
if current > max_dd_duration:
max_dd_duration = current
else:
current = 0
else:
max_dd_duration = 0
if dd.size:
trough_idx = int(np.argmin(dd.values))
if trough_idx > 0:
peak_idx = int(np.argmax(equity.values[: trough_idx + 1]))
peak_level = float(equity.values[peak_idx])
rec_idx = None
for t in range(trough_idx + 1, equity.size):
if equity.values[t] >= peak_level:
rec_idx = t
break
if rec_idx is None:
ttr_from_mdd = sentinel_ttr
else:
ttr_from_mdd = rec_idx - trough_idx
else:
ttr_from_mdd = np.nan
else:
ttr_from_mdd = np.nan
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
def heal_index_metrics(returns: pd.Series):
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
def h_min_100(returns: pd.Series, month_len: int = 21):
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1 :]
tail = np.concatenate(([0.0], csum[: -k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(math.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# =========================
# CONNESSIONE DB (connection.txt)
# =========================
def read_connection_params(path: str = "connection.txt") -> dict:
params = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
params[key.strip()] = value.strip()
return params
def make_engine(params: dict):
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
conn_str = (
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
return create_engine(conn_str)
# =========================
# MAIN LOGIC
# =========================
def main(input_excel: Union[str, Path] = DEFAULT_INPUT_EXCEL):
# 1) Carica input
input_path = Path(input_excel)
if not input_path.is_absolute():
input_path = (BASE_DIR / input_path).resolve()
if not input_path.exists():
raise FileNotFoundError(f"File di input non trovato: {input_path}")
df_in = pd.read_excel(input_path)
# Normalizza i nomi attesi
df_in.columns = [str(c).strip() for c in df_in.columns]
cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"]
for c in cols_needed:
if c not in df_in.columns:
raise ValueError(f"Manca la colonna richiesta nel file input: '{c}'")
# 2) Connessione DB
params = read_connection_params("connection.txt")
engine = make_engine(params)
with engine.connect() as con:
_ = con.execute(text("SELECT 1"))
# 3) Range date 5 anni (fino a ieri, business days)
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=MIN_YEARS_REQ)
all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize()
five_year_len = len(all_dates)
# 4) Scarico serie per ciascun ISIN e filtro < 5 anni
final_df = pd.DataFrame(index=all_dates)
accepted_isins = []
dropped_info = [] # lista dizionari per report scartati
# Nuova lista per raccogliere i dati da StrumentiFinanziari
strumenti_info_rows = []
# Query parametrica per StrumentiFinanziari (con Valuta)
sql_strumenti = text("""
SELECT ISIN, Valuta, Strumento, MorningStarCat, MacroAsset
FROM StrumentiFinanziari
WHERE ISIN = :isin
""")
for isin in df_in["ISIN"].dropna().astype(str).unique():
print(f"[SP] Recupero: {isin}")
sp = (
f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', "
f"@n = {SP_SAMPLE_SIZE}, @PtfCurr = {PTF_CURRENCY}"
)
try:
tmp = pd.read_sql_query(sp, engine)
if tmp.empty:
print(f" - Nessun dato: SKIP {isin}")
dropped_info.append({"ISIN": isin, "Motivo": "SP vuota"})
continue
tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize()
tmp = tmp.dropna(subset=["Px_Date"])
# FIX: deduplicate per data PRIMA del reindex (media)
dup_cnt = tmp["Px_Date"].duplicated().sum()
if dup_cnt > 0:
print(f" - Attenzione: {dup_cnt} duplicati di Px_Date per {isin}, compattati con media.")
tmp["RendimentoGiornaliero"] = pd.to_numeric(tmp["RendimentoGiornaliero"], errors="coerce")
ser = (
tmp.sort_values("Px_Date")
.groupby("Px_Date", as_index=True)["RendimentoGiornaliero"]
.mean()
.div(100.0) # la SP restituisce percentuali → converti in frazione
.reindex(all_dates)
)
# Criteri di accettazione (almeno 5 anni effettivi):
first_valid = ser.first_valid_index()
last_valid = ser.last_valid_index()
nonnull = int(ser.notna().sum())
if first_valid is None or last_valid is None:
dropped_info.append({"ISIN": isin, "Motivo": "nessun valore valido"})
print(f" - Nessun valore valido: SKIP {isin}")
continue
span_days = (last_valid - first_valid).days
cond_span = span_days >= (365 * MIN_YEARS_REQ - 7) # tolleranza 1 settimana
cond_count = nonnull >= int(0.98 * five_year_len) # copertura ~98%
cond_start = first_valid <= (start_date + pd.Timedelta(days=7))
if not (cond_span and cond_count and cond_start):
dropped_info.append({
"ISIN": isin,
"Motivo": f"storia insufficiente (span={span_days}d, count={nonnull}/{five_year_len})"
})
print(f" - Storia insufficiente: SKIP {isin}")
continue
# Riempie eventuali buchi residui con 0 (nessun rendimento quel giorno)
ser = ser.fillna(0.0)
final_df[isin] = ser
accepted_isins.append(isin)
print(f" - OK: {nonnull} osservazioni utili")
# ================================================================
# NUOVA PARTE: SELECT da StrumentiFinanziari per questo ISIN
# ================================================================
try:
df_strum = pd.read_sql_query(sql_strumenti, engine, params={"isin": isin})
if df_strum.empty:
# Se non trovi niente, metti comunque una riga con NaN
strumenti_info_rows.append({
"ISIN": isin,
"Valuta": np.nan,
"Strumento": np.nan,
"MorningStarCat": np.nan,
"MacroAsset": np.nan
})
print(" - StrumentiFinanziari: nessuna riga trovata.")
else:
# Supponiamo che l'ISIN sia univoco nella tabella
r = df_strum.iloc[0]
strumenti_info_rows.append({
"ISIN": r.get("ISIN", isin),
"Valuta": r.get("Valuta", np.nan),
"Strumento": r.get("Strumento", np.nan),
"MorningStarCat": r.get("MorningStarCat", np.nan),
"MacroAsset": r.get("MacroAsset", np.nan)
})
print(" - StrumentiFinanziari: info recuperate.")
except SQLAlchemyError as e_info:
print(f" - Errore SELECT StrumentiFinanziari per {isin}: {e_info}")
strumenti_info_rows.append({
"ISIN": isin,
"Valuta": np.nan,
"Strumento": np.nan,
"MorningStarCat": np.nan,
"MacroAsset": np.nan
})
# ================================================================
except SQLAlchemyError as e:
print(f" - Errore SP per {isin}: {e}")
dropped_info.append({"ISIN": isin, "Motivo": "errore SP"})
continue
if not accepted_isins:
print("Nessun ISIN con 5 anni pieni: nessun output generato.")
# comunque salvo l'elenco scartati se presente
if dropped_info:
pd.DataFrame(dropped_info).to_excel("asset_metrics_only.xlsx", sheet_name="Scartati", index=False)
# salvo eventualmente anche il file con info StrumentiFinanziari (se presente qualcosa)
if strumenti_info_rows:
df_info_out = (
pd.DataFrame(strumenti_info_rows)
[["ISIN", "Valuta", "Strumento", "MorningStarCat", "MacroAsset"]]
.sort_values("ISIN", kind="stable")
.reset_index(drop=True)
)
df_info_out.to_excel("strumenti_info.xlsx", sheet_name="Strumenti", index=False)
return
# 5) Calcolo metriche per-asset sui 5 anni
period_df = final_df[accepted_isins]
n_days = int(period_df.shape[0])
years_elapsed = n_days / DAYS_PER_YEAR if n_days > 0 else np.nan
daily_mean = period_df.mean()
ann_return = daily_mean * DAYS_PER_YEAR
ann_vol = period_df.std(ddof=1) * np.sqrt(DAYS_PER_YEAR)
gross = (1.0 + period_df).prod(skipna=True)
cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
r2_series = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
aaw_dict, auw_dict, heal_dict = {}, {}, {}
hmin_5y_months_dict = {}
for col in period_df.columns:
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
aaw, auw, heal = heal_index_metrics(period_df[col])
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
_, h_months_5y = h_min_100(period_df[col], month_len=21)
hmin_5y_months_dict[col] = h_months_5y
metrics_df = (
pd.DataFrame({
'ISIN': period_df.columns,
'Rendimento_Ann': ann_return.reindex(period_df.columns).values,
'Volatilita_Ann': ann_vol.reindex(period_df.columns).values,
'CAGR': cagr.reindex(period_df.columns).values,
'R2_Equity': r2_series.reindex(period_df.columns).values,
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
})
.merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
[['ISIN', 'Nome', 'Categoria', 'Asset Class',
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']]
.sort_values('ISIN', kind='stable')
.reset_index(drop=True)
)
# 6) Export Excel metriche (openpyxl)
out_path = "asset_metrics_only.xlsx"
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
metrics_df.to_excel(writer, sheet_name='Metriche_5Y', index=False)
if dropped_info:
pd.DataFrame(dropped_info).to_excel(writer, sheet_name='Scartati', index=False)
print(f"Creato: {out_path} | ISIN metricati: {len(metrics_df)} | Scartati: {len(dropped_info)}")
# 7) Export Excel separato con info da StrumentiFinanziari
if strumenti_info_rows:
df_info_out = (
pd.DataFrame(strumenti_info_rows)
[["ISIN", "Valuta", "Strumento", "MorningStarCat", "MacroAsset"]]
.sort_values("ISIN", kind="stable")
.reset_index(drop=True)
)
out_info = "strumenti_info.xlsx"
df_info_out.to_excel(out_info, sheet_name="Strumenti", index=False)
print(f"Creato: {out_info} | Righe: {len(df_info_out)}")
else:
print("Nessuna informazione da StrumentiFinanziari da esportare.")
if __name__ == "__main__":
inp = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_INPUT_EXCEL
main(inp)