unificato output pesi su unico excel

This commit is contained in:
fredmaloggia
2025-11-26 15:01:27 +01:00
parent cb44ce7dc8
commit a05ccba8bb
3 changed files with 37 additions and 790 deletions

View File

@@ -1,774 +0,0 @@
# -*- coding: utf-8 -*-
"""
Created on 22 Oct 2025
@author: Federico
"""
# =========================
# IMPORT & PARAMETRI
# =========================
import sys
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yaml
import logging
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from pypfopt import risk_models
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt.exceptions import OptimizationError
# Cartelle di input/output/plot
OUTPUT_DIR = "Output"
PLOT_DIR = "Plot"
INPUT_DIR = "Input"
CONFIG_FILE = "config.yaml"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(PLOT_DIR, exist_ok=True)
os.makedirs(INPUT_DIR, exist_ok=True)
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def excel_path(filename: str) -> str:
"""Costruisce il percorso completo per un file Excel nella cartella di output."""
return os.path.join(OUTPUT_DIR, filename)
def plot_path(filename: str) -> str:
"""Costruisce il percorso completo per un file PNG nella cartella Plot."""
return os.path.join(PLOT_DIR, filename)
DEFAULT_VOL_TARGETS = [
{"years": 5, "target_vol": 0.06, "name": "VAR3_5Y"},
{"years": 1, "target_vol": 0.12, "name": "VAR6_1Y"},
{"years": 3, "target_vol": 0.12, "name": "VAR6_3Y"},
{"years": 5, "target_vol": 0.12, "name": "VAR6_5Y"},
{"years": 5, "target_vol": 0.18, "name": "VAR9_5Y"},
]
DEFAULT_ASSET_CLASS_LIMITS = {
'Azionari': 0.75, 'Obbligazionari': 0.75,
'Metalli Preziosi': 0.20, 'Materie Prime': 0.05,
'Immobiliare': 0.05, 'Criptovalute': 0.05, 'Monetari': 0.1
}
def load_targets_and_limits(config_file: str):
"""Legge target di volatilità e limiti asset class dal file di configurazione."""
try:
with open(config_file, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
except FileNotFoundError:
logger.error("File di configurazione mancante: %s", config_file)
sys.exit(1)
vt_cfg = cfg.get("volatility_targets", {})
vt_list = []
if isinstance(vt_cfg, dict):
vt_list = vt_cfg.get("default") or []
elif isinstance(vt_cfg, list):
vt_list = vt_cfg
if not vt_list:
logger.error("Sezione 'volatility_targets' mancante o vuota nel file di configurazione.")
sys.exit(1)
volatility_targets_local = {
(int(item["years"]), float(item["target_vol"])): item["name"]
for item in vt_list
if "years" in item and "target_vol" in item and "name" in item
}
if not volatility_targets_local:
logger.error("Nessun target di volatilita valido trovato in configurazione.")
sys.exit(1)
asset_limits_cfg = cfg.get("asset_class_limits") or {}
if not asset_limits_cfg:
logger.error("Sezione 'asset_class_limits' mancante o vuota nel file di configurazione.")
sys.exit(1)
asset_class_limits_local = {k: float(v) for k, v in asset_limits_cfg.items()}
return volatility_targets_local, asset_class_limits_local
def validate_universe(df_universe: pd.DataFrame):
"""Verifica colonne obbligatorie e duplicati ISIN nel file universo."""
required_cols = ['ISIN', 'Nome', 'Categoria', 'Asset Class']
missing_cols = [c for c in required_cols if c not in df_universe.columns]
if missing_cols:
logger.error("Colonne mancanti nel file universo: %s", ", ".join(missing_cols))
sys.exit(1)
dup_isin = df_universe['ISIN'][df_universe['ISIN'].duplicated()].unique().tolist()
if dup_isin:
logger.warning("ISIN duplicati nel file universo: %s", dup_isin)
empty_isin = df_universe['ISIN'].isna().sum()
if empty_isin:
logger.warning("Righe con ISIN mancante nel file universo: %d", int(empty_isin))
def validate_returns_frame(df_returns: pd.DataFrame, threshold: float = 0.2):
"""Avvisa se i rendimenti hanno molte celle NaN prima del riempimento."""
if df_returns.empty:
logger.error("Nessun dato di rendimento recuperato: final_df vuoto.")
sys.exit(1)
na_ratio = df_returns.isna().mean()
high_na = na_ratio[na_ratio > threshold]
if not high_na.empty:
logger.warning(
"Colonne con >%.0f%% di NaN prima del fill: %s",
threshold * 100,
", ".join([f"{c} ({v:.0%})" for c, v in high_na.items()])
)
# ---------------------------------
# Utility per R^2 sullequity line
# ---------------------------------
def r2_equity_line(returns: pd.Series) -> float:
"""R^2 della regressione OLS di log(equity) sul tempo (con intercetta)."""
s = returns.dropna()
if s.size < 3:
return np.nan
equity = (1.0 + s).cumprod()
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
if equity.size < 3:
return np.nan
y = np.log(equity.values)
if np.allclose(y.var(ddof=1), 0.0):
return 0.0
x = np.arange(y.size, dtype=float)
X = np.column_stack([np.ones_like(x), x])
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - y.mean()) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
if np.isnan(r2):
return np.nan
return float(np.clip(r2, 0.0, 1.0))
# ---------------------------------
# Utility per metriche di drawdown
# ---------------------------------
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
"""
Calcola:
- max_dd: profondità massima del drawdown (negativa o zero)
- max_dd_duration: durata massima (in giorni) di qualsiasi drawdown
- ttr_from_mdd: giorni dal minimo del Max DD al pieno recupero del picco precedente (sentinel se non recupera)
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
# Max Drawdown (valore più negativo)
max_dd = float(dd.min()) if dd.size else np.nan
# Durata massima di drawdown (giorni consecutivi sotto zero drawdown)
under_water = dd < 0
if under_water.any():
max_dd_duration = 0
current = 0
for flag in under_water.values:
if flag:
current += 1
if current > max_dd_duration:
max_dd_duration = current
else:
current = 0
else:
max_dd_duration = 0
# Time-to-Recovery dal Max DD
if dd.size:
trough_idx = int(np.argmin(dd.values))
if trough_idx > 0:
peak_idx = int(np.argmax(equity.values[:trough_idx+1]))
peak_level = float(equity.values[peak_idx])
rec_idx = None
for t in range(trough_idx + 1, equity.size):
if equity.values[t] >= peak_level:
rec_idx = t
break
if rec_idx is None:
ttr_from_mdd = sentinel_ttr # non recuperato
else:
ttr_from_mdd = rec_idx - trough_idx
else:
ttr_from_mdd = np.nan
else:
ttr_from_mdd = np.nan
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
# ---------------------------------
# Utility per AAW, AUW e Heal Index
# ---------------------------------
def heal_index_metrics(returns: pd.Series):
"""
Calcola:
- AAW: area sopra acqua (run-up vs minimo cumulato)
- AUW: area sotto acqua (drawdown vs massimo cumulato)
- Heal Index: (AAW - AUW) / AUW
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
# ---------------------------------
# Utility per H_min (100% finestre positive)
# ---------------------------------
def h_min_100(returns: pd.Series, month_len: int = 21):
"""
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
"""
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1:]
tail = np.concatenate(([0.0], csum[:-k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(np.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# ---------------------------------
# Utility di serie portafoglio e metriche path-based
# ---------------------------------
def portfolio_series_from_weights(period_df: pd.DataFrame, w: np.ndarray, cols: list) -> pd.Series:
w_series = pd.Series(w, index=cols)
return (period_df[cols] * w_series).sum(axis=1)
def portfolio_path_metrics(period_df: pd.DataFrame,
five_year_df: pd.DataFrame,
w: np.ndarray,
cols: list,
days_per_year: int) -> dict:
"""Metriche path-based del portafoglio su period_df + H_min_100m su 5Y."""
w = np.asarray(w, dtype=float)
cols = list(cols)
port_returns = portfolio_series_from_weights(period_df, w, cols)
n_days = int(port_returns.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
if years_elapsed and years_elapsed > 0 and gross and gross > 0:
cagr = gross**(1.0 / years_elapsed) - 1.0
else:
cagr = np.nan
r2 = r2_equity_line(port_returns)
maxdd, dddur, ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
aaw, auw, heal = heal_index_metrics(port_returns)
common_cols = [c for c in cols if c in five_year_df.columns]
if len(common_cols) > 0:
w5 = pd.Series(w, index=cols).reindex(common_cols).fillna(0.0).values
port_returns_5y = portfolio_series_from_weights(five_year_df, w5, common_cols)
_, hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
else:
hmin_5y_months = np.nan
return {
"AnnReturn": ann_return,
"AnnVol": ann_vol,
"CAGR": cagr,
"R2": r2,
"MaxDD": maxdd,
"DD_Duration": dddur,
"TTR": ttr,
"AAW": aaw,
"AUW": auw,
"Heal": heal,
"Hmin_100m_5Y": hmin_5y_months
}
# --- Lettura parametri dal file connection.txt ---
params = {}
with open("connection.txt", "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
params[key.strip()] = value.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
connection_string = (
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
print("Connection string letta correttamente")
# =========================
# CONNESSIONE AL DB
# =========================
try:
engine = create_engine(connection_string)
with engine.connect() as connection:
_ = connection.execute(text("SELECT 1"))
print("Connessione al database riuscita.")
except SQLAlchemyError as e:
print("Errore durante la connessione al database:", e)
sys.exit()
# =========================
# INPUT / TEMPLATE
# =========================
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
template_df = pd.read_excel(template_path)
file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore v.2.4.xlsx')
df = pd.read_excel(
file_path,
usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'],
dtype={'Codice Titolo': str}
)
validate_universe(df)
# =========================
# SERIE STORICHE RENDIMENTI
# =========================
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=5)
all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize()
final_df = pd.DataFrame(index=all_dates)
isin_from_db = set()
for isin in df['ISIN'].unique():
print(f"Working on ISIN: {isin}")
procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = 1305, @PtfCurr = EUR"
try:
temp_df = pd.read_sql_query(procedure_call, engine)
if temp_df.empty:
print(f"Nessun dato recuperato per {isin}, skipping...")
continue
temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d', errors='coerce').dt.normalize()
temp_df = temp_df.dropna(subset=['Px_Date'])
temp_df.set_index('Px_Date', inplace=True)
temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100
final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates)
isin_from_db.add(isin)
print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.")
except SQLAlchemyError as e:
print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e)
validate_returns_frame(final_df)
final_df.fillna(0, inplace=True)
# -------- H_min sempre su 5 anni (21 gg = 1 mese) --------
five_year_df = final_df.loc[end_date - pd.DateOffset(years=5): end_date]
# =========================
# CONFIGURAZIONE OBIETTIVI
# =========================
volatility_targets, asset_class_limits = load_targets_and_limits(CONFIG_FILE)
days_per_year = 252
riskfree_rate = 0.02
# =========================
# LOOP OTTIMIZZAZIONI (PH1 tradizionale)
# =========================
optimized_weights = pd.DataFrame()
per_asset_metrics = {}
for (years, target_vol), name in volatility_targets.items():
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
# ---------- PER-ASSET METRICS ----------
n_days = int(period_df.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
asset_ann_return = daily_returns_mean * days_per_year
asset_ann_vol = period_df.std(ddof=1) * np.sqrt(days_per_year)
gross = (1.0 + period_df).prod(skipna=True)
asset_cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
asset_r2 = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
aaw_dict, auw_dict, heal_dict = {}, {}, {}
hmin_5y_months_dict = {}
for col in period_df.columns:
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
aaw, auw, heal = heal_index_metrics(period_df[col])
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
if col in five_year_df.columns:
_, h_months_5y = h_min_100(five_year_df[col], month_len=21)
else:
h_months_5y = np.nan
hmin_5y_months_dict[col] = h_months_5y
asset_metrics_df = (
pd.DataFrame({
'ISIN': period_df.columns,
'Rendimento_Ann': asset_ann_return.reindex(period_df.columns).values,
'Volatilita_Ann': asset_ann_vol.reindex(period_df.columns).values,
'CAGR': asset_cagr.reindex(period_df.columns).values,
'R2_Equity': asset_r2.reindex(period_df.columns).values,
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
})
.merge(df[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
[['ISIN', 'Nome', 'Categoria', 'Asset Class',
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']]
.sort_values('ISIN', kind='stable')
.reset_index(drop=True)
)
per_asset_metrics[name] = asset_metrics_df
# ---------- OTTIMIZZAZIONE ----------
ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix)
# Vincoli PesoFisso / PesoMax
for _, row in df.iterrows():
isin_i = row['ISIN']
if isin_i in period_df.columns:
idx = period_df.columns.get_loc(isin_i)
pf = row.get('PesoFisso')
pm = row.get('PesoMax')
if pd.notna(pf):
ef.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val)
elif pd.notna(pm):
ef.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw)
# Vincoli per Categoria
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
for cat, maxw in categories_limits.items():
isin_list = df[df['Categoria'] == cat]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# Vincoli per Asset Class
for ac, maxw in asset_class_limits.items():
isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# ---------- Risoluzione ----------
try:
ef.efficient_risk(target_volatility=target_vol)
weights = ef.clean_weights()
optimized_weights[name] = pd.Series(weights)
exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=riskfree_rate)
print(f"=== Ottimizzazione: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
# --- Beneficio di diversificazione ---
w_vec_tmp = np.array([weights.get(isin, 0) for isin in period_df.columns])
indiv_ann_vols = np.sqrt(np.diag(annual_covariance_matrix.loc[period_df.columns, period_df.columns].values))
weighted_avg_vol = float(np.dot(w_vec_tmp, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
# --- File Excel per import gestionale (uno per portafoglio) ---
template_cols = list(template_df.columns)
results_rows = []
for isin, weight in weights.items():
if weight > 0:
r_sel = df.loc[df['ISIN'] == isin]
codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else ""
nome = r_sel['Nome'].iloc[0] if not r_sel.empty else ""
row = {col: "" for col in template_cols}
row['cod_por'] = f'PTFOPT{name}'
row['cod_tit'] = codice_titolo
row['des_tit'] = nome
row['peso'] = float(weight * 99)
results_rows.append(row)
results_full_df = pd.DataFrame(results_rows, columns=template_cols)
if results_full_df.empty:
output_df = template_df.iloc[0:0].copy()
else:
output_df = results_full_df.reindex(columns=template_cols)
output_file_path = excel_path(f'PTFOPT{name}.xlsx')
output_df.to_excel(output_file_path, index=False)
print(f"File {output_file_path} saved successfully.")
# --- Pie chart asset allocation (se ci sono pesi > 0) ---
asset_allocations = {asset: 0 for asset in asset_class_limits}
for isin, weight in weights.items():
r_sel = df.loc[df['ISIN'] == isin]
if r_sel.empty:
continue
asset_allocations.setdefault(r_sel['Asset Class'].iloc[0], 0)
asset_allocations[r_sel['Asset Class'].iloc[0]] += weight
if sum(asset_allocations.values()) > 0:
plt.figure(figsize=(8, 6))
plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%')
plt.title(f'Asset Allocation for {name}')
pie_path = plot_path(f'Asset_Allocation_{name}.png')
if os.path.exists(pie_path):
os.remove(pie_path)
plt.savefig(pie_path, dpi=150, bbox_inches='tight')
plt.close()
except OptimizationError as e:
print(f"Optimization failed for {name}: {e}")
optimized_weights[name] = pd.Series([0] * len(annual_returns_mean))
# =========================
# RIEPILOGO METRICHE (PORTAFOGLI PH1)
# =========================
summary_data = []
for (years, target_vol), name in volatility_targets.items():
if name in optimized_weights.columns:
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
w_series = optimized_weights[name].reindex(period_df.columns).fillna(0.0)
w_vec = w_series.values
port_returns = (period_df[period_df.columns] * w_series).sum(axis=1)
n_days = int(port_returns.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
port_ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
port_ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
port_cagr = (gross**(1.0 / years_elapsed) - 1.0) if (years_elapsed and years_elapsed > 0 and gross and gross > 0) else np.nan
port_r2 = r2_equity_line(port_returns)
port_maxdd, port_dddur, port_ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
port_aaw, port_auw, port_heal = heal_index_metrics(port_returns)
common_cols = [c for c in w_series.index if c in five_year_df.columns]
if len(common_cols) > 0:
w_5y = w_series.reindex(common_cols).fillna(0.0)
port_returns_5y = (five_year_df[common_cols] * w_5y).sum(axis=1)
_, port_hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
else:
port_hmin_5y_months = np.nan
exp_ret = float(np.dot(w_vec, annual_returns_mean.loc[period_df.columns].values))
cov_mat = annual_covariance_matrix.loc[period_df.columns, period_df.columns].values
exp_vol = float(np.sqrt(np.dot(w_vec, np.dot(cov_mat, w_vec))))
sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
indiv_ann_vols = np.sqrt(np.diag(cov_mat))
weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
diversification_ratio = weighted_avg_vol / exp_vol if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"=== Riepilogo: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
print(f"Diversification Ratio: {diversification_ratio:.3f}" if not np.isnan(diversification_ratio) else "Diversification Ratio: NaN")
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
summary_data.append({
"Portafoglio": name,
"Years": years,
"Target Vol": f"{target_vol:.2%}",
"Expected annual return": f"{exp_ret:.2%}",
"Annual volatility": f"{exp_vol:.2%}",
"Sharpe Ratio": f"{sharpe:.2f}",
"Beneficio di diversificazione": f"{diversification_benefit:.2%}" if not np.isnan(diversification_benefit) else "",
"Rendimento_Ann": f"{port_ann_return:.2%}" if pd.notna(port_ann_return) else "",
"Volatilita_Ann": f"{port_ann_vol:.2%}" if pd.notna(port_ann_vol) else "",
"CAGR": f"{port_cagr:.2%}" if pd.notna(port_cagr) else "",
"R2_Equity": round(port_r2, 3) if pd.notna(port_r2) else np.nan,
"MaxDD": f"{port_maxdd:.2%}" if pd.notna(port_maxdd) else "",
"DD_Duration_Max": int(port_dddur) if pd.notna(port_dddur) else "",
"TTR_from_MDD": int(port_ttr) if pd.notna(port_ttr) else "",
"AAW": float(port_aaw) if pd.notna(port_aaw) else np.nan,
"AUW": float(port_auw) if pd.notna(port_auw) else np.nan,
"Heal_Index": float(port_heal) if pd.notna(port_heal) else np.nan,
"H_min_100m_5Y": int(port_hmin_5y_months) if pd.notna(port_hmin_5y_months) else ""
})
# =========================
# PLOT EQUITY/UNDERWATER (PH1)
# =========================
def plot_equity_overlay_all(port_names=None):
if port_names is None:
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
period_start_date = end_date - pd.DateOffset(years=5)
period_df = final_df.loc[period_start_date:end_date]
available_cols = set(optimized_weights.columns)
plotted = 0
plt.figure(figsize=(11, 6))
for pname in port_names:
if pname not in available_cols:
print(f"[plot] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.")
continue
w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0)
port_returns = (period_df[w_series.index] * w_series).sum(axis=1)
equity = (1.0 + port_returns).cumprod()
plt.plot(equity.index, equity.values, label=pname)
plotted += 1
if plotted == 0:
print("[plot] Nessun portafoglio valido da plottare.")
plt.close()
return
plt.title("Equity line - Portafogli ottimizzati (ultimi 5 anni)")
plt.xlabel("Data")
plt.ylabel("Equity (base=1.0)")
plt.grid(True, alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
out_png = plot_path("Equity_ALL_PORTS.png")
if os.path.exists(out_png):
os.remove(out_png)
plt.savefig(out_png, dpi=150, bbox_inches='tight')
plt.close()
print(f"[plot] Grafico sovrapposto salvato: {out_png}")
def plot_underwater_overlay_all(port_names=None, ylim=(-0.3, 0.0)):
if port_names is None:
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
period_start_date = end_date - pd.DateOffset(years=5)
period_df = final_df.loc[period_start_date:end_date]
available_cols = set(optimized_weights.columns)
plotted = 0
plt.figure(figsize=(11, 6))
for pname in port_names:
if pname not in available_cols:
print(f"[underwater] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.")
continue
w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0)
port_returns = (period_df[w_series.index] * w_series).sum(axis=1)
equity = (1.0 + port_returns).cumprod()
run_max = equity.cummax()
dd = equity / run_max - 1.0
plt.plot(dd.index, dd.values, label=pname)
plotted += 1
if plotted == 0:
print("[underwater] Nessun portafoglio valido da plottare.")
plt.close()
return
plt.title("Underwater (Drawdown) - Portafogli ottimizzati (ultimi 5 anni)")
plt.xlabel("Data")
plt.ylabel("Drawdown")
if ylim is not None:
plt.ylim(*ylim)
plt.grid(True, alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
out_png = plot_path("Underwater_ALL_PORTS.png")
if os.path.exists(out_png):
os.remove(out_png)
plt.savefig(out_png, dpi=150, bbox_inches='tight')
plt.close()
print(f"[underwater] Grafico sovrapposto salvato: {out_png}")
plot_equity_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'])
plot_underwater_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'])
# =========================
# EXPORT - (1) ASSET METRICS SOLO
# =========================
asset_metrics_path = excel_path('asset_metrics_v2.5.xlsx')
with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer:
for name, metrics_df in per_asset_metrics.items():
metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False)
consolidated = []
for name, metrics_df in per_asset_metrics.items():
tmp = metrics_df.copy()
tmp.insert(0, 'Periodo', name)
consolidated.append(tmp)
consolidated_df = pd.concat(consolidated, ignore_index=True) if consolidated else pd.DataFrame()
if not consolidated_df.empty:
consolidated_df.to_excel(writer, sheet_name='Metriche_Consolidate', index=False)
print(f"File '{asset_metrics_path}' creato con soli fogli Metriche_* e Metriche_Consolidate.")
# =========================
# COSTRUZIONE "WITH NAMES" (solo PH1)
# =========================
optimized_weights_with_names = optimized_weights.copy()
optimized_weights_with_names['Nome ETF'] = [
df.loc[df['ISIN'] == isin, 'Nome'].values[0] if (df['ISIN'] == isin).any() else ""
for isin in optimized_weights.index
]
# =========================
# EXPORT - (2) RIEPILOGO PESI & METRICHE
# =========================
summary_df = pd.DataFrame(summary_data)
summary_path = excel_path('optimized_weights_summary_v2.5.xlsx')
with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer:
optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True)
summary_df.to_excel(writer, sheet_name='metriche', index=False)
print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'metriche'.")

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
Ottimizzatore v2.5.2 - VERSIONE LITE
Ottimizzatore Lite
"""
# =========================
@@ -13,6 +13,7 @@ import pandas as pd
import matplotlib.pyplot as plt
import yaml
import logging
from datetime import datetime
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
@@ -295,7 +296,7 @@ except SQLAlchemyError as e:
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
template_df = pd.read_excel(template_path)
file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore v.2.4.xlsx')
file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore.xlsx')
df = pd.read_excel(
file_path,
usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'],
@@ -349,6 +350,7 @@ riskfree_rate = 0.02
# =========================
optimized_weights = pd.DataFrame()
per_asset_metrics = {}
export_rows = []
for (years, target_vol), name in volatility_targets.items():
period_start_date = end_date - pd.DateOffset(years=years)
@@ -479,9 +481,7 @@ for (years, target_vol), name in volatility_targets.items():
output_df = template_df.iloc[0:0].copy()
else:
output_df = results_full_df.reindex(columns=template_cols)
output_file_path = excel_path(f'PTFOPT{name}.xlsx')
output_df.to_excel(output_file_path, index=False)
print(f"File {output_file_path} saved successfully.")
export_rows.append(output_df)
# --- Pie chart asset allocation: salva in Output senza mostrare ---
asset_allocations = {asset: 0 for asset in asset_class_limits}
@@ -583,7 +583,7 @@ for (years, target_vol), name in volatility_targets.items():
# =========================
# EXPORT — (1) ASSET METRICS SOLO
# =========================
asset_metrics_path = excel_path('asset_metrics_v2.5.xlsx')
asset_metrics_path = excel_path('Asset metrics ITA.xlsx')
with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer:
for name, metrics_df in per_asset_metrics.items():
metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False)
@@ -613,9 +613,22 @@ optimized_weights_with_names['Nome ETF'] = [
# =========================
summary_df = pd.DataFrame(summary_data)
summary_path = excel_path('optimized_weights_summary_v2.5_LITE.xlsx')
summary_path = excel_path('Pesi ottimizzati ITA.xlsx')
with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer:
optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True)
summary_df.to_excel(writer, sheet_name='Riepilogo', index=False)
print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'Riepilogo' (solo PH1, versione LITE).")
print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'Riepilogo'.")
# =========================
# EXPORT UNICO PESI OTTIMIZZATI
# =========================
date_tag = datetime.now().strftime("%Y%m%d")
combined_path = excel_path(f"{date_tag}_Pesi_ottimizzati.xlsx")
with pd.ExcelWriter(combined_path, engine='openpyxl', mode='w') as writer:
if export_rows:
combined_df = pd.concat([template_df] + export_rows, ignore_index=True)
else:
combined_df = template_df.copy()
combined_df.to_excel(writer, sheet_name='Pesi Ottimizzati', index=False)
print(f"Pesi ottimizzati salvati in un unico file/sheet: '{combined_path}'.")

View File

@@ -7,6 +7,7 @@ from pypfopt.exceptions import OptimizationError
import matplotlib.pyplot as plt
import os
import sys
from datetime import datetime
# Cartelle di input/output/plot
OUTPUT_DIR = "Output"
@@ -105,6 +106,7 @@ riskfree_rate = 0.02
# Ottimizzazione per ciascun target di volatilità e salvataggio dei risultati
optimized_weights = pd.DataFrame()
summary_data = []
export_rows = []
for (years, target_vol), name in volatility_targets.items():
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
@@ -168,13 +170,8 @@ for (years, target_vol), name in volatility_targets.items():
# Creazione del DataFrame per i risultati
results_df = pd.DataFrame(results)
# Concatenazione dei risultati con il template_df
output_df = pd.concat([template_df, results_df], ignore_index=True)
# Salva il file
output_file_path = excel_path(f'PTFOPT{name}.xlsx')
output_df.to_excel(output_file_path, index=False)
print(f"File {output_file_path} saved successfully.")
# Accumula le righe per esportazione unica
export_rows.append(results_df)
# Grafico a torta per ciascun portafoglio ottimizzato
asset_allocations = {asset: 0 for asset in asset_class_limits}
@@ -197,11 +194,22 @@ optimized_weights_with_names = optimized_weights.copy()
optimized_weights_with_names['Nome ETF'] = [df.loc[df['ISIN'] == isin, 'Nome'].values[0] for isin in optimized_weights.index]
summary_df = pd.DataFrame(summary_data)
output_path = excel_path("optimized_weights_GBP.xlsx")
output_path = excel_path("Riepilogo pesi GBP.xlsx")
with pd.ExcelWriter(output_path, engine='openpyxl', mode='w') as writer:
optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True)
summary_df.to_excel(writer, sheet_name='metriche', index=False)
print(f"All optimized weights saved to '{output_path}' (with metriche).")
# Export unico con tutti i pesi in un solo foglio
date_tag = datetime.now().strftime("%Y%m%d")
combined_path = excel_path(f"{date_tag} Pesi ottimizzati UK.xlsx")
with pd.ExcelWriter(combined_path, engine='openpyxl', mode='w') as writer:
if export_rows:
combined_df = pd.concat([template_df] + export_rows, ignore_index=True)
else:
combined_df = template_df.copy()
combined_df.to_excel(writer, sheet_name='Pesi Ottimizzati', index=False)
print(f"Pesi ottimizzati salvati in un unico file/sheet: '{combined_path}'.")