216 lines
8.7 KiB
Python
216 lines
8.7 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from pypfopt import EfficientFrontier, risk_models, expected_returns
|
|
from pypfopt.exceptions import OptimizationError
|
|
import matplotlib.pyplot as plt
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
# Cartelle di input/output/plot
|
|
OUTPUT_DIR = "Output"
|
|
INPUT_DIR = "Input"
|
|
PLOT_DIR = "Plot"
|
|
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
os.makedirs(INPUT_DIR, exist_ok=True)
|
|
os.makedirs(PLOT_DIR, exist_ok=True)
|
|
|
|
def excel_path(filename: str) -> str:
|
|
"""Percorso completo per i file Excel di output."""
|
|
return os.path.join(OUTPUT_DIR, filename)
|
|
|
|
def plot_path(filename: str) -> str:
|
|
"""Percorso completo per i file di grafico."""
|
|
return os.path.join(PLOT_DIR, filename)
|
|
|
|
# Configurazione della connessione al database (da connection.txt)
|
|
params = {}
|
|
with open("connection.txt", "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
key, value = line.split('=', 1)
|
|
params[key.strip()] = value.strip()
|
|
|
|
username = params.get('username')
|
|
password = params.get('password')
|
|
host = params.get('host')
|
|
port = params.get('port', '1433')
|
|
database = params.get('database')
|
|
connection_string = (
|
|
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver=ODBC+Driver+17+for+SQL+Server"
|
|
)
|
|
|
|
try:
|
|
engine = create_engine(connection_string)
|
|
with engine.connect() as connection:
|
|
_ = connection.execute(text("SELECT 1"))
|
|
print("Connessione al database riuscita.")
|
|
except SQLAlchemyError as e:
|
|
print("Errore durante la connessione al database:", e)
|
|
sys.exit()
|
|
|
|
# Caricamento del template Excel
|
|
template_path = os.path.join(INPUT_DIR, 'Template_Guardian.xls')
|
|
template_df = pd.read_excel(template_path)
|
|
|
|
# Caricamento dati degli ISIN
|
|
file_path = os.path.join(INPUT_DIR, 'Universo ETF per ottimizzatore UK.xlsx')
|
|
df = pd.read_excel(file_path, usecols=['ISIN', 'Nome', 'Categoria', 'Asset Class', 'PesoMax', 'Codice Titolo'],dtype={'Codice Titolo':str})
|
|
|
|
# Intervallo di date degli ultimi 5 anni, escludendo sabati e domeniche
|
|
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
|
|
start_date = end_date - pd.DateOffset(years=5)
|
|
all_dates = pd.date_range(start=start_date, end=end_date, freq='B').normalize()
|
|
|
|
# DataFrame vuoto con le date come indice
|
|
final_df = pd.DataFrame(index=all_dates)
|
|
|
|
# Iterazione sugli ISIN e recupero dei dati
|
|
isin_from_db = set()
|
|
for isin in df['ISIN'].unique():
|
|
print(f"Working on ISIN: {isin}")
|
|
procedure_call = f"EXEC opt_RendimentoGiornaliero1_GBP @ISIN = '{isin}', @n = 1305"
|
|
try:
|
|
temp_df = pd.read_sql_query(procedure_call, engine)
|
|
if temp_df.empty:
|
|
print(f"Nessun dato recuperato per {isin}, skipping...")
|
|
continue
|
|
temp_df['Px_Date'] = pd.to_datetime(temp_df['Px_Date'], format='%Y-%m-%d').dt.normalize()
|
|
temp_df.set_index('Px_Date', inplace=True)
|
|
temp_df['RendimentoGiornaliero'] = temp_df['RendimentoGiornaliero'] / 100
|
|
final_df[isin] = temp_df['RendimentoGiornaliero'].reindex(all_dates)
|
|
isin_from_db.add(isin)
|
|
print(f"Dati recuperati per {isin}: {final_df[isin].count()} righe di dati non-null prelevate.")
|
|
except SQLAlchemyError as e:
|
|
print(f"Errore durante l'esecuzione della stored procedure per {isin}:", e)
|
|
|
|
final_df.fillna(0, inplace=True)
|
|
|
|
# Configurazione degli obiettivi di volatilità
|
|
volatility_targets = {
|
|
(5, 0.06): 'VAR3_GBP',
|
|
#(1, 0.12): 'VAR6_1Y',
|
|
#(3, 0.12): 'VAR6_3Y',
|
|
(5, 0.12): 'VAR6_GBP',
|
|
(5, 0.18): 'VAR9_GBP'
|
|
}
|
|
|
|
# Definizione del numero di giorni lavorativi per anno
|
|
days_per_year = 252
|
|
riskfree_rate = 0.02
|
|
|
|
# Ottimizzazione per ciascun target di volatilità e salvataggio dei risultati
|
|
optimized_weights = pd.DataFrame()
|
|
summary_data = []
|
|
export_rows = []
|
|
for (years, target_vol), name in volatility_targets.items():
|
|
period_start_date = end_date - pd.DateOffset(years=years)
|
|
period_df = final_df.loc[period_start_date:end_date]
|
|
|
|
# Calcolo dei parametri per l'ottimizzazione
|
|
daily_returns_mean = period_df.mean()
|
|
annual_returns_mean = daily_returns_mean * days_per_year
|
|
annual_covariance_matrix = risk_models.sample_cov(period_df, returns_data=True)
|
|
|
|
ef = EfficientFrontier(annual_returns_mean, annual_covariance_matrix)
|
|
|
|
# Aggiunta dei vincoli per le categorie e le asset class
|
|
categories_limits = df.groupby('Categoria')['PesoMax'].max().to_dict()
|
|
asset_class_limits = {
|
|
'Azionari': 0.75,
|
|
'Obbligazionari': 0.75,
|
|
'Metalli Preziosi': 0.20,
|
|
'Materie Prime': 0.05,
|
|
'Immobiliare': 0.05
|
|
}
|
|
|
|
for category, max_weight in categories_limits.items():
|
|
isin_list = df[df['Categoria'] == category]['ISIN'].tolist()
|
|
category_idx = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
|
|
ef.add_constraint(lambda w: sum(w[i] for i in category_idx) <= max_weight)
|
|
ef.add_constraint(lambda w: sum(w[i] for i in category_idx) >= 0)
|
|
|
|
for asset_class, max_weight in asset_class_limits.items():
|
|
isin_list = df[df['Asset Class'] == asset_class]['ISIN'].tolist()
|
|
asset_class_idx = [period_df.columns.get_loc(isin) for isin in isin_list if isin in period_df.columns]
|
|
ef.add_constraint(lambda w: sum(w[i] for i in asset_class_idx) <= max_weight)
|
|
ef.add_constraint(lambda w: sum(w[i] for i in asset_class_idx) >= 0)
|
|
|
|
try:
|
|
ef.efficient_risk(target_volatility=target_vol)
|
|
weights = ef.clean_weights()
|
|
optimized_weights[name] = pd.Series(weights)
|
|
exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=True, risk_free_rate=riskfree_rate)
|
|
summary_data.append({
|
|
"Portafoglio": name,
|
|
"Years": years,
|
|
"Target Vol": f"{target_vol:.2%}",
|
|
"Expected annual return": f"{exp_ret:.2%}",
|
|
"Annual volatility": f"{exp_vol:.2%}",
|
|
"Sharpe Ratio": f"{sharpe:.2f}",
|
|
})
|
|
|
|
# Creazione del DataFrame per i risultati
|
|
results = []
|
|
for isin, weight in weights.items():
|
|
if weight > 0:
|
|
codice_titolo = df.loc[df['ISIN'] == isin, 'Codice Titolo'].values[0]
|
|
nome = df.loc[df['ISIN'] == isin, 'Nome'].values[0]
|
|
results.append({
|
|
'cod_por': f'PTFOPT{name}', # Nome del portafoglio
|
|
'cod_tit': codice_titolo, # Codice titolo
|
|
'des_tit': nome, # Nome titolo
|
|
'peso': weight * 99 # Peso, moltiplicato per 99
|
|
})
|
|
|
|
# Creazione del DataFrame per i risultati
|
|
results_df = pd.DataFrame(results)
|
|
|
|
# Accumula le righe per esportazione unica
|
|
export_rows.append(results_df)
|
|
|
|
# Grafico a torta per ciascun portafoglio ottimizzato
|
|
asset_allocations = {asset: 0 for asset in asset_class_limits}
|
|
for isin, weight in weights.items():
|
|
asset_class = df.loc[df['ISIN'] == isin, 'Asset Class'].values[0]
|
|
asset_allocations[asset_class] += weight
|
|
plt.figure(figsize=(8, 6))
|
|
plt.pie(asset_allocations.values(), labels=asset_allocations.keys(), autopct='%1.1f%%')
|
|
plt.title(f'Asset Allocation for {name}')
|
|
pie_path = plot_path(f'Asset_Allocation_{name}.png')
|
|
plt.savefig(pie_path, dpi=150, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
except OptimizationError as e:
|
|
print(f"Optimization failed for {name}: {e}")
|
|
optimized_weights[name] = pd.Series([0] * len(annual_returns_mean))
|
|
|
|
# Aggiunta del nome dell'ETF nel DataFrame optimized_weights
|
|
optimized_weights_with_names = optimized_weights.copy()
|
|
optimized_weights_with_names['Nome ETF'] = [df.loc[df['ISIN'] == isin, 'Nome'].values[0] for isin in optimized_weights.index]
|
|
|
|
summary_df = pd.DataFrame(summary_data)
|
|
output_path = excel_path("Riepilogo pesi GBP.xlsx")
|
|
with pd.ExcelWriter(output_path, engine='openpyxl', mode='w') as writer:
|
|
optimized_weights_with_names.to_excel(writer, sheet_name='Pesi Ottimizzati', index=True)
|
|
summary_df.to_excel(writer, sheet_name='metriche', index=False)
|
|
|
|
print(f"All optimized weights saved to '{output_path}' (with metriche).")
|
|
|
|
# Export unico con tutti i pesi in un solo foglio
|
|
date_tag = datetime.now().strftime("%Y%m%d")
|
|
combined_path = excel_path(f"{date_tag} Pesi ottimizzati UK.xlsx")
|
|
with pd.ExcelWriter(combined_path, engine='openpyxl', mode='w') as writer:
|
|
if export_rows:
|
|
combined_df = pd.concat([template_df] + export_rows, ignore_index=True)
|
|
else:
|
|
combined_df = template_df.copy()
|
|
combined_df.to_excel(writer, sheet_name='Pesi Ottimizzati', index=False)
|
|
print(f"Pesi ottimizzati salvati in un unico file/sheet: '{combined_path}'.")
|
|
|
|
|