Versione iniziale

This commit is contained in:
fredmaloggia
2025-11-20 14:47:51 +01:00
commit fcb9660c26
43 changed files with 3561 additions and 0 deletions

View File

@@ -0,0 +1,190 @@
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from pypfopt import EfficientFrontier, risk_models, expected_returns
from pypfopt.exceptions import OptimizationError
import matplotlib.pyplot as plt
import os
import sys
# Cartelle di input/output/plot
OUTPUT_DIR = "Output"
INPUT_DIR = "Input"
PLOT_DIR = "Plot"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(PLOT_DIR, exist_ok=True)
def excel_path(filename: str) -> str:
"""Percorso completo per i file Excel di output."""
return os.path.join(OUTPUT_DIR, filename)
def plot_path(filename: str) -> str:
"""Percorso completo per i file di grafico."""
return os.path.join(PLOT_DIR, filename)
# Configurazione della connessione al database
username = 'readonly'
password = 'e8nqtSa39L4Le3'
host = '26.69.45.60'
database = 'FirstSolutionDB'
port = 1433
connection_string = f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver=ODBC+Driver+17+for+SQL+Server"
try:
# Crea l'Engine
engine = create_engine(connection_string)
# Usa il contesto con la connessione e il metodo text() per eseguire la query
with engine.connect() as connection:
result = connection.execute(text("SELECT 1"))
print("Connessione al database riuscita.")
except SQLAlchemyError as e:
print("Errore durante la connessione al database:", e)
sys.exit()
# Caricamento del template Excel
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
template_df = pd.read_excel(template_path)
# Caricamento dati degli ISIN
file_path = os.path.join(INPUT_DIR, 'Universo ETF per ottimizzatore UK.xlsx')
df = pd.read_excel(file_path, usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'Codice Titolo'],dtype={'Codice Titolo':str})
# Intervallo di date degli ultimi 5 anni, escludendo sabati e domeniche
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=5)
all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize()
# DataFrame vuoto con le date come indice
final_df = pd.DataFrame(index=all_dates)
# Iterazione sugli ISIN e recupero dei dati
isin_from_db = set()
for isin in df['ISIN'].unique():
print(f"Working on ISIN: {isin}")
procedure_call = f"EXEC opt_RendimentoGiornaliero1_GBP @ISIN = '{isin}', @n = 1305"
try:
temp_df = pd.read_sql_query(procedure_call, engine)
if temp_df.empty:
print(f"Nessun dato recuperato per {isin}, skipping...")
continue
temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d').dt.normalize()
temp_df.set_index('Px_Date', inplace=True)
temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100
final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates)
isin_from_db.add(isin)
print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.")
except SQLAlchemyError as e:
print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e)
final_df.fillna(0, inplace=True)
# Configurazione degli obiettivi di volatilità
volatility_targets = {
(5, 0.06): 'VAR3_GBP',
#(1, 0.12): 'VAR6_1Y',
#(3, 0.12): 'VAR6_3Y',
(5, 0.12): 'VAR6_GBP',
(5, 0.18): 'VAR9_GBP'
}
# Definizione del numero di giorni lavorativi per anno
days_per_year = 252
riskfree_rate = 0.02
# Ottimizzazione per ciascun target di volatilità e salvataggio dei risultati
optimized_weights = pd.DataFrame()
for (years, target_vol), name in volatility_targets.items():
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
# Calcolo dei parametri per l'ottimizzazione
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix)
# Aggiunta dei vincoli per le categorie e le asset class
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
asset_class_limits = {
'Azionari': 0.75,
'Obbligazionari': 0.75,
'Metalli Preziosi': 0.20,
'Materie Prime': 0.05,
'Immobiliare': 0.05
}
for category, max_weight in categories_limits.items():
isin_list = df[df['Categoria'] == category]['ISIN'].tolist()
category_idx = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
ef.add_constraint(lambda w: sum(w[i] for i in category_idx) <= max_weight)
ef.add_constraint(lambda w: sum(w[i] for i in category_idx) >= 0)
for asset_class, max_weight in asset_class_limits.items():
isin_list = df[df['Asset Class'] == asset_class]['ISIN'].tolist()
asset_class_idx = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
ef.add_constraint(lambda w: sum(w[i] for i in asset_class_idx) <= max_weight)
ef.add_constraint(lambda w: sum(w[i] for i in asset_class_idx) >= 0)
try:
ef.efficient_risk(target_volatility=target_vol)
weights = ef.clean_weights()
optimized_weights[name] = pd.Series(weights)
ef.portfolio_performance(verbose=True, risk_free_rate=riskfree_rate)
# Creazione del DataFrame per i risultati
results = []
for isin, weight in weights.items():
if weight > 0:
codice_titolo = df.loc[df['ISIN'] == isin, 'Codice Titolo'].values[0]
nome = df.loc[df['ISIN'] == isin, 'Nome'].values[0]
results.append({
'cod_por': f'PTFOPT{name}', # Nome del portafoglio
'cod_tit': codice_titolo, # Codice titolo
'des_tit': nome, # Nome titolo
'peso': weight * 99 # Peso, moltiplicato per 99
})
# Creazione del DataFrame per i risultati
results_df = pd.DataFrame(results)
# Concatenazione dei risultati con il template_df
output_df = pd.concat([template_df, results_df], ignore_index=True)
# Salva il file
output_file_path = excel_path(f'PTFOPT{name}.xlsx')
output_df.to_excel(output_file_path, index=False)
print(f"File {output_file_path} saved successfully.")
# Grafico a torta per ciascun portafoglio ottimizzato
asset_allocations = {asset: 0 for asset in asset_class_limits}
for isin, weight in weights.items():
asset_class = df.loc[df['ISIN'] == isin, 'Asset Class'].values[0]
asset_allocations[asset_class] += weight
plt.figure(figsize=(8, 6))
plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%')
plt.title(f'Asset Allocation for {name}')
pie_path = plot_path(f'Asset_Allocation_{name}.png')
plt.savefig(pie_path, dpi=150, bbox_inches='tight')
plt.close()
except OptimizationError as e:
print(f"Optimization failed for {name}: {e}")
optimized_weights[name] = pd.Series([0] * len(annual_returns_mean))
# Aggiunta del nome dell'ETF nel DataFrame optimized_weights
optimized_weights_with_names = optimized_weights.copy()
optimized_weights_with_names['Nome ETF'] = [df.loc[df['ISIN'] == isin, 'Nome'].values[0] for isin in optimized_weights.index]
# Esportazione dei pesi ottimizzati in un file Excel con il nome dell'ETF
output_path = excel_path("optimized_weights_GBP.xlsx")
optimized_weights_with_names.to_excel(output_path)
print(f"All optimized weights saved to '{output_path}'.")

