Files
Ottimizzatore/Sviluppo/20251022 Ottimizzatore Versione 2.6.py
2025-11-20 14:47:51 +01:00

1022 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Created on 22 Oct 2025
@author: Federico
"""
# =========================
# IMPORT & PARAMETRI
# =========================
import sys
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from pypfopt import risk_models
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt.exceptions import OptimizationError
# Cartelle di input/output/plot
OUTPUT_DIR = "Output"
INPUT_DIR = "Input"
PLOT_DIR = "Plot"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(PLOT_DIR, exist_ok=True)
def excel_path(filename: str) -> str:
"""Percorso completo per i file Excel di output."""
return os.path.join(OUTPUT_DIR, filename)
def plot_path(filename: str) -> str:
"""Percorso completo per i file di grafico."""
return os.path.join(PLOT_DIR, filename)
# --- Placeholders per evitare NameError anche se la fase Heal viene saltata ---
optimized_weights_phase1_heal = pd.DataFrame()
summary_data_phase1_heal = []
# =========================
# CONFIGURAZIONE OBIETTIVI
# =========================
volatility_targets = {
# (1, 0.06): 'VAR3_1Y',
# (3, 0.06): 'VAR3_3Y',
(5, 0.06): 'VAR3_5Y',
(1, 0.12): 'VAR6_1Y',
(3, 0.12): 'VAR6_3Y',
(5, 0.12): 'VAR6_5Y',
# (1, 0.18): 'VAR9_1Y',
# (3, 0.18): 'VAR9_3Y',
(5, 0.18): 'VAR9_5Y'
}
days_per_year = 252
riskfree_rate = 0.02
mu_heal_floor = 0.85
# ---------------------------------
# Utility per R^2 sullequity line
# ---------------------------------
def r2_equity_line(returns: pd.Series) -> float:
"""R^2 della regressione OLS di log(equity) sul tempo (con intercetta)."""
s = returns.dropna()
if s.size < 3:
return np.nan
equity = (1.0 + s).cumprod()
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
if equity.size < 3:
return np.nan
y = np.log(equity.values)
if np.allclose(y.var(ddof=1), 0.0):
return 0.0
x = np.arange(y.size, dtype=float)
X = np.column_stack([np.ones_like(x), x])
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - y.mean()) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
if np.isnan(r2):
return np.nan
return float(np.clip(r2, 0.0, 1.0))
# ---------------------------------
# Utility per metriche di drawdown
# ---------------------------------
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
"""
Calcola:
- max_dd: profondità massima del drawdown (negativa o zero)
- max_dd_duration: durata massima (in giorni) di qualsiasi drawdown
- ttr_from_mdd: giorni dal minimo del Max DD al pieno recupero del picco precedente (sentinel se non recupera)
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
# Max Drawdown (valore più negativo)
max_dd = float(dd.min()) if dd.size else np.nan
# Durata massima di drawdown (giorni consecutivi sotto zero drawdown)
under_water = dd < 0
if under_water.any():
max_dd_duration = 0
current = 0
for flag in under_water.values:
if flag:
current += 1
if current > max_dd_duration:
max_dd_duration = current
else:
current = 0
else:
max_dd_duration = 0
# Time-to-Recovery dal Max DD
if dd.size:
trough_idx = int(np.argmin(dd.values))
if trough_idx > 0:
peak_idx = int(np.argmax(equity.values[:trough_idx+1]))
peak_level = float(equity.values[peak_idx])
rec_idx = None
for t in range(trough_idx + 1, equity.size):
if equity.values[t] >= peak_level:
rec_idx = t
break
if rec_idx is None:
ttr_from_mdd = sentinel_ttr # non recuperato
else:
ttr_from_mdd = rec_idx - trough_idx
else:
ttr_from_mdd = np.nan
else:
ttr_from_mdd = np.nan
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
# ---------------------------------
# Utility per AAW, AUW e Heal Index
# ---------------------------------
def heal_index_metrics(returns: pd.Series):
"""
Calcola:
- AAW: area sopra acqua (run-up vs minimo cumulato)
- AUW: area sotto acqua (drawdown vs massimo cumulato)
- Heal Index: (AAW - AUW) / AUW
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
# ---------------------------------
# Utility per H_min (100% finestre positive)
# ---------------------------------
def h_min_100(returns: pd.Series, month_len: int = 21):
"""
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
"""
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1:]
tail = np.concatenate(([0.0], csum[:-k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(np.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# ---------------------------------
# Utility di serie portafoglio e metriche path-based
# ---------------------------------
def portfolio_series_from_weights(period_df: pd.DataFrame, w: np.ndarray, cols: list) -> pd.Series:
w_series = pd.Series(w, index=cols)
return (period_df[cols] * w_series).sum(axis=1)
def portfolio_path_metrics(period_df: pd.DataFrame,
five_year_df: pd.DataFrame,
w: np.ndarray,
cols: list,
days_per_year: int) -> dict:
"""Metriche path-based del portafoglio su period_df + H_min_100m su 5Y."""
w = np.asarray(w, dtype=float)
cols = list(cols)
port_returns = portfolio_series_from_weights(period_df, w, cols)
n_days = int(port_returns.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
if years_elapsed and years_elapsed > 0 and gross and gross > 0:
cagr = gross**(1.0 / years_elapsed) - 1.0
else:
cagr = np.nan
r2 = r2_equity_line(port_returns)
maxdd, dddur, ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
aaw, auw, heal = heal_index_metrics(port_returns)
common_cols = [c for c in cols if c in five_year_df.columns]
if len(common_cols) > 0:
w5 = pd.Series(w, index=cols).reindex(common_cols).fillna(0.0).values
port_returns_5y = portfolio_series_from_weights(five_year_df, w5, common_cols)
_, hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
else:
hmin_5y_months = np.nan
return {
"AnnReturn": ann_return,
"AnnVol": ann_vol,
"CAGR": cagr,
"R2": r2,
"MaxDD": maxdd,
"DD_Duration": dddur,
"TTR": ttr,
"AAW": aaw,
"AUW": auw,
"Heal": heal,
"Hmin_100m_5Y": hmin_5y_months
}
# --- Lettura parametri dal file connection.txt ---
params = {}
with open("connection.txt", "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
params[key.strip()] = value.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
connection_string = (
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
print("Connection string letta correttamente")
# =========================
# CONNESSIONE AL DB
# =========================
try:
engine = create_engine(connection_string)
with engine.connect() as connection:
_ = connection.execute(text("SELECT 1"))
print("Connessione al database riuscita.")
except SQLAlchemyError as e:
print("Errore durante la connessione al database:", e)
sys.exit()
# =========================
# INPUT / TEMPLATE
# =========================
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
template_df = pd.read_excel(template_path)
file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore v.2.4.xlsx')
df = pd.read_excel(
file_path,
usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'],
dtype={'Codice Titolo': str}
)
# =========================
# SERIE STORICHE RENDIMENTI
# =========================
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=5)
all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize()
final_df = pd.DataFrame(index=all_dates)
isin_from_db = set()
for isin in df['ISIN'].unique():
print(f"Working on ISIN: {isin}")
procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = 1305, @PtfCurr = EUR"
try:
temp_df = pd.read_sql_query(procedure_call, engine)
if temp_df.empty:
print(f"Nessun dato recuperato per {isin}, skipping...")
continue
temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d', errors='coerce').dt.normalize()
temp_df = temp_df.dropna(subset=['Px_Date'])
temp_df.set_index('Px_Date', inplace=True)
temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100
final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates)
isin_from_db.add(isin)
print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.")
except SQLAlchemyError as e:
print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e)
final_df.fillna(0, inplace=True)
# -------- H_min sempre su 5 anni (21 gg = 1 mese) --------
five_year_df = final_df.loc[end_date - pd.DateOffset(years=5): end_date]
# =========================
# LOOP OTTIMIZZAZIONI (PH1 tradizionale)
# =========================
optimized_weights = pd.DataFrame()
per_asset_metrics = {}
for (years, target_vol), name in volatility_targets.items():
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
# ---------- PER-ASSET METRICS ----------
n_days = int(period_df.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
asset_ann_return = daily_returns_mean * days_per_year
asset_ann_vol = period_df.std(ddof=1) * np.sqrt(days_per_year)
gross = (1.0 + period_df).prod(skipna=True)
asset_cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
asset_r2 = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
aaw_dict, auw_dict, heal_dict = {}, {}, {}
hmin_5y_months_dict = {}
for col in period_df.columns:
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
aaw, auw, heal = heal_index_metrics(period_df[col])
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
if col in five_year_df.columns:
_, h_months_5y = h_min_100(five_year_df[col], month_len=21)
else:
h_months_5y = np.nan
hmin_5y_months_dict[col] = h_months_5y
asset_metrics_df = (
pd.DataFrame({
'ISIN': period_df.columns,
'Rendimento_Ann': asset_ann_return.reindex(period_df.columns).values,
'Volatilita_Ann': asset_ann_vol.reindex(period_df.columns).values,
'CAGR': asset_cagr.reindex(period_df.columns).values,
'R2_Equity': asset_r2.reindex(period_df.columns).values,
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
})
.merge(df[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
[['ISIN', 'Nome', 'Categoria', 'Asset Class',
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']]
.sort_values('ISIN', kind='stable')
.reset_index(drop=True)
)
per_asset_metrics[name] = asset_metrics_df
# ---------- OTTIMIZZAZIONE ----------
ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix)
# Vincoli PesoFisso / PesoMax
for _, row in df.iterrows():
isin_i = row['ISIN']
if isin_i in period_df.columns:
idx = period_df.columns.get_loc(isin_i)
pf = row.get('PesoFisso')
pm = row.get('PesoMax')
if pd.notna(pf):
ef.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val)
elif pd.notna(pm):
ef.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw)
# Vincoli per Categoria
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
for cat, maxw in categories_limits.items():
isin_list = df[df['Categoria'] == cat]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# Vincoli per Asset Class
asset_class_limits = {
'Azionari': 0.75, 'Obbligazionari': 0.75,
'Metalli Preziosi': 0.20, 'Materie Prime': 0.05,
'Immobiliare': 0.05, 'Criptovalute': 0.05, 'Monetari': 0.1
}
for ac, maxw in asset_class_limits.items():
isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# ---------- Risoluzione ----------
try:
ef.efficient_risk(target_volatility=target_vol)
weights = ef.clean_weights()
optimized_weights[name] = pd.Series(weights)
exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=riskfree_rate)
print(f"=== Ottimizzazione: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
# --- Beneficio di diversificazione ---
w_vec_tmp = np.array([weights.get(isin, 0) for isin in period_df.columns])
indiv_ann_vols = np.sqrt(np.diag(annual_covariance_matrix.loc[period_df.columns, period_df.columns].values))
weighted_avg_vol = float(np.dot(w_vec_tmp, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
# --- File Excel per import gestionale (uno per portafoglio) ---
template_cols = list(template_df.columns)
results_rows = []
for isin, weight in weights.items():
if weight > 0:
r_sel = df.loc[df['ISIN'] == isin]
codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else ""
nome = r_sel['Nome'].iloc[0] if not r_sel.empty else ""
row = {col: "" for col in template_cols}
row['cod_por'] = f'PTFOPT{name}'
row['cod_tit'] = codice_titolo
row['des_tit'] = nome
row['peso'] = float(weight * 99)
results_rows.append(row)
results_full_df = pd.DataFrame(results_rows, columns=template_cols)
output_df = pd.concat([template_df.iloc[0:0], results_full_df], ignore_index=True)
output_file_path = excel_path(f'PTFOPT{name}.xlsx')
output_df.to_excel(output_file_path, index=False)
print(f"File {output_file_path} saved successfully.")
# --- Pie chart asset allocation (se ci sono pesi > 0) ---
asset_allocations = {asset: 0 for asset in asset_class_limits}
for isin, weight in weights.items():
r_sel = df.loc[df['ISIN'] == isin]
if r_sel.empty:
continue
asset_allocations.setdefault(r_sel['Asset Class'].iloc[0], 0)
asset_allocations[r_sel['Asset Class'].iloc[0]] += weight
if sum(asset_allocations.values()) > 0:
plt.figure(figsize=(8, 6))
plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%')
plt.title(f'Asset Allocation for {name}')
pie_path = plot_path(f'Asset_Allocation_{name}.png')
plt.savefig(pie_path, dpi=150, bbox_inches='tight')
plt.close()
except OptimizationError as e:
print(f"Optimization failed for {name}: {e}")
optimized_weights[name] = pd.Series([0] * len(annual_returns_mean))
# =========================
# RIEPILOGO METRICHE (PORTAFOGLI PH1)
# =========================
summary_data = []
for (years, target_vol), name in volatility_targets.items():
if name in optimized_weights.columns:
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
w_series = optimized_weights[name].reindex(period_df.columns).fillna(0.0)
w_vec = w_series.values
port_returns = (period_df[period_df.columns] * w_series).sum(axis=1)
n_days = int(port_returns.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
port_ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
port_ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
port_cagr = (gross**(1.0 / years_elapsed) - 1.0) if (years_elapsed and years_elapsed > 0 and gross and gross > 0) else np.nan
port_r2 = r2_equity_line(port_returns)
port_maxdd, port_dddur, port_ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
port_aaw, port_auw, port_heal = heal_index_metrics(port_returns)
common_cols = [c for c in w_series.index if c in five_year_df.columns]
if len(common_cols) > 0:
w_5y = w_series.reindex(common_cols).fillna(0.0)
port_returns_5y = (five_year_df[common_cols] * w_5y).sum(axis=1)
_, port_hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
else:
port_hmin_5y_months = np.nan
exp_ret = float(np.dot(w_vec, annual_returns_mean.loc[period_df.columns].values))
cov_mat = annual_covariance_matrix.loc[period_df.columns, period_df.columns].values
exp_vol = float(np.sqrt(np.dot(w_vec, np.dot(cov_mat, w_vec))))
sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
indiv_ann_vols = np.sqrt(np.diag(cov_mat))
weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
diversification_ratio = weighted_avg_vol / exp_vol if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"=== Riepilogo: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
print(f"Diversification Ratio: {diversification_ratio:.3f}" if not np.isnan(diversification_ratio) else "Diversification Ratio: NaN")
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
summary_data.append({
"Portafoglio": name,
"Years": years,
"Target Vol": f"{target_vol:.2%}",
"Expected annual return": f"{exp_ret:.2%}",
"Annual volatility": f"{exp_vol:.2%}",
"Sharpe Ratio": f"{sharpe:.2f}",
"Beneficio di diversificazione": f"{diversification_benefit:.2%}",
"Rendimento_Ann": f"{port_ann_return:.2%}" if pd.notna(port_ann_return) else "",
"Volatilita_Ann": f"{port_ann_vol:.2%}" if pd.notna(port_ann_vol) else "",
"CAGR": f"{port_cagr:.2%}" if pd.notna(port_cagr) else "",
"R2_Equity": round(port_r2, 3) if pd.notna(port_r2) else np.nan,
"MaxDD": f"{port_maxdd:.2%}" if pd.notna(port_maxdd) else "",
"DD_Duration_Max": int(port_dddur) if pd.notna(port_dddur) else "",
"TTR_from_MDD": int(port_ttr) if pd.notna(port_ttr) else "",
"AAW": float(port_aaw) if pd.notna(port_aaw) else np.nan,
"AUW": float(port_auw) if pd.notna(port_auw) else np.nan,
"Heal_Index": float(port_heal) if pd.notna(port_heal) else np.nan,
"H_min_100m_5Y": int(port_hmin_5y_months) if pd.notna(port_hmin_5y_months) else ""
})
# =========================
# PLOT EQUITY/UNDERWATER (PH1)
# =========================
def plot_equity_overlay_all(port_names=None):
if port_names is None:
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
period_start_date = end_date - pd.DateOffset(years=5)
period_df = final_df.loc[period_start_date:end_date]
available_cols = set(optimized_weights.columns)
plotted = 0
plt.figure(figsize=(11, 6))
for pname in port_names:
if pname not in available_cols:
print(f"[plot] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.")
continue
w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0)
port_returns = (period_df[w_series.index] * w_series).sum(axis=1)
equity = (1.0 + port_returns).cumprod()
plt.plot(equity.index, equity.values, label=pname)
plotted += 1
if plotted == 0:
print("[plot] Nessun portafoglio valido da plottare.")
plt.close()
return
plt.title("Equity line - Portafogli ottimizzati (ultimi 5 anni)")
plt.xlabel("Data")
plt.ylabel("Equity (base=1.0)")
plt.grid(True, alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
out_png = plot_path("Equity_ALL_PORTS.png")
plt.savefig(out_png, dpi=150, bbox_inches='tight')
plt.close()
print(f"[plot] Grafico sovrapposto salvato: {out_png}")
def plot_underwater_overlay_all(port_names=None, ylim=(-0.3, 0.0)):
if port_names is None:
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
period_start_date = end_date - pd.DateOffset(years=5)
period_df = final_df.loc[period_start_date:end_date]
available_cols = set(optimized_weights.columns)
plotted = 0
plt.figure(figsize=(11, 6))
for pname in port_names:
if pname not in available_cols:
print(f"[underwater] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.")
continue
w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0)
port_returns = (period_df[w_series.index] * w_series).sum(axis=1)
equity = (1.0 + port_returns).cumprod()
run_max = equity.cummax()
dd = equity / run_max - 1.0
plt.plot(dd.index, dd.values, label=pname)
plotted += 1
if plotted == 0:
print("[underwater] Nessun portafoglio valido da plottare.")
plt.close()
return
plt.title("Underwater (Drawdown) - Portafogli ottimizzati (ultimi 5 anni)")
plt.xlabel("Data")
plt.ylabel("Drawdown")
if ylim is not None:
plt.ylim(*ylim)
plt.grid(True, alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
out_png = plot_path("Underwater_ALL_PORTS.png")
plt.savefig(out_png, dpi=150, bbox_inches='tight')
plt.close()
print(f"[underwater] Grafico sovrapposto salvato: {out_png}")
plot_equity_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'])
plot_underwater_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'])
# =========================
# FASE 1 — Variante PH1+HealProxy
# =========================
try:
import cvxpy as cp
except Exception as _e:
print("[Phase1+Heal] Warning: cvxpy non disponibile. Salto questa variante.")
cp = None
if cp is not None:
for (years, target_vol), name in volatility_targets.items():
if name not in optimized_weights.columns or name not in per_asset_metrics:
print(f"[Phase1+Heal] '{name}': niente PH1 o metriche per-asset -> skip")
continue
period_start_date = end_date - pd.DateOffset(years=years)
period_df_p = final_df.loc[period_start_date:end_date]
cols = list(period_df_p.columns)
mu = period_df_p.mean().reindex(cols).fillna(0.0) * days_per_year
Sigma = risk_models.sample_cov(period_df_p, returns_data=True).loc[cols, cols]
w_base = optimized_weights[name].reindex(cols).fillna(0.0).values
exp_ret_base = float(np.dot(w_base, mu.values))
metr_df = per_asset_metrics[name]
heal_map = metr_df.set_index('ISIN')['Heal_Index'].to_dict()
h_vec = np.array([heal_map.get(c, 0.0) if pd.notna(heal_map.get(c, np.nan)) else 0.0 for c in cols], dtype=float)
ef_h = EfficientFrontier(mu, Sigma)
for _, row in df.iterrows():
isin_i = row['ISIN']
if isin_i in period_df_p.columns:
idx = period_df_p.columns.get_loc(isin_i)
pf = row.get('PesoFisso')
pm = row.get('PesoMax')
if pd.notna(pf):
ef_h.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val)
elif pd.notna(pm):
ef_h.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw)
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
for cat, maxw in categories_limits.items():
isin_list = df[df['Categoria'] == cat]['ISIN'].tolist()
idxs = [period_df_p.columns.get_loc(isin) for isin in isin_list if isin in period_df_p.columns]
if idxs:
ef_h.add_constraint(lambda w, idxs=idxs, maxw=maxw: cp.sum(w[idxs]) <= maxw)
asset_class_limits = {
'Azionari': 0.75, 'Obbligazionari': 0.75,
'Metalli Preziosi': 0.20, 'Materie Prime': 0.05,
'Immobiliare': 0.05, 'Criptovalute': 0.05, 'Monetari': 0.1
}
for ac, maxw in asset_class_limits.items():
isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist()
idxs = [period_df_p.columns.get_loc(isin) for isin in isin_list if isin in period_df_p.columns]
if idxs:
ef_h.add_constraint(lambda w, idxs=idxs, maxw=maxw: cp.sum(w[idxs]) <= maxw)
# floor rendimento atteso: >= 90% del baseline
ef_h.add_constraint(lambda w, mu_vec=mu.values, floor=mu_heal_floor*exp_ret_base: (mu_vec @ w) >= floor)
# obiettivo: massimizzare proxy Heal -> -h·w
ef_h.add_objective(lambda w, h=h_vec: -cp.sum(cp.multiply(h, w)))
try:
ef_h.efficient_risk(target_volatility=target_vol)
w_heal = ef_h.clean_weights()
optimized_weights_phase1_heal[name] = pd.Series(w_heal)
# stampa di controllo
w_arr = np.array([w_heal.get(isin, 0.0) for isin in cols], dtype=float)
exp_ret = float(w_arr @ mu.values)
exp_vol = float(np.sqrt(np.maximum(w_arr @ Sigma.values @ w_arr, 0.0)))
sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
summary_data_phase1_heal.append({
"Portafoglio": f"{name}_PH1_HEALPROXY",
"Years": years,
"Target Vol": f"{target_vol:.2%}",
"Expected annual return": f"{exp_ret:.2%}",
"Annual volatility": f"{exp_vol:.2%}",
"Sharpe Ratio": f"{sharpe:.2f}"
})
print(f"[Phase1+Heal] {name}: ottimizzazione completata.")
except Exception as e:
print(f"[Phase1+Heal] {name}: fallita ({e}). Skipping.")
# =========================
# CONFRONTO PH1 vs PH1+HealProxy (Equity & Underwater)
# =========================
def _portfolio_returns_from_weights_generic(period_df: pd.DataFrame, w_series: pd.Series) -> pd.Series:
w_series = w_series.reindex(period_df.columns).fillna(0.0)
return (period_df[w_series.index] * w_series).sum(axis=1)
def _plot_equity_compare_generic(name: str,
wA: pd.Series, labelA: str,
wB: pd.Series, labelB: str,
period_df: pd.DataFrame,
out_prefix: str):
rA = _portfolio_returns_from_weights_generic(period_df, wA)
rB = _portfolio_returns_from_weights_generic(period_df, wB)
eqA = (1.0 + rA).cumprod()
eqB = (1.0 + rB).cumprod()
plt.figure(figsize=(10, 5))
plt.plot(eqA.index, eqA.values, label=f"{name} {labelA}")
plt.plot(eqB.index, eqB.values, label=f"{name} {labelB}")
plt.title(f"Equity line - {name} ({labelA} vs {labelB}) - ultimi 5 anni")
plt.xlabel("Data")
plt.ylabel("Equity (base=1.0)")
plt.grid(True, alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
out_png = plot_path(f"{out_prefix}_{name}_{labelA}_vs_{labelB}.png".replace("/", "_"))
plt.savefig(out_png, dpi=150, bbox_inches='tight')
plt.close()
print(f"[compare-equity-heal] Salvato: {out_png}")
def _plot_underwater_compare_generic(name: str,
wA: pd.Series, labelA: str,
wB: pd.Series, labelB: str,
period_df: pd.DataFrame,
ylim: tuple, out_prefix: str):
rA = _portfolio_returns_from_weights_generic(period_df, wA)
rB = _portfolio_returns_from_weights_generic(period_df, wB)
eqA = (1.0 + rA).cumprod()
eqB = (1.0 + rB).cumprod()
ddA = eqA / eqA.cummax() - 1.0
ddB = eqB / eqB.cummax() - 1.0
plt.figure(figsize=(10, 5))
plt.plot(ddA.index, ddA.values, label=f"{name} {labelA}")
plt.plot(ddB.index, ddB.values, label=f"{name} {labelB}")
plt.title(f"Underwater (Drawdown) - {name} ({labelA} vs {labelB}) - ultimi 5 anni")
plt.xlabel("Data")
plt.ylabel("Drawdown")
if ylim is not None:
plt.ylim(*ylim)
plt.grid(True, alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
out_png = plot_path(f"{out_prefix}_{name}_{labelA}_vs_{labelB}.png".replace("/", "_"))
plt.savefig(out_png, dpi=150, bbox_inches='tight')
plt.close()
print(f"[compare-underwater-heal] Salvato: {out_png}")
def plot_phase1_vs_phase1heal_all(port_names=None, underwater_ylim=(-0.5, 0.0)):
if port_names is None:
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
if optimized_weights_phase1_heal.empty:
print("[PH1 vs PH1+HealProxy] Nessun risultato HealProxy disponibile. Salto i plot.")
return
period_start_date = end_date - pd.DateOffset(years=5)
period_df = final_df.loc[period_start_date:end_date]
ph1_cols = set(optimized_weights.columns)
ph1h_cols = set(optimized_weights_phase1_heal.columns)
for name in port_names:
if name not in ph1_cols:
print(f"[PH1 vs HealProxy] '{name}' assente in PH1. Skip.")
continue
if name not in ph1h_cols:
print(f"[PH1 vs HealProxy] '{name}' assente in PH1+HealProxy. Skip.")
continue
w_ph1 = optimized_weights[name].reindex(period_df.columns).fillna(0.0)
w_hproxy= optimized_weights_phase1_heal[name].reindex(period_df.columns).fillna(0.0)
_plot_equity_compare_generic(name, w_ph1, "PH1", w_hproxy, "PH1_HEALPROXY",
period_df, out_prefix="Equity_Compare_PH1_vs_PH1HEAL")
_plot_underwater_compare_generic(name, w_ph1, "PH1", w_hproxy, "PH1_HEALPROXY",
period_df, ylim=underwater_ylim,
out_prefix="Underwater_Compare_PH1_vs_PH1HEAL")
plot_phase1_vs_phase1heal_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'], underwater_ylim=(-0.5, 0.0))
# =========================
# RIEPILOGO TABELLARE: PH1 vs PH1+HealProxy (stesse metriche path)
# =========================
def _port_metrics_row(name, variant_label, years, target_vol, w_series, period_df, metrics_asset_df):
"""
Riga di confronto con:
- metriche model-based (exp_ret/exp_vol/sharpe)
- path-based (AnnReturn, AnnVol, CAGR, R2, MaxDD, ecc.)
- Diversification Ratio & Beneficio di diversificazione
- Beneficio temporale (con segno coerente: valori negativi = beneficio)
"""
cols = list(period_df.columns)
w_series = w_series.reindex(cols).fillna(0.0)
w_vec = w_series.values
# === model-based ===
#mu = period_df.mean().reindex(cols).fillna(0.0).values * days_per_year
Sigma_df = risk_models.sample_cov(period_df, returns_data=True).loc[cols, cols]
Sigma = Sigma_df.values
#exp_ret = float(np.dot(w_vec, mu))
exp_vol = float(np.sqrt(max(w_vec @ Sigma @ w_vec, 0.0)))
#sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
# Diversification Ratio & Beneficio di diversificazione
indiv_ann_vols = np.sqrt(np.clip(np.diag(Sigma), 0.0, None))
weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols))
if weighted_avg_vol > 0 and exp_vol > 0:
#diversification_ratio = weighted_avg_vol / exp_vol
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 # tipicamente negativo
else:
#diversification_ratio = np.nan
diversification_benefit = np.nan
# === path-based ===
metr = portfolio_path_metrics(period_df, five_year_df, w_vec, cols, days_per_year)
port_hmin = metr['Hmin_100m_5Y'] # mesi
# === Beneficio temporale ===
h_map = metrics_asset_df.set_index('ISIN')['H_min_100m_5Y'].to_dict()
h_assets = np.array([h_map.get(c, np.nan) for c in cols], dtype=float)
mask = np.isfinite(h_assets) & (w_vec > 0)
if mask.any():
w_sub = w_vec[mask].astype(float)
h_sub = h_assets[mask].astype(float)
tot = w_sub.sum()
if tot > 0:
w_sub /= tot
h_wavg = float(np.dot(w_sub, h_sub))
else:
h_wavg = np.nan
else:
h_wavg = np.nan
if (
(h_wavg is not None) and np.isfinite(h_wavg) and h_wavg > 0
and (port_hmin is not None) and np.isfinite(port_hmin)
):
# beneficio temporale positivo → tempo più breve; invertiamo il segno per coerenza
beneficio_temporale = - (1.0 - (float(port_hmin) / float(h_wavg)))
else:
beneficio_temporale = np.nan
return {
"Portafoglio": name,
"Variante": variant_label,
"Years": years,
#"Target Vol": f"{target_vol:.2%}".replace('.', ','),
#"Volatilita_Ann": f"{metr['AnnVol']:.2%}".replace('.', ',') if pd.notna(metr['AnnVol']) else "",
"Target Vol": round(target_vol * 100,2),
"Volatilita Ann": round(metr['AnnVol'] * 100,2) if pd.notna(metr['AnnVol']) else None,
#"Expected annual return": f"{exp_ret:.2%}",
#"Annual volatility": f"{exp_vol:.2%}",
#"Sharpe Ratio": f"{sharpe:.2f}",
#"Diversification Ratio": f"{diversification_ratio:.3f}" if pd.notna(diversification_ratio) else "",
"Rendimento Ann": round(metr['AnnReturn'] * 100, 2) if pd.notna(metr['AnnReturn']) else None,
"CAGR": round(metr['CAGR'] * 100, 2) if pd.notna(metr['CAGR']) else None,
"R^2 Equity": round(metr['R2'], 3) if pd.notna(metr['R2']) else np.nan,
"MaxDD": round(metr['MaxDD'] * 100, 2) if pd.notna(metr['MaxDD']) else None,
"DD Duration Max": int(metr['DD_Duration']) if pd.notna(metr['DD_Duration']) else "",
"Time to Recovery": int(metr['TTR']) if pd.notna(metr['TTR']) else "",
"AAW": round(float(metr['AAW']),2) if pd.notna(metr['AAW']) else np.nan,
"AUW": round(float(metr['AUW']),2) if pd.notna(metr['AUW']) else np.nan,
"Heal Index": round(float(metr['Heal']),2) if pd.notna(metr['Heal']) else np.nan,
"Horizon": int(metr['Hmin_100m_5Y']) if pd.notna(metr['Hmin_100m_5Y']) else "",
"Horizon average": round(float(h_wavg),2) if pd.notna(h_wavg) else np.nan,
"Beneficio di diversificazione": round(diversification_benefit * 100, 2) if pd.notna(diversification_benefit) else None,
"Beneficio temporale": round(beneficio_temporale * 100, 2) if pd.notna(beneficio_temporale) else None,
}
comparison_rows = []
for (years, target_vol), name in volatility_targets.items():
if name not in optimized_weights.columns:
continue
if optimized_weights_phase1_heal.empty or name not in optimized_weights_phase1_heal.columns:
continue
period_start_date = end_date - pd.DateOffset(years=years)
period_df_cmp = final_df.loc[period_start_date:end_date]
w_ph1 = optimized_weights[name].reindex(period_df_cmp.columns).fillna(0.0)
w_h = optimized_weights_phase1_heal[name].reindex(period_df_cmp.columns).fillna(0.0)
metrics_asset_df = per_asset_metrics[name] # contiene H_min_100m_5Y per-asset (su 5Y)
# riga PH1
comparison_rows.append(
_port_metrics_row(name, "PH1", years, target_vol, w_ph1, period_df_cmp, metrics_asset_df)
)
# riga PH1+HealProxy
comparison_rows.append(
_port_metrics_row(name, "PH1_HEALPROXY", years, target_vol, w_h, period_df_cmp, metrics_asset_df)
)
comparison_df = pd.DataFrame(comparison_rows)
# =========================
# EXPORT — (1) ASSET METRICS SOLO
# =========================
asset_metrics_path = excel_path('asset_metrics_v2.5.xlsx')
with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer:
for name, metrics_df in per_asset_metrics.items():
metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False)
consolidated = []
for name, metrics_df in per_asset_metrics.items():
tmp = metrics_df.copy()
tmp.insert(0, 'Periodo', name)
consolidated.append(tmp)
consolidated_df = pd.concat(consolidated, ignore_index=True) if consolidated else pd.DataFrame()
if not consolidated_df.empty:
consolidated_df.to_excel(writer, sheet_name='Metriche_Consolidate', index=False)
print(f"File '{asset_metrics_path}' creato con soli fogli Metriche_* e Metriche_Consolidate.")
# =========================
# (NUOVO ORDINE) COSTRUZIONE "WITH NAMES" DOPO LA FASE HEAL
# =========================
optimized_weights_with_names = optimized_weights.copy()
optimized_weights_with_names['Nome ETF'] = [
df.loc[df['ISIN'] == isin, 'Nome'].values[0] if (df['ISIN'] == isin).any() else ""
for isin in optimized_weights.index
]
optimized_weights_phase1_heal_with_names = optimized_weights_phase1_heal.copy()
if not optimized_weights_phase1_heal_with_names.empty:
optimized_weights_phase1_heal_with_names['Nome ETF'] = [
df.loc[df['ISIN'] == isin, 'Nome'].values[0] if (df['ISIN'] == isin).any() else ""
for isin in optimized_weights_phase1_heal_with_names.index
]
# =========================
# EXPORT — (2) RIEPILOGO PESI & CONFRONTI
# =========================
summary_path = excel_path('optimized_weights_summary_v2.5.xlsx')
with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer:
optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True)
if optimized_weights_phase1_heal_with_names.empty:
pd.DataFrame({
"Nota": ["Nessun risultato HealProxy (cvxpy assente o ottimizzazione fallita)."]
}).to_excel(writer, sheet_name='Pesi PH1 + HealProxy', index=False)
else:
optimized_weights_phase1_heal_with_names.to_excel(writer, sheet_name='Pesi PH1 + HealProxy', index=True)
if comparison_df.empty:
pd.DataFrame({
"Nota": ["Confronto non disponibile (mancano risultati HealProxy)."]
}).to_excel(writer, sheet_name='Confronto_PH1_vs_HEAL', index=False)
else:
comparison_df.to_excel(writer, sheet_name='Confronto_PH1_vs_HEAL', index=False)
print(f"File '{summary_path}' creato con 'Pesi Ottimizzati', 'Pesi PH1 + HealProxy' e 'Confronto_PH1_vs_HEAL'.")