diff --git a/Ottimizzatore Quant Best Europe v.2.py b/Ottimizzatore Quant Best Europe v.2.py new file mode 100644 index 0000000..5b4e53e --- /dev/null +++ b/Ottimizzatore Quant Best Europe v.2.py @@ -0,0 +1,561 @@ +# -*- coding: utf-8 -*- +""" +Ottimizzatore Quant Best Europe - versione ripulita con confronto Wealth-overview. + +Output principali: +1) Output/AAAAMMGG Pesi ottimizzati Quant Best Europe.xlsx +2) Output/10040912.538.xlsx + +Il secondo file confronta i pesi target generati dall'ottimizzatore con il file +Input/AAAAMMGG_HHMM_Wealth-overview.xlsx piu' recente, ignorando le righe con +Asset class = Liquidity e usando solo la colonna "% of net assets". +""" + +import os +import re +import sys +import logging +from datetime import datetime +from pathlib import Path + +import cvxpy as cp +import numpy as np +import pandas as pd +import yaml +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError + +from pypfopt import objective_functions as _obj +from pypfopt import risk_models +from pypfopt.efficient_frontier import EfficientFrontier +from pypfopt.exceptions import OptimizationError + + +# ========================= +# PARAMETRI GENERALI +# ========================= +BASE_DIR = Path(__file__).resolve().parent +INPUT_DIR = BASE_DIR / "Input" +OUTPUT_DIR = BASE_DIR / "Output" +CONFIG_FILE = BASE_DIR / "config.yaml" +CONNECTION_FILE = BASE_DIR / "connection.txt" + +UNIVERSE_FILE = INPUT_DIR / "Universo per Quant Best Europe.xlsx" +TEMPLATE_GUARDIAN_FILE = INPUT_DIR / "Template_Guardian.xls" + +OUTPUT_PORTFOLIO_CODE = "Quant BE" +TARGET_PORTFOLIO_NAME = "VAR6_3Y" +DAYS_PER_YEAR = 252 +RISKFREE_RATE = 0.02 +DB_OBSERVATIONS = 1305 +PTF_CURRENCY = "EUR" + +# I pesi del file import gestionale sono espressi in percentuale. +# Esempio: peso target 4.75 = 4.75% = 0.0475. +TARGET_WEIGHT_MULTIPLIER = 95.0 + +# Soglia minima per non produrre ordini irrilevanti per arrotondamenti. +MIN_TRADE_WEIGHT = 0.000001 # 0.0001% del NAV in forma decimale + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + +OUTPUT_DIR.mkdir(exist_ok=True) +INPUT_DIR.mkdir(exist_ok=True) + + +# ========================= +# PATCH PYPORTFOLIOOPT +# ========================= +def portfolio_variance_psdwrap(w, cov_matrix): + """Versione piu' robusta di portfolio_variance: evita errori ARPACK in CVXPY.""" + variance = cp.quad_form(w, cp.psd_wrap(cov_matrix)) + return _obj._objective_value(w, variance) + + +_obj.portfolio_variance = portfolio_variance_psdwrap + + +# ========================= +# UTILITY DI BASE +# ========================= +def read_key_value_file(path: Path) -> dict: + """Legge file testuali nel formato key=value.""" + if not path.exists(): + logger.error("File mancante: %s", path) + sys.exit(1) + + params = {} + with path.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + key, value = line.split("=", 1) + params[key.strip()] = value.strip() + return params + + +def load_targets_and_limits(config_file: Path): + """Legge target di volatilita' e limiti per asset class da config.yaml.""" + if not config_file.exists(): + logger.error("File di configurazione mancante: %s", config_file) + sys.exit(1) + + with config_file.open("r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) or {} + + vt_cfg = cfg.get("volatility_targets", {}) + vt_list = vt_cfg.get("default", []) if isinstance(vt_cfg, dict) else vt_cfg + if not vt_list: + logger.error("Sezione 'volatility_targets' mancante o vuota in config.yaml.") + sys.exit(1) + + volatility_targets = { + (int(item["years"]), float(item["target_vol"])): item["name"] + for item in vt_list + if {"years", "target_vol", "name"}.issubset(item) + } + if not volatility_targets: + logger.error("Nessun target di volatilita' valido trovato in config.yaml.") + sys.exit(1) + + asset_class_limits = cfg.get("asset_class_limits") or {} + if not asset_class_limits: + logger.error("Sezione 'asset_class_limits' mancante o vuota in config.yaml.") + sys.exit(1) + + return volatility_targets, {k: float(v) for k, v in asset_class_limits.items()} + + +def build_engine(connection_file: Path): + """Crea e verifica la connessione al database.""" + params = read_key_value_file(connection_file) + username = params.get("username") + password = params.get("password") + host = params.get("host") + port = params.get("port", "1433") + database = params.get("database") + + connection_string = ( + f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}" + "?driver=ODBC+Driver+17+for+SQL+Server" + ) + + try: + engine = create_engine(connection_string) + with engine.connect() as connection: + connection.execute(text("SELECT 1")) + logger.info("Connessione al database riuscita.") + return engine + except SQLAlchemyError as exc: + logger.error("Errore durante la connessione al database: %s", exc) + sys.exit(1) + + +def regularize_covariance(cov_df: pd.DataFrame, ridge_factor: float = 1e-6) -> pd.DataFrame: + """Simmetrizza la covarianza e aggiunge un piccolo ridge diagonale.""" + if cov_df.empty: + return cov_df + + sigma = cov_df.values.astype(float) + sigma = 0.5 * (sigma + sigma.T) + n = sigma.shape[0] + trace = np.trace(sigma) + eps = ridge_factor * (trace / n) if np.isfinite(trace) and n > 0 else ridge_factor + return pd.DataFrame(sigma + eps * np.eye(n), index=cov_df.index, columns=cov_df.columns) + + +def validate_universe(df_universe: pd.DataFrame) -> None: + required_cols = ["ISIN", "Nome", "Categoria", "Asset Class", "Bloomberg Ticker", "PesoMax", "PesoFisso"] + missing = [c for c in required_cols if c not in df_universe.columns] + if missing: + logger.error("Colonne mancanti nel file universo: %s", ", ".join(missing)) + sys.exit(1) + + duplicated = df_universe["ISIN"][df_universe["ISIN"].duplicated()].dropna().unique().tolist() + if duplicated: + logger.warning("ISIN duplicati nel file universo: %s", duplicated) + + if df_universe["ISIN"].isna().any(): + logger.error("Il file universo contiene righe con ISIN mancante.") + sys.exit(1) + + +# ========================= +# LETTURA INPUT E RENDIMENTI +# ========================= +def load_universe(path: Path) -> pd.DataFrame: + if not path.exists(): + logger.error("File universo mancante: %s", path) + sys.exit(1) + + # Legge tutte le colonne per consentire l'uso della colonna E come Bloomberg Ticker. + # Il file atteso contiene ora: + # A ISIN | B Nome | C Categoria | D Asset Class | E Bloomberg Ticker | F PesoMax | G PesoFisso + df = pd.read_excel(path, dtype=str) + df.columns = [str(c).strip() for c in df.columns] + + # Se in futuro il nome della colonna E venisse modificato, la normalizziamo comunque + # a "Bloomberg Ticker" per evitare rotture a valle. + if "Bloomberg Ticker" not in df.columns: + if len(df.columns) >= 5: + df = df.rename(columns={df.columns[4]: "Bloomberg Ticker"}) + else: + logger.error("Il file universo non contiene la colonna E con il ticker Bloomberg.") + sys.exit(1) + + validate_universe(df) + + df["ISIN"] = df["ISIN"].astype(str).str.strip() + df["Bloomberg Ticker"] = df["Bloomberg Ticker"].fillna("").astype(str).str.strip() + df["PesoMax"] = pd.to_numeric(df["PesoMax"], errors="coerce") + df["PesoFisso"] = pd.to_numeric(df["PesoFisso"], errors="coerce") + return df + + +def load_returns(engine, universe: pd.DataFrame, end_date: pd.Timestamp) -> pd.DataFrame: + """Carica i rendimenti giornalieri dei titoli dell'universo dal database.""" + start_date = end_date - pd.DateOffset(years=5) + all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize() + returns = pd.DataFrame(index=all_dates) + + for isin in universe["ISIN"].dropna().unique(): + logger.info("Caricamento rendimenti: %s", isin) + procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = {DB_OBSERVATIONS}, @PtfCurr = {PTF_CURRENCY}" + try: + tmp = pd.read_sql_query(procedure_call, engine) + except SQLAlchemyError as exc: + logger.warning("Errore nella stored procedure per %s: %s", isin, exc) + continue + + if tmp.empty: + logger.warning("Nessun rendimento recuperato per %s.", isin) + continue + + tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize() + tmp = tmp.dropna(subset=["Px_Date"]).set_index("Px_Date") + returns[isin] = (tmp["RendimentoGiornaliero"] / 100.0).reindex(all_dates) + logger.info("%s: %d osservazioni valide.", isin, returns[isin].count()) + + if returns.empty: + logger.error("Nessun dato di rendimento recuperato dal database.") + sys.exit(1) + + high_na = returns.isna().mean()[lambda s: s > 0.20] + if not high_na.empty: + logger.warning("Colonne con oltre 20%% di NaN prima del fill: %s", high_na.to_dict()) + + return returns.fillna(0.0) + + +# ========================= +# OTTIMIZZAZIONE +# ========================= +def add_weight_constraints(ef: EfficientFrontier, universe: pd.DataFrame, columns: pd.Index, asset_class_limits: dict) -> None: + """Applica vincoli per PesoFisso/PesoMax, Categoria e Asset Class.""" + for _, row in universe.iterrows(): + isin = row["ISIN"] + if isin not in columns: + continue + idx = columns.get_loc(isin) + fixed_weight = row.get("PesoFisso") + max_weight = row.get("PesoMax") + + if pd.notna(fixed_weight): + ef.add_constraint(lambda w, idx=idx, val=float(fixed_weight): w[idx] == val) + elif pd.notna(max_weight): + ef.add_constraint(lambda w, idx=idx, val=float(max_weight): w[idx] <= val) + + for category, max_weight in universe.groupby("Categoria")["PesoMax"].max().dropna().items(): + idxs = [columns.get_loc(isin) for isin in universe.loc[universe["Categoria"] == category, "ISIN"] if isin in columns] + if idxs: + ef.add_constraint(lambda w, idxs=idxs, val=float(max_weight): sum(w[i] for i in idxs) <= val) + + for asset_class, max_weight in asset_class_limits.items(): + idxs = [columns.get_loc(isin) for isin in universe.loc[universe["Asset Class"] == asset_class, "ISIN"] if isin in columns] + if idxs: + ef.add_constraint(lambda w, idxs=idxs, val=float(max_weight): sum(w[i] for i in idxs) <= val) + + +def run_optimizations(returns: pd.DataFrame, universe: pd.DataFrame, volatility_targets: dict, asset_class_limits: dict, end_date: pd.Timestamp) -> pd.DataFrame: + """Esegue le ottimizzazioni per tutti i target configurati.""" + optimized_weights = pd.DataFrame(index=returns.columns) + + for (years, target_vol), name in volatility_targets.items(): + period_start = end_date - pd.DateOffset(years=years) + period_returns = returns.loc[period_start:end_date] + + annual_returns = period_returns.mean() * DAYS_PER_YEAR + annual_cov = risk_models.sample_cov(period_returns, returns_data=True) + annual_cov = regularize_covariance(annual_cov) + + ef = EfficientFrontier(annual_returns, annual_cov) + add_weight_constraints(ef, universe, period_returns.columns, asset_class_limits) + + try: + ef.efficient_risk(target_volatility=target_vol) + weights = pd.Series(ef.clean_weights()).reindex(returns.columns).fillna(0.0) + optimized_weights[name] = weights + + exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=RISKFREE_RATE) + logger.info("%s: return %.2f%% | vol %.2f%% | sharpe %.2f", name, exp_ret * 100, exp_vol * 100, sharpe) + except OptimizationError as exc: + logger.warning("Ottimizzazione fallita per %s: %s", name, exc) + optimized_weights[name] = 0.0 + + return optimized_weights + + +# ========================= +# EXPORT PESI TARGET +# ========================= +def build_target_weights_export(optimized_weights: pd.DataFrame, universe: pd.DataFrame, target_name: str) -> pd.DataFrame: + """Costruisce il DataFrame del file 'Pesi ottimizzati Quant Best Europe'.""" + if target_name not in optimized_weights.columns: + logger.error("Target portfolio '%s' non presente fra i risultati: %s", target_name, list(optimized_weights.columns)) + sys.exit(1) + + universe_info = universe.set_index("ISIN")[["Nome", "Bloomberg Ticker"]].to_dict("index") + rows = [] + for isin, weight in optimized_weights[target_name].items(): + if weight > 0: + info = universe_info.get(isin, {}) + rows.append({ + "cod_por": OUTPUT_PORTFOLIO_CODE, + "ISIN": isin, + "des_tit": info.get("Nome", ""), + "Bloomberg Ticker": info.get("Bloomberg Ticker", ""), + "peso": float(weight * TARGET_WEIGHT_MULTIPLIER), + }) + + return pd.DataFrame(rows, columns=["cod_por", "ISIN", "des_tit", "Bloomberg Ticker", "peso"]) + + +def export_target_weights(target_df: pd.DataFrame, output_dir: Path, date_tag: str) -> Path: + path = output_dir / f"{date_tag} Pesi ottimizzati Quant Best Europe.xlsx" + export_columns = ["cod_por", "ISIN", "des_tit", "peso"] + with pd.ExcelWriter(path, engine="openpyxl", mode="w") as writer: + target_df.reindex(columns=export_columns).to_excel(writer, sheet_name=OUTPUT_PORTFOLIO_CODE, index=False) + logger.info("File pesi target salvato: %s", path) + return path + + +# ========================= +# CONFRONTO CON WEALTH-OVERVIEW +# ========================= +def find_latest_wealth_overview(input_dir: Path) -> Path: + """Trova il Wealth-overview piu' recente nella cartella Input.""" + pattern = re.compile(r"^(\d{8})_(\d{4})_Wealth-overview\.xlsx$", re.IGNORECASE) + candidates = [] + + for path in input_dir.glob("*_Wealth-overview.xlsx"): + match = pattern.match(path.name) + if match: + candidates.append((match.group(1), match.group(2), path)) + + if not candidates: + logger.error("Nessun file AAAAMMGG_HHMM_Wealth-overview.xlsx trovato in %s", input_dir) + sys.exit(1) + + candidates.sort(key=lambda x: (x[0], x[1]), reverse=True) + selected = candidates[0][2] + logger.info("Wealth-overview selezionato: %s", selected) + return selected + + +def load_current_portfolio_from_wealth(path: Path) -> pd.DataFrame: + """Legge il Wealth-overview, esclude Liquidity e usa la colonna '% of net assets'.""" + raw = pd.read_excel(path, sheet_name=0) + raw.columns = [str(c).strip() for c in raw.columns] + + required = ["Asset class", "Description", "ISIN / IBAN", "Position currency", "% of net assets"] + missing = [c for c in required if c not in raw.columns] + if missing: + logger.error("Colonne mancanti nel Wealth-overview %s: %s", path.name, missing) + sys.exit(1) + + df = raw.copy() + df["Asset class"] = df["Asset class"].astype(str).str.strip() + df = df[df["Asset class"].str.casefold() != "liquidity"] + df = df[df["ISIN / IBAN"].notna()].copy() + + df["ISIN"] = df["ISIN / IBAN"].astype(str).str.strip() + df["current_weight"] = pd.to_numeric(df["% of net assets"], errors="coerce").fillna(0.0) + df["Description"] = df["Description"].fillna("").astype(str) + df["TradingCurrency"] = df["Position currency"].fillna("EUR").astype(str).str.strip().replace("", "EUR") + + grouped = ( + df.groupby("ISIN", as_index=False) + .agg( + current_weight=("current_weight", "sum"), + Description=("Description", "first"), + TradingCurrency=("TradingCurrency", "first"), + AssetClass=("Asset class", "first"), + ) + ) + return grouped + + +def build_orders(target_df: pd.DataFrame, current_df: pd.DataFrame) -> pd.DataFrame: + """Costruisce ordini Buy/Sell confrontando target e portafoglio corrente.""" + target = target_df.copy() + target["target_weight"] = pd.to_numeric(target["peso"], errors="coerce").fillna(0.0) / 100.0 + if "Bloomberg Ticker" not in target.columns: + target["Bloomberg Ticker"] = "" + target["Bloomberg Ticker"] = target["Bloomberg Ticker"].fillna("").astype(str).str.strip() + target = target[["ISIN", "des_tit", "Bloomberg Ticker", "target_weight"]] + + merged = current_df.merge(target, on="ISIN", how="outer") + merged["current_weight"] = merged["current_weight"].fillna(0.0) + merged["target_weight"] = merged["target_weight"].fillna(0.0) + merged["Description"] = merged["des_tit"].combine_first(merged["Description"]).fillna("") + merged["TradingCurrency"] = merged["TradingCurrency"].fillna("EUR").replace("", "EUR") + merged["Bloomberg Ticker"] = merged["Bloomberg Ticker"].fillna("").astype(str).str.strip() + merged["delta_weight"] = merged["target_weight"] - merged["current_weight"] + + order_columns = [ + "Order", "ISIN", "TradingCurrency", "Description", "Bloomberg Ticker", + "Limit Price", "Quantity", "Close Position", "Amount", "Weight", "Notes", + ] + + rows = [] + for _, row in merged.iterrows(): + delta = float(row["delta_weight"]) + current_weight = float(row["current_weight"]) + target_weight = float(row["target_weight"]) + + if abs(delta) <= MIN_TRADE_WEIGHT: + continue + + base = { + "ISIN": row["ISIN"], + "TradingCurrency": row["TradingCurrency"] or "EUR", + "Description": row["Description"], + "Bloomberg Ticker": "", + "Limit Price": "", + "Quantity": "", + "Close Position": "", + "Amount": "", + "Weight": "", + "Notes": "", + } + + if delta > 0: + base["Order"] = "Buy" + base["Weight"] = round(delta, 10) + + # Per i BUY di titoli non gia' presenti in portafoglio, valorizza la colonna E + # del file 10040912.538.xlsx con il Bloomberg Ticker letto dalla colonna E + # dell'universo. Per i BUY di titoli gia' presenti, la colonna resta vuota. + if current_weight <= MIN_TRADE_WEIGHT: + base["Bloomberg Ticker"] = row["Bloomberg Ticker"] + elif target_weight <= MIN_TRADE_WEIGHT and current_weight > MIN_TRADE_WEIGHT: + base["Order"] = "Sell" + base["Close Position"] = "Yes" + else: + base["Order"] = "Sell" + base["Weight"] = round(abs(delta), 10) + + rows.append(base) + + orders = pd.DataFrame(rows, columns=order_columns) + if orders.empty: + return orders + + order_rank = {("Sell", "Yes"): 0, ("Sell", ""): 1, ("Buy", ""): 2} + orders["_rank"] = orders.apply(lambda r: order_rank.get((r["Order"], r["Close Position"]), 9), axis=1) + orders = orders.sort_values(["_rank", "ISIN"], kind="stable").drop(columns="_rank").reset_index(drop=True) + return orders + + +def instruction_rows() -> list: + return [ + ["IMPORTANTE", "NON CAMBIARE IL NOME DEL FILE!!"], + ["", ""], + ["Campi obbligatori:", ""], + ["Order", "Selezionare dal menu a tendina 'Buy' o 'Sell'"], + ["TradingCurrency", "Selezionare la currency dello strumento"], + ["Description", "Compilare con il nome del titolo - serve per controllo"], + ["", ""], + ["Campi alternativi obbligatori:", ""], + ["ISIN", "Indicare il codice ISIN del titolo"], + ["Bloomberg Ticker", "Compilare con il ticker Bloomberg completo"], + ["", ""], + ["Campi facoltativi:", ""], + ["Limit Price", "Se vuoto, si intende ordine a mercato"], + ["Quantity", "Se vuoto, compilare uno fra Close Position, Amount o Weight"], + ["Close Position", "Usare 'Yes' solo per chiudere integralmente una posizione"], + ["Amount", "Compilare solo se Quantity e Weight restano vuoti"], + ["Weight", "Peso aggiuntivo rispetto al portafoglio gia' presente, espresso in forma decimale"], + ["Notes", "Eventuali annotazioni per il Middle Office"], + ] + + +def example_rows() -> list: + return [ + ["Order", "ISIN", "TradingCurrency", "Description", "Bloomberg Ticker", "Limit Price", "Quantity", "Close Position", "Amount", "Weight", "Notes"], + ["Sell", "CA00217Y1043", "CAD", "ATS Corporation", "", "", "", "Yes", "", "", ""], + ["Buy", "FR0000121014", "EUR", "LVMH", "", "", "", "", "", 0.10, ""], + ["Buy", "", "USD", "Apple", "AAPL US Equity", "", "", "", 10000, "", ""], + ] + + +def caption_rows() -> list: + return [ + ["Order Type", "", "In Portfolio", "", "Trading Currency", "", "Close Position"], + ["Buy", "", "Yes", "", "EUR", "", "Yes"], + ["Sell", "", "No", "", "USD", "", ""], + ["", "", "", "", "GBP", "", ""], + ["", "", "", "", "CAD", "", ""], + ["", "", "", "", "CHF", "", ""], + ["", "", "", "", "JPY", "", ""], + ] + + +def export_orders_workbook(orders_df: pd.DataFrame, output_dir: Path, date_tag: str) -> Path: + """Esporta un file ordini analogo a 10040912.538.xlsx.""" + output_path = output_dir / "10040912.538.xlsx" + + with pd.ExcelWriter(output_path, engine="openpyxl", mode="w") as writer: + orders_df.to_excel(writer, sheet_name="Orders", index=False) + pd.DataFrame(instruction_rows()).to_excel(writer, sheet_name="Instructions", index=False, header=False) + pd.DataFrame(example_rows()[1:], columns=example_rows()[0]).to_excel(writer, sheet_name="Example", index=False) + pd.DataFrame(caption_rows()).to_excel(writer, sheet_name="Caption", index=False, header=False) + + workbook = writer.book + for ws in workbook.worksheets: + ws.freeze_panes = "A2" if ws.title in {"Orders", "Example"} else None + for column_cells in ws.columns: + max_len = max(len(str(cell.value)) if cell.value is not None else 0 for cell in column_cells) + ws.column_dimensions[column_cells[0].column_letter].width = min(max(max_len + 2, 10), 45) + + logger.info("File ordini salvato: %s", output_path) + return output_path + + +# ========================= +# MAIN +# ========================= +def main() -> None: + date_tag = datetime.now().strftime("%Y%m%d") + end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1) + + universe = load_universe(UNIVERSE_FILE) + volatility_targets, asset_class_limits = load_targets_and_limits(CONFIG_FILE) + engine = build_engine(CONNECTION_FILE) + returns = load_returns(engine, universe, end_date) + + optimized_weights = run_optimizations(returns, universe, volatility_targets, asset_class_limits, end_date) + target_df = build_target_weights_export(optimized_weights, universe, TARGET_PORTFOLIO_NAME) + export_target_weights(target_df, OUTPUT_DIR, date_tag) + + wealth_path = find_latest_wealth_overview(INPUT_DIR) + current_df = load_current_portfolio_from_wealth(wealth_path) + orders_df = build_orders(target_df, current_df) + export_orders_workbook(orders_df, OUTPUT_DIR, date_tag) + + logger.info("Processo completato. Ordini generati: %d", len(orders_df)) + + +if __name__ == "__main__": + main()