diff --git a/Ottimizzatore Quant Best Europe.py b/Ottimizzatore Quant Best Europe.py new file mode 100644 index 0000000..502e0dd --- /dev/null +++ b/Ottimizzatore Quant Best Europe.py @@ -0,0 +1,711 @@ +# -*- coding: utf-8 -*- +""" +Ottimizzatore ITA +""" + +# ========================= +# IMPORT & PARAMETRI +# ========================= +import sys +import os +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +import yaml +import logging +from datetime import datetime + +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError + +from pypfopt import risk_models +from pypfopt.efficient_frontier import EfficientFrontier +from pypfopt.exceptions import OptimizationError + +# --------------------------------------------------- +# Patch PyPortfolioOpt: usa cp.psd_wrap sulla covarianza +# --------------------------------------------------- +import cvxpy as cp +from pypfopt import objective_functions as _obj + +def portfolio_variance_psdwrap(w, cov_matrix): + """ + Versione patchata di portfolio_variance: + usa cp.psd_wrap(cov_matrix) per evitare che CVXPY + faccia il controllo spettrale con ARPACK. + Comportamento identico all'originale, ma più robusto. + """ + variance = cp.quad_form(w, cp.psd_wrap(cov_matrix)) + return _obj._objective_value(w, variance) + +# Monkey patch globale: da qui in poi EfficientFrontier usa questa versione +_obj.portfolio_variance = portfolio_variance_psdwrap + + +# Cartelle di input/output/plot +OUTPUT_DIR = "Output" +INPUT_DIR = "Input" +PLOT_DIR = "Plot" +CONFIG_FILE = "config.yaml" + +os.makedirs(OUTPUT_DIR, exist_ok=True) +os.makedirs(INPUT_DIR, exist_ok=True) +os.makedirs(PLOT_DIR, exist_ok=True) +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +def excel_path(filename: str) -> str: + """Costruisce il percorso completo per un file Excel nella cartella di output.""" + return os.path.join(OUTPUT_DIR, filename) + +def plot_path(filename: str) -> str: + """Costruisce il percorso completo per un file PNG nella cartella Plot.""" + return os.path.join(PLOT_DIR, filename) + + +def load_targets_and_limits(config_file: str): + """Legge target di volatilità e limiti asset class dal file di configurazione (nessun fallback).""" + try: + with open(config_file, "r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) or {} + except FileNotFoundError: + logger.error("File di configurazione mancante: %s", config_file) + sys.exit(1) + + vt_cfg = cfg.get("volatility_targets", {}) + vt_list = [] + if isinstance(vt_cfg, dict): + vt_list = vt_cfg.get("default") or [] + elif isinstance(vt_cfg, list): + vt_list = vt_cfg + if not vt_list: + logger.error("Sezione 'volatility_targets' mancante o vuota nel file di configurazione.") + sys.exit(1) + + volatility_targets_local = { + (int(item["years"]), float(item["target_vol"])): item["name"] + for item in vt_list + if "years" in item and "target_vol" in item and "name" in item + } + if not volatility_targets_local: + logger.error("Nessun target di volatilita valido trovato in configurazione.") + sys.exit(1) + + asset_limits_cfg = cfg.get("asset_class_limits") or {} + if not asset_limits_cfg: + logger.error("Sezione 'asset_class_limits' mancante o vuota nel file di configurazione.") + sys.exit(1) + asset_class_limits_local = {k: float(v) for k, v in asset_limits_cfg.items()} + + return volatility_targets_local, asset_class_limits_local + + +def validate_universe(df_universe: pd.DataFrame): + """Verifica colonne obbligatorie e duplicati ISIN nel file universo.""" + required_cols = ['ISIN', 'Nome', 'Categoria', 'Asset Class'] + missing_cols = [c for c in required_cols if c not in df_universe.columns] + if missing_cols: + print(f"[warn] Colonne mancanti nel file universo: {', '.join(missing_cols)}") + dup_isin = df_universe['ISIN'][df_universe['ISIN'].duplicated()].unique().tolist() + if dup_isin: + print(f"[warn] ISIN duplicati nel file universo: {dup_isin}") + empty_isin = df_universe['ISIN'].isna().sum() + if empty_isin: + print(f"[warn] Righe con ISIN mancante nel file universo: {int(empty_isin)}") + + +def validate_returns_frame(df_returns: pd.DataFrame, threshold: float = 0.2): + """Avvisa se i rendimenti hanno molte celle NaN prima del riempimento.""" + if df_returns.empty: + print("[errore] Nessun dato di rendimento recuperato: final_df vuoto.") + sys.exit(1) + na_ratio = df_returns.isna().mean() + high_na = na_ratio[na_ratio > threshold] + if not high_na.empty: + cols = ", ".join([f"{c} ({v:.0%})" for c, v in high_na.items()]) + print(f"[warn] Colonne con >{threshold:.0%} di NaN prima del fill: {cols}") + +# --------------------------------- +# Utility per R^2 sull’equity line +# --------------------------------- +def r2_equity_line(returns: pd.Series) -> float: + """R^2 della regressione OLS di log(equity) sul tempo (con intercetta).""" + s = returns.dropna() + if s.size < 3: + return np.nan + equity = (1.0 + s).cumprod() + equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna() + if equity.size < 3: + return np.nan + y = np.log(equity.values) + if np.allclose(y.var(ddof=1), 0.0): + return 0.0 + x = np.arange(y.size, dtype=float) + X = np.column_stack([np.ones_like(x), x]) + beta, *_ = np.linalg.lstsq(X, y, rcond=None) + y_hat = X @ beta + ss_res = np.sum((y - y_hat) ** 2) + ss_tot = np.sum((y - y.mean()) ** 2) + r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan + if np.isnan(r2): + return np.nan + return float(np.clip(r2, 0.0, 1.0)) + +# --------------------------------- +# Utility per metriche di drawdown +# --------------------------------- +def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): + """ + Calcola: + - max_dd: profondità massima del drawdown (negativa o zero) + - max_dd_duration: durata massima (in giorni) di qualsiasi drawdown + - ttr_from_mdd: giorni dal minimo del Max DD al pieno recupero del picco precedente (sentinel se non recupera) + """ + s = returns.fillna(0.0).astype(float) + if s.size == 0: + return np.nan, np.nan, np.nan + + equity = (1.0 + s).cumprod() + if equity.size == 0: + return np.nan, np.nan, np.nan + + run_max = equity.cummax() + dd = equity / run_max - 1.0 + + # Max Drawdown (valore più negativo) + max_dd = float(dd.min()) if dd.size else np.nan + + # Durata massima di drawdown (giorni consecutivi sotto zero drawdown) + under_water = dd < 0 + if under_water.any(): + max_dd_duration = 0 + current = 0 + for flag in under_water.values: + if flag: + current += 1 + if current > max_dd_duration: + max_dd_duration = current + else: + current = 0 + else: + max_dd_duration = 0 + + # Time-to-Recovery dal Max DD + if dd.size: + trough_idx = int(np.argmin(dd.values)) + if trough_idx > 0: + peak_idx = int(np.argmax(equity.values[:trough_idx+1])) + peak_level = float(equity.values[peak_idx]) + rec_idx = None + for t in range(trough_idx + 1, equity.size): + if equity.values[t] >= peak_level: + rec_idx = t + break + if rec_idx is None: + ttr_from_mdd = sentinel_ttr # non recuperato + else: + ttr_from_mdd = rec_idx - trough_idx + else: + ttr_from_mdd = np.nan + else: + ttr_from_mdd = np.nan + + return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan) + +# --------------------------------- +# Utility per AAW, AUW e Heal Index +# --------------------------------- +def heal_index_metrics(returns: pd.Series): + """ + Calcola: + - AAW: area sopra acqua (run-up vs minimo cumulato) + - AUW: area sotto acqua (drawdown vs massimo cumulato) + - Heal Index: (AAW - AUW) / AUW + """ + s = returns.fillna(0.0).astype(float) + if s.size == 0: + return np.nan, np.nan, np.nan + + equity = (1.0 + s).cumprod() + if equity.size == 0: + return np.nan, np.nan, np.nan + + run_max = equity.cummax() + dd = equity / run_max - 1.0 + AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan + + run_min = equity.cummin() + ru = equity / run_min - 1.0 + AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan + + heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan + return AAW, AUW, heal + +# --------------------------------- +# Utility per H_min (100% finestre positive) +# --------------------------------- +def h_min_100(returns: pd.Series, month_len: int = 21): + """ + Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days + hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)). + """ + s = returns.dropna().astype(float) + n = s.size + if n == 0: + return np.nan, np.nan + + log1p = np.log1p(s.values) + csum = np.cumsum(log1p) + + def rolling_sum_k(k: int): + if k > n: + return np.array([]) + head = csum[k - 1:] + tail = np.concatenate(([0.0], csum[:-k])) + return head - tail + + for k in range(1, n + 1): + rs = rolling_sum_k(k) + if rs.size == 0: + break + roll_ret = np.exp(rs) - 1.0 + if np.all(roll_ret >= 0): + h_days = k + h_months = int(np.ceil(h_days / month_len)) + return h_days, h_months + + return np.nan, np.nan + +# --------------------------------- +# Utility per rendere PSD/robusta la covarianza +# --------------------------------- +def regularize_covariance(cov_df: pd.DataFrame, ridge_factor: float = 1e-6) -> pd.DataFrame: + """ + Rende la matrice di covarianza numericamente piu' robusta: + - la simmetrizza + - aggiunge un piccolo termine di ridge sulla diagonale + Ritorna un nuovo DataFrame con stessa index/columns. + """ + if cov_df.empty: + return cov_df + + Sigma = cov_df.values.astype(float) + + # simmetrizza (elimina piccole asimmetrie numeriche) + Sigma = 0.5 * (Sigma + Sigma.T) + + # piccolo ridge proporzionato alla scala media delle varianze + n = Sigma.shape[0] + trace = np.trace(Sigma) + if np.isfinite(trace) and n > 0: + eps = ridge_factor * (trace / n) + else: + eps = ridge_factor + + Sigma_reg = Sigma + eps * np.eye(n) + + return pd.DataFrame(Sigma_reg, index=cov_df.index, columns=cov_df.columns) + + +# --- Lettura parametri dal file connection.txt --- +params = {} +with open("connection.txt", "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + key, value = line.split("=", 1) + params[key.strip()] = value.strip() + +username = params.get("username") +password = params.get("password") +host = params.get("host") +port = params.get("port", "1433") +database = params.get("database") + +connection_string = ( + f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}" + "?driver=ODBC+Driver+17+for+SQL+Server" +) + +print("Connection string letta correttamente") + +# ========================= +# CONNESSIONE AL DB +# ========================= +try: + engine = create_engine(connection_string) + with engine.connect() as connection: + _ = connection.execute(text("SELECT 1")) + print("Connessione al database riuscita.") +except SQLAlchemyError as e: + print("Errore durante la connessione al database:", e) + sys.exit() + +# ========================= +# INPUT / TEMPLATE +# ========================= +template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls') +template_df = pd.read_excel(template_path) + +file_path = os.path.join(INPUT_DIR, 'Universo per Quant Best Europe.xlsx') +df = pd.read_excel( + file_path, + usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'], + dtype={'Codice Titolo': str} +) +validate_universe(df) + +# ========================= +# SERIE STORICHE RENDIMENTI +# ========================= +end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1) +start_date = end_date - pd.DateOffset(years=5) +all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize() + +final_df = pd.DataFrame(index=all_dates) + +isin_from_db = set() +for isin in df['ISIN'].unique(): + print(f"Working on ISIN: {isin}") + procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = 1305, @PtfCurr = EUR" + try: + temp_df = pd.read_sql_query(procedure_call, engine) + if temp_df.empty: + print(f"Nessun dato recuperato per {isin}, skipping...") + continue + temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d', errors='coerce').dt.normalize() + temp_df = temp_df.dropna(subset=['Px_Date']) + temp_df.set_index('Px_Date', inplace=True) + temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100 + final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates) + isin_from_db.add(isin) + print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.") + except SQLAlchemyError as e: + print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e) + +validate_returns_frame(final_df) +final_df.fillna(0, inplace=True) + +# -------- H_min sempre su 5 anni (21 gg = 1 mese) -------- +five_year_df = final_df.loc[end_date - pd.DateOffset(years=5): end_date] + +# ========================= +# CONFIGURAZIONE OBIETTIVI +# ========================= +volatility_targets, asset_class_limits = load_targets_and_limits(CONFIG_FILE) +days_per_year = 252 +riskfree_rate = 0.02 + +# ========================= +# LOOP OTTIMIZZAZIONI (PH1 tradizionale) +# ========================= +optimized_weights = pd.DataFrame() +per_asset_metrics = {} +export_rows = [] +target_portfolio_name = "VAR6_3Y" +output_portfolio_code = "Quant BE" + +for (years, target_vol), name in volatility_targets.items(): + period_start_date = end_date - pd.DateOffset(years=years) + period_df = final_df.loc[period_start_date:end_date] + + daily_returns_mean = period_df.mean() + annual_returns_mean = daily_returns_mean * days_per_year + annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True) + # --- Regularizzazione covarianza per l'ottimizzatore --- + annual_covariance_matrix = regularize_covariance(annual_covariance_matrix) + + + # ---------- PER-ASSET METRICS ---------- + n_days = int(period_df.shape[0]) + years_elapsed = n_days / days_per_year if n_days > 0 else np.nan + + asset_ann_return = daily_returns_mean * days_per_year + asset_ann_vol = period_df.std(ddof=1) * np.sqrt(days_per_year) + + gross = (1.0 + period_df).prod(skipna=True) + asset_cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns) + + asset_r2 = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns) + + maxdd_dict, dddur_dict, ttr_dict = {}, {}, {} + aaw_dict, auw_dict, heal_dict = {}, {}, {} + hmin_5y_months_dict = {} + + for col in period_df.columns: + mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250) + maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr + aaw, auw, heal = heal_index_metrics(period_df[col]) + aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal + if col in five_year_df.columns: + _, h_months_5y = h_min_100(five_year_df[col], month_len=21) + else: + h_months_5y = np.nan + hmin_5y_months_dict[col] = h_months_5y + + asset_metrics_df = ( + pd.DataFrame({ + 'ISIN': period_df.columns, + 'Rendimento_Ann': asset_ann_return.reindex(period_df.columns).values, + 'Volatilita_Ann': asset_ann_vol.reindex(period_df.columns).values, + 'CAGR': asset_cagr.reindex(period_df.columns).values, + 'R2_Equity': asset_r2.reindex(period_df.columns).values, + 'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values, + 'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values, + 'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values, + 'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values, + 'AUW': pd.Series(auw_dict).reindex(period_df.columns).values, + 'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values, + 'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values + }) + .merge(df[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left') + [['ISIN', 'Nome', 'Categoria', 'Asset Class', + 'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity', + 'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD', + 'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']] + .sort_values('ISIN', kind='stable') + .reset_index(drop=True) + ) + per_asset_metrics[name] = asset_metrics_df + + # ---------- OTTIMIZZAZIONE ---------- + ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix) + + # Vincoli PesoFisso / PesoMax + for _, row in df.iterrows(): + isin_i = row['ISIN'] + if isin_i in period_df.columns: + idx = period_df.columns.get_loc(isin_i) + pf = row.get('PesoFisso') + pm = row.get('PesoMax') + if pd.notna(pf): + ef.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val) + elif pd.notna(pm): + ef.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw) + + # Vincoli per Categoria + categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict() + for cat, maxw in categories_limits.items(): + isin_list = df[df['Categoria'] == cat]['ISIN'].tolist() + idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns] + if idxs: + ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw) + + # Vincoli per Asset Class + for ac, maxw in asset_class_limits.items(): + isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist() + idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns] + if idxs: + ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw) + + # ---------- Risoluzione ---------- + try: + ef.efficient_risk(target_volatility=target_vol) + weights = ef.clean_weights() + optimized_weights[name] = pd.Series(weights) + exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=riskfree_rate) + + print(f"=== Ottimizzazione: {name} (anni={years}, target_vol={target_vol}) ===") + print(f"Expected annual return: {exp_ret:.2%}") + print(f"Annual volatility: {exp_vol:.2%}") + print(f"Sharpe Ratio: {sharpe:.2f}") + + # --- Beneficio di diversificazione --- + w_vec_tmp = np.array([weights.get(isin, 0) for isin in period_df.columns]) + indiv_ann_vols = np.sqrt(np.diag(annual_covariance_matrix.loc[period_df.columns, period_df.columns].values)) + weighted_avg_vol = float(np.dot(w_vec_tmp, indiv_ann_vols)) + diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan + print(f"Beneficio di diversificazione: {diversification_benefit:.2%}") + + # --- File Excel per import gestionale (uno per portafoglio) --- + template_cols = list(template_df.columns) + results_rows = [] + for isin, weight in weights.items(): + if weight > 0: + r_sel = df.loc[df['ISIN'] == isin] + codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else "" + nome = r_sel['Nome'].iloc[0] if not r_sel.empty else "" + row = {col: "" for col in template_cols} + row['cod_por'] = output_portfolio_code + row['cod_tit'] = isin + row['des_tit'] = nome + row['peso'] = float(weight * 99) + results_rows.append(row) + + results_full_df = pd.DataFrame(results_rows, columns=template_cols) + if results_full_df.empty: + output_df = template_df.iloc[0:0].copy() + else: + output_df = results_full_df.reindex(columns=template_cols) + if name == target_portfolio_name: + export_rows.append(output_df) + + # --- Pie chart asset allocation: salva in Output senza mostrare --- + asset_allocations = {asset: 0 for asset in asset_class_limits} + for isin, weight in weights.items(): + r_sel = df.loc[df['ISIN'] == isin] + if r_sel.empty: + continue + asset_class = r_sel['Asset Class'].iloc[0] + asset_allocations.setdefault(asset_class, 0) + asset_allocations[asset_class] += weight + + total_alloc = sum(asset_allocations.values()) + if total_alloc > 0: + plt.figure(figsize=(8, 6)) + plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%') + plt.title(f'Asset Allocation for {name}') + pie_path = plot_path(f'Asset_Allocation_{name}.png') + plt.savefig(pie_path, dpi=150, bbox_inches='tight') + plt.close() + + except OptimizationError as e: + print(f"Optimization failed for {name}: {e}") + optimized_weights[name] = pd.Series([0] * len(annual_returns_mean)) + +# ========================= +# RIEPILOGO METRICHE (PORTAFOGLI PH1) +# ========================= +summary_data = [] +for (years, target_vol), name in volatility_targets.items(): + if name in optimized_weights.columns: + period_start_date = end_date - pd.DateOffset(years=years) + period_df = final_df.loc[period_start_date:end_date] + + daily_returns_mean = period_df.mean() + annual_returns_mean = daily_returns_mean * days_per_year + annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True) + + # --- Regularizzazione covarianza per l'ottimizzatore --- + annual_covariance_matrix = regularize_covariance(annual_covariance_matrix) + + w_series = optimized_weights[name].reindex(period_df.columns).fillna(0.0) + w_vec = w_series.values + + port_returns = (period_df[period_df.columns] * w_series).sum(axis=1) + n_days = int(port_returns.shape[0]) + years_elapsed = n_days / days_per_year if n_days > 0 else np.nan + port_ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan + port_ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan + gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan + port_cagr = (gross**(1.0 / years_elapsed) - 1.0) if (years_elapsed and years_elapsed > 0 and gross and gross > 0) else np.nan + + port_r2 = r2_equity_line(port_returns) + port_maxdd, port_dddur, port_ttr = drawdown_metrics(port_returns, sentinel_ttr=1250) + port_aaw, port_auw, port_heal = heal_index_metrics(port_returns) + + common_cols = [c for c in w_series.index if c in five_year_df.columns] + if len(common_cols) > 0: + w_5y = w_series.reindex(common_cols).fillna(0.0) + port_returns_5y = (five_year_df[common_cols] * w_5y).sum(axis=1) + _, port_hmin_5y_months = h_min_100(port_returns_5y, month_len=21) + else: + port_hmin_5y_months = np.nan + + exp_ret = float(np.dot(w_vec, annual_returns_mean.loc[period_df.columns].values)) + cov_mat = annual_covariance_matrix.loc[period_df.columns, period_df.columns].values + exp_vol = float(np.sqrt(np.dot(w_vec, np.dot(cov_mat, w_vec)))) + sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan + + indiv_ann_vols = np.sqrt(np.diag(cov_mat)) + weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols)) + diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan + diversification_ratio = weighted_avg_vol / exp_vol if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan + + print(f"=== Riepilogo: {name} (anni={years}, target_vol={target_vol}) ===") + print(f"Expected annual return: {exp_ret:.2%}") + print(f"Annual volatility: {exp_vol:.2%}") + print(f"Sharpe Ratio: {sharpe:.2f}") + print(f"Diversification Ratio: {diversification_ratio:.3f}" if not np.isnan(diversification_ratio) else "Diversification Ratio: NaN") + print(f"Beneficio di diversificazione: {diversification_benefit:.2%}") + + summary_data.append({ + "Portafoglio": name, + "Years": years, + "Target Vol": f"{target_vol:.2%}", + "Expected annual return": f"{exp_ret:.2%}", + "Annual volatility": f"{exp_vol:.2%}", + "Sharpe Ratio": f"{sharpe:.2f}", + "Beneficio di diversificazione": f"{diversification_benefit:.2%}" if not np.isnan(diversification_benefit) else "", + "Rendimento_Ann": f"{port_ann_return:.2%}" if pd.notna(port_ann_return) else "", + "Volatilita_Ann": f"{port_ann_vol:.2%}" if pd.notna(port_ann_vol) else "", + "CAGR": f"{port_cagr:.2%}" if pd.notna(port_cagr) else "", + "R2_Equity": round(port_r2, 3) if pd.notna(port_r2) else np.nan, + "MaxDD": f"{port_maxdd:.2%}" if pd.notna(port_maxdd) else "", + "DD_Duration_Max": int(port_dddur) if pd.notna(port_dddur) else "", + "TTR_from_MDD": int(port_ttr) if pd.notna(port_ttr) else "", + "AAW": float(port_aaw) if pd.notna(port_aaw) else np.nan, + "AUW": float(port_auw) if pd.notna(port_auw) else np.nan, + "Heal_Index": float(port_heal) if pd.notna(port_heal) else np.nan, + "H_min_100m_5Y": int(port_hmin_5y_months) if pd.notna(port_hmin_5y_months) else "" + }) + +# ========================= +# EXPORT — (1) ASSET METRICS SOLO +# ========================= +asset_metrics_path = excel_path('Asset metrics Quant Best Europe.xlsx') +with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer: + for name, metrics_df in per_asset_metrics.items(): + metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False) + + consolidated = [] + for name, metrics_df in per_asset_metrics.items(): + tmp = metrics_df.copy() + tmp.insert(0, 'Periodo', name) + consolidated.append(tmp) + consolidated_df = pd.concat(consolidated, ignore_index=True) if consolidated else pd.DataFrame() + if not consolidated_df.empty: + consolidated_df.to_excel(writer, sheet_name='Metriche_Consolidate', index=False) + +print(f"File '{asset_metrics_path}' creato con soli fogli Metriche_* e Metriche_Consolidate.") + +# ========================= +# COSTRUZIONE "WITH NAMES" +# ========================= +optimized_weights_with_names = optimized_weights.copy() +optimized_weights_with_names['Nome ETF'] = [ + df.loc[df['ISIN'] == isin, 'Nome'].values[0] if (df['ISIN'] == isin).any() else "" + for isin in optimized_weights.index +] + +# ========================= +# EXPORT — (2) RIEPILOGO PESI (SOLO PH1) +# ========================= +summary_df = pd.DataFrame(summary_data) + +summary_path = excel_path('Riepilogo pesi Quant Best Europe.xlsx') +with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer: + optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True) + summary_df.to_excel(writer, sheet_name='Riepilogo', index=False) + +print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'Riepilogo'.") + +# ========================= +# EXPORT UNICO PESI OTTIMIZZATI +# ========================= +date_tag = datetime.now().strftime("%Y%m%d") +combined_path = excel_path(f"{date_tag} Pesi ottimizzati Quant Best Europe.xlsx") +with pd.ExcelWriter(combined_path, engine='openpyxl', mode='w') as writer: + if export_rows: + combined_df = pd.concat([template_df] + export_rows, ignore_index=True) + else: + combined_df = template_df.copy() + combined_df = combined_df.drop( + columns=[ + 'des_por', + 'cod_div_tit', + 'cod_isin', + 'cod_bloomberg', + 'cod_esterno', + 'cod_tit1', + 'cod_tit2', + 'cod_tit3', + 'divisore', + 'moltiplicatore', + 'leva', + ], + errors='ignore', + ) + combined_df = combined_df.rename(columns={'cod_tit': 'ISIN'}) + combined_df.to_excel(writer, sheet_name='Quant BE', index=False) +print(f"Pesi ottimizzati salvati in un unico file/sheet: '{combined_path}'.")