Files
Ottimizzatore/Ottimizzatore ITA.py

692 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Ottimizzatore ITA
"""
# =========================
# IMPORT & PARAMETRI
# =========================
import sys
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yaml
import logging
from datetime import datetime
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from pypfopt import risk_models
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt.exceptions import OptimizationError
# ---------------------------------------------------
# Patch PyPortfolioOpt: usa cp.psd_wrap sulla covarianza
# ---------------------------------------------------
import cvxpy as cp
from pypfopt import objective_functions as _obj
def portfolio_variance_psdwrap(w, cov_matrix):
"""
Versione patchata di portfolio_variance:
usa cp.psd_wrap(cov_matrix) per evitare che CVXPY
faccia il controllo spettrale con ARPACK.
Comportamento identico all'originale, ma più robusto.
"""
variance = cp.quad_form(w, cp.psd_wrap(cov_matrix))
return _obj._objective_value(w, variance)
# Monkey patch globale: da qui in poi EfficientFrontier usa questa versione
_obj.portfolio_variance = portfolio_variance_psdwrap
# Cartelle di input/output/plot
OUTPUT_DIR = "Output"
INPUT_DIR = "Input"
PLOT_DIR = "Plot"
CONFIG_FILE = "config.yaml"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(PLOT_DIR, exist_ok=True)
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def excel_path(filename: str) -> str:
"""Costruisce il percorso completo per un file Excel nella cartella di output."""
return os.path.join(OUTPUT_DIR, filename)
def plot_path(filename: str) -> str:
"""Costruisce il percorso completo per un file PNG nella cartella Plot."""
return os.path.join(PLOT_DIR, filename)
def load_targets_and_limits(config_file: str):
"""Legge target di volatilità e limiti asset class dal file di configurazione (nessun fallback)."""
try:
with open(config_file, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
except FileNotFoundError:
logger.error("File di configurazione mancante: %s", config_file)
sys.exit(1)
vt_cfg = cfg.get("volatility_targets", {})
vt_list = []
if isinstance(vt_cfg, dict):
vt_list = vt_cfg.get("default") or []
elif isinstance(vt_cfg, list):
vt_list = vt_cfg
if not vt_list:
logger.error("Sezione 'volatility_targets' mancante o vuota nel file di configurazione.")
sys.exit(1)
volatility_targets_local = {
(int(item["years"]), float(item["target_vol"])): item["name"]
for item in vt_list
if "years" in item and "target_vol" in item and "name" in item
}
if not volatility_targets_local:
logger.error("Nessun target di volatilita valido trovato in configurazione.")
sys.exit(1)
asset_limits_cfg = cfg.get("asset_class_limits") or {}
if not asset_limits_cfg:
logger.error("Sezione 'asset_class_limits' mancante o vuota nel file di configurazione.")
sys.exit(1)
asset_class_limits_local = {k: float(v) for k, v in asset_limits_cfg.items()}
return volatility_targets_local, asset_class_limits_local
def validate_universe(df_universe: pd.DataFrame):
"""Verifica colonne obbligatorie e duplicati ISIN nel file universo."""
required_cols = ['ISIN', 'Nome', 'Categoria', 'Asset Class']
missing_cols = [c for c in required_cols if c not in df_universe.columns]
if missing_cols:
print(f"[warn] Colonne mancanti nel file universo: {', '.join(missing_cols)}")
dup_isin = df_universe['ISIN'][df_universe['ISIN'].duplicated()].unique().tolist()
if dup_isin:
print(f"[warn] ISIN duplicati nel file universo: {dup_isin}")
empty_isin = df_universe['ISIN'].isna().sum()
if empty_isin:
print(f"[warn] Righe con ISIN mancante nel file universo: {int(empty_isin)}")
def validate_returns_frame(df_returns: pd.DataFrame, threshold: float = 0.2):
"""Avvisa se i rendimenti hanno molte celle NaN prima del riempimento."""
if df_returns.empty:
print("[errore] Nessun dato di rendimento recuperato: final_df vuoto.")
sys.exit(1)
na_ratio = df_returns.isna().mean()
high_na = na_ratio[na_ratio > threshold]
if not high_na.empty:
cols = ", ".join([f"{c} ({v:.0%})" for c, v in high_na.items()])
print(f"[warn] Colonne con >{threshold:.0%} di NaN prima del fill: {cols}")
# ---------------------------------
# Utility per R^2 sullequity line
# ---------------------------------
def r2_equity_line(returns: pd.Series) -> float:
"""R^2 della regressione OLS di log(equity) sul tempo (con intercetta)."""
s = returns.dropna()
if s.size < 3:
return np.nan
equity = (1.0 + s).cumprod()
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
if equity.size < 3:
return np.nan
y = np.log(equity.values)
if np.allclose(y.var(ddof=1), 0.0):
return 0.0
x = np.arange(y.size, dtype=float)
X = np.column_stack([np.ones_like(x), x])
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - y.mean()) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
if np.isnan(r2):
return np.nan
return float(np.clip(r2, 0.0, 1.0))
# ---------------------------------
# Utility per metriche di drawdown
# ---------------------------------
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
"""
Calcola:
- max_dd: profondità massima del drawdown (negativa o zero)
- max_dd_duration: durata massima (in giorni) di qualsiasi drawdown
- ttr_from_mdd: giorni dal minimo del Max DD al pieno recupero del picco precedente (sentinel se non recupera)
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
# Max Drawdown (valore più negativo)
max_dd = float(dd.min()) if dd.size else np.nan
# Durata massima di drawdown (giorni consecutivi sotto zero drawdown)
under_water = dd < 0
if under_water.any():
max_dd_duration = 0
current = 0
for flag in under_water.values:
if flag:
current += 1
if current > max_dd_duration:
max_dd_duration = current
else:
current = 0
else:
max_dd_duration = 0
# Time-to-Recovery dal Max DD
if dd.size:
trough_idx = int(np.argmin(dd.values))
if trough_idx > 0:
peak_idx = int(np.argmax(equity.values[:trough_idx+1]))
peak_level = float(equity.values[peak_idx])
rec_idx = None
for t in range(trough_idx + 1, equity.size):
if equity.values[t] >= peak_level:
rec_idx = t
break
if rec_idx is None:
ttr_from_mdd = sentinel_ttr # non recuperato
else:
ttr_from_mdd = rec_idx - trough_idx
else:
ttr_from_mdd = np.nan
else:
ttr_from_mdd = np.nan
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
# ---------------------------------
# Utility per AAW, AUW e Heal Index
# ---------------------------------
def heal_index_metrics(returns: pd.Series):
"""
Calcola:
- AAW: area sopra acqua (run-up vs minimo cumulato)
- AUW: area sotto acqua (drawdown vs massimo cumulato)
- Heal Index: (AAW - AUW) / AUW
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
# ---------------------------------
# Utility per H_min (100% finestre positive)
# ---------------------------------
def h_min_100(returns: pd.Series, month_len: int = 21):
"""
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
"""
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1:]
tail = np.concatenate(([0.0], csum[:-k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(np.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# ---------------------------------
# Utility per rendere PSD/robusta la covarianza
# ---------------------------------
def regularize_covariance(cov_df: pd.DataFrame, ridge_factor: float = 1e-6) -> pd.DataFrame:
"""
Rende la matrice di covarianza numericamente piu' robusta:
- la simmetrizza
- aggiunge un piccolo termine di ridge sulla diagonale
Ritorna un nuovo DataFrame con stessa index/columns.
"""
if cov_df.empty:
return cov_df
Sigma = cov_df.values.astype(float)
# simmetrizza (elimina piccole asimmetrie numeriche)
Sigma = 0.5 * (Sigma + Sigma.T)
# piccolo ridge proporzionato alla scala media delle varianze
n = Sigma.shape[0]
trace = np.trace(Sigma)
if np.isfinite(trace) and n > 0:
eps = ridge_factor * (trace / n)
else:
eps = ridge_factor
Sigma_reg = Sigma + eps * np.eye(n)
return pd.DataFrame(Sigma_reg, index=cov_df.index, columns=cov_df.columns)
# --- Lettura parametri dal file connection.txt ---
params = {}
with open("connection.txt", "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
params[key.strip()] = value.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
connection_string = (
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
print("Connection string letta correttamente")
# =========================
# CONNESSIONE AL DB
# =========================
try:
engine = create_engine(connection_string)
with engine.connect() as connection:
_ = connection.execute(text("SELECT 1"))
print("Connessione al database riuscita.")
except SQLAlchemyError as e:
print("Errore durante la connessione al database:", e)
sys.exit()
# =========================
# INPUT / TEMPLATE
# =========================
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
template_df = pd.read_excel(template_path)
file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore.xlsx')
df = pd.read_excel(
file_path,
usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'],
dtype={'Codice Titolo': str}
)
validate_universe(df)
# =========================
# SERIE STORICHE RENDIMENTI
# =========================
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=5)
all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize()
final_df = pd.DataFrame(index=all_dates)
isin_from_db = set()
for isin in df['ISIN'].unique():
print(f"Working on ISIN: {isin}")
procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = 1305, @PtfCurr = EUR"
try:
temp_df = pd.read_sql_query(procedure_call, engine)
if temp_df.empty:
print(f"Nessun dato recuperato per {isin}, skipping...")
continue
temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d', errors='coerce').dt.normalize()
temp_df = temp_df.dropna(subset=['Px_Date'])
temp_df.set_index('Px_Date', inplace=True)
temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100
final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates)
isin_from_db.add(isin)
print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.")
except SQLAlchemyError as e:
print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e)
validate_returns_frame(final_df)
final_df.fillna(0, inplace=True)
# -------- H_min sempre su 5 anni (21 gg = 1 mese) --------
five_year_df = final_df.loc[end_date - pd.DateOffset(years=5): end_date]
# =========================
# CONFIGURAZIONE OBIETTIVI
# =========================
volatility_targets, asset_class_limits = load_targets_and_limits(CONFIG_FILE)
days_per_year = 252
riskfree_rate = 0.02
# =========================
# LOOP OTTIMIZZAZIONI (PH1 tradizionale)
# =========================
optimized_weights = pd.DataFrame()
per_asset_metrics = {}
export_rows = []
for (years, target_vol), name in volatility_targets.items():
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
# --- Regularizzazione covarianza per l'ottimizzatore ---
annual_covariance_matrix = regularize_covariance(annual_covariance_matrix)
# ---------- PER-ASSET METRICS ----------
n_days = int(period_df.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
asset_ann_return = daily_returns_mean * days_per_year
asset_ann_vol = period_df.std(ddof=1) * np.sqrt(days_per_year)
gross = (1.0 + period_df).prod(skipna=True)
asset_cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
asset_r2 = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
aaw_dict, auw_dict, heal_dict = {}, {}, {}
hmin_5y_months_dict = {}
for col in period_df.columns:
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
aaw, auw, heal = heal_index_metrics(period_df[col])
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
if col in five_year_df.columns:
_, h_months_5y = h_min_100(five_year_df[col], month_len=21)
else:
h_months_5y = np.nan
hmin_5y_months_dict[col] = h_months_5y
asset_metrics_df = (
pd.DataFrame({
'ISIN': period_df.columns,
'Rendimento_Ann': asset_ann_return.reindex(period_df.columns).values,
'Volatilita_Ann': asset_ann_vol.reindex(period_df.columns).values,
'CAGR': asset_cagr.reindex(period_df.columns).values,
'R2_Equity': asset_r2.reindex(period_df.columns).values,
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
})
.merge(df[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
[['ISIN', 'Nome', 'Categoria', 'Asset Class',
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']]
.sort_values('ISIN', kind='stable')
.reset_index(drop=True)
)
per_asset_metrics[name] = asset_metrics_df
# ---------- OTTIMIZZAZIONE ----------
ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix)
# Vincoli PesoFisso / PesoMax
for _, row in df.iterrows():
isin_i = row['ISIN']
if isin_i in period_df.columns:
idx = period_df.columns.get_loc(isin_i)
pf = row.get('PesoFisso')
pm = row.get('PesoMax')
if pd.notna(pf):
ef.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val)
elif pd.notna(pm):
ef.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw)
# Vincoli per Categoria
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
for cat, maxw in categories_limits.items():
isin_list = df[df['Categoria'] == cat]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# Vincoli per Asset Class
for ac, maxw in asset_class_limits.items():
isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# ---------- Risoluzione ----------
try:
ef.efficient_risk(target_volatility=target_vol)
weights = ef.clean_weights()
optimized_weights[name] = pd.Series(weights)
exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=riskfree_rate)
print(f"=== Ottimizzazione: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
# --- Beneficio di diversificazione ---
w_vec_tmp = np.array([weights.get(isin, 0) for isin in period_df.columns])
indiv_ann_vols = np.sqrt(np.diag(annual_covariance_matrix.loc[period_df.columns, period_df.columns].values))
weighted_avg_vol = float(np.dot(w_vec_tmp, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
# --- File Excel per import gestionale (uno per portafoglio) ---
template_cols = list(template_df.columns)
results_rows = []
for isin, weight in weights.items():
if weight > 0:
r_sel = df.loc[df['ISIN'] == isin]
codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else ""
nome = r_sel['Nome'].iloc[0] if not r_sel.empty else ""
row = {col: "" for col in template_cols}
row['cod_por'] = f'PTFOPT{name}'
row['cod_tit'] = codice_titolo
row['des_tit'] = nome
row['peso'] = float(weight * 99)
results_rows.append(row)
results_full_df = pd.DataFrame(results_rows, columns=template_cols)
if results_full_df.empty:
output_df = template_df.iloc[0:0].copy()
else:
output_df = results_full_df.reindex(columns=template_cols)
export_rows.append(output_df)
# --- Pie chart asset allocation: salva in Output senza mostrare ---
asset_allocations = {asset: 0 for asset in asset_class_limits}
for isin, weight in weights.items():
r_sel = df.loc[df['ISIN'] == isin]
if r_sel.empty:
continue
asset_class = r_sel['Asset Class'].iloc[0]
asset_allocations.setdefault(asset_class, 0)
asset_allocations[asset_class] += weight
total_alloc = sum(asset_allocations.values())
if total_alloc > 0:
plt.figure(figsize=(8, 6))
plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%')
plt.title(f'Asset Allocation for {name}')
pie_path = plot_path(f'Asset_Allocation_{name}.png')
plt.savefig(pie_path, dpi=150, bbox_inches='tight')
plt.close()
except OptimizationError as e:
print(f"Optimization failed for {name}: {e}")
optimized_weights[name] = pd.Series([0] * len(annual_returns_mean))
# =========================
# RIEPILOGO METRICHE (PORTAFOGLI PH1)
# =========================
summary_data = []
for (years, target_vol), name in volatility_targets.items():
if name in optimized_weights.columns:
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
# --- Regularizzazione covarianza per l'ottimizzatore ---
annual_covariance_matrix = regularize_covariance(annual_covariance_matrix)
w_series = optimized_weights[name].reindex(period_df.columns).fillna(0.0)
w_vec = w_series.values
port_returns = (period_df[period_df.columns] * w_series).sum(axis=1)
n_days = int(port_returns.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
port_ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
port_ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
port_cagr = (gross**(1.0 / years_elapsed) - 1.0) if (years_elapsed and years_elapsed > 0 and gross and gross > 0) else np.nan
port_r2 = r2_equity_line(port_returns)
port_maxdd, port_dddur, port_ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
port_aaw, port_auw, port_heal = heal_index_metrics(port_returns)
common_cols = [c for c in w_series.index if c in five_year_df.columns]
if len(common_cols) > 0:
w_5y = w_series.reindex(common_cols).fillna(0.0)
port_returns_5y = (five_year_df[common_cols] * w_5y).sum(axis=1)
_, port_hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
else:
port_hmin_5y_months = np.nan
exp_ret = float(np.dot(w_vec, annual_returns_mean.loc[period_df.columns].values))
cov_mat = annual_covariance_matrix.loc[period_df.columns, period_df.columns].values
exp_vol = float(np.sqrt(np.dot(w_vec, np.dot(cov_mat, w_vec))))
sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
indiv_ann_vols = np.sqrt(np.diag(cov_mat))
weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
diversification_ratio = weighted_avg_vol / exp_vol if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"=== Riepilogo: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
print(f"Diversification Ratio: {diversification_ratio:.3f}" if not np.isnan(diversification_ratio) else "Diversification Ratio: NaN")
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
summary_data.append({
"Portafoglio": name,
"Years": years,
"Target Vol": f"{target_vol:.2%}",
"Expected annual return": f"{exp_ret:.2%}",
"Annual volatility": f"{exp_vol:.2%}",
"Sharpe Ratio": f"{sharpe:.2f}",
"Beneficio di diversificazione": f"{diversification_benefit:.2%}" if not np.isnan(diversification_benefit) else "",
"Rendimento_Ann": f"{port_ann_return:.2%}" if pd.notna(port_ann_return) else "",
"Volatilita_Ann": f"{port_ann_vol:.2%}" if pd.notna(port_ann_vol) else "",
"CAGR": f"{port_cagr:.2%}" if pd.notna(port_cagr) else "",
"R2_Equity": round(port_r2, 3) if pd.notna(port_r2) else np.nan,
"MaxDD": f"{port_maxdd:.2%}" if pd.notna(port_maxdd) else "",
"DD_Duration_Max": int(port_dddur) if pd.notna(port_dddur) else "",
"TTR_from_MDD": int(port_ttr) if pd.notna(port_ttr) else "",
"AAW": float(port_aaw) if pd.notna(port_aaw) else np.nan,
"AUW": float(port_auw) if pd.notna(port_auw) else np.nan,
"Heal_Index": float(port_heal) if pd.notna(port_heal) else np.nan,
"H_min_100m_5Y": int(port_hmin_5y_months) if pd.notna(port_hmin_5y_months) else ""
})
# =========================
# EXPORT — (1) ASSET METRICS SOLO
# =========================
asset_metrics_path = excel_path('Asset metrics ITA.xlsx')
with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer:
for name, metrics_df in per_asset_metrics.items():
metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False)
consolidated = []
for name, metrics_df in per_asset_metrics.items():
tmp = metrics_df.copy()
tmp.insert(0, 'Periodo', name)
consolidated.append(tmp)
consolidated_df = pd.concat(consolidated, ignore_index=True) if consolidated else pd.DataFrame()
if not consolidated_df.empty:
consolidated_df.to_excel(writer, sheet_name='Metriche_Consolidate', index=False)
print(f"File '{asset_metrics_path}' creato con soli fogli Metriche_* e Metriche_Consolidate.")
# =========================
# COSTRUZIONE "WITH NAMES"
# =========================
optimized_weights_with_names = optimized_weights.copy()
optimized_weights_with_names['Nome ETF'] = [
df.loc[df['ISIN'] == isin, 'Nome'].values[0] if (df['ISIN'] == isin).any() else ""
for isin in optimized_weights.index
]
# =========================
# EXPORT — (2) RIEPILOGO PESI (SOLO PH1)
# =========================
summary_df = pd.DataFrame(summary_data)
summary_path = excel_path('Riepilogo pesi ITA.xlsx')
with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer:
optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True)
summary_df.to_excel(writer, sheet_name='Riepilogo', index=False)
print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'Riepilogo'.")
# =========================
# EXPORT UNICO PESI OTTIMIZZATI
# =========================
date_tag = datetime.now().strftime("%Y%m%d")
combined_path = excel_path(f"{date_tag} Pesi ottimizzati ITA.xlsx")
with pd.ExcelWriter(combined_path, engine='openpyxl', mode='w') as writer:
if export_rows:
combined_df = pd.concat([template_df] + export_rows, ignore_index=True)
else:
combined_df = template_df.copy()
combined_df.to_excel(writer, sheet_name='Pesi Ottimizzati', index=False)
print(f"Pesi ottimizzati salvati in un unico file/sheet: '{combined_path}'.")