1091 lines
44 KiB
Python
1091 lines
44 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Created on 22 Oct 2025
|
||
|
||
@author: Federico
|
||
"""
|
||
|
||
# =========================
|
||
# IMPORT & PARAMETRI
|
||
# =========================
|
||
import sys
|
||
import os
|
||
import numpy as np
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
|
||
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.exc import SQLAlchemyError
|
||
|
||
from pypfopt import risk_models
|
||
from pypfopt.efficient_frontier import EfficientFrontier
|
||
from pypfopt.exceptions import OptimizationError
|
||
|
||
# Cartelle di input/output/plot
|
||
OUTPUT_DIR = "Output"
|
||
INPUT_DIR = "Input"
|
||
PLOT_DIR = "Plot"
|
||
|
||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||
os.makedirs(INPUT_DIR, exist_ok=True)
|
||
os.makedirs(PLOT_DIR, exist_ok=True)
|
||
|
||
def excel_path(filename: str) -> str:
|
||
"""Percorso completo per i file Excel di output."""
|
||
return os.path.join(OUTPUT_DIR, filename)
|
||
|
||
def plot_path(filename: str) -> str:
|
||
"""Percorso completo per i file di grafico."""
|
||
return os.path.join(PLOT_DIR, filename)
|
||
|
||
# --- Placeholders per evitare NameError anche se la fase Heal viene saltata ---
|
||
optimized_weights_phase2 = pd.DataFrame()
|
||
summary_data_phase2 = []
|
||
|
||
# =========================
|
||
# CONFIGURAZIONE OBIETTIVI
|
||
# =========================
|
||
volatility_targets = {
|
||
# (1, 0.06): 'VAR3_1Y',
|
||
# (3, 0.06): 'VAR3_3Y',
|
||
(5, 0.06): 'VAR3_5Y',
|
||
(1, 0.12): 'VAR6_1Y',
|
||
(3, 0.12): 'VAR6_3Y',
|
||
(5, 0.12): 'VAR6_5Y',
|
||
# (1, 0.18): 'VAR9_1Y',
|
||
# (3, 0.18): 'VAR9_3Y',
|
||
(5, 0.18): 'VAR9_5Y'
|
||
}
|
||
days_per_year = 252
|
||
riskfree_rate = 0.02
|
||
|
||
mu_ph2_floor = 0.9
|
||
|
||
# ---------------------------------
|
||
# Utility per R^2 sull’equity line
|
||
# ---------------------------------
|
||
def r2_equity_line(returns: pd.Series) -> float:
|
||
"""R^2 della regressione OLS di log(equity) sul tempo (con intercetta)."""
|
||
s = returns.dropna()
|
||
if s.size < 3:
|
||
return np.nan
|
||
equity = (1.0 + s).cumprod()
|
||
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
|
||
if equity.size < 3:
|
||
return np.nan
|
||
y = np.log(equity.values)
|
||
if np.allclose(y.var(ddof=1), 0.0):
|
||
return 0.0
|
||
x = np.arange(y.size, dtype=float)
|
||
X = np.column_stack([np.ones_like(x), x])
|
||
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
|
||
y_hat = X @ beta
|
||
ss_res = np.sum((y - y_hat) ** 2)
|
||
ss_tot = np.sum((y - y.mean()) ** 2)
|
||
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
|
||
if np.isnan(r2):
|
||
return np.nan
|
||
return float(np.clip(r2, 0.0, 1.0))
|
||
|
||
# ---------------------------------
|
||
# Utility per metriche di drawdown
|
||
# ---------------------------------
|
||
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
|
||
"""
|
||
Calcola:
|
||
- max_dd: profondità massima del drawdown (negativa o zero)
|
||
- max_dd_duration: durata massima (in giorni) di qualsiasi drawdown
|
||
- ttr_from_mdd: giorni dal minimo del Max DD al pieno recupero del picco precedente (sentinel se non recupera)
|
||
"""
|
||
s = returns.fillna(0.0).astype(float)
|
||
if s.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
|
||
equity = (1.0 + s).cumprod()
|
||
if equity.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
|
||
run_max = equity.cummax()
|
||
dd = equity / run_max - 1.0
|
||
|
||
# Max Drawdown (valore più negativo)
|
||
max_dd = float(dd.min()) if dd.size else np.nan
|
||
|
||
# Durata massima di drawdown (giorni consecutivi sotto zero drawdown)
|
||
under_water = dd < 0
|
||
if under_water.any():
|
||
max_dd_duration = 0
|
||
current = 0
|
||
for flag in under_water.values:
|
||
if flag:
|
||
current += 1
|
||
if current > max_dd_duration:
|
||
max_dd_duration = current
|
||
else:
|
||
current = 0
|
||
else:
|
||
max_dd_duration = 0
|
||
|
||
# Time-to-Recovery dal Max DD
|
||
if dd.size:
|
||
trough_idx = int(np.argmin(dd.values))
|
||
if trough_idx > 0:
|
||
peak_idx = int(np.argmax(equity.values[:trough_idx+1]))
|
||
peak_level = float(equity.values[peak_idx])
|
||
rec_idx = None
|
||
for t in range(trough_idx + 1, equity.size):
|
||
if equity.values[t] >= peak_level:
|
||
rec_idx = t
|
||
break
|
||
if rec_idx is None:
|
||
ttr_from_mdd = sentinel_ttr # non recuperato
|
||
else:
|
||
ttr_from_mdd = rec_idx - trough_idx
|
||
else:
|
||
ttr_from_mdd = np.nan
|
||
else:
|
||
ttr_from_mdd = np.nan
|
||
|
||
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
|
||
|
||
# ---------------------------------
|
||
# Utility per AAW, AUW e Heal Index
|
||
# ---------------------------------
|
||
def heal_index_metrics(returns: pd.Series):
|
||
"""
|
||
Calcola:
|
||
- AAW: area sopra acqua (run-up vs minimo cumulato)
|
||
- AUW: area sotto acqua (drawdown vs massimo cumulato)
|
||
- Heal Index: (AAW - AUW) / AUW
|
||
"""
|
||
s = returns.fillna(0.0).astype(float)
|
||
if s.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
|
||
equity = (1.0 + s).cumprod()
|
||
if equity.size == 0:
|
||
return np.nan, np.nan, np.nan
|
||
|
||
run_max = equity.cummax()
|
||
dd = equity / run_max - 1.0
|
||
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
|
||
|
||
run_min = equity.cummin()
|
||
ru = equity / run_min - 1.0
|
||
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
|
||
|
||
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
|
||
return AAW, AUW, heal
|
||
|
||
# ---------------------------------
|
||
# Utility per H_min (100% finestre positive)
|
||
# ---------------------------------
|
||
def h_min_100(returns: pd.Series, month_len: int = 21):
|
||
"""
|
||
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
|
||
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
|
||
"""
|
||
s = returns.dropna().astype(float)
|
||
n = s.size
|
||
if n == 0:
|
||
return np.nan, np.nan
|
||
|
||
log1p = np.log1p(s.values)
|
||
csum = np.cumsum(log1p)
|
||
|
||
def rolling_sum_k(k: int):
|
||
if k > n:
|
||
return np.array([])
|
||
head = csum[k - 1:]
|
||
tail = np.concatenate(([0.0], csum[:-k]))
|
||
return head - tail
|
||
|
||
for k in range(1, n + 1):
|
||
rs = rolling_sum_k(k)
|
||
if rs.size == 0:
|
||
break
|
||
roll_ret = np.exp(rs) - 1.0
|
||
if np.all(roll_ret >= 0):
|
||
h_days = k
|
||
h_months = int(np.ceil(h_days / month_len))
|
||
return h_days, h_months
|
||
|
||
return np.nan, np.nan
|
||
|
||
# ---------------------------------
|
||
# Utility di serie portafoglio e metriche path-based
|
||
# ---------------------------------
|
||
def portfolio_series_from_weights(period_df: pd.DataFrame, w: np.ndarray, cols: list) -> pd.Series:
|
||
w_series = pd.Series(w, index=cols)
|
||
return (period_df[cols] * w_series).sum(axis=1)
|
||
|
||
def portfolio_path_metrics(period_df: pd.DataFrame,
|
||
five_year_df: pd.DataFrame,
|
||
w: np.ndarray,
|
||
cols: list,
|
||
days_per_year: int) -> dict:
|
||
"""Metriche path-based del portafoglio su period_df + H_min_100m su 5Y."""
|
||
w = np.asarray(w, dtype=float)
|
||
cols = list(cols)
|
||
|
||
port_returns = portfolio_series_from_weights(period_df, w, cols)
|
||
|
||
n_days = int(port_returns.shape[0])
|
||
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
|
||
|
||
ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
|
||
ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
|
||
|
||
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
|
||
if years_elapsed and years_elapsed > 0 and gross and gross > 0:
|
||
cagr = gross**(1.0 / years_elapsed) - 1.0
|
||
else:
|
||
cagr = np.nan
|
||
|
||
r2 = r2_equity_line(port_returns)
|
||
maxdd, dddur, ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
|
||
aaw, auw, heal = heal_index_metrics(port_returns)
|
||
|
||
common_cols = [c for c in cols if c in five_year_df.columns]
|
||
if len(common_cols) > 0:
|
||
w5 = pd.Series(w, index=cols).reindex(common_cols).fillna(0.0).values
|
||
port_returns_5y = portfolio_series_from_weights(five_year_df, w5, common_cols)
|
||
_, hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
|
||
else:
|
||
hmin_5y_months = np.nan
|
||
|
||
return {
|
||
"AnnReturn": ann_return,
|
||
"AnnVol": ann_vol,
|
||
"CAGR": cagr,
|
||
"R2": r2,
|
||
"MaxDD": maxdd,
|
||
"DD_Duration": dddur,
|
||
"TTR": ttr,
|
||
"AAW": aaw,
|
||
"AUW": auw,
|
||
"Heal": heal,
|
||
"Hmin_100m_5Y": hmin_5y_months
|
||
}
|
||
|
||
# --- Lettura parametri dal file connection.txt ---
|
||
params = {}
|
||
with open("connection.txt", "r") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith("#"):
|
||
key, value = line.split("=", 1)
|
||
params[key.strip()] = value.strip()
|
||
|
||
username = params.get("username")
|
||
password = params.get("password")
|
||
host = params.get("host")
|
||
port = params.get("port", "1433")
|
||
database = params.get("database")
|
||
|
||
connection_string = (
|
||
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
|
||
"?driver=ODBC+Driver+17+for+SQL+Server"
|
||
)
|
||
|
||
print("Connection string letta correttamente")
|
||
|
||
# =========================
|
||
# CONNESSIONE AL DB
|
||
# =========================
|
||
try:
|
||
engine = create_engine(connection_string)
|
||
with engine.connect() as connection:
|
||
_ = connection.execute(text("SELECT 1"))
|
||
print("Connessione al database riuscita.")
|
||
except SQLAlchemyError as e:
|
||
print("Errore durante la connessione al database:", e)
|
||
sys.exit()
|
||
|
||
# =========================
|
||
# INPUT / TEMPLATE
|
||
# =========================
|
||
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
|
||
template_df = pd.read_excel(template_path)
|
||
|
||
file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore v.2.4.xlsx')
|
||
df = pd.read_excel(
|
||
file_path,
|
||
usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'],
|
||
dtype={'Codice Titolo': str}
|
||
)
|
||
|
||
# =========================
|
||
# SERIE STORICHE RENDIMENTI
|
||
# =========================
|
||
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
|
||
start_date = end_date - pd.DateOffset(years=5)
|
||
all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize()
|
||
|
||
final_df = pd.DataFrame(index=all_dates)
|
||
|
||
isin_from_db = set()
|
||
for isin in df['ISIN'].unique():
|
||
print(f"Working on ISIN: {isin}")
|
||
procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = 1305, @PtfCurr = EUR"
|
||
try:
|
||
temp_df = pd.read_sql_query(procedure_call, engine)
|
||
if temp_df.empty:
|
||
print(f"Nessun dato recuperato per {isin}, skipping...")
|
||
continue
|
||
temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d', errors='coerce').dt.normalize()
|
||
temp_df = temp_df.dropna(subset=['Px_Date'])
|
||
temp_df.set_index('Px_Date', inplace=True)
|
||
temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100
|
||
final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates)
|
||
isin_from_db.add(isin)
|
||
print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.")
|
||
except SQLAlchemyError as e:
|
||
print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e)
|
||
|
||
final_df.fillna(0, inplace=True)
|
||
|
||
# -------- H_min sempre su 5 anni (21 gg = 1 mese) --------
|
||
five_year_df = final_df.loc[end_date - pd.DateOffset(years=5): end_date]
|
||
|
||
# =========================
|
||
# LOOP OTTIMIZZAZIONI (PH1 tradizionale)
|
||
# =========================
|
||
optimized_weights = pd.DataFrame()
|
||
per_asset_metrics = {}
|
||
|
||
for (years, target_vol), name in volatility_targets.items():
|
||
period_start_date = end_date - pd.DateOffset(years=years)
|
||
period_df = final_df.loc[period_start_date:end_date]
|
||
|
||
daily_returns_mean = period_df.mean()
|
||
annual_returns_mean = daily_returns_mean * days_per_year
|
||
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
|
||
|
||
# ---------- PER-ASSET METRICS ----------
|
||
n_days = int(period_df.shape[0])
|
||
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
|
||
|
||
asset_ann_return = daily_returns_mean * days_per_year
|
||
asset_ann_vol = period_df.std(ddof=1) * np.sqrt(days_per_year)
|
||
|
||
gross = (1.0 + period_df).prod(skipna=True)
|
||
asset_cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
|
||
|
||
asset_r2 = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
|
||
|
||
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
|
||
aaw_dict, auw_dict, heal_dict = {}, {}, {}
|
||
hmin_5y_months_dict = {}
|
||
|
||
for col in period_df.columns:
|
||
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
|
||
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
|
||
aaw, auw, heal = heal_index_metrics(period_df[col])
|
||
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
|
||
if col in five_year_df.columns:
|
||
_, h_months_5y = h_min_100(five_year_df[col], month_len=21)
|
||
else:
|
||
h_months_5y = np.nan
|
||
hmin_5y_months_dict[col] = h_months_5y
|
||
|
||
asset_metrics_df = (
|
||
pd.DataFrame({
|
||
'ISIN': period_df.columns,
|
||
'Rendimento_Ann': asset_ann_return.reindex(period_df.columns).values,
|
||
'Volatilita_Ann': asset_ann_vol.reindex(period_df.columns).values,
|
||
'CAGR': asset_cagr.reindex(period_df.columns).values,
|
||
'R2_Equity': asset_r2.reindex(period_df.columns).values,
|
||
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
|
||
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
|
||
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
|
||
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
|
||
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
|
||
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
|
||
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
|
||
})
|
||
.merge(df[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
|
||
[['ISIN', 'Nome', 'Categoria', 'Asset Class',
|
||
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
|
||
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
|
||
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']]
|
||
.sort_values('ISIN', kind='stable')
|
||
.reset_index(drop=True)
|
||
)
|
||
per_asset_metrics[name] = asset_metrics_df
|
||
|
||
# ---------- OTTIMIZZAZIONE ----------
|
||
ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix)
|
||
|
||
# Vincoli PesoFisso / PesoMax
|
||
for _, row in df.iterrows():
|
||
isin_i = row['ISIN']
|
||
if isin_i in period_df.columns:
|
||
idx = period_df.columns.get_loc(isin_i)
|
||
pf = row.get('PesoFisso')
|
||
pm = row.get('PesoMax')
|
||
if pd.notna(pf):
|
||
ef.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val)
|
||
elif pd.notna(pm):
|
||
ef.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw)
|
||
|
||
# Vincoli per Categoria
|
||
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
|
||
for cat, maxw in categories_limits.items():
|
||
isin_list = df[df['Categoria'] == cat]['ISIN'].tolist()
|
||
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
|
||
if idxs:
|
||
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
|
||
|
||
# Vincoli per Asset Class
|
||
asset_class_limits = {
|
||
'Azionari': 0.75, 'Obbligazionari': 0.75,
|
||
'Metalli Preziosi': 0.20, 'Materie Prime': 0.05,
|
||
'Immobiliare': 0.05, 'Criptovalute': 0.05, 'Monetari': 0.1
|
||
}
|
||
for ac, maxw in asset_class_limits.items():
|
||
isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist()
|
||
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
|
||
if idxs:
|
||
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
|
||
|
||
# ---------- Risoluzione ----------
|
||
try:
|
||
ef.efficient_risk(target_volatility=target_vol)
|
||
weights = ef.clean_weights()
|
||
optimized_weights[name] = pd.Series(weights)
|
||
exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=riskfree_rate)
|
||
|
||
print(f"=== Ottimizzazione: {name} (anni={years}, target_vol={target_vol}) ===")
|
||
print(f"Expected annual return: {exp_ret:.2%}")
|
||
print(f"Annual volatility: {exp_vol:.2%}")
|
||
print(f"Sharpe Ratio: {sharpe:.2f}")
|
||
|
||
# --- Beneficio di diversificazione ---
|
||
w_vec_tmp = np.array([weights.get(isin, 0) for isin in period_df.columns])
|
||
indiv_ann_vols = np.sqrt(np.diag(annual_covariance_matrix.loc[period_df.columns, period_df.columns].values))
|
||
weighted_avg_vol = float(np.dot(w_vec_tmp, indiv_ann_vols))
|
||
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
|
||
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
|
||
|
||
# --- File Excel per import gestionale (uno per portafoglio) ---
|
||
template_cols = list(template_df.columns)
|
||
results_rows = []
|
||
for isin, weight in weights.items():
|
||
if weight > 0:
|
||
r_sel = df.loc[df['ISIN'] == isin]
|
||
codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else ""
|
||
nome = r_sel['Nome'].iloc[0] if not r_sel.empty else ""
|
||
row = {col: "" for col in template_cols}
|
||
row['cod_por'] = f'PTFOPT{name}'
|
||
row['cod_tit'] = codice_titolo
|
||
row['des_tit'] = nome
|
||
row['peso'] = float(weight * 99)
|
||
results_rows.append(row)
|
||
|
||
results_full_df = pd.DataFrame(results_rows, columns=template_cols)
|
||
output_df = pd.concat([template_df.iloc[0:0], results_full_df], ignore_index=True)
|
||
output_file_path = excel_path(f'PTFOPT{name}.xlsx')
|
||
output_df.to_excel(output_file_path, index=False)
|
||
print(f"File {output_file_path} saved successfully.")
|
||
|
||
# --- Pie chart asset allocation (se ci sono pesi > 0) ---
|
||
asset_allocations = {asset: 0 for asset in asset_class_limits}
|
||
for isin, weight in weights.items():
|
||
r_sel = df.loc[df['ISIN'] == isin]
|
||
if r_sel.empty:
|
||
continue
|
||
asset_allocations.setdefault(r_sel['Asset Class'].iloc[0], 0)
|
||
asset_allocations[r_sel['Asset Class'].iloc[0]] += weight
|
||
|
||
if sum(asset_allocations.values()) > 0:
|
||
plt.figure(figsize=(8, 6))
|
||
plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%')
|
||
plt.title(f'Asset Allocation for {name}')
|
||
pie_path = plot_path(f'Asset_Allocation_{name}.png')
|
||
plt.savefig(pie_path, dpi=150, bbox_inches='tight')
|
||
plt.close()
|
||
|
||
except OptimizationError as e:
|
||
print(f"Optimization failed for {name}: {e}")
|
||
optimized_weights[name] = pd.Series([0] * len(annual_returns_mean))
|
||
|
||
# =========================
|
||
# RIEPILOGO METRICHE (PORTAFOGLI PH1)
|
||
# =========================
|
||
summary_data = []
|
||
for (years, target_vol), name in volatility_targets.items():
|
||
if name in optimized_weights.columns:
|
||
period_start_date = end_date - pd.DateOffset(years=years)
|
||
period_df = final_df.loc[period_start_date:end_date]
|
||
|
||
daily_returns_mean = period_df.mean()
|
||
annual_returns_mean = daily_returns_mean * days_per_year
|
||
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
|
||
|
||
w_series = optimized_weights[name].reindex(period_df.columns).fillna(0.0)
|
||
w_vec = w_series.values
|
||
|
||
port_returns = (period_df[period_df.columns] * w_series).sum(axis=1)
|
||
n_days = int(port_returns.shape[0])
|
||
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
|
||
port_ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
|
||
port_ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
|
||
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
|
||
port_cagr = (gross**(1.0 / years_elapsed) - 1.0) if (years_elapsed and years_elapsed > 0 and gross and gross > 0) else np.nan
|
||
|
||
port_r2 = r2_equity_line(port_returns)
|
||
port_maxdd, port_dddur, port_ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
|
||
port_aaw, port_auw, port_heal = heal_index_metrics(port_returns)
|
||
|
||
common_cols = [c for c in w_series.index if c in five_year_df.columns]
|
||
if len(common_cols) > 0:
|
||
w_5y = w_series.reindex(common_cols).fillna(0.0)
|
||
port_returns_5y = (five_year_df[common_cols] * w_5y).sum(axis=1)
|
||
_, port_hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
|
||
else:
|
||
port_hmin_5y_months = np.nan
|
||
|
||
exp_ret = float(np.dot(w_vec, annual_returns_mean.loc[period_df.columns].values))
|
||
cov_mat = annual_covariance_matrix.loc[period_df.columns, period_df.columns].values
|
||
exp_vol = float(np.sqrt(np.dot(w_vec, np.dot(cov_mat, w_vec))))
|
||
sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
|
||
|
||
indiv_ann_vols = np.sqrt(np.diag(cov_mat))
|
||
weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols))
|
||
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
|
||
diversification_ratio = weighted_avg_vol / exp_vol if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
|
||
|
||
print(f"=== Riepilogo: {name} (anni={years}, target_vol={target_vol}) ===")
|
||
print(f"Expected annual return: {exp_ret:.2%}")
|
||
print(f"Annual volatility: {exp_vol:.2%}")
|
||
print(f"Sharpe Ratio: {sharpe:.2f}")
|
||
print(f"Diversification Ratio: {diversification_ratio:.3f}" if not np.isnan(diversification_ratio) else "Diversification Ratio: NaN")
|
||
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
|
||
|
||
summary_data.append({
|
||
"Portafoglio": name,
|
||
"Years": years,
|
||
"Target Vol": f"{target_vol:.2%}",
|
||
"Expected annual return": f"{exp_ret:.2%}",
|
||
"Annual volatility": f"{exp_vol:.2%}",
|
||
"Sharpe Ratio": f"{sharpe:.2f}",
|
||
"Beneficio di diversificazione": f"{diversification_benefit:.2%}",
|
||
"Rendimento_Ann": f"{port_ann_return:.2%}" if pd.notna(port_ann_return) else "",
|
||
"Volatilita_Ann": f"{port_ann_vol:.2%}" if pd.notna(port_ann_vol) else "",
|
||
"CAGR": f"{port_cagr:.2%}" if pd.notna(port_cagr) else "",
|
||
"R2_Equity": round(port_r2, 3) if pd.notna(port_r2) else np.nan,
|
||
"MaxDD": f"{port_maxdd:.2%}" if pd.notna(port_maxdd) else "",
|
||
"DD_Duration_Max": int(port_dddur) if pd.notna(port_dddur) else "",
|
||
"TTR_from_MDD": int(port_ttr) if pd.notna(port_ttr) else "",
|
||
"AAW": float(port_aaw) if pd.notna(port_aaw) else np.nan,
|
||
"AUW": float(port_auw) if pd.notna(port_auw) else np.nan,
|
||
"Heal_Index": float(port_heal) if pd.notna(port_heal) else np.nan,
|
||
"H_min_100m_5Y": int(port_hmin_5y_months) if pd.notna(port_hmin_5y_months) else ""
|
||
})
|
||
|
||
# =========================
|
||
# PLOT EQUITY/UNDERWATER (PH1)
|
||
# =========================
|
||
def plot_equity_overlay_all(port_names=None):
|
||
if port_names is None:
|
||
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
|
||
period_start_date = end_date - pd.DateOffset(years=5)
|
||
period_df = final_df.loc[period_start_date:end_date]
|
||
available_cols = set(optimized_weights.columns)
|
||
plotted = 0
|
||
plt.figure(figsize=(11, 6))
|
||
for pname in port_names:
|
||
if pname not in available_cols:
|
||
print(f"[plot] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.")
|
||
continue
|
||
w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0)
|
||
port_returns = (period_df[w_series.index] * w_series).sum(axis=1)
|
||
equity = (1.0 + port_returns).cumprod()
|
||
plt.plot(equity.index, equity.values, label=pname)
|
||
plotted += 1
|
||
if plotted == 0:
|
||
print("[plot] Nessun portafoglio valido da plottare.")
|
||
plt.close()
|
||
return
|
||
plt.title("Equity line - Portafogli ottimizzati (ultimi 5 anni)")
|
||
plt.xlabel("Data")
|
||
plt.ylabel("Equity (base=1.0)")
|
||
plt.grid(True, alpha=0.3)
|
||
plt.legend(loc="best")
|
||
plt.tight_layout()
|
||
out_png = plot_path("Equity_ALL_PORTS.png")
|
||
plt.savefig(out_png, dpi=150, bbox_inches='tight')
|
||
plt.close()
|
||
print(f"[plot] Grafico sovrapposto salvato: {out_png}")
|
||
|
||
def plot_underwater_overlay_all(port_names=None, ylim=(-0.3, 0.0)):
|
||
if port_names is None:
|
||
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
|
||
period_start_date = end_date - pd.DateOffset(years=5)
|
||
period_df = final_df.loc[period_start_date:end_date]
|
||
available_cols = set(optimized_weights.columns)
|
||
plotted = 0
|
||
plt.figure(figsize=(11, 6))
|
||
for pname in port_names:
|
||
if pname not in available_cols:
|
||
print(f"[underwater] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.")
|
||
continue
|
||
w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0)
|
||
port_returns = (period_df[w_series.index] * w_series).sum(axis=1)
|
||
equity = (1.0 + port_returns).cumprod()
|
||
run_max = equity.cummax()
|
||
dd = equity / run_max - 1.0
|
||
plt.plot(dd.index, dd.values, label=pname)
|
||
plotted += 1
|
||
if plotted == 0:
|
||
print("[underwater] Nessun portafoglio valido da plottare.")
|
||
plt.close()
|
||
return
|
||
plt.title("Underwater (Drawdown) - Portafogli ottimizzati (ultimi 5 anni)")
|
||
plt.xlabel("Data")
|
||
plt.ylabel("Drawdown")
|
||
if ylim is not None:
|
||
plt.ylim(*ylim)
|
||
plt.grid(True, alpha=0.3)
|
||
plt.legend(loc="best")
|
||
plt.tight_layout()
|
||
out_png = plot_path("Underwater_ALL_PORTS.png")
|
||
plt.savefig(out_png, dpi=150, bbox_inches='tight')
|
||
plt.close()
|
||
print(f"[underwater] Grafico sovrapposto salvato: {out_png}")
|
||
|
||
plot_equity_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'])
|
||
plot_underwater_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'])
|
||
|
||
# =========================
|
||
# FASE 2
|
||
# =========================
|
||
try:
|
||
import cvxpy as cp
|
||
except Exception as _e:
|
||
print("[Phase2] Warning: cvxpy non disponibile. Salto questa variante.")
|
||
cp = None
|
||
|
||
if cp is not None:
|
||
for (years, target_vol), name in volatility_targets.items():
|
||
if name not in optimized_weights.columns or name not in per_asset_metrics:
|
||
print(f"[Phase2] '{name}': niente PH1 o metriche per-asset -> skip")
|
||
continue
|
||
|
||
period_start_date = end_date - pd.DateOffset(years=years)
|
||
period_df_p = final_df.loc[period_start_date:end_date]
|
||
cols = list(period_df_p.columns)
|
||
|
||
mu = period_df_p.mean().reindex(cols).fillna(0.0) * days_per_year
|
||
Sigma = risk_models.sample_cov(period_df_p, returns_data=True).loc[cols, cols]
|
||
|
||
w_base = optimized_weights[name].reindex(cols).fillna(0.0).values
|
||
exp_ret_base = float(np.dot(w_base, mu.values))
|
||
|
||
metr_df = per_asset_metrics[name]
|
||
r2_map = metr_df.set_index('ISIN')['R2_Equity'].to_dict()
|
||
r2_vec = np.array([r2_map.get(c, 0.0) if pd.notna(r2_map.get(c, np.nan)) else 0.0 for c in cols], dtype=float)
|
||
|
||
ef_h = EfficientFrontier(mu, Sigma)
|
||
|
||
for _, row in df.iterrows():
|
||
isin_i = row['ISIN']
|
||
if isin_i in period_df_p.columns:
|
||
idx = period_df_p.columns.get_loc(isin_i)
|
||
pf = row.get('PesoFisso')
|
||
pm = row.get('PesoMax')
|
||
if pd.notna(pf):
|
||
ef_h.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val)
|
||
elif pd.notna(pm):
|
||
ef_h.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw)
|
||
|
||
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
|
||
for cat, maxw in categories_limits.items():
|
||
isin_list = df[df['Categoria'] == cat]['ISIN'].tolist()
|
||
idxs = [period_df_p.columns.get_loc(isin) for isin in isin_list if isin in period_df_p.columns]
|
||
if idxs:
|
||
ef_h.add_constraint(lambda w, idxs=idxs, maxw=maxw: cp.sum(w[idxs]) <= maxw)
|
||
|
||
asset_class_limits = {
|
||
'Azionari': 0.75, 'Obbligazionari': 0.75,
|
||
'Metalli Preziosi': 0.20, 'Materie Prime': 0.05,
|
||
'Immobiliare': 0.05, 'Criptovalute': 0.05, 'Monetari': 0.1
|
||
}
|
||
for ac, maxw in asset_class_limits.items():
|
||
isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist()
|
||
idxs = [period_df_p.columns.get_loc(isin) for isin in isin_list if isin in period_df_p.columns]
|
||
if idxs:
|
||
ef_h.add_constraint(lambda w, idxs=idxs, maxw=maxw: cp.sum(w[idxs]) <= maxw)
|
||
|
||
# floor rendimento atteso: >= 90% del baseline
|
||
ef_h.add_constraint(lambda w, mu_vec=mu.values, floor=mu_ph2_floor*exp_ret_base: (mu_vec @ w) >= floor)
|
||
ef_h.add_objective(lambda w, r2=r2_vec: -cp.sum(cp.multiply(r2, w)))
|
||
|
||
try:
|
||
ef_h.efficient_risk(target_volatility=target_vol)
|
||
w_heal = ef_h.clean_weights()
|
||
optimized_weights_phase2[name] = pd.Series(w_heal)
|
||
|
||
# stampa di controllo
|
||
w_arr = np.array([w_heal.get(isin, 0.0) for isin in cols], dtype=float)
|
||
exp_ret = float(w_arr @ mu.values)
|
||
exp_vol = float(np.sqrt(np.maximum(w_arr @ Sigma.values @ w_arr, 0.0)))
|
||
sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
|
||
summary_data_phase2.append({
|
||
"Portafoglio": f"{name}_PH2",
|
||
"Years": years,
|
||
"Target Vol": f"{target_vol:.2%}",
|
||
"Expected annual return": f"{exp_ret:.2%}",
|
||
"Annual volatility": f"{exp_vol:.2%}",
|
||
"Sharpe Ratio": f"{sharpe:.2f}"
|
||
})
|
||
print(f"[Phase2] {name}: ottimizzazione completata.")
|
||
except Exception as e:
|
||
print(f"[Phase2] {name}: fallita ({e}). Skipping.")
|
||
|
||
# =========================
|
||
# CONFRONTO PH1 vs PH2 (Equity & Underwater)
|
||
# =========================
|
||
def _portfolio_returns_from_weights_generic(period_df: pd.DataFrame, w_series: pd.Series) -> pd.Series:
|
||
w_series = w_series.reindex(period_df.columns).fillna(0.0)
|
||
return (period_df[w_series.index] * w_series).sum(axis=1)
|
||
|
||
def _plot_equity_compare_generic(name: str,
|
||
wA: pd.Series, labelA: str,
|
||
wB: pd.Series, labelB: str,
|
||
period_df: pd.DataFrame,
|
||
out_prefix: str):
|
||
rA = _portfolio_returns_from_weights_generic(period_df, wA)
|
||
rB = _portfolio_returns_from_weights_generic(period_df, wB)
|
||
eqA = (1.0 + rA).cumprod()
|
||
eqB = (1.0 + rB).cumprod()
|
||
|
||
plt.figure(figsize=(10, 5))
|
||
plt.plot(eqA.index, eqA.values, label=f"{name} {labelA}")
|
||
plt.plot(eqB.index, eqB.values, label=f"{name} {labelB}")
|
||
plt.title(f"Equity line - {name} ({labelA} vs {labelB}) - ultimi 5 anni")
|
||
plt.xlabel("Data")
|
||
plt.ylabel("Equity (base=1.0)")
|
||
plt.grid(True, alpha=0.3)
|
||
plt.legend(loc="best")
|
||
plt.tight_layout()
|
||
out_png = plot_path(f"{out_prefix}_{name}_{labelA}_vs_{labelB}.png".replace("/", "_"))
|
||
plt.savefig(out_png, dpi=150, bbox_inches='tight')
|
||
plt.close()
|
||
print(f"[compare-equity-ph2] Salvato: {out_png}")
|
||
|
||
def _plot_underwater_compare_generic(name: str,
|
||
wA: pd.Series, labelA: str,
|
||
wB: pd.Series, labelB: str,
|
||
period_df: pd.DataFrame,
|
||
ylim: tuple, out_prefix: str):
|
||
rA = _portfolio_returns_from_weights_generic(period_df, wA)
|
||
rB = _portfolio_returns_from_weights_generic(period_df, wB)
|
||
|
||
eqA = (1.0 + rA).cumprod()
|
||
eqB = (1.0 + rB).cumprod()
|
||
ddA = eqA / eqA.cummax() - 1.0
|
||
ddB = eqB / eqB.cummax() - 1.0
|
||
|
||
plt.figure(figsize=(10, 5))
|
||
plt.plot(ddA.index, ddA.values, label=f"{name} {labelA}")
|
||
plt.plot(ddB.index, ddB.values, label=f"{name} {labelB}")
|
||
plt.title(f"Underwater (Drawdown) - {name} ({labelA} vs {labelB}) - ultimi 5 anni")
|
||
plt.xlabel("Data")
|
||
plt.ylabel("Drawdown")
|
||
if ylim is not None:
|
||
plt.ylim(*ylim)
|
||
plt.grid(True, alpha=0.3)
|
||
plt.legend(loc="best")
|
||
plt.tight_layout()
|
||
out_png = plot_path(f"{out_prefix}_{name}_{labelA}_vs_{labelB}.png".replace("/", "_"))
|
||
plt.savefig(out_png, dpi=150, bbox_inches='tight')
|
||
plt.close()
|
||
print(f"[compare-underwater-ph2] Salvato: {out_png}")
|
||
|
||
def plot_phase1_vs_phase2_all(port_names=None, underwater_ylim=(-0.5, 0.0)):
|
||
if port_names is None:
|
||
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
|
||
|
||
if optimized_weights_phase2.empty:
|
||
print("[PH1 vs PH2] Nessun risultato PH2 disponibile. Salto i plot.")
|
||
return
|
||
|
||
period_start_date = end_date - pd.DateOffset(years=5)
|
||
period_df = final_df.loc[period_start_date:end_date]
|
||
|
||
ph1_cols = set(optimized_weights.columns)
|
||
ph1h_cols = set(optimized_weights_phase2.columns)
|
||
|
||
for name in port_names:
|
||
if name not in ph1_cols:
|
||
print(f"[PH1 vs PH2] '{name}' assente in PH1. Skip.")
|
||
continue
|
||
if name not in ph1h_cols:
|
||
print(f"[PH1 vs PH2] '{name}' assente in PH1+PH2. Skip.")
|
||
continue
|
||
|
||
w_ph1 = optimized_weights[name].reindex(period_df.columns).fillna(0.0)
|
||
w_hproxy= optimized_weights_phase2[name].reindex(period_df.columns).fillna(0.0)
|
||
|
||
_plot_equity_compare_generic(name, w_ph1, "PH1", w_hproxy, "PH2",
|
||
period_df, out_prefix="Equity_Compare_PH1_vs_PH2")
|
||
_plot_underwater_compare_generic(name, w_ph1, "PH1", w_hproxy, "PH2",
|
||
period_df, ylim=underwater_ylim,
|
||
out_prefix="Underwater_Compare_PH1_vs_PH2")
|
||
|
||
plot_phase1_vs_phase2_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'], underwater_ylim=(-0.5, 0.0))
|
||
|
||
# =========================
|
||
# EXPORT — (3) FILE GESTIONALE FASE 2 (PTFOPTVARx_nY.xlsx)
|
||
# =========================
|
||
try:
|
||
optimized_weights_phase2
|
||
except NameError:
|
||
optimized_weights_phase2 = pd.DataFrame()
|
||
|
||
if optimized_weights_phase2.empty:
|
||
print("[Fase2] Nessun risultato di Fase 2 trovato: skip export PTFOPTVARx_nY.xlsx")
|
||
else:
|
||
template_cols = list(template_df.columns)
|
||
|
||
# Per ciascun portafoglio di riferimento nei target attivi
|
||
for (years, target_vol), name in volatility_targets.items():
|
||
if name not in optimized_weights_phase2.columns:
|
||
print(f"[Fase2] '{name}' non presente tra i risultati di Fase 2. Skip.")
|
||
continue
|
||
|
||
# Serie pesi (ISIN -> peso), mantieni solo > 0
|
||
w_series = optimized_weights_phase2[name].reindex(df['ISIN']).dropna()
|
||
w_series = w_series[w_series > 0]
|
||
|
||
results_rows = []
|
||
for isin, weight in w_series.items():
|
||
r_sel = df.loc[df['ISIN'] == isin]
|
||
codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else ""
|
||
nome = r_sel['Nome'].iloc[0] if not r_sel.empty else ""
|
||
|
||
row = {col: "" for col in template_cols}
|
||
row['cod_por'] = f'PTFOPT{name}_PH2' # es. PTFOPTVAR6_3Y
|
||
row['cod_tit'] = codice_titolo
|
||
row['des_tit'] = nome
|
||
row['peso'] = float(weight * 99) # allineato a Fase 1
|
||
results_rows.append(row)
|
||
|
||
# Prepara il foglio con l’intestazione del template + righe risultato
|
||
results_full_df = pd.DataFrame(results_rows, columns=template_cols)
|
||
output_df = pd.concat([template_df.iloc[0:0], results_full_df], ignore_index=True)
|
||
|
||
# NOME FILE: identico al naming di Fase 1
|
||
output_file_path = excel_path(f'PTFOPT{name}_PH2.xlsx')
|
||
output_df.to_excel(output_file_path, index=False, engine='openpyxl')
|
||
print(f"[Fase2] File {output_file_path} salvato con successo.")
|
||
|
||
# =========================
|
||
# RIEPILOGO TABELLARE: PH1 vs PH2 (stesse metriche path)
|
||
# =========================
|
||
def _port_metrics_row(name, variant_label, years, target_vol, w_series, period_df, metrics_asset_df):
|
||
"""
|
||
Riga di confronto con:
|
||
- metriche model-based (exp_ret/exp_vol/sharpe)
|
||
- path-based (AnnReturn, AnnVol, CAGR, R2, MaxDD, ecc.)
|
||
- Diversification Ratio & Beneficio di diversificazione
|
||
- Beneficio temporale (con segno coerente: valori negativi = beneficio)
|
||
"""
|
||
cols = list(period_df.columns)
|
||
w_series = w_series.reindex(cols).fillna(0.0)
|
||
w_vec = w_series.values
|
||
|
||
# === model-based ===
|
||
#mu = period_df.mean().reindex(cols).fillna(0.0).values * days_per_year
|
||
Sigma_df = risk_models.sample_cov(period_df, returns_data=True).loc[cols, cols]
|
||
Sigma = Sigma_df.values
|
||
|
||
#exp_ret = float(np.dot(w_vec, mu))
|
||
exp_vol = float(np.sqrt(max(w_vec @ Sigma @ w_vec, 0.0)))
|
||
#sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
|
||
|
||
# Diversification Ratio & Beneficio di diversificazione
|
||
indiv_ann_vols = np.sqrt(np.clip(np.diag(Sigma), 0.0, None))
|
||
weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols))
|
||
if weighted_avg_vol > 0 and exp_vol > 0:
|
||
#diversification_ratio = weighted_avg_vol / exp_vol
|
||
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 # tipicamente negativo
|
||
else:
|
||
#diversification_ratio = np.nan
|
||
diversification_benefit = np.nan
|
||
|
||
# === path-based ===
|
||
metr = portfolio_path_metrics(period_df, five_year_df, w_vec, cols, days_per_year)
|
||
port_hmin = metr['Hmin_100m_5Y'] # mesi
|
||
|
||
# === Beneficio temporale ===
|
||
h_map = metrics_asset_df.set_index('ISIN')['H_min_100m_5Y'].to_dict()
|
||
h_assets = np.array([h_map.get(c, np.nan) for c in cols], dtype=float)
|
||
|
||
mask = np.isfinite(h_assets) & (w_vec > 0)
|
||
if mask.any():
|
||
w_sub = w_vec[mask].astype(float)
|
||
h_sub = h_assets[mask].astype(float)
|
||
tot = w_sub.sum()
|
||
if tot > 0:
|
||
w_sub /= tot
|
||
h_wavg = float(np.dot(w_sub, h_sub))
|
||
else:
|
||
h_wavg = np.nan
|
||
else:
|
||
h_wavg = np.nan
|
||
|
||
if (
|
||
(h_wavg is not None) and np.isfinite(h_wavg) and h_wavg > 0
|
||
and (port_hmin is not None) and np.isfinite(port_hmin)
|
||
):
|
||
# beneficio temporale positivo → tempo più breve; invertiamo il segno per coerenza
|
||
beneficio_temporale = - (1.0 - (float(port_hmin) / float(h_wavg)))
|
||
else:
|
||
beneficio_temporale = np.nan
|
||
|
||
return {
|
||
"Portafoglio": name,
|
||
"Variante": variant_label,
|
||
"Years": years,
|
||
|
||
"Target Vol": round(target_vol,4),
|
||
"Volatilita Ann": round(metr['AnnVol'],4) if pd.notna(metr['AnnVol']) else None,
|
||
|
||
"Rendimento Ann": round(metr['AnnReturn'],4) if pd.notna(metr['AnnReturn']) else None,
|
||
"CAGR": round(metr['CAGR'],4) if pd.notna(metr['CAGR']) else None,
|
||
|
||
"R^2 Equity": round(metr['R2'], 3) if pd.notna(metr['R2']) else np.nan,
|
||
|
||
"MaxDD": round(metr['MaxDD'], 4) if pd.notna(metr['MaxDD']) else None,
|
||
"DD Duration Max": int(metr['DD_Duration']) if pd.notna(metr['DD_Duration']) else "",
|
||
"Time to Recovery": int(metr['TTR']) if pd.notna(metr['TTR']) else "",
|
||
|
||
"AAW": round(float(metr['AAW']),2) if pd.notna(metr['AAW']) else np.nan,
|
||
"AUW": round(float(metr['AUW']),2) if pd.notna(metr['AUW']) else np.nan,
|
||
"Heal Index": round(float(metr['Heal']),2) if pd.notna(metr['Heal']) else np.nan,
|
||
|
||
"Horizon": int(metr['Hmin_100m_5Y']) if pd.notna(metr['Hmin_100m_5Y']) else "",
|
||
"Horizon average": round(float(h_wavg),2) if pd.notna(h_wavg) else np.nan,
|
||
|
||
"Beneficio di diversificazione": round(diversification_benefit, 4) if pd.notna(diversification_benefit) else None,
|
||
"Beneficio temporale": round(beneficio_temporale, 4) if pd.notna(beneficio_temporale) else None,
|
||
}
|
||
|
||
comparison_rows = []
|
||
for (years, target_vol), name in volatility_targets.items():
|
||
if name not in optimized_weights.columns:
|
||
continue
|
||
if optimized_weights_phase2.empty or name not in optimized_weights_phase2.columns:
|
||
continue
|
||
|
||
period_start_date = end_date - pd.DateOffset(years=years)
|
||
period_df_cmp = final_df.loc[period_start_date:end_date]
|
||
|
||
w_ph1 = optimized_weights[name].reindex(period_df_cmp.columns).fillna(0.0)
|
||
w_h = optimized_weights_phase2[name].reindex(period_df_cmp.columns).fillna(0.0)
|
||
|
||
metrics_asset_df = per_asset_metrics[name] # contiene H_min_100m_5Y per-asset (su 5Y)
|
||
|
||
# riga PH1
|
||
comparison_rows.append(
|
||
_port_metrics_row(name, "PH1", years, target_vol, w_ph1, period_df_cmp, metrics_asset_df)
|
||
)
|
||
# riga PH1+PH2
|
||
comparison_rows.append(
|
||
_port_metrics_row(name, "PH2", years, target_vol, w_h, period_df_cmp, metrics_asset_df)
|
||
)
|
||
|
||
comparison_df = pd.DataFrame(comparison_rows)
|
||
|
||
# =========================
|
||
# EXPORT — (1) ASSET METRICS SOLO
|
||
# =========================
|
||
asset_metrics_path = excel_path('asset_metrics_v2.5.xlsx')
|
||
with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer:
|
||
for name, metrics_df in per_asset_metrics.items():
|
||
metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False)
|
||
|
||
consolidated = []
|
||
for name, metrics_df in per_asset_metrics.items():
|
||
tmp = metrics_df.copy()
|
||
tmp.insert(0, 'Periodo', name)
|
||
consolidated.append(tmp)
|
||
consolidated_df = pd.concat(consolidated, ignore_index=True) if consolidated else pd.DataFrame()
|
||
if not consolidated_df.empty:
|
||
consolidated_df.to_excel(writer, sheet_name='Metriche Consolidate', index=False)
|
||
|
||
print(f"File '{asset_metrics_path}' creato con soli fogli Metriche_* e Metriche_Consolidate.")
|
||
|
||
# =========================
|
||
# (NUOVO ORDINE) COSTRUZIONE "WITH NAMES" DOPO LA FASE 2
|
||
# =========================
|
||
# mappa ISIN -> Nome per lookup robusto
|
||
isin_to_name = dict(zip(df['ISIN'], df['Nome']))
|
||
|
||
optimized_weights_with_names = optimized_weights.copy()
|
||
optimized_weights_with_names['Nome ETF'] = (
|
||
optimized_weights_with_names.index.map(isin_to_name).fillna("")
|
||
)
|
||
|
||
optimized_weights_phase2_with_names = optimized_weights_phase2.copy()
|
||
if not optimized_weights_phase2_with_names.empty:
|
||
optimized_weights_phase2_with_names['Nome ETF'] = (
|
||
optimized_weights_phase2_with_names.index.map(isin_to_name).fillna("")
|
||
)
|
||
|
||
# =========================
|
||
# EXPORT — (unico writer con xlsxwriter) + formattazione %
|
||
# =========================
|
||
output_path = excel_path('optimized_weights_summary_v2.5.xlsx')
|
||
sheet_weights_ph1 = 'Pesi PH1'
|
||
sheet_weights_ph2 = 'Pesi PH2'
|
||
sheet_compare = 'Confronto PH1 vs PH2'
|
||
|
||
# Colonne da mostrare in formato percentuale nel foglio di confronto
|
||
percent_cols = [
|
||
"Target Vol",
|
||
"Volatilita Ann",
|
||
"Rendimento Ann",
|
||
"CAGR",
|
||
"MaxDD",
|
||
"Beneficio di diversificazione",
|
||
"Beneficio temporale",
|
||
]
|
||
|
||
with pd.ExcelWriter(output_path, engine='xlsxwriter') as writer:
|
||
# 1) Pesi PH1
|
||
optimized_weights_with_names.to_excel(writer, sheet_name=sheet_weights_ph1, index=True)
|
||
|
||
# 2) Pesi PH1+PH2 oppure nota
|
||
if optimized_weights_phase2_with_names.empty:
|
||
pd.DataFrame({
|
||
"Nota": ["Nessun risultato PH2 (cvxpy assente o ottimizzazione fallita)."]
|
||
}).to_excel(writer, sheet_name=sheet_weights_ph2, index=False)
|
||
else:
|
||
optimized_weights_phase2_with_names.to_excel(writer, sheet_name=sheet_weights_ph2, index=True)
|
||
|
||
# 3) Confronto oppure nota
|
||
if comparison_df.empty:
|
||
pd.DataFrame({
|
||
"Nota": ["Confronto non disponibile (mancano risultati PH2)."]
|
||
}).to_excel(writer, sheet_name=sheet_compare, index=False)
|
||
# niente formattazione percentuale se non c'è il confronto
|
||
else:
|
||
comparison_df.to_excel(writer, sheet_name=sheet_compare, index=False)
|
||
|
||
# === Formattazione percentuale sulle colonne elencate ===
|
||
wb = writer.book
|
||
ws = writer.sheets[sheet_compare]
|
||
percent_fmt = wb.add_format({'num_format': '0.00%'})
|
||
|
||
# Applica il formato solo alle colonne effettivamente presenti
|
||
for col in percent_cols:
|
||
if col in comparison_df.columns:
|
||
col_idx = comparison_df.columns.get_loc(col) # 0-based
|
||
ws.set_column(col_idx, col_idx, 10, percent_fmt)
|
||
|
||
print(f"File '{output_path}' creato con '{sheet_weights_ph1}', '{sheet_weights_ph2}' e '{sheet_compare}'.")
|