View File

@@ -0,0 +1,692 @@
# -*- coding: utf-8 -*-
"""
Created on 22 Oct 2025
@author: Federico
"""
# =========================
# IMPORT & PARAMETRI
# =========================
import sys
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from pypfopt import risk_models
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt.exceptions import OptimizationError
# Cartelle di input/output/plot
OUTPUT_DIR = "Output"
PLOT_DIR = "Plot"
INPUT_DIR = "Input"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(PLOT_DIR, exist_ok=True)
os.makedirs(INPUT_DIR, exist_ok=True)
def excel_path(filename: str) -> str:
"""Costruisce il percorso completo per un file Excel nella cartella di output."""
return os.path.join(OUTPUT_DIR, filename)
def plot_path(filename: str) -> str:
"""Costruisce il percorso completo per un file PNG nella cartella Plot."""
return os.path.join(PLOT_DIR, filename)
# ---------------------------------
# Utility per R^2 sullequity line
# ---------------------------------
def r2_equity_line(returns: pd.Series) -> float:
"""R^2 della regressione OLS di log(equity) sul tempo (con intercetta)."""
s = returns.dropna()
if s.size < 3:
return np.nan
equity = (1.0 + s).cumprod()
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
if equity.size < 3:
return np.nan
y = np.log(equity.values)
if np.allclose(y.var(ddof=1), 0.0):
return 0.0
x = np.arange(y.size, dtype=float)
X = np.column_stack([np.ones_like(x), x])
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - y.mean()) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
if np.isnan(r2):
return np.nan
return float(np.clip(r2, 0.0, 1.0))
# ---------------------------------
# Utility per metriche di drawdown
# ---------------------------------
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
"""
Calcola:
- max_dd: profondità massima del drawdown (negativa o zero)
- max_dd_duration: durata massima (in giorni) di qualsiasi drawdown
- ttr_from_mdd: giorni dal minimo del Max DD al pieno recupero del picco precedente (sentinel se non recupera)
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
# Max Drawdown (valore più negativo)
max_dd = float(dd.min()) if dd.size else np.nan
# Durata massima di drawdown (giorni consecutivi sotto zero drawdown)
under_water = dd < 0
if under_water.any():
max_dd_duration = 0
current = 0
for flag in under_water.values:
if flag:
current += 1
if current > max_dd_duration:
max_dd_duration = current
else:
current = 0
else:
max_dd_duration = 0
# Time-to-Recovery dal Max DD
if dd.size:
trough_idx = int(np.argmin(dd.values))
if trough_idx > 0:
peak_idx = int(np.argmax(equity.values[:trough_idx+1]))
peak_level = float(equity.values[peak_idx])
rec_idx = None
for t in range(trough_idx + 1, equity.size):
if equity.values[t] >= peak_level:
rec_idx = t
break
if rec_idx is None:
ttr_from_mdd = sentinel_ttr # non recuperato
else:
ttr_from_mdd = rec_idx - trough_idx
else:
ttr_from_mdd = np.nan
else:
ttr_from_mdd = np.nan
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
# ---------------------------------
# Utility per AAW, AUW e Heal Index
# ---------------------------------
def heal_index_metrics(returns: pd.Series):
"""
Calcola:
- AAW: area sopra acqua (run-up vs minimo cumulato)
- AUW: area sotto acqua (drawdown vs massimo cumulato)
- Heal Index: (AAW - AUW) / AUW
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
# ---------------------------------
# Utility per H_min (100% finestre positive)
# ---------------------------------
def h_min_100(returns: pd.Series, month_len: int = 21):
"""
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
"""
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1:]
tail = np.concatenate(([0.0], csum[:-k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(np.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# ---------------------------------
# Utility di serie portafoglio e metriche path-based
# ---------------------------------
def portfolio_series_from_weights(period_df: pd.DataFrame, w: np.ndarray, cols: list) -> pd.Series:
w_series = pd.Series(w, index=cols)
return (period_df[cols] * w_series).sum(axis=1)
def portfolio_path_metrics(period_df: pd.DataFrame,
five_year_df: pd.DataFrame,
w: np.ndarray,
cols: list,
days_per_year: int) -> dict:
"""Metriche path-based del portafoglio su period_df + H_min_100m su 5Y."""
w = np.asarray(w, dtype=float)
cols = list(cols)
port_returns = portfolio_series_from_weights(period_df, w, cols)
n_days = int(port_returns.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
if years_elapsed and years_elapsed > 0 and gross and gross > 0:
cagr = gross**(1.0 / years_elapsed) - 1.0
else:
cagr = np.nan
r2 = r2_equity_line(port_returns)
maxdd, dddur, ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
aaw, auw, heal = heal_index_metrics(port_returns)
common_cols = [c for c in cols if c in five_year_df.columns]
if len(common_cols) > 0:
w5 = pd.Series(w, index=cols).reindex(common_cols).fillna(0.0).values
port_returns_5y = portfolio_series_from_weights(five_year_df, w5, common_cols)
_, hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
else:
hmin_5y_months = np.nan
return {
"AnnReturn": ann_return,
"AnnVol": ann_vol,
"CAGR": cagr,
"R2": r2,
"MaxDD": maxdd,
"DD_Duration": dddur,
"TTR": ttr,
"AAW": aaw,
"AUW": auw,
"Heal": heal,
"Hmin_100m_5Y": hmin_5y_months
}
# --- Lettura parametri dal file connection.txt ---
params = {}
with open("connection.txt", "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
params[key.strip()] = value.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
connection_string = (
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
print("Connection string letta correttamente")
# =========================
# CONNESSIONE AL DB
# =========================
try:
engine = create_engine(connection_string)
with engine.connect() as connection:
_ = connection.execute(text("SELECT 1"))
print("Connessione al database riuscita.")
except SQLAlchemyError as e:
print("Errore durante la connessione al database:", e)
sys.exit()
# =========================
# INPUT / TEMPLATE
# =========================
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
template_df = pd.read_excel(template_path)
file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore v.2.4.xlsx')
df = pd.read_excel(
file_path,
usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'],
dtype={'Codice Titolo': str}
)
# =========================
# SERIE STORICHE RENDIMENTI
# =========================
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=5)
all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize()
final_df = pd.DataFrame(index=all_dates)
isin_from_db = set()
for isin in df['ISIN'].unique():
print(f"Working on ISIN: {isin}")
procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = 1305, @PtfCurr = EUR"
try:
temp_df = pd.read_sql_query(procedure_call, engine)
if temp_df.empty:
print(f"Nessun dato recuperato per {isin}, skipping...")
continue
temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d', errors='coerce').dt.normalize()
temp_df = temp_df.dropna(subset=['Px_Date'])
temp_df.set_index('Px_Date', inplace=True)
temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100
final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates)
isin_from_db.add(isin)
print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.")
except SQLAlchemyError as e:
print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e)
final_df.fillna(0, inplace=True)
# -------- H_min sempre su 5 anni (21 gg = 1 mese) --------
five_year_df = final_df.loc[end_date - pd.DateOffset(years=5): end_date]
# =========================
# CONFIGURAZIONE OBIETTIVI
# =========================
volatility_targets = {
# (1, 0.06): 'VAR3_1Y',
# (3, 0.06): 'VAR3_3Y',
(5, 0.06): 'VAR3_5Y',
(1, 0.12): 'VAR6_1Y',
(3, 0.12): 'VAR6_3Y',
(5, 0.12): 'VAR6_5Y',
# (1, 0.18): 'VAR9_1Y',
# (3, 0.18): 'VAR9_3Y',
(5, 0.18): 'VAR9_5Y'
}
days_per_year = 252
riskfree_rate = 0.02
# =========================
# LOOP OTTIMIZZAZIONI (PH1 tradizionale)
# =========================
optimized_weights = pd.DataFrame()
per_asset_metrics = {}
for (years, target_vol), name in volatility_targets.items():
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
# ---------- PER-ASSET METRICS ----------
n_days = int(period_df.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
asset_ann_return = daily_returns_mean * days_per_year
asset_ann_vol = period_df.std(ddof=1) * np.sqrt(days_per_year)
gross = (1.0 + period_df).prod(skipna=True)
asset_cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
asset_r2 = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
aaw_dict, auw_dict, heal_dict = {}, {}, {}
hmin_5y_months_dict = {}
for col in period_df.columns:
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
aaw, auw, heal = heal_index_metrics(period_df[col])
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
if col in five_year_df.columns:
_, h_months_5y = h_min_100(five_year_df[col], month_len=21)
else:
h_months_5y = np.nan
hmin_5y_months_dict[col] = h_months_5y
asset_metrics_df = (
pd.DataFrame({
'ISIN': period_df.columns,
'Rendimento_Ann': asset_ann_return.reindex(period_df.columns).values,
'Volatilita_Ann': asset_ann_vol.reindex(period_df.columns).values,
'CAGR': asset_cagr.reindex(period_df.columns).values,
'R2_Equity': asset_r2.reindex(period_df.columns).values,
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
})
.merge(df[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
[['ISIN', 'Nome', 'Categoria', 'Asset Class',
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']]
.sort_values('ISIN', kind='stable')
.reset_index(drop=True)
)
per_asset_metrics[name] = asset_metrics_df
# ---------- OTTIMIZZAZIONE ----------
ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix)
# Vincoli PesoFisso / PesoMax
for _, row in df.iterrows():
isin_i = row['ISIN']
if isin_i in period_df.columns:
idx = period_df.columns.get_loc(isin_i)
pf = row.get('PesoFisso')
pm = row.get('PesoMax')
if pd.notna(pf):
ef.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val)
elif pd.notna(pm):
ef.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw)
# Vincoli per Categoria
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
for cat, maxw in categories_limits.items():
isin_list = df[df['Categoria'] == cat]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# Vincoli per Asset Class
asset_class_limits = {
'Azionari': 0.75, 'Obbligazionari': 0.75,
'Metalli Preziosi': 0.20, 'Materie Prime': 0.05,
'Immobiliare': 0.05, 'Criptovalute': 0.05, 'Monetari': 0.1
}
for ac, maxw in asset_class_limits.items():
isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# ---------- Risoluzione ----------
try:
ef.efficient_risk(target_volatility=target_vol)
weights = ef.clean_weights()
optimized_weights[name] = pd.Series(weights)
exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=riskfree_rate)
print(f"=== Ottimizzazione: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
# --- Beneficio di diversificazione ---
w_vec_tmp = np.array([weights.get(isin, 0) for isin in period_df.columns])
indiv_ann_vols = np.sqrt(np.diag(annual_covariance_matrix.loc[period_df.columns, period_df.columns].values))
weighted_avg_vol = float(np.dot(w_vec_tmp, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
# --- File Excel per import gestionale (uno per portafoglio) ---
template_cols = list(template_df.columns)
results_rows = []
for isin, weight in weights.items():
if weight > 0:
r_sel = df.loc[df['ISIN'] == isin]
codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else ""
nome = r_sel['Nome'].iloc[0] if not r_sel.empty else ""
row = {col: "" for col in template_cols}
row['cod_por'] = f'PTFOPT{name}'
row['cod_tit'] = codice_titolo
row['des_tit'] = nome
row['peso'] = float(weight * 99)
results_rows.append(row)
results_full_df = pd.DataFrame(results_rows, columns=template_cols)
output_df = pd.concat([template_df.iloc[0:0], results_full_df], ignore_index=True)
output_file_path = excel_path(f'PTFOPT{name}.xlsx')
output_df.to_excel(output_file_path, index=False)
print(f"File {output_file_path} saved successfully.")
# --- Pie chart asset allocation (se ci sono pesi > 0) ---
asset_allocations = {asset: 0 for asset in asset_class_limits}
for isin, weight in weights.items():
r_sel = df.loc[df['ISIN'] == isin]
if r_sel.empty:
continue
asset_allocations.setdefault(r_sel['Asset Class'].iloc[0], 0)
asset_allocations[r_sel['Asset Class'].iloc[0]] += weight
if sum(asset_allocations.values()) > 0:
plt.figure(figsize=(8, 6))
plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%')
plt.title(f'Asset Allocation for {name}')
pie_path = plot_path(f'Asset_Allocation_{name}.png')
plt.savefig(pie_path, dpi=150, bbox_inches='tight')
plt.close()
except OptimizationError as e:
print(f"Optimization failed for {name}: {e}")
optimized_weights[name] = pd.Series([0] * len(annual_returns_mean))
# =========================
# RIEPILOGO METRICHE (PORTAFOGLI PH1)
# =========================
summary_data = []
for (years, target_vol), name in volatility_targets.items():
if name in optimized_weights.columns:
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
w_series = optimized_weights[name].reindex(period_df.columns).fillna(0.0)
w_vec = w_series.values
port_returns = (period_df[period_df.columns] * w_series).sum(axis=1)
n_days = int(port_returns.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
port_ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
port_ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
port_cagr = (gross**(1.0 / years_elapsed) - 1.0) if (years_elapsed and years_elapsed > 0 and gross and gross > 0) else np.nan
port_r2 = r2_equity_line(port_returns)
port_maxdd, port_dddur, port_ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
port_aaw, port_auw, port_heal = heal_index_metrics(port_returns)
common_cols = [c for c in w_series.index if c in five_year_df.columns]
if len(common_cols) > 0:
w_5y = w_series.reindex(common_cols).fillna(0.0)
port_returns_5y = (five_year_df[common_cols] * w_5y).sum(axis=1)
_, port_hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
else:
port_hmin_5y_months = np.nan
exp_ret = float(np.dot(w_vec, annual_returns_mean.loc[period_df.columns].values))
cov_mat = annual_covariance_matrix.loc[period_df.columns, period_df.columns].values
exp_vol = float(np.sqrt(np.dot(w_vec, np.dot(cov_mat, w_vec))))
sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
indiv_ann_vols = np.sqrt(np.diag(cov_mat))
weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
diversification_ratio = weighted_avg_vol / exp_vol if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"=== Riepilogo: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
print(f"Diversification Ratio: {diversification_ratio:.3f}" if not np.isnan(diversification_ratio) else "Diversification Ratio: NaN")
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
summary_data.append({
"Portafoglio": name,
"Years": years,
"Target Vol": f"{target_vol:.2%}",
"Expected annual return": f"{exp_ret:.2%}",
"Annual volatility": f"{exp_vol:.2%}",
"Sharpe Ratio": f"{sharpe:.2f}",
"Beneficio di diversificazione": f"{diversification_benefit:.2%}" if not np.isnan(diversification_benefit) else "",
"Rendimento_Ann": f"{port_ann_return:.2%}" if pd.notna(port_ann_return) else "",
"Volatilita_Ann": f"{port_ann_vol:.2%}" if pd.notna(port_ann_vol) else "",
"CAGR": f"{port_cagr:.2%}" if pd.notna(port_cagr) else "",
"R2_Equity": round(port_r2, 3) if pd.notna(port_r2) else np.nan,
"MaxDD": f"{port_maxdd:.2%}" if pd.notna(port_maxdd) else "",
"DD_Duration_Max": int(port_dddur) if pd.notna(port_dddur) else "",
"TTR_from_MDD": int(port_ttr) if pd.notna(port_ttr) else "",
"AAW": float(port_aaw) if pd.notna(port_aaw) else np.nan,
"AUW": float(port_auw) if pd.notna(port_auw) else np.nan,
"Heal_Index": float(port_heal) if pd.notna(port_heal) else np.nan,
"H_min_100m_5Y": int(port_hmin_5y_months) if pd.notna(port_hmin_5y_months) else ""
})
# =========================
# PLOT EQUITY/UNDERWATER (PH1)
# =========================
def plot_equity_overlay_all(port_names=None):
if port_names is None:
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
period_start_date = end_date - pd.DateOffset(years=5)
period_df = final_df.loc[period_start_date:end_date]
available_cols = set(optimized_weights.columns)
plotted = 0
plt.figure(figsize=(11, 6))
for pname in port_names:
if pname not in available_cols:
print(f"[plot] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.")
continue
w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0)
port_returns = (period_df[w_series.index] * w_series).sum(axis=1)
equity = (1.0 + port_returns).cumprod()
plt.plot(equity.index, equity.values, label=pname)
plotted += 1
if plotted == 0:
print("[plot] Nessun portafoglio valido da plottare.")
plt.close()
return
plt.title("Equity line - Portafogli ottimizzati (ultimi 5 anni)")
plt.xlabel("Data")
plt.ylabel("Equity (base=1.0)")
plt.grid(True, alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
out_png = plot_path("Equity_ALL_PORTS.png")
plt.savefig(out_png, dpi=150, bbox_inches='tight')
plt.close()
print(f"[plot] Grafico sovrapposto salvato: {out_png}")
def plot_underwater_overlay_all(port_names=None, ylim=(-0.3, 0.0)):
if port_names is None:
port_names = ['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y']
period_start_date = end_date - pd.DateOffset(years=5)
period_df = final_df.loc[period_start_date:end_date]
available_cols = set(optimized_weights.columns)
plotted = 0
plt.figure(figsize=(11, 6))
for pname in port_names:
if pname not in available_cols:
print(f"[underwater] Portafoglio '{pname}' non trovato in optimized_weights. Skipping.")
continue
w_series = optimized_weights[pname].reindex(period_df.columns).fillna(0.0)
port_returns = (period_df[w_series.index] * w_series).sum(axis=1)
equity = (1.0 + port_returns).cumprod()
run_max = equity.cummax()
dd = equity / run_max - 1.0
plt.plot(dd.index, dd.values, label=pname)
plotted += 1
if plotted == 0:
print("[underwater] Nessun portafoglio valido da plottare.")
plt.close()
return
plt.title("Underwater (Drawdown) - Portafogli ottimizzati (ultimi 5 anni)")
plt.xlabel("Data")
plt.ylabel("Drawdown")
if ylim is not None:
plt.ylim(*ylim)
plt.grid(True, alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
out_png = plot_path("Underwater_ALL_PORTS.png")
plt.savefig(out_png, dpi=150, bbox_inches='tight')
plt.close()
print(f"[underwater] Grafico sovrapposto salvato: {out_png}")
plot_equity_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'])
plot_underwater_overlay_all(['VAR3_5Y', 'VAR6_1Y', 'VAR6_3Y', 'VAR6_5Y', 'VAR9_5Y'])
# =========================
# EXPORT - (1) ASSET METRICS SOLO
# =========================
asset_metrics_path = excel_path('asset_metrics_v2.5.xlsx')
with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer:
for name, metrics_df in per_asset_metrics.items():
metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False)
consolidated = []
for name, metrics_df in per_asset_metrics.items():
tmp = metrics_df.copy()
tmp.insert(0, 'Periodo', name)
consolidated.append(tmp)
consolidated_df = pd.concat(consolidated, ignore_index=True) if consolidated else pd.DataFrame()
if not consolidated_df.empty:
consolidated_df.to_excel(writer, sheet_name='Metriche_Consolidate', index=False)
print(f"File '{asset_metrics_path}' creato con soli fogli Metriche_* e Metriche_Consolidate.")
# =========================
# COSTRUZIONE "WITH NAMES" (solo PH1)
# =========================
optimized_weights_with_names = optimized_weights.copy()
optimized_weights_with_names['Nome ETF'] = [
df.loc[df['ISIN'] == isin, 'Nome'].values[0] if (df['ISIN'] == isin).any() else ""
for isin in optimized_weights.index
]
# =========================
# EXPORT - (2) RIEPILOGO PESI & METRICHE
# =========================
summary_df = pd.DataFrame(summary_data)
summary_path = excel_path('optimized_weights_summary_v2.5.xlsx')
with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer:
optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True)
summary_df.to_excel(writer, sheet_name='metriche', index=False)
print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'metriche'.")

View File

@@ -0,0 +1,563 @@
# -*- coding: utf-8 -*-
"""
Ottimizzatore v2.5.2 - VERSIONE LITE
"""
# =========================
# IMPORT & PARAMETRI
# =========================
import sys
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from pypfopt import risk_models
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt.exceptions import OptimizationError
# Cartelle di input/output/plot
OUTPUT_DIR = "Output"
INPUT_DIR = "Input"
PLOT_DIR = "Plot"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(PLOT_DIR, exist_ok=True)
def excel_path(filename: str) -> str:
"""Costruisce il percorso completo per un file Excel nella cartella di output."""
return os.path.join(OUTPUT_DIR, filename)
def plot_path(filename: str) -> str:
"""Costruisce il percorso completo per un file PNG nella cartella Plot."""
return os.path.join(PLOT_DIR, filename)
# ---------------------------------
# Utility per R^2 sullequity line
# ---------------------------------
def r2_equity_line(returns: pd.Series) -> float:
"""R^2 della regressione OLS di log(equity) sul tempo (con intercetta)."""
s = returns.dropna()
if s.size < 3:
return np.nan
equity = (1.0 + s).cumprod()
equity = equity.replace([0, np.inf, -np.inf], np.nan).dropna()
if equity.size < 3:
return np.nan
y = np.log(equity.values)
if np.allclose(y.var(ddof=1), 0.0):
return 0.0
x = np.arange(y.size, dtype=float)
X = np.column_stack([np.ones_like(x), x])
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - y.mean()) ** 2)
r2 = 1.0 - (ss_res / ss_tot) if ss_tot > 0 else np.nan
if np.isnan(r2):
return np.nan
return float(np.clip(r2, 0.0, 1.0))
# ---------------------------------
# Utility per metriche di drawdown
# ---------------------------------
def drawdown_metrics(returns: pd.Series, sentinel_ttr: int = 1250):
"""
Calcola:
- max_dd: profondità massima del drawdown (negativa o zero)
- max_dd_duration: durata massima (in giorni) di qualsiasi drawdown
- ttr_from_mdd: giorni dal minimo del Max DD al pieno recupero del picco precedente (sentinel se non recupera)
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
# Max Drawdown (valore più negativo)
max_dd = float(dd.min()) if dd.size else np.nan
# Durata massima di drawdown (giorni consecutivi sotto zero drawdown)
under_water = dd < 0
if under_water.any():
max_dd_duration = 0
current = 0
for flag in under_water.values:
if flag:
current += 1
if current > max_dd_duration:
max_dd_duration = current
else:
current = 0
else:
max_dd_duration = 0
# Time-to-Recovery dal Max DD
if dd.size:
trough_idx = int(np.argmin(dd.values))
if trough_idx > 0:
peak_idx = int(np.argmax(equity.values[:trough_idx+1]))
peak_level = float(equity.values[peak_idx])
rec_idx = None
for t in range(trough_idx + 1, equity.size):
if equity.values[t] >= peak_level:
rec_idx = t
break
if rec_idx is None:
ttr_from_mdd = sentinel_ttr # non recuperato
else:
ttr_from_mdd = rec_idx - trough_idx
else:
ttr_from_mdd = np.nan
else:
ttr_from_mdd = np.nan
return max_dd, int(max_dd_duration), (int(ttr_from_mdd) if not np.isnan(ttr_from_mdd) else np.nan)
# ---------------------------------
# Utility per AAW, AUW e Heal Index
# ---------------------------------
def heal_index_metrics(returns: pd.Series):
"""
Calcola:
- AAW: area sopra acqua (run-up vs minimo cumulato)
- AUW: area sotto acqua (drawdown vs massimo cumulato)
- Heal Index: (AAW - AUW) / AUW
"""
s = returns.fillna(0.0).astype(float)
if s.size == 0:
return np.nan, np.nan, np.nan
equity = (1.0 + s).cumprod()
if equity.size == 0:
return np.nan, np.nan, np.nan
run_max = equity.cummax()
dd = equity / run_max - 1.0
AUW = float((-dd[dd < 0]).sum()) if dd.size else np.nan
run_min = equity.cummin()
ru = equity / run_min - 1.0
AAW = float((ru[ru > 0]).sum()) if ru.size else np.nan
heal = ((AAW - AUW) / AUW) if (AUW is not None and np.isfinite(AUW) and AUW > 0) else np.nan
return AAW, AUW, heal
# ---------------------------------
# Utility per H_min (100% finestre positive)
# ---------------------------------
def h_min_100(returns: pd.Series, month_len: int = 21):
"""
Orizzonte minimo h_days tale che TUTTE le finestre rolling di ampiezza h_days
hanno rendimento cumulato >= 0. Restituisce (h_days, ceil(h_days/21)).
"""
s = returns.dropna().astype(float)
n = s.size
if n == 0:
return np.nan, np.nan
log1p = np.log1p(s.values)
csum = np.cumsum(log1p)
def rolling_sum_k(k: int):
if k > n:
return np.array([])
head = csum[k - 1:]
tail = np.concatenate(([0.0], csum[:-k]))
return head - tail
for k in range(1, n + 1):
rs = rolling_sum_k(k)
if rs.size == 0:
break
roll_ret = np.exp(rs) - 1.0
if np.all(roll_ret >= 0):
h_days = k
h_months = int(np.ceil(h_days / month_len))
return h_days, h_months
return np.nan, np.nan
# --- Lettura parametri dal file connection.txt ---
params = {}
with open("connection.txt", "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("=", 1)
params[key.strip()] = value.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
connection_string = (
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
print("Connection string letta correttamente")
# =========================
# CONNESSIONE AL DB
# =========================
try:
engine = create_engine(connection_string)
with engine.connect() as connection:
_ = connection.execute(text("SELECT 1"))
print("Connessione al database riuscita.")
except SQLAlchemyError as e:
print("Errore durante la connessione al database:", e)
sys.exit()
# =========================
# INPUT / TEMPLATE
# =========================
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
template_df = pd.read_excel(template_path)
file_path = os.path.join(INPUT_DIR, 'Universo per ottimizzatore v.2.4.xlsx')
df = pd.read_excel(
file_path,
usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'PesoFisso', 'Codice Titolo'],
dtype={'Codice Titolo': str}
)
# =========================
# SERIE STORICHE RENDIMENTI
# =========================
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
start_date = end_date - pd.DateOffset(years=5)
all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize()
final_df = pd.DataFrame(index=all_dates)
isin_from_db = set()
for isin in df['ISIN'].unique():
print(f"Working on ISIN: {isin}")
procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = 1305, @PtfCurr = EUR"
try:
temp_df = pd.read_sql_query(procedure_call, engine)
if temp_df.empty:
print(f"Nessun dato recuperato per {isin}, skipping...")
continue
temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d', errors='coerce').dt.normalize()
temp_df = temp_df.dropna(subset=['Px_Date'])
temp_df.set_index('Px_Date', inplace=True)
temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100
final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates)
isin_from_db.add(isin)
print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.")
except SQLAlchemyError as e:
print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e)
final_df.fillna(0, inplace=True)
# -------- H_min sempre su 5 anni (21 gg = 1 mese) --------
five_year_df = final_df.loc[end_date - pd.DateOffset(years=5): end_date]
# =========================
# CONFIGURAZIONE OBIETTIVI
# =========================
volatility_targets = {
# (1, 0.06): 'VAR3_1Y',
# (3, 0.06): 'VAR3_3Y',
(5, 0.06): 'VAR3_5Y',
(1, 0.12): 'VAR6_1Y',
(3, 0.12): 'VAR6_3Y',
(5, 0.12): 'VAR6_5Y',
# (1, 0.18): 'VAR9_1Y',
# (3, 0.18): 'VAR9_3Y',
(5, 0.18): 'VAR9_5Y'
}
days_per_year = 252
riskfree_rate = 0.02
# =========================
# LOOP OTTIMIZZAZIONI (PH1 tradizionale)
# =========================
optimized_weights = pd.DataFrame()
per_asset_metrics = {}
for (years, target_vol), name in volatility_targets.items():
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
# ---------- PER-ASSET METRICS ----------
n_days = int(period_df.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
asset_ann_return = daily_returns_mean * days_per_year
asset_ann_vol = period_df.std(ddof=1) * np.sqrt(days_per_year)
gross = (1.0 + period_df).prod(skipna=True)
asset_cagr = gross.pow(1.0 / years_elapsed) - 1.0 if years_elapsed and years_elapsed > 0 else pd.Series(np.nan, index=period_df.columns)
asset_r2 = pd.Series({col: r2_equity_line(period_df[col]) for col in period_df.columns}, index=period_df.columns)
maxdd_dict, dddur_dict, ttr_dict = {}, {}, {}
aaw_dict, auw_dict, heal_dict = {}, {}, {}
hmin_5y_months_dict = {}
for col in period_df.columns:
mdd, dddur, ttr = drawdown_metrics(period_df[col], sentinel_ttr=1250)
maxdd_dict[col], dddur_dict[col], ttr_dict[col] = mdd, dddur, ttr
aaw, auw, heal = heal_index_metrics(period_df[col])
aaw_dict[col], auw_dict[col], heal_dict[col] = aaw, auw, heal
if col in five_year_df.columns:
_, h_months_5y = h_min_100(five_year_df[col], month_len=21)
else:
h_months_5y = np.nan
hmin_5y_months_dict[col] = h_months_5y
asset_metrics_df = (
pd.DataFrame({
'ISIN': period_df.columns,
'Rendimento_Ann': asset_ann_return.reindex(period_df.columns).values,
'Volatilita_Ann': asset_ann_vol.reindex(period_df.columns).values,
'CAGR': asset_cagr.reindex(period_df.columns).values,
'R2_Equity': asset_r2.reindex(period_df.columns).values,
'MaxDD': pd.Series(maxdd_dict).reindex(period_df.columns).values,
'DD_Duration_Max': pd.Series(dddur_dict).reindex(period_df.columns).values,
'TTR_from_MDD': pd.Series(ttr_dict).reindex(period_df.columns).values,
'AAW': pd.Series(aaw_dict).reindex(period_df.columns).values,
'AUW': pd.Series(auw_dict).reindex(period_df.columns).values,
'Heal_Index': pd.Series(heal_dict).reindex(period_df.columns).values,
'H_min_100m_5Y': pd.Series(hmin_5y_months_dict).reindex(period_df.columns).values
})
.merge(df[['ISIN', 'Nome', 'Categoria', 'Asset Class']], on='ISIN', how='left')
[['ISIN', 'Nome', 'Categoria', 'Asset Class',
'Rendimento_Ann', 'Volatilita_Ann', 'CAGR', 'R2_Equity',
'MaxDD', 'DD_Duration_Max', 'TTR_from_MDD',
'AAW', 'AUW', 'Heal_Index', 'H_min_100m_5Y']]
.sort_values('ISIN', kind='stable')
.reset_index(drop=True)
)
per_asset_metrics[name] = asset_metrics_df
# ---------- OTTIMIZZAZIONE ----------
ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix)
# Vincoli PesoFisso / PesoMax
for _, row in df.iterrows():
isin_i = row['ISIN']
if isin_i in period_df.columns:
idx = period_df.columns.get_loc(isin_i)
pf = row.get('PesoFisso')
pm = row.get('PesoMax')
if pd.notna(pf):
ef.add_constraint(lambda w, idx=idx, val=pf: w[idx] == val)
elif pd.notna(pm):
ef.add_constraint(lambda w, idx=idx, maxw=pm: w[idx] <= maxw)
# Vincoli per Categoria
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
for cat, maxw in categories_limits.items():
isin_list = df[df['Categoria'] == cat]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# Vincoli per Asset Class
asset_class_limits = {
'Azionari': 0.75, 'Obbligazionari': 0.75,
'Metalli Preziosi': 0.20, 'Materie Prime': 0.05,
'Immobiliare': 0.05, 'Criptovalute': 0.05, 'Monetari': 0.1
}
for ac, maxw in asset_class_limits.items():
isin_list = df[df['Asset Class'] == ac]['ISIN'].tolist()
idxs = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
if idxs:
ef.add_constraint(lambda w, idxs=idxs, maxw=maxw: sum(w[i] for i in idxs) <= maxw)
# ---------- Risoluzione ----------
try:
ef.efficient_risk(target_volatility=target_vol)
weights = ef.clean_weights()
optimized_weights[name] = pd.Series(weights)
exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=riskfree_rate)
print(f"=== Ottimizzazione: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
# --- Beneficio di diversificazione ---
w_vec_tmp = np.array([weights.get(isin, 0) for isin in period_df.columns])
indiv_ann_vols = np.sqrt(np.diag(annual_covariance_matrix.loc[period_df.columns, period_df.columns].values))
weighted_avg_vol = float(np.dot(w_vec_tmp, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
# --- File Excel per import gestionale (uno per portafoglio) ---
template_cols = list(template_df.columns)
results_rows = []
for isin, weight in weights.items():
if weight > 0:
r_sel = df.loc[df['ISIN'] == isin]
codice_titolo = r_sel['Codice Titolo'].iloc[0] if not r_sel.empty else ""
nome = r_sel['Nome'].iloc[0] if not r_sel.empty else ""
row = {col: "" for col in template_cols}
row['cod_por'] = f'PTFOPT{name}'
row['cod_tit'] = codice_titolo
row['des_tit'] = nome
row['peso'] = float(weight * 99)
results_rows.append(row)
results_full_df = pd.DataFrame(results_rows, columns=template_cols)
output_df = pd.concat([template_df.iloc[0:0], results_full_df], ignore_index=True)
output_file_path = excel_path(f'PTFOPT{name}.xlsx')
output_df.to_excel(output_file_path, index=False)
print(f"File {output_file_path} saved successfully.")
# --- Pie chart asset allocation: salva in Output senza mostrare ---
asset_allocations = {asset: 0 for asset in ['Azionari', 'Obbligazionari', 'Metalli Preziosi', 'Materie Prime', 'Immobiliare', 'Criptovalute', 'Monetari']}
for isin, weight in weights.items():
r_sel = df.loc[df['ISIN'] == isin]
if r_sel.empty:
continue
asset_class = r_sel['Asset Class'].iloc[0]
asset_allocations.setdefault(asset_class, 0)
asset_allocations[asset_class] += weight
total_alloc = sum(asset_allocations.values())
if total_alloc > 0:
plt.figure(figsize=(8, 6))
plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%')
plt.title(f'Asset Allocation for {name}')
pie_path = plot_path(f'Asset_Allocation_{name}.png')
plt.savefig(pie_path, dpi=150, bbox_inches='tight')
plt.close()
except OptimizationError as e:
print(f"Optimization failed for {name}: {e}")
optimized_weights[name] = pd.Series([0] * len(annual_returns_mean))
# =========================
# RIEPILOGO METRICHE (PORTAFOGLI PH1)
# =========================
summary_data = []
for (years, target_vol), name in volatility_targets.items():
if name in optimized_weights.columns:
period_start_date = end_date - pd.DateOffset(years=years)
period_df = final_df.loc[period_start_date:end_date]
daily_returns_mean = period_df.mean()
annual_returns_mean = daily_returns_mean * days_per_year
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
w_series = optimized_weights[name].reindex(period_df.columns).fillna(0.0)
w_vec = w_series.values
port_returns = (period_df[period_df.columns] * w_series).sum(axis=1)
n_days = int(port_returns.shape[0])
years_elapsed = n_days / days_per_year if n_days > 0 else np.nan
port_ann_return = float(port_returns.mean() * days_per_year) if n_days > 0 else np.nan
port_ann_vol = float(port_returns.std(ddof=1) * np.sqrt(days_per_year)) if n_days > 1 else np.nan
gross = float((1.0 + port_returns).prod()) if n_days > 0 else np.nan
port_cagr = (gross**(1.0 / years_elapsed) - 1.0) if (years_elapsed and years_elapsed > 0 and gross and gross > 0) else np.nan
port_r2 = r2_equity_line(port_returns)
port_maxdd, port_dddur, port_ttr = drawdown_metrics(port_returns, sentinel_ttr=1250)
port_aaw, port_auw, port_heal = heal_index_metrics(port_returns)
common_cols = [c for c in w_series.index if c in five_year_df.columns]
if len(common_cols) > 0:
w_5y = w_series.reindex(common_cols).fillna(0.0)
port_returns_5y = (five_year_df[common_cols] * w_5y).sum(axis=1)
_, port_hmin_5y_months = h_min_100(port_returns_5y, month_len=21)
else:
port_hmin_5y_months = np.nan
exp_ret = float(np.dot(w_vec, annual_returns_mean.loc[period_df.columns].values))
cov_mat = annual_covariance_matrix.loc[period_df.columns, period_df.columns].values
exp_vol = float(np.sqrt(np.dot(w_vec, np.dot(cov_mat, w_vec))))
sharpe = (exp_ret - riskfree_rate) / exp_vol if exp_vol > 0 else np.nan
indiv_ann_vols = np.sqrt(np.diag(cov_mat))
weighted_avg_vol = float(np.dot(w_vec, indiv_ann_vols))
diversification_benefit = (exp_vol / weighted_avg_vol) - 1 if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
diversification_ratio = weighted_avg_vol / exp_vol if (weighted_avg_vol > 0 and exp_vol > 0) else np.nan
print(f"=== Riepilogo: {name} (anni={years}, target_vol={target_vol}) ===")
print(f"Expected annual return: {exp_ret:.2%}")
print(f"Annual volatility: {exp_vol:.2%}")
print(f"Sharpe Ratio: {sharpe:.2f}")
print(f"Diversification Ratio: {diversification_ratio:.3f}" if not np.isnan(diversification_ratio) else "Diversification Ratio: NaN")
print(f"Beneficio di diversificazione: {diversification_benefit:.2%}")
summary_data.append({
"Portafoglio": name,
"Years": years,
"Target Vol": f"{target_vol:.2%}",
"Expected annual return": f"{exp_ret:.2%}",
"Annual volatility": f"{exp_vol:.2%}",
"Sharpe Ratio": f"{sharpe:.2f}",
"Beneficio di diversificazione": f"{diversification_benefit:.2%}" if not np.isnan(diversification_benefit) else "",
"Rendimento_Ann": f"{port_ann_return:.2%}" if pd.notna(port_ann_return) else "",
"Volatilita_Ann": f"{port_ann_vol:.2%}" if pd.notna(port_ann_vol) else "",
"CAGR": f"{port_cagr:.2%}" if pd.notna(port_cagr) else "",
"R2_Equity": round(port_r2, 3) if pd.notna(port_r2) else np.nan,
"MaxDD": f"{port_maxdd:.2%}" if pd.notna(port_maxdd) else "",
"DD_Duration_Max": int(port_dddur) if pd.notna(port_dddur) else "",
"TTR_from_MDD": int(port_ttr) if pd.notna(port_ttr) else "",
"AAW": float(port_aaw) if pd.notna(port_aaw) else np.nan,
"AUW": float(port_auw) if pd.notna(port_auw) else np.nan,
"Heal_Index": float(port_heal) if pd.notna(port_heal) else np.nan,
"H_min_100m_5Y": int(port_hmin_5y_months) if pd.notna(port_hmin_5y_months) else ""
})
# =========================
# EXPORT — (1) ASSET METRICS SOLO
# =========================
asset_metrics_path = excel_path('asset_metrics_v2.5.xlsx')
with pd.ExcelWriter(asset_metrics_path, engine='openpyxl', mode='w') as writer:
for name, metrics_df in per_asset_metrics.items():
metrics_df.to_excel(writer, sheet_name=f'Metriche_{name}', index=False)
consolidated = []
for name, metrics_df in per_asset_metrics.items():
tmp = metrics_df.copy()
tmp.insert(0, 'Periodo', name)
consolidated.append(tmp)
consolidated_df = pd.concat(consolidated, ignore_index=True) if consolidated else pd.DataFrame()
if not consolidated_df.empty:
consolidated_df.to_excel(writer, sheet_name='Metriche_Consolidate', index=False)
print(f"File '{asset_metrics_path}' creato con soli fogli Metriche_* e Metriche_Consolidate.")
# =========================
# COSTRUZIONE "WITH NAMES"
# =========================
optimized_weights_with_names = optimized_weights.copy()
optimized_weights_with_names['Nome ETF'] = [
df.loc[df['ISIN'] == isin, 'Nome'].values[0] if (df['ISIN'] == isin).any() else ""
for isin in optimized_weights.index
]
# =========================
# EXPORT — (2) RIEPILOGO PESI (SOLO PH1)
# =========================
summary_df = pd.DataFrame(summary_data)
summary_path = excel_path('optimized_weights_summary_v2.5_LITE.xlsx')
with pd.ExcelWriter(summary_path, engine='openpyxl', mode='w') as writer:
optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True)
summary_df.to_excel(writer, sheet_name='Riepilogo', index=False)
print(f"File '{summary_path}' creato con 'Pesi Ottimizzati' e 'Riepilogo' (solo PH1, versione LITE).")

BIN
Input/Template_Guardian.xls Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
Output/PTFOPTVAR3_5Y.xlsx Normal file

Binary file not shown.

BIN
Output/PTFOPTVAR3_GBP.xlsx Normal file

Binary file not shown.

BIN
Output/PTFOPTVAR6_1Y.xlsx Normal file

Binary file not shown.

BIN
Output/PTFOPTVAR6_3Y.xlsx Normal file

Binary file not shown.

BIN
Output/PTFOPTVAR6_5Y.xlsx Normal file

Binary file not shown.

BIN
Output/PTFOPTVAR6_GBP.xlsx Normal file

Binary file not shown.

BIN
Output/PTFOPTVAR9_5Y.xlsx Normal file

Binary file not shown.

BIN
Output/PTFOPTVAR9_GBP.xlsx Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 43 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

BIN
Plot/Equity_ALL_PORTS.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 193 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 103 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 209 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

5
connection.txt Normal file
View File

@@ -0,0 +1,5 @@
username=readonly
password=e8nqtSa39L4Le3
host=26.69.45.60
port=1433
database=FirstSolutionDB