From 6370b31d41efdbfe5c3f84ecf0194f46ed27dc92 Mon Sep 17 00:00:00 2001 From: fredmaloggia Date: Mon, 1 Dec 2025 21:40:36 +0100 Subject: [PATCH] perfezionata distillazione strumenti per correlazione e per entropia --- Metriche v.2.0.py | 41 +-- Metriche v.3.0.py | 722 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 734 insertions(+), 29 deletions(-) create mode 100644 Metriche v.3.0.py diff --git a/Metriche v.2.0.py b/Metriche v.2.0.py index 20ee14c..2ee75be 100644 --- a/Metriche v.2.0.py +++ b/Metriche v.2.0.py @@ -1,31 +1,9 @@ # -*- coding: utf-8 -*- -""" -Programma: metrics_builder_v1.py -Scopo: Leggere un file Excel di input (lista ISIN) e, usando la stessa SP - "opt_RendimentoGiornaliero1_ALL", scaricare le serie storiche dei - rendimenti e calcolare SOLO le metriche per-asset (come in asset_metrics). - -Nota: scarta subito gli ISIN che NON hanno almeno 5 anni di storia utile. - -Esecuzione (opzionale): - python metrics_builder_v1.py ".Universo per metrics v.1.0.xlsx" - -Dipendenze: - pip install numpy pandas SQLAlchemy pyodbc openpyxl - (assicurarsi di avere l'ODBC Driver 17 for SQL Server) - -File richiesti: - - connection.txt con chiavi: username, password, host, port, database - - Excel di input con colonne: ISIN, Nome, Categoria, Asset Class - -Output: - - asset_metrics_only.xlsx (un foglio "Metriche_5Y" e uno "Scartati") - - strumenti_info.xlsx (un foglio "Strumenti" con ISIN, Valuta, Strumento, MorningStarCat, MacroAsset) -""" import sys -import os import math +from pathlib import Path +from typing import Union import numpy as np import pandas as pd from sqlalchemy import create_engine, text @@ -38,6 +16,8 @@ DAYS_PER_YEAR = 252 MIN_YEARS_REQ = 5 SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260) PTF_CURRENCY = "EUR" +BASE_DIR = Path(__file__).resolve().parent +DEFAULT_INPUT_EXCEL = BASE_DIR / "Input" / "Universo per metrics v.1.0.xlsx" # ========================= # UTILITY METRICHE @@ -184,11 +164,14 @@ def make_engine(params: dict): # MAIN LOGIC # ========================= -def main(input_excel: str = "./Universo per metrics v.1.0.xlsx"): +def main(input_excel: Union[str, Path] = DEFAULT_INPUT_EXCEL): # 1) Carica input - if not os.path.exists(input_excel): - raise FileNotFoundError(f"File di input non trovato: {input_excel}") - df_in = pd.read_excel(input_excel) + input_path = Path(input_excel) + if not input_path.is_absolute(): + input_path = (BASE_DIR / input_path).resolve() + if not input_path.exists(): + raise FileNotFoundError(f"File di input non trovato: {input_path}") + df_in = pd.read_excel(input_path) # Normalizza i nomi attesi df_in.columns = [str(c).strip() for c in df_in.columns] cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"] @@ -414,5 +397,5 @@ def main(input_excel: str = "./Universo per metrics v.1.0.xlsx"): print("Nessuna informazione da StrumentiFinanziari da esportare.") if __name__ == "__main__": - inp = sys.argv[1] if len(sys.argv) > 1 else "./Universo per metrics v.1.0.xlsx" + inp = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_INPUT_EXCEL main(inp) diff --git a/Metriche v.3.0.py b/Metriche v.3.0.py new file mode 100644 index 0000000..78a8836 --- /dev/null +++ b/Metriche v.3.0.py @@ -0,0 +1,722 @@ +# -*- coding: utf-8 -*- +""" +Created on Sun Nov 30 22:59:00 2025 + +@author: Federico +""" + +# -*- coding: utf-8 -*- + +import sys +import math +from pathlib import Path +from typing import Union +import numpy as np +import pandas as pd +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError + +# ========================= +# PARAMETRI +# ========================= +DAYS_PER_YEAR = 252 +MIN_YEARS_REQ = 5 +SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260) +PTF_CURRENCY = "EUR" +BASE_DIR = Path(__file__).resolve().parent +DEFAULT_INPUT_EXCEL = BASE_DIR / "Input" / "Universo per metrics v.1.0.xlsx" +OUTPUT_DIR = BASE_DIR / "Output" + +# massimo numero di strumenti da mantenere nell'"universo selezionato" +MAX_UNIVERSE_SIZE = 100 +SELECTION_MODE = "entropy" # "corr" (greedy CorrIndex) oppure "entropy" (massimizza entropia degli autovalori della correlazione) + +# ========================= +# UTILITY METRICHE +# ========================= + +def r2_equity_line(returns: pd.Series) -> float: + s = returns.dropna() + if s.size < 3: + return np.nan + equity = (1.0 + s).cumprod() + equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna() + if equity.size < 3: + return np.nan + y = np.log(equity.values) + if np.allclose(y.var(ddof=1), 0.0): + return 0.0 + x = np.arange(y.size, dtype=float) + X = np.column_stack([np.ones_like(x), x]) + beta, *_ = np.linalg.lstsq(X, y, rcond=None) + y_hat = X @ beta + ss_res = np.sum((y - y_hat) ** 2) + ss_tot = np.sum((y - y.mean()) ** 2) + r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan + if np.isnan(r2): + return np.nan + return float(np.clip(r2, 0.0, 1.0)) + + +def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): + s = returns.fillna(0.0).astype(float) + if s.size == 0: + return np.nan, np.nan, np.nan + equity = (1.0 + s).cumprod() + if equity.size == 0: + return np.nan, np.nan, np.nan + run_max = equity.cummax() + dd = equity / run_max - 1.0 + max_dd = float(dd.min()) if dd.size else np.nan + under_water = dd < 0 + if under_water.any(): + max_dd_duration = 0 + current = 0 + for flag in under_water.values: + if flag: + current += 1 + if current > max_dd_duration: + max_dd_duration = current + else: + current = 0 + else: + max_dd_duration = 0 + if dd.size: + trough_idx = int(np.argmin(dd.values)) + if trough_idx > 0: + peak_idx = int(np.argmax(equity.values[: trough_idx + 1])) + peak_level = float(equity.values[peak_idx]) + rec_idx = None + for t in range(trough_idx + 1, equity.size): + if equity.values[t] >= peak_level: + rec_idx = t + break + if rec_idx is None: + ttr_from_mdd = sentinel_ttr + else: + ttr_from_mdd = rec_idx - trough_idx + else: + ttr_from_mdd = np.nan + else: + ttr_from_mdd = np.nan + return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan) + + +def heal_index_metrics(returns: pd.Series): + s = returns.fillna(0.0).astype(float) + if s.size == 0: + return np.nan, np.nan, np.nan + equity = (1.0 + s).cumprod() + if equity.size == 0: + return np.nan, np.nan, np.nan + run_max = equity.cummax() + dd = equity / run_max - 1.0 + AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan + run_min = equity.cummin() + ru = equity / run_min - 1.0 + AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan + heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan + return AAW, AUW, heal + + +def h_min_100(returns: pd.Series, month_len: int = 21): + s = returns.dropna().astype(float) + n = s.size + if n == 0: + return np.nan, np.nan + log1p = np.log1p(s.values) + csum = np.cumsum(log1p) + + def rolling_sum_k(k: int): + if k > n: + return np.array([]) + head = csum[k - 1 :] + tail = np.concatenate(([0.0], csum[: -k])) + return head - tail + + for k in range(1, n + 1): + rs = rolling_sum_k(k) + if rs.size == 0: + break + roll_ret = np.exp(rs) - 1.0 + if np.all(roll_ret >= 0): + h_days = k + h_months = int(math.ceil(h_days / month_len)) + return h_days, h_months + return np.nan, np.nan + +# ========================= +# UTILITY CORRELAZIONE / DIVERSIFICAZIONE +# ========================= + +def compute_corr_diversification(corr: pd.DataFrame): + """ + Calcola: + - CorrIndex in [0,1] (0 = molto diversificato, 1 = altamente correlato) + - DiversificationScore = 1 - CorrIndex + sulla base della matrice di correlazione 'corr'. + """ + m = corr.shape[0] + if m <= 1: + return np.nan, np.nan + + mask = np.triu(np.ones((m, m), dtype=bool), k=1) + off_vals = corr.values[mask] + if off_vals.size == 0: + return np.nan, np.nan + + mean_rho = float(np.nanmean(off_vals)) + rho_min = -1.0 / (m - 1) if m > 1 else 0.0 + + corr_index = (mean_rho - rho_min) / (1.0 - rho_min) + corr_index = float(np.clip(corr_index, 0.0, 1.0)) + divers_score = 1.0 - corr_index + return corr_index, divers_score + + +def corr_entropy_metrics(corr: pd.DataFrame): + """ + Ritorna: + - entropy: entropia (ln) degli autovalori di correlazione (clippati >=0) + - entropy_norm: entropy / ln(N) + - n_eff: exp(entropy) + - decorrelation_potential: (n_eff - 1)/(N - 1) se N>1 + """ + n = corr.shape[0] + if n == 0: + return np.nan, np.nan, np.nan, np.nan + # simmetrizza e clip per stabilità + m = corr.values.astype(float) + m = 0.5 * (m + m.T) + m = np.clip(m, -1.0, 1.0) + eig = np.linalg.eigvalsh(m) + eig = np.clip(eig, 0.0, None) + s = float(eig.sum()) + if s <= 0: + return np.nan, np.nan, np.nan, np.nan + p = eig / s + entropy = float(-(p * np.log(p + 1e-18)).sum()) + entropy_norm = float(entropy / np.log(n)) if n > 1 else np.nan + n_eff = float(np.exp(entropy)) + decorrelation_potential = float((n_eff - 1.0) / (n - 1.0)) if n > 1 else np.nan + return entropy, entropy_norm, n_eff, decorrelation_potential + + +def select_universe_by_diversification( + returns: pd.DataFrame, + max_assets: int = 100, + selection_mode: str = "corr", # "corr" (min CorrIndex) oppure "entropy" (max entropia autovalori corr) +): + """ + Seleziona al massimo 'max_assets' colonne da 'returns' (rendimento giornaliero), + cercando di massimizzare la diversificazione in termini di correlazione. + + Ritorna: + - lista di ISIN selezionati + - CorrIndex finale + - DiversificationScore finale + """ + cols = list(returns.columns) + m = len(cols) + if m == 0: + return [], np.nan, np.nan + + corr_full = returns.corr().fillna(0.0) + + if m <= max_assets: + corr_index, divers_score = compute_corr_diversification(corr_full) + print( + f"[Diversification] Universo con {m} asset (<= {max_assets}), " + f"CorrIndex={corr_index:.4f}, DiversScore={divers_score:.4f} (nessun filtro applicato)." + ) + return cols, corr_index, divers_score + + print( + f"[Diversification-{selection_mode}] Universo iniziale: {m} asset. " + f"Selezione greedy fino a max {max_assets} asset..." + ) + + avg_corr = corr_full.apply( + lambda s: s.drop(s.name).mean() if s.drop(s.name).size > 0 else 0.0, + axis=1, + ) + + first_asset = avg_corr.idxmin() + selected = [first_asset] + remaining = set(cols) - {first_asset} + + while len(selected) < max_assets and remaining: + best_asset = None + best_score = None + best_corr_index = None + + for c in list(remaining): + subset = selected + [c] + sub_corr = corr_full.loc[subset, subset] + ci_tmp, _ = compute_corr_diversification(sub_corr) + + if selection_mode == "entropy": + _, h_norm_tmp, n_eff_tmp, decor_tmp = corr_entropy_metrics(sub_corr) + score_tmp = h_norm_tmp if np.isfinite(h_norm_tmp) else np.nan + # fallback: se score non finito, usa -CorrIndex per penalizzare correlazione alta + if not np.isfinite(score_tmp): + score_tmp = -ci_tmp if np.isfinite(ci_tmp) else np.nan + else: + # modalità standard: minimizza CorrIndex -> score = -CorrIndex + score_tmp = -ci_tmp if np.isfinite(ci_tmp) else np.nan + + if best_score is None or (np.isfinite(score_tmp) and score_tmp > best_score): + best_score = score_tmp + best_corr_index = ci_tmp + best_asset = c + elif best_score is not None and np.isclose(score_tmp, best_score, equal_nan=False): + # parità: prendo CorrIndex più basso + if np.isfinite(ci_tmp) and (best_corr_index is None or ci_tmp < best_corr_index): + best_corr_index = ci_tmp + best_asset = c + + if best_asset is None: + print(f"[Diversification-{selection_mode}] Nessun asset migliorativo trovato, interrompo la selezione.") + break + + selected.append(best_asset) + remaining.remove(best_asset) + + sub_corr_final = corr_full.loc[selected, selected] + corr_index_final, divers_score_final = compute_corr_diversification(sub_corr_final) + if selection_mode == "entropy": + ent, ent_norm, n_eff, decor = corr_entropy_metrics(sub_corr_final) + print( + f"[Diversification-{selection_mode}] Selezionati {len(selected)} asset su {m}. " + f"H_norm={ent_norm:.4f} | N_eff={n_eff:.2f} | DecorrPot={decor:.4f} | " + f"CorrIndex={corr_index_final:.4f}, DiversScore={divers_score_final:.4f}" + ) + else: + print( + f"[Diversification-{selection_mode}] Selezionati {len(selected)} asset su {m}. " + f"CorrIndex={corr_index_final:.4f}, DiversScore={divers_score_final:.4f}" + ) + return selected, corr_index_final, divers_score_final + +# ========================= +# CONNESSIONE DB (connection.txt) +# ========================= + +def read_connection_params(path: str = "connection.txt") -> dict: + params = {} + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + key, value = line.split("=", 1) + params[key.strip()] = value.strip() + return params + + +def make_engine(params: dict): + username = params.get("username") + password = params.get("password") + host = params.get("host") + port = params.get("port", "1433") + database = params.get("database") + conn_str = ( + f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}" + "?driver=ODBC+Driver+17+for+SQL+Server" + ) + return create_engine(conn_str) + +# ========================= +# MAIN LOGIC +# ========================= + +def main(input_excel: Union[str, Path] = DEFAULT_INPUT_EXCEL): + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + # 1) Carica input + input_path = Path(input_excel) + if not input_path.is_absolute(): + input_path = (BASE_DIR / input_path).resolve() + if not input_path.exists(): + raise FileNotFoundError(f"File di input non trovato: {input_path}") + df_in = pd.read_excel(input_path) + df_in.columns = [str(c).strip() for c in df_in.columns] + cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"] + for c in cols_needed: + if c not in df_in.columns: + raise ValueError(f"Manca la colonna richiesta nel file input: '{c}'") + + # 2) Connessione DB + params = read_connection_params("connection.txt") + engine = make_engine(params) + with engine.connect() as con: + _ = con.execute(text("SELECT 1")) + + # 3) Range date 5 anni + end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1) + start_date = end_date - pd.DateOffset(years=MIN_YEARS_REQ) + all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize() + five_year_len = len(all_dates) + + # 4) Scarico serie per ciascun ISIN e filtro < 5 anni + final_df = pd.DataFrame(index=all_dates) + accepted_isins = [] + dropped_info = [] + strumenti_info_rows = [] + + sql_strumenti = text(""" + SELECT ISIN, Valuta, Strumento, MorningStarCat, MacroAsset + FROM StrumentiFinanziari + WHERE ISIN = :isin + """) + + for isin in df_in["ISIN"].dropna().astype(str).unique(): + print(f"[SP] Recupero: {isin}") + sp = ( + f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', " + f"@n = {SP_SAMPLE_SIZE}, @PtfCurr = {PTF_CURRENCY}" + ) + try: + tmp = pd.read_sql_query(sp, engine) + if tmp.empty: + print(f" - Nessun dato: SKIP {isin}") + dropped_info.append({"ISIN": isin, "Motivo": "SP vuota"}) + continue + + tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize() + tmp = tmp.dropna(subset=["Px_Date"]) + + dup_cnt = tmp["Px_Date"].duplicated().sum() + if dup_cnt > 0: + print(f" - Attenzione: {dup_cnt} duplicati di Px_Date per {isin}, compattati con media.") + + tmp["RendimentoGiornaliero"] = pd.to_numeric(tmp["RendimentoGiornaliero"], errors="coerce") + + ser = ( + tmp.sort_values("Px_Date") + .groupby("Px_Date", as_index=True)["RendimentoGiornaliero"] + .mean() + .div(100.0) + .reindex(all_dates) + ) + + first_valid = ser.first_valid_index() + last_valid = ser.last_valid_index() + nonnull = int(ser.notna().sum()) + + if first_valid is None or last_valid is None: + dropped_info.append({"ISIN": isin, "Motivo": "nessun valore valido"}) + print(f" - Nessun valore valido: SKIP {isin}") + continue + + span_days = (last_valid - first_valid).days + cond_span = span_days >= (365 * MIN_YEARS_REQ - 7) + cond_count = nonnull >= int(0.98 * five_year_len) + cond_start = first_valid <= (start_date + pd.Timedelta(days=7)) + + if not (cond_span and cond_count and cond_start): + dropped_info.append({ + "ISIN": isin, + "Motivo": f"storia insufficiente (span={span_days}d, count={nonnull}/{five_year_len})" + }) + print(f" - Storia insufficiente: SKIP {isin}") + continue + + ser = ser.fillna(0.0) + final_df[isin] = ser + accepted_isins.append(isin) + print(f" - OK: {nonnull} osservazioni utili") + + try: + df_strum = pd.read_sql_query(sql_strumenti, engine, params={"isin": isin}) + if df_strum.empty: + strumenti_info_rows.append({ + "ISIN": isin, + "Valuta": np.nan, + "Strumento": np.nan, + "MorningStarCat": np.nan, + "MacroAsset": np.nan + }) + print(" - StrumentiFinanziari: nessuna riga trovata.") + else: + r = df_strum.iloc[0] + strumenti_info_rows.append({ + "ISIN": r.get("ISIN", isin), + "Valuta": r.get("Valuta", np.nan), + "Strumento": r.get("Strumento", np.nan), + "MorningStarCat": r.get("MorningStarCat", np.nan), + "MacroAsset": r.get("MacroAsset", np.nan) + }) + print(" - StrumentiFinanziari: info recuperate.") + except SQLAlchemyError as e_info: + print(f" - Errore SELECT StrumentiFinanziari per {isin}: {e_info}") + strumenti_info_rows.append({ + "ISIN": isin, + "Valuta": np.nan, + "Strumento": np.nan, + "MorningStarCat": np.nan, + "MacroAsset": np.nan + }) + + except SQLAlchemyError as e: + print(f" - Errore SP per {isin}: {e}") + dropped_info.append({"ISIN": isin, "Motivo": "errore SP"}) + continue + + if not accepted_isins: + print("Nessun ISIN con 5 anni pieni: nessun output generato.") + if dropped_info: + pd.DataFrame(dropped_info).to_excel("asset_metrics.xlsx", sheet_name="Scartati", index=False) + if strumenti_info_rows: + df_info_out = ( + pd.DataFrame(strumenti_info_rows) + [["ISIN", "Valuta", "Strumento", "MorningStarCat", "MacroAsset"]] + .sort_values("ISIN", kind="stable") + .reset_index(drop=True) + ) + df_info_out.to_excel("strumenti_info.xlsx", sheet_name="Strumenti", index=False) + return + + # ===== METRICHE SU TUTTI GLI ISIN METRICABILI ===== + period_df = final_df[accepted_isins] + n_days = int(period_df.shape[0]) + years_elapsed = n_days / DAYS_PER_YEAR if n_days > 0 else np.nan + + daily_mean = period_df.mean() + ann_return = daily_mean * DAYS_PER_YEAR + ann_vol = period_df.std(ddof=1) * np.sqrt(DAYS_PER_YEAR) + + gross = (1.0 + period_df).prod(skipna=True) + if years_elapsed and years_elapsed > 0: + cagr = gross.pow(1.0 / years_elapsed) - 1.0 + else: + cagr = pd.Series(np.nan, index=period_df.columns) + + r2_series = pd.Series( + {col: r2_equity_line(period_df[col]) for col in period_df.columns}, + index=period_df.columns + ) + + maxdd_dict, dddur_dict, ttr_dict = {}, {}, {} + aaw_dict, auw_dict, heal_dict = {}, {}, {} + hmin_5y_months_dict = {} + + for col in period_df.columns: + mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250) + maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr + aaw, auw, heal = heal_index_metrics(period_df[col]) + aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal + _, h_months_5y = h_min_100(period_df[col], month_len=21) + hmin_5y_months_dict[col] = h_months_5y + + metrics_df = ( + pd.DataFrame({ + 'ISIN': period_df.columns, + 'Rendimento_Ann': ann_return.reindex(period_df.columns).values, + 'Volatilita_Ann': ann_vol.reindex(period_df.columns).values, + 'CAGR': cagr.reindex(period_df.columns).values, + 'R2_Equity': r2_series.reindex(period_df.columns).values, + 'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values, + 'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values, + 'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values, + 'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values, + 'AUW': pd.Series(auw_dict).reindex(period_df.columns).values, + 'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values, + 'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values + }) + .merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left') + [['ISIN', 'Nome', 'Categoria', 'Asset Class', + 'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity', + 'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD', + 'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']] + .sort_values('ISIN', kind='stable') + .reset_index(drop=True) + ) + + # ===== EXPORT 1: ASSET METRICS (tutti gli ISIN metricabili + scartati) ===== + out_path = OUTPUT_DIR / "asset_metrics.xlsx" + with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer: + metrics_df.to_excel(writer, sheet_name='Metriche_5Y', index=False) + if dropped_info: + pd.DataFrame(dropped_info).to_excel(writer, sheet_name='Scartati', index=False) + + print( + f"Creato: {out_path} | ISIN metricati: " + f"{len(metrics_df)} | Scartati: {len(dropped_info)}" + ) + + # ===== PARTE 2: UNIVERSO SELEZIONATO + MATRICE + QUALITA ===== + + # 2.1 Selezione universo massimo 100 strumenti per diversificazione + selected_isins, corr_index_sel, divers_score_sel = select_universe_by_diversification( + period_df, max_assets=MAX_UNIVERSE_SIZE, selection_mode=SELECTION_MODE + ) + if not selected_isins: + print("Filtro di diversificazione ha restituito 0 asset: nessun 'universo_selezionato' generato.") + return + + sel_returns = period_df[selected_isins] + sel_corr = sel_returns.corr() + sel_cov = sel_returns.cov() + + K = len(selected_isins) + + # 2.2 Pesi min-var basati sulla covarianza (combinazione più efficiente) + Sigma = sel_cov.values.astype(float) + Sigma = 0.5 * (Sigma + Sigma.T) # simmetrizzo + + # uso la pseudo-inversa per robustezza + ones = np.ones(K) + try: + invSigma = np.linalg.pinv(Sigma) + w_raw = invSigma @ ones + if np.allclose(w_raw.sum(), 0.0): + w_raw = np.ones(K) + except np.linalg.LinAlgError: + w_raw = np.ones(K) + + w_minvar = w_raw / w_raw.sum() + w_minvar = w_minvar.astype(float) + + # 2.3 Sheet "Universo_Selezionato" + df_sel = ( + pd.DataFrame({"ISIN": selected_isins}) + .merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left') + ) + df_sel["Peso_MinVar"] = w_minvar + + # 2.4 Sheet "Matrice" (covarianza) + df_cov = sel_cov.copy() + df_cov.index.name = "ISIN" + + # 2.5 Sheet "Qualita_Matrice" + mask = np.triu(np.ones((K, K), dtype=bool), k=1) + off_vals = sel_corr.values[mask] + mean_corr_off = float(np.nanmean(off_vals)) if off_vals.size else np.nan + min_corr_off = float(np.nanmin(off_vals)) if off_vals.size else np.nan + max_corr_off = float(np.nanmax(off_vals)) if off_vals.size else np.nan + var_corr_off = float(np.nanvar(off_vals)) if off_vals.size else np.nan + std_corr_off = float(np.nanstd(off_vals)) if off_vals.size else np.nan + std_corr_off_norm = ( + float(np.clip(std_corr_off / 1.0, 0.0, 1.0)) if np.isfinite(std_corr_off) else np.nan + ) # normalizzazione grezza, ρ ∈ [-1,1] ⇒ std ≤ 1 + ci_sel, ds_sel = compute_corr_diversification(sel_corr) + + eig_corr = np.linalg.eigvalsh(sel_corr.values.astype(float)) + eig_corr_clipped = np.clip(eig_corr, 0.0, None) + sum_eig_corr = float(eig_corr_clipped.sum()) + if sum_eig_corr > 0: + p_corr = eig_corr_clipped / sum_eig_corr + entropy_corr = float(-(p_corr * np.log(p_corr + 1e-18)).sum()) + entropy_corr_norm = float(entropy_corr / np.log(K)) if K > 1 else np.nan + n_eff = float(np.exp(entropy_corr)) + else: + p_corr = None + entropy_corr = np.nan + entropy_corr_norm = np.nan + n_eff = np.nan + + if K > 1 and np.isfinite(n_eff): + decorrelation_potential = float(np.clip((n_eff - 1.0) / (K - 1.0), 0.0, 1.0)) + else: + decorrelation_potential = np.nan + neg_entropy_index = float(1.0 - entropy_corr_norm) if np.isfinite(entropy_corr_norm) else np.nan + + eig_cov = np.linalg.eigvalsh(Sigma) + min_eig_cov = float(eig_cov.min()) + max_eig_cov = float(eig_cov.max()) + num_neg_eig = int((eig_cov < 0).sum()) + pos_eigs = eig_cov[eig_cov > 1e-12] + if pos_eigs.size > 0: + cond_cov = float(max_eig_cov / pos_eigs.min()) + else: + cond_cov = np.nan + + qual_dict = { + "N_Asset": [K], + "Mean_Corr_offdiag": [mean_corr_off], + "Min_Corr_offdiag": [min_corr_off], + "Max_Corr_offdiag": [max_corr_off], + "Var_Corr_offdiag": [var_corr_off], + "Std_Corr_offdiag": [std_corr_off], + "Std_Corr_offdiag_Norm": [std_corr_off_norm], + "CorrIndex": [ci_sel], + "DiversificationScore": [ds_sel], + "Entropy_Eig_Corr": [entropy_corr], + "Entropy_Eig_Corr_Norm": [entropy_corr_norm], + "N_Eff_Fattori": [n_eff], + "Decorrelation_Potential": [decorrelation_potential], + "Negative_Entropy_Index": [neg_entropy_index], + "Min_Eig_Corr": [float(eig_corr.min())], + "Max_Eig_Corr": [float(eig_corr.max())], + "Sum_Eig_Corr": [sum_eig_corr], + "Selection_Mode": [SELECTION_MODE], + "Selection_Score": [entropy_corr_norm if SELECTION_MODE == "entropy" else -corr_index_sel], + "Min_Eig_Cov": [min_eig_cov], + "Max_Eig_Cov": [max_eig_cov], + "Num_Neg_Eig_Cov": [num_neg_eig], + "Condition_Number_Cov": [cond_cov], + "Trace_Cov": [float(np.trace(Sigma))] + } + df_qual = pd.DataFrame(qual_dict) + legenda_qual_rows = [ + ("N_Asset", "Numero di asset inclusi nella matrice (dimensione N della matrice NxN)."), + ("Mean_Corr_offdiag", "Media delle correlazioni pairwise escludendo la diagonale; misura la correlazione media tra asset."), + ("Min_Corr_offdiag", "Correlazione più bassa (più negativa) osservata tra due asset, esclusa la diagonale."), + ("Max_Corr_offdiag", "Correlazione più alta (più positiva) osservata tra due asset."), + ("Var_Corr_offdiag", "Varianza delle correlazioni off-diagonal; valori più alti = maggiore eterogeneità delle relazioni."), + ("Std_Corr_offdiag", "Deviazione standard delle correlazioni off-diagonal (dispersione grezza)."), + ("Std_Corr_offdiag_Norm", "Std_Corr_offdiag normalizzata su [0,1] assumendo std ≤ 1 per ρ ∈ [-1,1]."), + ("CorrIndex", "Indice sintetico di correlazione complessiva (media delle correlazioni off-diagonali rispetto al minimo teorico); valori alti = asset che si muovono insieme."), + ("DiversificationScore", "1 - CorrIndex; valori vicini a 1 indicano bassa correlazione e maggiore diversificazione attesa."), + ("Entropy_Eig_Corr", "Entropia (log naturale) della distribuzione degli autovalori della matrice di correlazione; misura quanto il rischio è distribuito tra i fattori."), + ("Entropy_Eig_Corr_Norm", "Entropia degli autovalori normalizzata in [0,1] dividendo per ln(N); 1 = rischio equamente distribuito su N fattori."), + ("N_Eff_Fattori", "Numero effettivo di fattori indipendenti = exp(Entropy_Eig_Corr); stima quanti “bet” distinti offre l’universo."), + ("Decorrelation_Potential", "(N_Eff_Fattori - 1)/(N-1) in [0,1]; 0 = un solo fattore dominante, 1 = N fattori equivalenti."), + ("Negative_Entropy_Index", "1 - Entropy_Eig_Corr_Norm; misura la concentrazione del rischio (più alto = rischio concentrato)."), + ("Min_Eig_Corr", "Autovalore minimo della matrice di correlazione (può essere negativo se la matrice non è semidefinita positiva)."), + ("Max_Eig_Corr", "Autovalore massimo della matrice di correlazione."), + ("Sum_Eig_Corr", "Somma degli autovalori della matrice di correlazione (ideale ≈ N; deviazioni indicano problemi di definizione positiva)."), + ("Selection_Mode", "Modalità di selezione usata: 'corr' (min CorrIndex) oppure 'entropy' (max entropia autovalori correlazione)."), + ("Selection_Score", "Score finale della selezione: H_norm se Selection_Mode='entropy', altrimenti -CorrIndex."), + ("Min_Eig_Cov", "Autovalore minimo della matrice di covarianza (può risultare negativo se la matrice non è definita positiva)."), + ("Max_Eig_Cov", "Autovalore massimo della matrice di covarianza, legato alla componente principale di varianza."), + ("Num_Neg_Eig_Cov", "Numero di autovalori negativi della matrice di covarianza; >0 segnala matrice non semidefinita positiva."), + ("Condition_Number_Cov", "Numero di condizionamento della covarianza (rapporto tra autovalore massimo e minimo in valore assoluto); valori alti = matrice mal condizionata e inversione instabile."), + ("Trace_Cov", "Traccia della matrice di covarianza (somma degli autovalori/varianze), pari alla varianza totale cumulata.") + ] + df_legenda_qual = pd.DataFrame(legenda_qual_rows, columns=["Campo", "Descrizione"]) + + # 2.6 Export file "universo_selezionato.xlsx" + out_univ = OUTPUT_DIR / "universo_selezionato.xlsx" + with pd.ExcelWriter(out_univ, engine="openpyxl", mode="w") as writer: + df_sel.to_excel(writer, sheet_name="Universo_Selezionato", index=False) + df_cov.to_excel(writer, sheet_name="Matrice", index=True) + df_qual.to_excel(writer, sheet_name="Qualita_Matrice", index=False) + df_legenda_qual.to_excel(writer, sheet_name="Legenda_Qualita_Matrice", index=False) + + print( + f"Creato: {out_univ} | Asset selezionati: {K} | " + f"CorrIndex={ci_sel:.4f}, DiversScore={ds_sel:.4f}" + ) + + # ===== EXPORT info da StrumentiFinanziari (solo selezionati) ===== + if strumenti_info_rows: + df_info_out = ( + pd.DataFrame(strumenti_info_rows) + [["ISIN", "Valuta", "Strumento", "MorningStarCat", "MacroAsset"]] + ) + df_info_sel = ( + df_info_out[df_info_out["ISIN"].isin(selected_isins)] + .drop_duplicates(subset=["ISIN"]) + .sort_values("ISIN", kind="stable") + .reset_index(drop=True) + ) + out_info = OUTPUT_DIR / "strumenti_info.xlsx" + df_info_sel.to_excel(out_info, sheet_name="Strumenti", index=False) + print(f"Creato: {out_info} | Righe: {len(df_info_sel)}") + else: + print("Nessuna informazione da StrumentiFinanziari da esportare.") + + +if __name__ == "__main__": + inp = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_INPUT_EXCEL + main(inp)