perfezionata distillazione strumenti per correlazione e per entropia

This commit is contained in:
fredmaloggia
2025-12-01 21:40:36 +01:00
parent 5ddfdde8bc
commit 6370b31d41
2 changed files with 734 additions and 29 deletions

View File

@@ -1,31 +1,9 @@
# -*- coding: utf-8 -*-
"""
Programma: metrics_builder_v1.py
Scopo: Leggere un file Excel di input (lista ISIN) e, usando la stessa SP
"opt_RendimentoGiornaliero1_ALL", scaricare le serie storiche dei
rendimenti e calcolare SOLO le metriche per-asset (come in asset_metrics).
Nota: scarta subito gli ISIN che NON hanno almeno 5 anni di storia utile.
Esecuzione (opzionale):
python metrics_builder_v1.py ".Universo per metrics v.1.0.xlsx"
Dipendenze:
pip install numpy pandas SQLAlchemy pyodbc openpyxl
(assicurarsi di avere l'ODBC Driver 17 for SQL Server)
File richiesti:
- connection.txt con chiavi: username, password, host, port, database
- Excel di input con colonne: ISIN, Nome, Categoria, Asset Class
Output:
- asset_metrics_only.xlsx (un foglio "Metriche_5Y" e uno "Scartati")
- strumenti_info.xlsx (un foglio "Strumenti" con ISIN, Valuta, Strumento, MorningStarCat, MacroAsset)
"""
import sys
import os
import math
from pathlib import Path
from typing import Union
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
@@ -38,6 +16,8 @@ DAYS_PER_YEAR = 252
MIN_YEARS_REQ = 5
SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260)
PTF_CURRENCY = "EUR"
BASE_DIR = Path(__file__).resolve().parent
DEFAULT_INPUT_EXCEL = BASE_DIR / "Input" / "Universo per metrics v.1.0.xlsx"
# =========================
# UTILITY METRICHE
@@ -184,11 +164,14 @@ def make_engine(params: dict):
# MAIN LOGIC
# =========================
def main(input_excel: str = "./Universo per metrics v.1.0.xlsx"):
def main(input_excel: Union[str, Path] = DEFAULT_INPUT_EXCEL):
# 1) Carica input
if not os.path.exists(input_excel):
raise FileNotFoundError(f"File di input non trovato: {input_excel}")
df_in = pd.read_excel(input_excel)
input_path = Path(input_excel)
if not input_path.is_absolute():
input_path = (BASE_DIR / input_path).resolve()
if not input_path.exists():
raise FileNotFoundError(f"File di input non trovato: {input_path}")
df_in = pd.read_excel(input_path)
# Normalizza i nomi attesi
df_in.columns = [str(c).strip() for c in df_in.columns]
cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"]
@@ -414,5 +397,5 @@ def main(input_excel: str = "./Universo per metrics v.1.0.xlsx"):
print("Nessuna informazione da StrumentiFinanziari da esportare.")
if __name__ == "__main__":
inp = sys.argv[1] if len(sys.argv) > 1 else "./Universo per metrics v.1.0.xlsx"
inp = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_INPUT_EXCEL
main(inp)

722
Metriche v.3.0.py Normal file
View File

