From a05ccba8bb2fdccf9ca4e161500908f5182e977a Mon Sep 17 00:00:00 2001 From: fredmaloggia Date: Wed, 26 Nov 2025 15:01:27 +0100 Subject: [PATCH] unificato output pesi su unico excel --- Ottimizzatore Full.py | 774 ------------------ Ottimizzatore Lite.py => Ottimizzatore ITA.py | 29 +- ...izzatore Lite UK.py => Ottimizzatore UK.py | 24 +- 3 files changed, 37 insertions(+), 790 deletions(-) delete mode 100644 Ottimizzatore Full.py rename Ottimizzatore Lite.py => Ottimizzatore ITA.py (96%) rename Ottimizzatore Lite UK.py => Ottimizzatore UK.py (91%) diff --git a/Ottimizzatore Full.py b/Ottimizzatore Full.py deleted file mode 100644 index b225cf9..0000000 --- a/Ottimizzatore Full.py +++ /dev/null @@ -1,774 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on 22 Oct 2025 - -@author: Federico -""" - -# ========================= -# IMPORT & PARAMETRI -# ========================= -import sys -import os -import numpy as np -import pandas as pd -import matplotlib.pyplot as plt -import yaml -import logging - -from sqlalchemy import create_engine, text -from sqlalchemy.exc import SQLAlchemyError - -from pypfopt import risk_models -from pypfopt.efficient_frontier import EfficientFrontier -from pypfopt.exceptions import OptimizationError - -# Cartelle di input/output/plot -OUTPUT_DIR = "Output" -PLOT_DIR = "Plot" -INPUT_DIR = "Input" -CONFIG_FILE = "config.yaml" - -os.makedirs(OUTPUT_DIR, exist_ok=True) -os.makedirs(PLOT_DIR, exist_ok=True) -os.makedirs(INPUT_DIR, exist_ok=True) -logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") -logger = logging.getLogger(__name__) - -def excel_path(filename: str) -> str: - """Costruisce il percorso completo per un file Excel nella cartella di output.""" - return os.path.join(OUTPUT_DIR, filename) - -def plot_path(filename: str) -> str: - """Costruisce il percorso completo per un file PNG nella cartella Plot.""" - return os.path.join(PLOT_DIR, filename) - -DEFAULT_VOL_TARGETS = [ - {"years": 5, "target_vol": 0.06, "name": "VAR3_5Y"}, - {"years": 1, "target_vol": 0.12, "name": "VAR6_1Y"}, - {"years": 3, "target_vol": 0.12, "name": "VAR6_3Y"}, - {"years": 5, "target_vol": 0.12, "name": "VAR6_5Y"}, - {"years": 5, "target_vol": 0.18, "name": "VAR9_5Y"}, -] - -DEFAULT_ASSET_CLASS_LIMITS = { - 'Azionari': 0.75, 'Obbligazionari': 0.75, - 'Metalli Preziosi': 0.20, 'Materie Prime': 0.05, - 'Immobiliare': 0.05, 'Criptovalute': 0.05, 'Monetari': 0.1 -} - - -def load_targets_and_limits(config_file: str): - """Legge target di volatilità e limiti asset class dal file di configurazione.""" - try: - with open(config_file, "r", encoding="utf-8") as f: - cfg = yaml.safe_load(f) or {} - except FileNotFoundError: - logger.error("File di configurazione mancante: %s", config_file) - sys.exit(1) - - vt_cfg = cfg.get("volatility_targets", {}) - vt_list = [] - if isinstance(vt_cfg, dict): - vt_list = vt_cfg.get("default") or [] - elif isinstance(vt_cfg, list): - vt_list = vt_cfg - if not vt_list: - logger.error("Sezione 'volatility_targets' mancante o vuota nel file di configurazione.") - sys.exit(1) - - volatility_targets_local = { - (int(item["years"]), float(item["target_vol"])): item["name"] - for item in vt_list - if "years" in item and "target_vol" in item and "name" in item - } - if not volatility_targets_local: - logger.error("Nessun target di volatilita valido trovato in configurazione.") - sys.exit(1) - - asset_limits_cfg = cfg.get("asset_class_limits") or {} - if not asset_limits_cfg: - logger.error("Sezione 'asset_class_limits' mancante o vuota nel file di configurazione.") - sys.exit(1) - asset_class_limits_local = {k: float(v) for k, v in asset_limits_cfg.items()} - - return volatility_targets_local, asset_class_limits_local - - -def validate_universe(df_universe: pd.DataFrame): - """Verifica colonne obbligatorie e duplicati ISIN nel file universo.""" - required_cols = ['ISIN', 'Nome', 'Categoria', 'Asset Class'] - missing_cols = [c for c in required_cols if c not in df_universe.columns] - if missing_cols: - logger.error("Colonne mancanti nel file universo: %s", ", ".join(missing_cols)) - sys.exit(1) - dup_isin = df_universe['ISIN'][df_universe['ISIN'].duplicated()].unique().tolist() - if dup_isin: - logger.warning("ISIN duplicati nel file universo: %s", dup_isin) - empty_isin = df_universe['ISIN'].isna().sum() - if empty_isin: - logger.warning("Righe con ISIN mancante nel file universo: %d", int(empty_isin)) - - -def validate_returns_frame(df_returns: pd.DataFrame, threshold: float = 0.2): - """Avvisa se i rendimenti hanno molte celle NaN prima del riempimento.""" - if df_returns.empty: - logger.error("Nessun dato di rendimento recuperato: final_df vuoto.") - sys.exit(1) - na_ratio = df_returns.isna().mean() - high_na = na_ratio[na_ratio > threshold] - if not high_na.empty: - logger.warning( - "Colonne con >%.0f%% di NaN prima del fill: %s", - threshold * 100, - ", ".join([f"{c} ({v:.0%})" for c, v in high_na.items()]) - ) - -# --------------------------------- -# Utility per R^2 sull’equity line -# --------------------------------- -def r2_equity_line(returns: pd.Series) -> float: - """R^2 della regressione OLS di log(equity) sul tempo (con intercetta).""" - s = returns.dropna() - if s.size < 3: - return np.nan - equity = (1.0 + s).cumprod() - equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna() - if equity.size < 3: - return np.nan - y = np.log(equity.values) - if np.allclose(y.var(ddof=1), 0.0): - return 0.0 - x = np.arange(y.size, dtype=float) - X = np.column_stack([np.ones_like(x), x]) - beta, *_ = np.linalg.lstsq(X, y, rcond=None) - y_hat = X @ beta - ss_res = np.sum((y - y_hat) ** 2) - ss_tot = np.sum((y - y.mean()) ** 2) - r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan - if np.isnan(r2): - return np.nan - return float(np.clip(r2, 0.0, 1.0)) - -# --------------------------------- -# Utility per metriche di drawdown -# --------------------------------- -def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250): - """ - Calcola: - - max_dd: profondità massima del drawdown (negativa o zero) - - max_dd_duration: durata massima (in giorni) di qualsiasi drawdown - - ttr_from_mdd: giorni dal minimo del Max DD al pieno recupero del picco precedente (sentinel se non recupera) - """ - s = returns.fillna(0.0).astype(float) - if s.size == 0: - return np.nan, np.nan, np.nan - - equity = (1.0 + s).cumprod() - if equity.size == 0: - return np.nan, np.nan, np.nan - - run_max = equity.cummax() - dd = equity / run_max - 1.0 - - # Max Drawdown (valore più negativo) - max_dd = float(dd.min()) if dd.size else np.nan - - # Durata massima di drawdown (giorni consecutivi sotto zero drawdown) - under_water = dd < 0 - if under_water.any(): - max_dd_duration = 0 - current = 0 - for flag in under_water.values: - if flag: - current += 1 - if current > max_dd_duration: - max_dd_duration = current - else: - current = 0 - else: - max_dd_duration = 0 - - # Time-to-Recovery dal Max DD - if dd.size: - trough_idx = int(np.argmin(dd.values)) - if trough_idx > 0: - peak_idx = int(np.argmax(equity.values[:trough_idx+1])) - peak_level = float(equity.values[peak_idx]) - rec_idx = None - for t in range(trough_idx + 1, equity.size): - if equity.values[t] >= peak_level: - rec_idx = t - break - if rec_idx is None: - ttr_from_mdd = sentinel_ttr # non recuperato - else: - ttr_from_mdd = rec_idx - trough_idx - else: - ttr_from_mdd = np.nan - else: - ttr_from_mdd = np.nan - - return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan) - -# --------------------------------- -# Utility per AAW, AUW e Heal Index -# --------------------------------- -def heal_index_metrics(returns: pd.Series): - """ - Calcola: - - AAW: area sopra acqua (run-up vs minimo cumulato) - - AUW: area sotto acqua (drawdown vs massimo cumulato) - - Heal Index: (AAW - AUW) / AUW - """ - s = returns.fillna(0.0).astype(float) - if s.size == 0: - return np.nan, np.nan, np.nan - - equity = (1.0 + s).cumprod() - if equity.size == 0: - return np.nan, np.nan, np.nan - - run_max = equity.cummax() - dd = equity / run_max - 1.0 - AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan - - run_min = equity.cummin() - ru = equity / run_min - 1.0 - AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan - - heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan - return AAW, AUW, heal - -# --------------------------------- -# Utility per H_min (100% finestre positive) -# --------------------------------- -def h_min_100(returns: pd.Series, month_len: int = 21): - """ - Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days - hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)). - """ - s = returns.dropna().astype(float) - n = s.size - if n == 0: - return np.nan, np.nan - - log1p = np.log1p(s.values) - csum = np.cumsum(log1p) - - def rolling_sum_k(k: int): - if k > n: - return np.array([]) - head = csum[k - 1:] - tail = np.concatenate(([0.0], csum[:-k])) - return head - tail - - for k in range(1, n + 1): - rs = rolling_sum_k(k) - if rs.size == 0: - break - roll_ret = np.exp(rs) - 1.0 - if np.all(roll_ret >= 0): - h_days = k - h_months = int(np.ceil(h_days / month_len)) - return h_days, h_months - - return np.nan, np.nan - -# --------------------------------- -# Utility di serie portafoglio e metriche path-based -# --------------------------------- -def portfolio_series_from_weights(period_df: pd.DataFrame, w: np.ndarray, cols: list) -> pd.Series: - w_series = pd.Series(w, index=cols) - return (period_df[cols] * w_series).sum(axis=1) - -def portfolio_path_metrics(period_df: pd.DataFrame, - five_year_df: pd.DataFrame, - w: np.ndarray, - cols: list, - days_per_year: int) -> dict: - """Metriche path-based del portafoglio su period_df + H_min_100m su 5Y.""" - w = np.asarray(w, dtype=float) - cols = list(cols) - - port_returns = portfolio_series_from_weights(period_df, w, cols) - - n_days = int(port_returns.shape[0]) - years_elapsed = n_days / days_per_year if n_days > 0 else np.nan - - ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan - ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan - - gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan - if years_elapsed and years_elapsed > 0 and gross and gross > 0: - cagr = gross**(1.0 / years_elapsed) - 1.0 - else: - cagr = np.nan - - r2 = r2_equity_line(port_returns) - maxdd, dddur, ttr = drawdown_metrics(port_returns, sentinel_ttr=1250) - aaw, auw, heal = heal_index_metrics(port_returns) - - common_cols = [c for c in cols if c in five_year_df.columns] - if len(common_cols) > 0: - w5 = pd.Series(w, index=cols).reindex(common_cols).fillna(0.0).values - port_returns_5y = portfolio_series_from_weights(five_year_df, w5, common_cols) - _, hmin_5y_months = h_min_100(port_returns_5y, month_len=21) - else: - hmin_5y_months = np.nan - - return { - "AnnReturn": ann_return, - "AnnVol": ann_vol, - "CAGR": cagr, - "R2": r2, - "MaxDD": maxdd, - "DD_Duration": dddur, - "TTR": ttr, - "AAW": aaw, - "AUW": auw, - "Heal": heal, - "Hmin_100m_5Y": hmin_5y_months - } - -# --- Lettura parametri dal file connection.txt --- -params = {} -with open("connection.txt", "r") as f: - for line in f: - line = line.strip() - if line and not line.startswith("#"): - key, value = line.split("=", 1) - params[key.strip()] = value.strip() - -username = params.get("username") -password = params.get("password") -host = params.get("host") -port = params.get("port", "1433") -database = params.get("database") - -connection_string = ( - f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}" - "?driver=ODBC+Driver+17+for+SQL+Server" -) - -print("Connection string letta correttamente") - -# ========================= -# CONNESSIONE AL DB -# ========================= -try: - engine = create_engine(connection_string) - with engine.connect() as connection: - _ = connection.execute(text("SELECT 1")) - print("Connessione al database riuscita.") -except SQLAlchemyError as e: - print("Errore durante la connessione al database:", e) - sys.exit() - -# ========================= -# INPUT / TEMPLATE -# ========================= -template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls') -template_df = pd.read_excel(template_path) - -file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore v.2.4.xlsx') -df = pd.read_excel( - file_path, - usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'], - dtype={'Codice Titolo': str} -) -validate_universe(df) - -# ========================= -# SERIE STORICHE RENDIMENTI -# ========================= -end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1) -start_date = end_date - pd.DateOffset(years=5) -all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize() - -final_df = pd.DataFrame(index=all_dates) - -isin_from_db = set() -for isin in df['ISIN'].unique(): - print(f"Working on ISIN: {isin}") - procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = 1305, @PtfCurr = EUR" - try: - temp_df = pd.read_sql_query(procedure_call, engine) - if temp_df.empty: - print(f"Nessun dato recuperato per {isin}, skipping...") - continue - temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d', errors='coerce').dt.normalize() - temp_df = temp_df.dropna(subset=['Px_Date']) - temp_df.set_index('Px_Date', inplace=True) - temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100 - final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates) - isin_from_db.add(isin) - print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.") - except SQLAlchemyError as e: - print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e) - -validate_returns_frame(final_df) -final_df.fillna(0, inplace=True) - -# -------- H_min sempre su 5 anni (21 gg = 1 mese) -------- -five_year_df = final_df.loc[end_date - pd.DateOffset(years=5): end_date] - -# ========================= -# CONFIGURAZIONE OBIETTIVI -# ========================= -volatility_targets, asset_class_limits = load_targets_and_limits(CONFIG_FILE) -days_per_year = 252 -riskfree_rate = 0.02 - -# ========================= -# LOOP OTTIMIZZAZIONI (PH1 tradizionale) -# ========================= -optimized_weights = pd.DataFrame() -per_asset_metrics = {} - -for (years, target_vol), name in volatility_targets.items(): - period_start_date = end_date - pd.DateOffset(years=years) - period_df = final_df.loc[period_start_date:end_date] - - daily_returns_mean = period_df.mean() - annual_returns_mean = daily_returns_mean * days_per_year - annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True) - - # ---------- PER-ASSET METRICS ---------- - n_days = int(period_df.shape[0]) - years_elapsed = n_days / days_per_year if n_days > 0 else np.nan - - asset_ann_return = daily_returns_mean * days_per_year - asset_ann_vol = period_df.std(ddof=1) * np.sqrt(days_per_year) - - gross = (1.0 + period_df).prod(skipna=True) - asset_cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns) - - asset_r2 = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns) - - maxdd_dict, dddur_dict, ttr_dict = {}, {}, {} - aaw_dict, auw_dict, heal_dict = {}, {}, {} - hmin_5y_months_dict = {} - - for col in period_df.columns: - mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250) - maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr - aaw, auw, heal = heal_index_metrics(period_df[col]) - aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal - if col in five_year_df.columns: - _, h_months_5y = h_min_100(five_year_df[col], month_len=21) - else: - h_months_5y = np.nan - hmin_5y_months_dict[col] = h_months_5y - - asset_metrics_df = ( - pd.DataFrame({ - 'ISIN': period_df.columns, - 'Rendimento_Ann': asset_ann_return.reindex(period_df.columns).values, - 'Volatilita_Ann': asset_ann_vol.reindex(period_df.columns).values, - 'CAGR': asset_cagr.reindex(period_df.columns).values, - 'R2_Equity': asset_r2.reindex(period_df.columns).values, - 'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values, - 'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values, - 'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values, - 'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values, - 'AUW': pd.Series(auw_dict).reindex(period_df.columns).values, - 'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values, - 'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values - }) - .merge(df[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left') - [['ISIN', 'Nome', 'Categoria', 'Asset Class', - 'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity', - 'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD', - 'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']] - .sort_values('ISIN', kind='stable') - .reset_index(drop=True) - ) - per_asset_metrics[name] = asset_metrics_df - - # ---------- OTTIMIZZAZIONE ---------- - ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix) - - # Vincoli PesoFisso / PesoMax - for _, row in df.iterrows(): - isin_i = row['ISIN'] - if isin_i in period_df.columns: - idx = period_df.columns.get_loc(isin_i) - pf = row.get('PesoFisso') - pm = row.get('PesoMax') - if pd.notna(pf): - ef.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val) - elif pd.notna(pm): - ef.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw) - - # Vincoli per Categoria - categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict() - for cat, maxw in categories_limits.items(): - isin_list = df[df['Categoria'] == cat]['ISIN'].tolist() - idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns] - if idxs: - ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw) - - # Vincoli per Asset Class - for ac, maxw in asset_class_limits.items(): - isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist() - idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns] - if idxs: - ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw) - - # ---------- Risoluzione ---------- - try: - ef.efficient_risk(target_volatility=target_vol) - weights = ef.clean_weights() - optimized_weights[name] = pd.Series(weights) - exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=riskfree_rate) - - print(f"=== Ottimizzazione: {name} (anni={years}, target_vol={target_vol}) ===") - print(f"Expected annual return: {exp_ret:.2%}") - print(f"Annual volatility: {exp_vol:.2%}") - print(f"Sharpe Ratio: {sharpe:.2f}") - - # --- Beneficio di diversificazione --- - w_vec_tmp = np.array([weights.get(isin, 0) for isin in period_df.columns]) - indiv_ann_vols = np.sqrt(np.diag(annual_covariance_matrix.loc[period_df.columns, period_df.columns].values)) - weighted_avg_vol = float(np.dot(w_vec_tmp, indiv_ann_vols)) - diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan - print(f"Beneficio di diversificazione: {diversification_benefit:.2%}") - - # --- File Excel per import gestionale (uno per portafoglio) --- - template_cols = list(template_df.columns) - results_rows = [] - for isin, weight in weights.items(): - if weight > 0: - r_sel = df.loc[df['ISIN'] == isin] - codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else "" - nome = r_sel['Nome'].iloc[0] if not r_sel.empty else "" - row = {col: "" for col in template_cols} - row['cod_por'] = f'PTFOPT{name}' - row['cod_tit'] = codice_titolo - row['des_tit'] = nome - row['peso'] = float(weight * 99) - results_rows.append(row) - - results_full_df = pd.DataFrame(results_rows, columns=template_cols) - if results_full_df.empty: - output_df = template_df.iloc[0:0].copy() - else: - output_df = results_full_df.reindex(columns=template_cols) - output_file_path = excel_path(f'PTFOPT{name}.xlsx') - output_df.to_excel(output_file_path, index=False) - print(f"File {output_file_path} saved successfully.") - - # --- Pie chart asset allocation (se ci sono pesi > 0) --- - asset_allocations = {asset: 0 for asset in asset_class_limits} - for isin, weight in weights.items(): - r_sel = df.loc[df['ISIN'] == isin] - if r_sel.empty: - continue - asset_allocations.setdefault(r_sel['Asset Class'].iloc[0], 0) - asset_allocations[r_sel['Asset Class'].iloc[0]] += weight - - if sum(asset_allocations.values()) > 0: - plt.figure(figsize=(8, 6)) - plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%') - plt.title(f'Asset Allocation for {name}') - pie_path = plot_path(f'Asset_Allocation_{name}.png') - if os.path.exists(pie_path): - os.remove(pie_path) - plt.savefig(pie_path, dpi=150, bbox_inches='tight') - plt.close() - - except OptimizationError as e: - print(f"Optimization failed for {name}: {e}") - optimized_weights[name] = pd.Series([0] * len(annual_returns_mean)) - -# ========================= -# RIEPILOGO METRICHE (PORTAFOGLI PH1) -# ========================= -summary_data = [] -for (years, target_vol), name in volatility_targets.items(): - if name in optimized_weights.columns: - period_start_date = end_date - pd.DateOffset(years=years) - period_df = final_df.loc[period_start_date:end_date] - - daily_returns_mean = period_df.mean() - annual_returns_mean = daily_returns_mean * days_per_year - annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True) - - w_series = optimized_weights[name].reindex(period_df.columns).fillna(0.0) - w_vec = w_series.values - - port_returns = (period_df[period_df.columns] * w_series).sum(axis=1) - n_days = int(port_returns.shape[0]) - years_elapsed = n_days / days_per_year if n_days > 0 else np.nan - port_ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan - port_ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan - gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan - port_cagr = (gross**(1.0 / years_elapsed) - 1.0) if (years_elapsed and years_elapsed > 0 and gross and gross > 0) else np.nan - - port_r2 = r2_equity_line(port_returns) - port_maxdd, port_dddur, port_ttr = drawdown_metrics(port_returns, sentinel_ttr=1250) - port_aaw, port_auw, port_heal = heal_index_metrics(port_returns) - - common_cols = [c for c in w_series.index if c in five_year_df.columns] - if len(common_cols) > 0: - w_5y = w_series.reindex(common_cols).fillna(0.0) - port_returns_5y = (five_year_df[common_cols] * w_5y).sum(axis=1) - _, port_hmin_5y_months = h_min_100(port_returns_5y, month_len=21) - else: - port_hmin_5y_months = np.nan - - exp_ret = float(np.dot(w_vec, annual_returns_mean.loc[period_df.columns].values)) - cov_mat = annual_covariance_matrix.loc[period_df.columns, period_df.columns].values - exp_vol = float(np.sqrt(np.dot(w_vec, np.dot(cov_mat, w_vec)))) - sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan - - indiv_ann_vols = np.sqrt(np.diag(cov_mat)) - weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols)) - diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan - diversification_ratio = weighted_avg_vol / exp_vol if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan - - print(f"=== Riepilogo: {name} (anni={years}, target_vol={target_vol}) ===") - print(f"Expected annual return: {exp_ret:.2%}") - print(f"Annual volatility: {exp_vol:.2%}") - print(f"Sharpe Ratio: {sharpe:.2f}") - print(f"Diversification Ratio: {diversification_ratio:.3f}" if not np.isnan(diversification_ratio) else "Diversification Ratio: NaN") - print(f"Beneficio di diversificazione: {diversification_benefit:.2%}") - - summary_data.append({ - "Portafoglio": name, - "Years": years, - "Target Vol": f"{target_vol:.2%}", - "Expected annual return": f"{exp_ret:.2%}", - "Annual volatility": f"{exp_vol:.2%}", - "Sharpe Ratio": f"{sharpe:.2f}", - "Beneficio di diversificazione": f"{diversification_benefit:.2%}" if not np.isnan(diversification_benefit) else "", - "Rendimento_Ann": f"{port_ann_return:.2%}" if pd.notna(port_ann_return) else "", - "Volatilita_Ann": f"{port_ann_vol:.2%}" if pd.notna(port_ann_vol) else "", - "CAGR": f"{port_cagr:.2%}" if pd.notna(port_cagr) else "", - "R2_Equity": round(port_r2, 3) if pd.notna(port_r2) else np.nan, - "MaxDD": f"{port_maxdd:.2%}" if pd.notna(port_maxdd) else "", - "DD_Duration_Max": int(port_dddur) if pd.notna(port_dddur) else "", - "TTR_from_MDD": int(port_ttr) if pd.notna(port_ttr) else "", - "AAW": float(port_aaw) if pd.notna(port_aaw) else np.nan, - "AUW": float(port_auw) if pd.notna(port_auw) else np.nan, - "Heal_Index": float(port_heal) if pd.notna(port_heal) else np.nan, - "H_min_100m_5Y": int(port_hmin_5y_months) if pd.notna(port_hmin_5y_months) else "" - }) - -# ========================= -# PLOT EQUITY/UNDERWATER (PH1) -# ========================= -def plot_equity_overlay_all(port_names=None): - if port_names is None: - port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'] - period_start_date = end_date - pd.DateOffset(years=5) - period_df = final_df.loc[period_start_date:end_date] - available_cols = set(optimized_weights.columns) - plotted = 0 - plt.figure(figsize=(11, 6)) - for pname in port_names: - if pname not in available_cols: - print(f"[plot] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.") - continue - w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0) - port_returns = (period_df[w_series.index] * w_series).sum(axis=1) - equity = (1.0 + port_returns).cumprod() - plt.plot(equity.index, equity.values, label=pname) - plotted += 1 - if plotted == 0: - print("[plot] Nessun portafoglio valido da plottare.") - plt.close() - return - plt.title("Equity line - Portafogli ottimizzati (ultimi 5 anni)") - plt.xlabel("Data") - plt.ylabel("Equity (base=1.0)") - plt.grid(True, alpha=0.3) - plt.legend(loc="best") - plt.tight_layout() - out_png = plot_path("Equity_ALL_PORTS.png") - if os.path.exists(out_png): - os.remove(out_png) - plt.savefig(out_png, dpi=150, bbox_inches='tight') - plt.close() - print(f"[plot] Grafico sovrapposto salvato: {out_png}") - -def plot_underwater_overlay_all(port_names=None, ylim=(-0.3, 0.0)): - if port_names is None: - port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'] - period_start_date = end_date - pd.DateOffset(years=5) - period_df = final_df.loc[period_start_date:end_date] - available_cols = set(optimized_weights.columns) - plotted = 0 - plt.figure(figsize=(11, 6)) - for pname in port_names: - if pname not in available_cols: - print(f"[underwater] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.") - continue - w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0) - port_returns = (period_df[w_series.index] * w_series).sum(axis=1) - equity = (1.0 + port_returns).cumprod() - run_max = equity.cummax() - dd = equity / run_max - 1.0 - plt.plot(dd.index, dd.values, label=pname) - plotted += 1 - if plotted == 0: - print("[underwater] Nessun portafoglio valido da plottare.") - plt.close() - return - plt.title("Underwater (Drawdown) - Portafogli ottimizzati (ultimi 5 anni)") - plt.xlabel("Data") - plt.ylabel("Drawdown") - if ylim is not None: - plt.ylim(*ylim) - plt.grid(True, alpha=0.3) - plt.legend(loc="best") - plt.tight_layout() - out_png = plot_path("Underwater_ALL_PORTS.png") - if os.path.exists(out_png): - os.remove(out_png) - plt.savefig(out_png, dpi=150, bbox_inches='tight') - plt.close() - print(f"[underwater] Grafico sovrapposto salvato: {out_png}") - -plot_equity_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']) -plot_underwater_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']) - -# ========================= -# EXPORT - (1) ASSET METRICS SOLO -# ========================= -asset_metrics_path = excel_path('asset_metrics_v2.5.xlsx') -with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer: - for name, metrics_df in per_asset_metrics.items(): - metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False) - - consolidated = [] - for name, metrics_df in per_asset_metrics.items(): - tmp = metrics_df.copy() - tmp.insert(0, 'Periodo', name) - consolidated.append(tmp) - consolidated_df = pd.concat(consolidated, ignore_index=True) if consolidated else pd.DataFrame() - if not consolidated_df.empty: - consolidated_df.to_excel(writer, sheet_name='Metriche_Consolidate', index=False) - -print(f"File '{asset_metrics_path}' creato con soli fogli Metriche_* e Metriche_Consolidate.") - -# ========================= -# COSTRUZIONE "WITH NAMES" (solo PH1) -# ========================= -optimized_weights_with_names = optimized_weights.copy() -optimized_weights_with_names['Nome ETF'] = [ - df.loc[df['ISIN'] == isin, 'Nome'].values[0] if (df['ISIN'] == isin).any() else "" - for isin in optimized_weights.index -] - -# ========================= -# EXPORT - (2) RIEPILOGO PESI & METRICHE -# ========================= -summary_df = pd.DataFrame(summary_data) -summary_path = excel_path('optimized_weights_summary_v2.5.xlsx') -with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer: - optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True) - summary_df.to_excel(writer, sheet_name='metriche', index=False) - -print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'metriche'.") diff --git a/Ottimizzatore Lite.py b/Ottimizzatore ITA.py similarity index 96% rename from Ottimizzatore Lite.py rename to Ottimizzatore ITA.py index f95f504..c5a2cee 100644 --- a/Ottimizzatore Lite.py +++ b/Ottimizzatore ITA.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -Ottimizzatore v2.5.2 - VERSIONE LITE +Ottimizzatore Lite """ # ========================= @@ -13,6 +13,7 @@ import pandas as pd import matplotlib.pyplot as plt import yaml import logging +from datetime import datetime from sqlalchemy import create_engine, text from sqlalchemy.exc import SQLAlchemyError @@ -295,7 +296,7 @@ except SQLAlchemyError as e: template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls') template_df = pd.read_excel(template_path) -file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore v.2.4.xlsx') +file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore.xlsx') df = pd.read_excel( file_path, usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'], @@ -349,6 +350,7 @@ riskfree_rate = 0.02 # ========================= optimized_weights = pd.DataFrame() per_asset_metrics = {} +export_rows = [] for (years, target_vol), name in volatility_targets.items(): period_start_date = end_date - pd.DateOffset(years=years) @@ -479,9 +481,7 @@ for (years, target_vol), name in volatility_targets.items(): output_df = template_df.iloc[0:0].copy() else: output_df = results_full_df.reindex(columns=template_cols) - output_file_path = excel_path(f'PTFOPT{name}.xlsx') - output_df.to_excel(output_file_path, index=False) - print(f"File {output_file_path} saved successfully.") + export_rows.append(output_df) # --- Pie chart asset allocation: salva in Output senza mostrare --- asset_allocations = {asset: 0 for asset in asset_class_limits} @@ -583,7 +583,7 @@ for (years, target_vol), name in volatility_targets.items(): # ========================= # EXPORT — (1) ASSET METRICS SOLO # ========================= -asset_metrics_path = excel_path('asset_metrics_v2.5.xlsx') +asset_metrics_path = excel_path('Asset metrics ITA.xlsx') with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer: for name, metrics_df in per_asset_metrics.items(): metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False) @@ -613,9 +613,22 @@ optimized_weights_with_names['Nome ETF'] = [ # ========================= summary_df = pd.DataFrame(summary_data) -summary_path = excel_path('optimized_weights_summary_v2.5_LITE.xlsx') +summary_path = excel_path('Pesi ottimizzati ITA.xlsx') with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer: optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True) summary_df.to_excel(writer, sheet_name='Riepilogo', index=False) -print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'Riepilogo' (solo PH1, versione LITE).") +print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'Riepilogo'.") + +# ========================= +# EXPORT UNICO PESI OTTIMIZZATI +# ========================= +date_tag = datetime.now().strftime("%Y%m%d") +combined_path = excel_path(f"{date_tag}_Pesi_ottimizzati.xlsx") +with pd.ExcelWriter(combined_path, engine='openpyxl', mode='w') as writer: + if export_rows: + combined_df = pd.concat([template_df] + export_rows, ignore_index=True) + else: + combined_df = template_df.copy() + combined_df.to_excel(writer, sheet_name='Pesi Ottimizzati', index=False) +print(f"Pesi ottimizzati salvati in un unico file/sheet: '{combined_path}'.") diff --git a/Ottimizzatore Lite UK.py b/Ottimizzatore UK.py similarity index 91% rename from Ottimizzatore Lite UK.py rename to Ottimizzatore UK.py index 68c2512..341232f 100644 --- a/Ottimizzatore Lite UK.py +++ b/Ottimizzatore UK.py @@ -7,6 +7,7 @@ from pypfopt.exceptions import OptimizationError import matplotlib.pyplot as plt import os import sys +from datetime import datetime # Cartelle di input/output/plot OUTPUT_DIR = "Output" @@ -105,6 +106,7 @@ riskfree_rate = 0.02 # Ottimizzazione per ciascun target di volatilità e salvataggio dei risultati optimized_weights = pd.DataFrame() summary_data = [] +export_rows = [] for (years, target_vol), name in volatility_targets.items(): period_start_date = end_date - pd.DateOffset(years=years) period_df = final_df.loc[period_start_date:end_date] @@ -168,13 +170,8 @@ for (years, target_vol), name in volatility_targets.items(): # Creazione del DataFrame per i risultati results_df = pd.DataFrame(results) - # Concatenazione dei risultati con il template_df - output_df = pd.concat([template_df, results_df], ignore_index=True) - - # Salva il file - output_file_path = excel_path(f'PTFOPT{name}.xlsx') - output_df.to_excel(output_file_path, index=False) - print(f"File {output_file_path} saved successfully.") + # Accumula le righe per esportazione unica + export_rows.append(results_df) # Grafico a torta per ciascun portafoglio ottimizzato asset_allocations = {asset: 0 for asset in asset_class_limits} @@ -197,11 +194,22 @@ optimized_weights_with_names = optimized_weights.copy() optimized_weights_with_names['Nome ETF'] = [df.loc[df['ISIN'] == isin, 'Nome'].values[0] for isin in optimized_weights.index] summary_df = pd.DataFrame(summary_data) -output_path = excel_path("optimized_weights_GBP.xlsx") +output_path = excel_path("Riepilogo pesi GBP.xlsx") with pd.ExcelWriter(output_path, engine='openpyxl', mode='w') as writer: optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True) summary_df.to_excel(writer, sheet_name='metriche', index=False) print(f"All optimized weights saved to '{output_path}' (with metriche).") +# Export unico con tutti i pesi in un solo foglio +date_tag = datetime.now().strftime("%Y%m%d") +combined_path = excel_path(f"{date_tag} Pesi ottimizzati UK.xlsx") +with pd.ExcelWriter(combined_path, engine='openpyxl', mode='w') as writer: + if export_rows: + combined_df = pd.concat([template_df] + export_rows, ignore_index=True) + else: + combined_df = template_df.copy() + combined_df.to_excel(writer, sheet_name='Pesi Ottimizzati', index=False) +print(f"Pesi ottimizzati salvati in un unico file/sheet: '{combined_path}'.") +