@@ -0,0 +1,722 @@
# -*- coding: utf-8 -*-
"""
Created on Sun Nov 30 22:59:00 2025
@author: Federico
"""
# -*- coding: utf-8 -*-
import sys
import math
from pathlib import Path
from typing import Union
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
# =========================
# PARAMETRI
# =========================
DAYS_PER_YEAR = 252
MIN_YEARS_REQ = 5
SP_SAMPLE_SIZE = 1305 # ~5 anni lavorativi (252*5 = 1260)
PTF_CURRENCY = "EUR"
BASE_DIR = Path(__file__).resolve().parent
DEFAULT_INPUT_EXCEL = BASE_DIR / "Input" / "Universo per metrics v.1.0.xlsx"
OUTPUT_DIR = BASE_DIR / "Output"
# massimo numero di strumenti da mantenere nell'"universo selezionato"
MAX_UNIVERSE_SIZE = 100
SELECTION_MODE = "entropy" # "corr" (greedy CorrIndex) oppure "entropy" (massimizza entropia degli autovalori della correlazione)
# =========================
# UTILITY METRICHE
# =========================
def r2_equity_line(returns: pd.Series) -> float:
s = returns.dropna()
if s.size < 3:
return np.nan
equity = (1.0 + s).cumprod()
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
if equity.size < 3:
return np.nan
y = np.log(equity.values)
if np.allclose(y.var(ddof=1), 0.0):
return 0.0
x = np.arange(y.size, dtype=float)
X = np.column_stack([np.ones_like(x), x])
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - y.mean()) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
if np.isnan(r2):
return np.nan
return float(np.clip(r2, 0.0, 1.0))
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
max_dd = float(dd.min()) if dd.size else np.nan
under_water = dd < 0
if under_water.any():
max_dd_duration = 0
current = 0
for flag in under_water.values:
if flag:
current += 1
if current > max_dd_duration:
max_dd_duration = current
else:
current = 0
else:
max_dd_duration = 0
if dd.size:
trough_idx = int(np.argmin(dd.values))
if trough_idx > 0:
peak_idx = int(np.argmax(equity.values[: trough_idx + 1]))
peak_level = float(equity.values[peak_idx])
rec_idx = None
for t in range(trough_idx + 1, equity.size):
if equity.values[t] >= peak_level:
rec_idx = t
break
if rec_idx is None:
ttr_from_mdd = sentinel_ttr
else:
ttr_from_mdd = rec_idx - trough_idx
else:
ttr_from_mdd = np.nan
else:
ttr_from_mdd = np.nan
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
def heal_index_metrics(returns: pd.Series):
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
def h_min_100(returns: pd.Series, month_len: int = 21):
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1 :]
tail = np.concatenate(([0.0], csum[: -k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(math.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# =========================
# UTILITY CORRELAZIONE / DIVERSIFICAZIONE
# =========================
def compute_corr_diversification(corr: pd.DataFrame):
"""
Calcola:
- CorrIndex in [0,1] (0 = molto diversificato, 1 = altamente correlato)
- DiversificationScore = 1 - CorrIndex
sulla base della matrice di correlazione 'corr'.
"""
m = corr.shape[0]
if m <= 1:
return np.nan, np.nan
mask = np.triu(np.ones((m, m), dtype=bool), k=1)
off_vals = corr.values[mask]
if off_vals.size == 0:
return np.nan, np.nan
mean_rho = float(np.nanmean(off_vals))
rho_min = -1.0 / (m - 1) if m > 1 else 0.0
corr_index = (mean_rho - rho_min) / (1.0 - rho_min)
corr_index = float(np.clip(corr_index, 0.0, 1.0))
divers_score = 1.0 - corr_index
return corr_index, divers_score
def corr_entropy_metrics(corr: pd.DataFrame):
"""
Ritorna:
- entropy: entropia (ln) degli autovalori di correlazione (clippati >=0)
- entropy_norm: entropy / ln(N)
- n_eff: exp(entropy)
- decorrelation_potential: (n_eff - 1)/(N - 1) se N>1
"""
n = corr.shape[0]
if n == 0:
return np.nan, np.nan, np.nan, np.nan
# simmetrizza e clip per stabilità
m = corr.values.astype(float)
m = 0.5 * (m + m.T)
m = np.clip(m, -1.0, 1.0)
eig = np.linalg.eigvalsh(m)
eig = np.clip(eig, 0.0, None)
s = float(eig.sum())
if s <= 0:
return np.nan, np.nan, np.nan, np.nan
p = eig / s
entropy = float(-(p * np.log(p + 1e-18)).sum())
entropy_norm = float(entropy / np.log(n)) if n > 1 else np.nan
n_eff = float(np.exp(entropy))
decorrelation_potential = float((n_eff - 1.0) / (n - 1.0)) if n > 1 else np.nan
return entropy, entropy_norm, n_eff, decorrelation_potential
def select_universe_by_diversification(
returns: pd.DataFrame,
max_assets: int = 100,
selection_mode: str = "corr", # "corr" (min CorrIndex) oppure "entropy" (max entropia autovalori corr)
):
"""
Seleziona al massimo 'max_assets' colonne da 'returns' (rendimento giornaliero),
cercando di massimizzare la diversificazione in termini di correlazione.
Ritorna:
- lista di ISIN selezionati
- CorrIndex finale
- DiversificationScore finale
"""
cols = list(returns.columns)
m = len(cols)
if m == 0:
return [], np.nan, np.nan
corr_full = returns.corr().fillna(0.0)
if m <= max_assets:
corr_index, divers_score = compute_corr_diversification(corr_full)
print(
f"[Diversification] Universo con {m} asset (<= {max_assets}), "
f"CorrIndex={corr_index:.4f}, DiversScore={divers_score:.4f} (nessun filtro applicato)."
)
return cols, corr_index, divers_score
print(
f"[Diversification-{selection_mode}] Universo iniziale: {m} asset. "
f"Selezione greedy fino a max {max_assets} asset..."
)
avg_corr = corr_full.apply(
lambda s: s.drop(s.name).mean() if s.drop(s.name).size > 0 else 0.0,
axis=1,
)
first_asset = avg_corr.idxmin()
selected = [first_asset]
remaining = set(cols) - {first_asset}
while len(selected) < max_assets and remaining:
best_asset = None
best_score = None
best_corr_index = None
for c in list(remaining):
subset = selected + [c]
sub_corr = corr_full.loc[subset, subset]
ci_tmp, _ = compute_corr_diversification(sub_corr)
if selection_mode == "entropy":
_, h_norm_tmp, n_eff_tmp, decor_tmp = corr_entropy_metrics(sub_corr)
score_tmp = h_norm_tmp if np.isfinite(h_norm_tmp) else np.nan
# fallback: se score non finito, usa -CorrIndex per penalizzare correlazione alta
if not np.isfinite(score_tmp):
score_tmp = -ci_tmp if np.isfinite(ci_tmp) else np.nan
else:
# modalità standard: minimizza CorrIndex -> score = -CorrIndex
score_tmp = -ci_tmp if np.isfinite(ci_tmp) else np.nan
if best_score is None or (np.isfinite(score_tmp) and score_tmp > best_score):
best_score = score_tmp
best_corr_index = ci_tmp
best_asset = c
elif best_score is not None and np.isclose(score_tmp, best_score, equal_nan=False):
# parità: prendo CorrIndex più basso
if np.isfinite(ci_tmp) and (best_corr_index is None or ci_tmp < best_corr_index):
best_corr_index = ci_tmp
best_asset = c
if best_asset is None:
print(f"[Diversification-{selection_mode}] Nessun asset migliorativo trovato, interrompo la selezione.")
break
selected.append(best_asset)
remaining.remove(best_asset)
sub_corr_final = corr_full.loc[selected, selected]
corr_index_final, divers_score_final = compute_corr_diversification(sub_corr_final)
if selection_mode == "entropy":
ent, ent_norm, n_eff, decor = corr_entropy_metrics(sub_corr_final)
print(
f"[Diversification-{selection_mode}] Selezionati {len(selected)} asset su {m}. "
f"H_norm={ent_norm:.4f} | N_eff={n_eff:.2f} | DecorrPot={decor:.4f} | "
f"CorrIndex={corr_index_final:.4f}, DiversScore={divers_score_final:.4f}"
)
else:
print(
f"[Diversification-{selection_mode}] Selezionati {len(selected)} asset su {m}. "
f"CorrIndex={corr_index_final:.4f}, DiversScore={divers_score_final:.4f}"
)
return selected, corr_index_final, divers_score_final
# =========================
# CONNESSIONE DB (connection.txt)
# =========================
def read_connection_params(path: str = "connection.txt") -> dict:
params = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
params[key.strip()] = value.strip()
return params
def make_engine(params: dict):
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
conn_str = (
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
return create_engine(conn_str)
# =========================
# MAIN LOGIC
# =========================
def main(input_excel: Union[str, Path] = DEFAULT_INPUT_EXCEL):
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# 1) Carica input
input_path = Path(input_excel)
if not input_path.is_absolute():
input_path = (BASE_DIR / input_path).resolve()
if not input_path.exists():
raise FileNotFoundError(f"File di input non trovato: {input_path}")
df_in = pd.read_excel(input_path)
df_in.columns = [str(c).strip() for c in df_in.columns]
cols_needed = ["ISIN", "Nome", "Categoria", "Asset Class"]
for c in cols_needed:
if c not in df_in.columns:
raise ValueError(f"Manca la colonna richiesta nel file input: '{c}'")
# 2) Connessione DB
params = read_connection_params("connection.txt")
engine = make_engine(params)
with engine.connect() as con:
_ = con.execute(text("SELECT 1"))
# 3) Range date 5 anni
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=MIN_YEARS_REQ)
all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize()
five_year_len = len(all_dates)
# 4) Scarico serie per ciascun ISIN e filtro < 5 anni
final_df = pd.DataFrame(index=all_dates)
accepted_isins = []
dropped_info = []
strumenti_info_rows = []
sql_strumenti = text("""
SELECT ISIN, Valuta, Strumento, MorningStarCat, MacroAsset
FROM StrumentiFinanziari
WHERE ISIN = :isin
""")
for isin in df_in["ISIN"].dropna().astype(str).unique():
print(f"[SP] Recupero: {isin}")
sp = (
f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', "
f"@n = {SP_SAMPLE_SIZE}, @PtfCurr = {PTF_CURRENCY}"
)
try:
tmp = pd.read_sql_query(sp, engine)
if tmp.empty:
print(f" - Nessun dato: SKIP {isin}")
dropped_info.append({"ISIN": isin, "Motivo": "SP vuota"})
continue
tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize()
tmp = tmp.dropna(subset=["Px_Date"])
dup_cnt = tmp["Px_Date"].duplicated().sum()
if dup_cnt > 0:
print(f" - Attenzione: {dup_cnt} duplicati di Px_Date per {isin}, compattati con media.")
tmp["RendimentoGiornaliero"] = pd.to_numeric(tmp["RendimentoGiornaliero"], errors="coerce")
ser = (
tmp.sort_values("Px_Date")
.groupby("Px_Date", as_index=True)["RendimentoGiornaliero"]
.mean()
.div(100.0)
.reindex(all_dates)
)
first_valid = ser.first_valid_index()
last_valid = ser.last_valid_index()
nonnull = int(ser.notna().sum())
if first_valid is None or last_valid is None:
dropped_info.append({"ISIN": isin, "Motivo": "nessun valore valido"})
print(f" - Nessun valore valido: SKIP {isin}")
continue
span_days = (last_valid - first_valid).days
cond_span = span_days >= (365 * MIN_YEARS_REQ - 7)
cond_count = nonnull >= int(0.98 * five_year_len)
cond_start = first_valid <= (start_date + pd.Timedelta(days=7))
if not (cond_span and cond_count and cond_start):
dropped_info.append({
"ISIN": isin,
"Motivo": f"storia insufficiente (span={span_days}d, count={nonnull}/{five_year_len})"
})
print(f" - Storia insufficiente: SKIP {isin}")
continue
ser = ser.fillna(0.0)
final_df[isin] = ser
accepted_isins.append(isin)
print(f" - OK: {nonnull} osservazioni utili")
try:
df_strum = pd.read_sql_query(sql_strumenti, engine, params={"isin": isin})
if df_strum.empty:
strumenti_info_rows.append({
"ISIN": isin,
"Valuta": np.nan,
"Strumento": np.nan,
"MorningStarCat": np.nan,
"MacroAsset": np.nan
})
print(" - StrumentiFinanziari: nessuna riga trovata.")
else:
r = df_strum.iloc[0]
strumenti_info_rows.append({
"ISIN": r.get("ISIN", isin),
"Valuta": r.get("Valuta", np.nan),
"Strumento": r.get("Strumento", np.nan),
"MorningStarCat": r.get("MorningStarCat", np.nan),
"MacroAsset": r.get("MacroAsset", np.nan)
})
print(" - StrumentiFinanziari: info recuperate.")
except SQLAlchemyError as e_info:
print(f" - Errore SELECT StrumentiFinanziari per {isin}: {e_info}")
strumenti_info_rows.append({
"ISIN": isin,
"Valuta": np.nan,
"Strumento": np.nan,
"MorningStarCat": np.nan,
"MacroAsset": np.nan
})
except SQLAlchemyError as e:
print(f" - Errore SP per {isin}: {e}")
dropped_info.append({"ISIN": isin, "Motivo": "errore SP"})
continue
if not accepted_isins:
print("Nessun ISIN con 5 anni pieni: nessun output generato.")
if dropped_info:
pd.DataFrame(dropped_info).to_excel("asset_metrics.xlsx", sheet_name="Scartati", index=False)
if strumenti_info_rows:
df_info_out = (
pd.DataFrame(strumenti_info_rows)
[["ISIN", "Valuta", "Strumento", "MorningStarCat", "MacroAsset"]]
.sort_values("ISIN", kind="stable")
.reset_index(drop=True)
)
df_info_out.to_excel("strumenti_info.xlsx", sheet_name="Strumenti", index=False)
return
# ===== METRICHE SU TUTTI GLI ISIN METRICABILI =====
period_df = final_df[accepted_isins]
n_days = int(period_df.shape[0])
years_elapsed = n_days / DAYS_PER_YEAR if n_days > 0 else np.nan
daily_mean = period_df.mean()
ann_return = daily_mean * DAYS_PER_YEAR
ann_vol = period_df.std(ddof=1) * np.sqrt(DAYS_PER_YEAR)
gross = (1.0 + period_df).prod(skipna=True)
if years_elapsed and years_elapsed > 0:
cagr = gross.pow(1.0 / years_elapsed) - 1.0
else:
cagr = pd.Series(np.nan, index=period_df.columns)
r2_series = pd.Series(
{col: r2_equity_line(period_df[col]) for col in period_df.columns},
index=period_df.columns
)
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
aaw_dict, auw_dict, heal_dict = {}, {}, {}
hmin_5y_months_dict = {}
for col in period_df.columns:
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
aaw, auw, heal = heal_index_metrics(period_df[col])
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
_, h_months_5y = h_min_100(period_df[col], month_len=21)
hmin_5y_months_dict[col] = h_months_5y
metrics_df = (
pd.DataFrame({
'ISIN': period_df.columns,
'Rendimento_Ann': ann_return.reindex(period_df.columns).values,
'Volatilita_Ann': ann_vol.reindex(period_df.columns).values,
'CAGR': cagr.reindex(period_df.columns).values,
'R2_Equity': r2_series.reindex(period_df.columns).values,
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
})
.merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
[['ISIN', 'Nome', 'Categoria', 'Asset Class',
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']]
.sort_values('ISIN', kind='stable')
.reset_index(drop=True)
)
# ===== EXPORT 1: ASSET METRICS (tutti gli ISIN metricabili + scartati) =====
out_path = OUTPUT_DIR / "asset_metrics.xlsx"
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
metrics_df.to_excel(writer, sheet_name='Metriche_5Y', index=False)
if dropped_info:
pd.DataFrame(dropped_info).to_excel(writer, sheet_name='Scartati', index=False)
print(
f"Creato: {out_path} | ISIN metricati: "
f"{len(metrics_df)} | Scartati: {len(dropped_info)}"
)
# ===== PARTE 2: UNIVERSO SELEZIONATO + MATRICE + QUALITA =====
# 2.1 Selezione universo massimo 100 strumenti per diversificazione
selected_isins, corr_index_sel, divers_score_sel = select_universe_by_diversification(
period_df, max_assets=MAX_UNIVERSE_SIZE, selection_mode=SELECTION_MODE
)
if not selected_isins:
print("Filtro di diversificazione ha restituito 0 asset: nessun 'universo_selezionato' generato.")
return
sel_returns = period_df[selected_isins]
sel_corr = sel_returns.corr()
sel_cov = sel_returns.cov()
K = len(selected_isins)
# 2.2 Pesi min-var basati sulla covarianza (combinazione più efficiente)
Sigma = sel_cov.values.astype(float)
Sigma = 0.5 * (Sigma + Sigma.T) # simmetrizzo
# uso la pseudo-inversa per robustezza
ones = np.ones(K)
try:
invSigma = np.linalg.pinv(Sigma)
w_raw = invSigma @ ones
if np.allclose(w_raw.sum(), 0.0):
w_raw = np.ones(K)
except np.linalg.LinAlgError:
w_raw = np.ones(K)
w_minvar = w_raw / w_raw.sum()
w_minvar = w_minvar.astype(float)
# 2.3 Sheet "Universo_Selezionato"
df_sel = (
pd.DataFrame({"ISIN": selected_isins})
.merge(df_in[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
)
df_sel["Peso_MinVar"] = w_minvar
# 2.4 Sheet "Matrice" (covarianza)
df_cov = sel_cov.copy()
df_cov.index.name = "ISIN"
# 2.5 Sheet "Qualita_Matrice"
mask = np.triu(np.ones((K, K), dtype=bool), k=1)
off_vals = sel_corr.values[mask]
mean_corr_off = float(np.nanmean(off_vals)) if off_vals.size else np.nan
min_corr_off = float(np.nanmin(off_vals)) if off_vals.size else np.nan
max_corr_off = float(np.nanmax(off_vals)) if off_vals.size else np.nan
var_corr_off = float(np.nanvar(off_vals)) if off_vals.size else np.nan
std_corr_off = float(np.nanstd(off_vals)) if off_vals.size else np.nan
std_corr_off_norm = (
float(np.clip(std_corr_off / 1.0, 0.0, 1.0)) if np.isfinite(std_corr_off) else np.nan
) # normalizzazione grezza, ρ ∈ [-1,1] ⇒ std ≤ 1
ci_sel, ds_sel = compute_corr_diversification(sel_corr)
eig_corr = np.linalg.eigvalsh(sel_corr.values.astype(float))
eig_corr_clipped = np.clip(eig_corr, 0.0, None)
sum_eig_corr = float(eig_corr_clipped.sum())
if sum_eig_corr > 0:
p_corr = eig_corr_clipped / sum_eig_corr
entropy_corr = float(-(p_corr * np.log(p_corr + 1e-18)).sum())
entropy_corr_norm = float(entropy_corr / np.log(K)) if K > 1 else np.nan
n_eff = float(np.exp(entropy_corr))
else:
p_corr = None
entropy_corr = np.nan
entropy_corr_norm = np.nan
n_eff = np.nan
if K > 1 and np.isfinite(n_eff):
decorrelation_potential = float(np.clip((n_eff - 1.0) / (K - 1.0), 0.0, 1.0))
else:
decorrelation_potential = np.nan
neg_entropy_index = float(1.0 - entropy_corr_norm) if np.isfinite(entropy_corr_norm) else np.nan
eig_cov = np.linalg.eigvalsh(Sigma)
min_eig_cov = float(eig_cov.min())
max_eig_cov = float(eig_cov.max())
num_neg_eig = int((eig_cov < 0).sum())
pos_eigs = eig_cov[eig_cov > 1e-12]
if pos_eigs.size > 0:
cond_cov = float(max_eig_cov / pos_eigs.min())
else:
cond_cov = np.nan
qual_dict = {
"N_Asset": [K],
"Mean_Corr_offdiag": [mean_corr_off],
"Min_Corr_offdiag": [min_corr_off],
"Max_Corr_offdiag": [max_corr_off],
"Var_Corr_offdiag": [var_corr_off],
"Std_Corr_offdiag": [std_corr_off],
"Std_Corr_offdiag_Norm": [std_corr_off_norm],
"CorrIndex": [ci_sel],
"DiversificationScore": [ds_sel],
"Entropy_Eig_Corr": [entropy_corr],
"Entropy_Eig_Corr_Norm": [entropy_corr_norm],
"N_Eff_Fattori": [n_eff],
"Decorrelation_Potential": [decorrelation_potential],
"Negative_Entropy_Index": [neg_entropy_index],
"Min_Eig_Corr": [float(eig_corr.min())],
"Max_Eig_Corr": [float(eig_corr.max())],
"Sum_Eig_Corr": [sum_eig_corr],
"Selection_Mode": [SELECTION_MODE],
"Selection_Score": [entropy_corr_norm if SELECTION_MODE == "entropy" else -corr_index_sel],
"Min_Eig_Cov": [min_eig_cov],
"Max_Eig_Cov": [max_eig_cov],
"Num_Neg_Eig_Cov": [num_neg_eig],
"Condition_Number_Cov": [cond_cov],
"Trace_Cov": [float(np.trace(Sigma))]
}
df_qual = pd.DataFrame(qual_dict)
legenda_qual_rows = [
("N_Asset", "Numero di asset inclusi nella matrice (dimensione N della matrice NxN)."),
("Mean_Corr_offdiag", "Media delle correlazioni pairwise escludendo la diagonale; misura la correlazione media tra asset."),
("Min_Corr_offdiag", "Correlazione più bassa (più negativa) osservata tra due asset, esclusa la diagonale."),
("Max_Corr_offdiag", "Correlazione più alta (più positiva) osservata tra due asset."),
("Var_Corr_offdiag", "Varianza delle correlazioni off-diagonal; valori più alti = maggiore eterogeneità delle relazioni."),
("Std_Corr_offdiag", "Deviazione standard delle correlazioni off-diagonal (dispersione grezza)."),
("Std_Corr_offdiag_Norm", "Std_Corr_offdiag normalizzata su [0,1] assumendo std ≤ 1 per ρ ∈ [-1,1]."),
("CorrIndex", "Indice sintetico di correlazione complessiva (media delle correlazioni off-diagonali rispetto al minimo teorico); valori alti = asset che si muovono insieme."),
("DiversificationScore", "1 - CorrIndex; valori vicini a 1 indicano bassa correlazione e maggiore diversificazione attesa."),
("Entropy_Eig_Corr", "Entropia (log naturale) della distribuzione degli autovalori della matrice di correlazione; misura quanto il rischio è distribuito tra i fattori."),
("Entropy_Eig_Corr_Norm", "Entropia degli autovalori normalizzata in [0,1] dividendo per ln(N); 1 = rischio equamente distribuito su N fattori."),
("N_Eff_Fattori", "Numero effettivo di fattori indipendenti = exp(Entropy_Eig_Corr); stima quanti “bet” distinti offre luniverso."),
("Decorrelation_Potential", "(N_Eff_Fattori - 1)/(N-1) in [0,1]; 0 = un solo fattore dominante, 1 = N fattori equivalenti."),
("Negative_Entropy_Index", "1 - Entropy_Eig_Corr_Norm; misura la concentrazione del rischio (più alto = rischio concentrato)."),
("Min_Eig_Corr", "Autovalore minimo della matrice di correlazione (può essere negativo se la matrice non è semidefinita positiva)."),
("Max_Eig_Corr", "Autovalore massimo della matrice di correlazione."),
("Sum_Eig_Corr", "Somma degli autovalori della matrice di correlazione (ideale ≈ N; deviazioni indicano problemi di definizione positiva)."),
("Selection_Mode", "Modalità di selezione usata: 'corr' (min CorrIndex) oppure 'entropy' (max entropia autovalori correlazione)."),
("Selection_Score", "Score finale della selezione: H_norm se Selection_Mode='entropy', altrimenti -CorrIndex."),
("Min_Eig_Cov", "Autovalore minimo della matrice di covarianza (può risultare negativo se la matrice non è definita positiva)."),
("Max_Eig_Cov", "Autovalore massimo della matrice di covarianza, legato alla componente principale di varianza."),
("Num_Neg_Eig_Cov", "Numero di autovalori negativi della matrice di covarianza; >0 segnala matrice non semidefinita positiva."),
("Condition_Number_Cov", "Numero di condizionamento della covarianza (rapporto tra autovalore massimo e minimo in valore assoluto); valori alti = matrice mal condizionata e inversione instabile."),
("Trace_Cov", "Traccia della matrice di covarianza (somma degli autovalori/varianze), pari alla varianza totale cumulata.")
]
df_legenda_qual = pd.DataFrame(legenda_qual_rows, columns=["Campo", "Descrizione"])
# 2.6 Export file "universo_selezionato.xlsx"
out_univ = OUTPUT_DIR / "universo_selezionato.xlsx"
with pd.ExcelWriter(out_univ, engine="openpyxl", mode="w") as writer:
df_sel.to_excel(writer, sheet_name="Universo_Selezionato", index=False)
df_cov.to_excel(writer, sheet_name="Matrice", index=True)
df_qual.to_excel(writer, sheet_name="Qualita_Matrice", index=False)
df_legenda_qual.to_excel(writer, sheet_name="Legenda_Qualita_Matrice", index=False)
print(
f"Creato: {out_univ} | Asset selezionati: {K} | "
f"CorrIndex={ci_sel:.4f}, DiversScore={ds_sel:.4f}"
)
# ===== EXPORT info da StrumentiFinanziari (solo selezionati) =====
if strumenti_info_rows:
df_info_out = (
pd.DataFrame(strumenti_info_rows)
[["ISIN", "Valuta", "Strumento", "MorningStarCat", "MacroAsset"]]
)
df_info_sel = (
df_info_out[df_info_out["ISIN"].isin(selected_isins)]
.drop_duplicates(subset=["ISIN"])
.sort_values("ISIN", kind="stable")
.reset_index(drop=True)
)
out_info = OUTPUT_DIR / "strumenti_info.xlsx"
df_info_sel.to_excel(out_info, sheet_name="Strumenti", index=False)
print(f"Creato: {out_info} | Righe: {len(df_info_sel)}")
else:
print("Nessuna informazione da StrumentiFinanziari da esportare.")
if __name__ == "__main__":
inp = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_INPUT_EXCEL
main(inp)