562 lines
22 KiB
Python
562 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Ottimizzatore Quant Best Europe - versione ripulita con confronto Wealth-overview.
|
|
|
|
Output principali:
|
|
1) Output/AAAAMMGG Pesi ottimizzati Quant Best Europe.xlsx
|
|
2) Output/10040912.538.xlsx
|
|
|
|
Il secondo file confronta i pesi target generati dall'ottimizzatore con il file
|
|
Input/AAAAMMGG_HHMM_Wealth-overview.xlsx piu' recente, ignorando le righe con
|
|
Asset class = Liquidity e usando solo la colonna "% of net assets".
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import cvxpy as cp
|
|
import numpy as np
|
|
import pandas as pd
|
|
import yaml
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from pypfopt import objective_functions as _obj
|
|
from pypfopt import risk_models
|
|
from pypfopt.efficient_frontier import EfficientFrontier
|
|
from pypfopt.exceptions import OptimizationError
|
|
|
|
|
|
# =========================
|
|
# PARAMETRI GENERALI
|
|
# =========================
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
INPUT_DIR = BASE_DIR / "Input"
|
|
OUTPUT_DIR = BASE_DIR / "Output"
|
|
CONFIG_FILE = BASE_DIR / "config.yaml"
|
|
CONNECTION_FILE = BASE_DIR / "connection.txt"
|
|
|
|
UNIVERSE_FILE = INPUT_DIR / "Universo per Quant Best Europe.xlsx"
|
|
TEMPLATE_GUARDIAN_FILE = INPUT_DIR / "Template_Guardian.xls"
|
|
|
|
OUTPUT_PORTFOLIO_CODE = "Quant BE"
|
|
TARGET_PORTFOLIO_NAME = "VAR6_3Y"
|
|
DAYS_PER_YEAR = 252
|
|
RISKFREE_RATE = 0.02
|
|
DB_OBSERVATIONS = 1305
|
|
PTF_CURRENCY = "EUR"
|
|
|
|
# I pesi del file import gestionale sono espressi in percentuale.
|
|
# Esempio: peso target 4.75 = 4.75% = 0.0475.
|
|
TARGET_WEIGHT_MULTIPLIER = 95.0
|
|
|
|
# Soglia minima per non produrre ordini irrilevanti per arrotondamenti.
|
|
MIN_TRADE_WEIGHT = 0.000001 # 0.0001% del NAV in forma decimale
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
INPUT_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
# =========================
|
|
# PATCH PYPORTFOLIOOPT
|
|
# =========================
|
|
def portfolio_variance_psdwrap(w, cov_matrix):
|
|
"""Versione piu' robusta di portfolio_variance: evita errori ARPACK in CVXPY."""
|
|
variance = cp.quad_form(w, cp.psd_wrap(cov_matrix))
|
|
return _obj._objective_value(w, variance)
|
|
|
|
|
|
_obj.portfolio_variance = portfolio_variance_psdwrap
|
|
|
|
|
|
# =========================
|
|
# UTILITY DI BASE
|
|
# =========================
|
|
def read_key_value_file(path: Path) -> dict:
|
|
"""Legge file testuali nel formato key=value."""
|
|
if not path.exists():
|
|
logger.error("File mancante: %s", path)
|
|
sys.exit(1)
|
|
|
|
params = {}
|
|
with path.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith("#"):
|
|
key, value = line.split("=", 1)
|
|
params[key.strip()] = value.strip()
|
|
return params
|
|
|
|
|
|
def load_targets_and_limits(config_file: Path):
|
|
"""Legge target di volatilita' e limiti per asset class da config.yaml."""
|
|
if not config_file.exists():
|
|
logger.error("File di configurazione mancante: %s", config_file)
|
|
sys.exit(1)
|
|
|
|
with config_file.open("r", encoding="utf-8") as f:
|
|
cfg = yaml.safe_load(f) or {}
|
|
|
|
vt_cfg = cfg.get("volatility_targets", {})
|
|
vt_list = vt_cfg.get("default", []) if isinstance(vt_cfg, dict) else vt_cfg
|
|
if not vt_list:
|
|
logger.error("Sezione 'volatility_targets' mancante o vuota in config.yaml.")
|
|
sys.exit(1)
|
|
|
|
volatility_targets = {
|
|
(int(item["years"]), float(item["target_vol"])): item["name"]
|
|
for item in vt_list
|
|
if {"years", "target_vol", "name"}.issubset(item)
|
|
}
|
|
if not volatility_targets:
|
|
logger.error("Nessun target di volatilita' valido trovato in config.yaml.")
|
|
sys.exit(1)
|
|
|
|
asset_class_limits = cfg.get("asset_class_limits") or {}
|
|
if not asset_class_limits:
|
|
logger.error("Sezione 'asset_class_limits' mancante o vuota in config.yaml.")
|
|
sys.exit(1)
|
|
|
|
return volatility_targets, {k: float(v) for k, v in asset_class_limits.items()}
|
|
|
|
|
|
def build_engine(connection_file: Path):
|
|
"""Crea e verifica la connessione al database."""
|
|
params = read_key_value_file(connection_file)
|
|
username = params.get("username")
|
|
password = params.get("password")
|
|
host = params.get("host")
|
|
port = params.get("port", "1433")
|
|
database = params.get("database")
|
|
|
|
connection_string = (
|
|
f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}"
|
|
"?driver=ODBC+Driver+17+for+SQL+Server"
|
|
)
|
|
|
|
try:
|
|
engine = create_engine(connection_string)
|
|
with engine.connect() as connection:
|
|
connection.execute(text("SELECT 1"))
|
|
logger.info("Connessione al database riuscita.")
|
|
return engine
|
|
except SQLAlchemyError as exc:
|
|
logger.error("Errore durante la connessione al database: %s", exc)
|
|
sys.exit(1)
|
|
|
|
|
|
def regularize_covariance(cov_df: pd.DataFrame, ridge_factor: float = 1e-6) -> pd.DataFrame:
|
|
"""Simmetrizza la covarianza e aggiunge un piccolo ridge diagonale."""
|
|
if cov_df.empty:
|
|
return cov_df
|
|
|
|
sigma = cov_df.values.astype(float)
|
|
sigma = 0.5 * (sigma + sigma.T)
|
|
n = sigma.shape[0]
|
|
trace = np.trace(sigma)
|
|
eps = ridge_factor * (trace / n) if np.isfinite(trace) and n > 0 else ridge_factor
|
|
return pd.DataFrame(sigma + eps * np.eye(n), index=cov_df.index, columns=cov_df.columns)
|
|
|
|
|
|
def validate_universe(df_universe: pd.DataFrame) -> None:
|
|
required_cols = ["ISIN", "Nome", "Categoria", "Asset Class", "Bloomberg Ticker", "PesoMax", "PesoFisso"]
|
|
missing = [c for c in required_cols if c not in df_universe.columns]
|
|
if missing:
|
|
logger.error("Colonne mancanti nel file universo: %s", ", ".join(missing))
|
|
sys.exit(1)
|
|
|
|
duplicated = df_universe["ISIN"][df_universe["ISIN"].duplicated()].dropna().unique().tolist()
|
|
if duplicated:
|
|
logger.warning("ISIN duplicati nel file universo: %s", duplicated)
|
|
|
|
if df_universe["ISIN"].isna().any():
|
|
logger.error("Il file universo contiene righe con ISIN mancante.")
|
|
sys.exit(1)
|
|
|
|
|
|
# =========================
|
|
# LETTURA INPUT E RENDIMENTI
|
|
# =========================
|
|
def load_universe(path: Path) -> pd.DataFrame:
|
|
if not path.exists():
|
|
logger.error("File universo mancante: %s", path)
|
|
sys.exit(1)
|
|
|
|
# Legge tutte le colonne per consentire l'uso della colonna E come Bloomberg Ticker.
|
|
# Il file atteso contiene ora:
|
|
# A ISIN | B Nome | C Categoria | D Asset Class | E Bloomberg Ticker | F PesoMax | G PesoFisso
|
|
df = pd.read_excel(path, dtype=str)
|
|
df.columns = [str(c).strip() for c in df.columns]
|
|
|
|
# Se in futuro il nome della colonna E venisse modificato, la normalizziamo comunque
|
|
# a "Bloomberg Ticker" per evitare rotture a valle.
|
|
if "Bloomberg Ticker" not in df.columns:
|
|
if len(df.columns) >= 5:
|
|
df = df.rename(columns={df.columns[4]: "Bloomberg Ticker"})
|
|
else:
|
|
logger.error("Il file universo non contiene la colonna E con il ticker Bloomberg.")
|
|
sys.exit(1)
|
|
|
|
validate_universe(df)
|
|
|
|
df["ISIN"] = df["ISIN"].astype(str).str.strip()
|
|
df["Bloomberg Ticker"] = df["Bloomberg Ticker"].fillna("").astype(str).str.strip()
|
|
df["PesoMax"] = pd.to_numeric(df["PesoMax"], errors="coerce")
|
|
df["PesoFisso"] = pd.to_numeric(df["PesoFisso"], errors="coerce")
|
|
return df
|
|
|
|
|
|
def load_returns(engine, universe: pd.DataFrame, end_date: pd.Timestamp) -> pd.DataFrame:
|
|
"""Carica i rendimenti giornalieri dei titoli dell'universo dal database."""
|
|
start_date = end_date - pd.DateOffset(years=5)
|
|
all_dates = pd.date_range(start=start_date, end=end_date, freq="B").normalize()
|
|
returns = pd.DataFrame(index=all_dates)
|
|
|
|
for isin in universe["ISIN"].dropna().unique():
|
|
logger.info("Caricamento rendimenti: %s", isin)
|
|
procedure_call = f"EXEC opt_RendimentoGiornaliero1_ALL @ISIN = '{isin}', @n = {DB_OBSERVATIONS}, @PtfCurr = {PTF_CURRENCY}"
|
|
try:
|
|
tmp = pd.read_sql_query(procedure_call, engine)
|
|
except SQLAlchemyError as exc:
|
|
logger.warning("Errore nella stored procedure per %s: %s", isin, exc)
|
|
continue
|
|
|
|
if tmp.empty:
|
|
logger.warning("Nessun rendimento recuperato per %s.", isin)
|
|
continue
|
|
|
|
tmp["Px_Date"] = pd.to_datetime(tmp["Px_Date"], errors="coerce").dt.normalize()
|
|
tmp = tmp.dropna(subset=["Px_Date"]).set_index("Px_Date")
|
|
returns[isin] = (tmp["RendimentoGiornaliero"] / 100.0).reindex(all_dates)
|
|
logger.info("%s: %d osservazioni valide.", isin, returns[isin].count())
|
|
|
|
if returns.empty:
|
|
logger.error("Nessun dato di rendimento recuperato dal database.")
|
|
sys.exit(1)
|
|
|
|
high_na = returns.isna().mean()[lambda s: s > 0.20]
|
|
if not high_na.empty:
|
|
logger.warning("Colonne con oltre 20%% di NaN prima del fill: %s", high_na.to_dict())
|
|
|
|
return returns.fillna(0.0)
|
|
|
|
|
|
# =========================
|
|
# OTTIMIZZAZIONE
|
|
# =========================
|
|
def add_weight_constraints(ef: EfficientFrontier, universe: pd.DataFrame, columns: pd.Index, asset_class_limits: dict) -> None:
|
|
"""Applica vincoli per PesoFisso/PesoMax, Categoria e Asset Class."""
|
|
for _, row in universe.iterrows():
|
|
isin = row["ISIN"]
|
|
if isin not in columns:
|
|
continue
|
|
idx = columns.get_loc(isin)
|
|
fixed_weight = row.get("PesoFisso")
|
|
max_weight = row.get("PesoMax")
|
|
|
|
if pd.notna(fixed_weight):
|
|
ef.add_constraint(lambda w, idx=idx, val=float(fixed_weight): w[idx] == val)
|
|
elif pd.notna(max_weight):
|
|
ef.add_constraint(lambda w, idx=idx, val=float(max_weight): w[idx] <= val)
|
|
|
|
for category, max_weight in universe.groupby("Categoria")["PesoMax"].max().dropna().items():
|
|
idxs = [columns.get_loc(isin) for isin in universe.loc[universe["Categoria"] == category, "ISIN"] if isin in columns]
|
|
if idxs:
|
|
ef.add_constraint(lambda w, idxs=idxs, val=float(max_weight): sum(w[i] for i in idxs) <= val)
|
|
|
|
for asset_class, max_weight in asset_class_limits.items():
|
|
idxs = [columns.get_loc(isin) for isin in universe.loc[universe["Asset Class"] == asset_class, "ISIN"] if isin in columns]
|
|
if idxs:
|
|
ef.add_constraint(lambda w, idxs=idxs, val=float(max_weight): sum(w[i] for i in idxs) <= val)
|
|
|
|
|
|
def run_optimizations(returns: pd.DataFrame, universe: pd.DataFrame, volatility_targets: dict, asset_class_limits: dict, end_date: pd.Timestamp) -> pd.DataFrame:
|
|
"""Esegue le ottimizzazioni per tutti i target configurati."""
|
|
optimized_weights = pd.DataFrame(index=returns.columns)
|
|
|
|
for (years, target_vol), name in volatility_targets.items():
|
|
period_start = end_date - pd.DateOffset(years=years)
|
|
period_returns = returns.loc[period_start:end_date]
|
|
|
|
annual_returns = period_returns.mean() * DAYS_PER_YEAR
|
|
annual_cov = risk_models.sample_cov(period_returns, returns_data=True)
|
|
annual_cov = regularize_covariance(annual_cov)
|
|
|
|
ef = EfficientFrontier(annual_returns, annual_cov)
|
|
add_weight_constraints(ef, universe, period_returns.columns, asset_class_limits)
|
|
|
|
try:
|
|
ef.efficient_risk(target_volatility=target_vol)
|
|
weights = pd.Series(ef.clean_weights()).reindex(returns.columns).fillna(0.0)
|
|
optimized_weights[name] = weights
|
|
|
|
exp_ret, exp_vol, sharpe = ef.portfolio_performance(verbose=False, risk_free_rate=RISKFREE_RATE)
|
|
logger.info("%s: return %.2f%% | vol %.2f%% | sharpe %.2f", name, exp_ret * 100, exp_vol * 100, sharpe)
|
|
except OptimizationError as exc:
|
|
logger.warning("Ottimizzazione fallita per %s: %s", name, exc)
|
|
optimized_weights[name] = 0.0
|
|
|
|
return optimized_weights
|
|
|
|
|
|
# =========================
|
|
# EXPORT PESI TARGET
|
|
# =========================
|
|
def build_target_weights_export(optimized_weights: pd.DataFrame, universe: pd.DataFrame, target_name: str) -> pd.DataFrame:
|
|
"""Costruisce il DataFrame del file 'Pesi ottimizzati Quant Best Europe'."""
|
|
if target_name not in optimized_weights.columns:
|
|
logger.error("Target portfolio '%s' non presente fra i risultati: %s", target_name, list(optimized_weights.columns))
|
|
sys.exit(1)
|
|
|
|
universe_info = universe.set_index("ISIN")[["Nome", "Bloomberg Ticker"]].to_dict("index")
|
|
rows = []
|
|
for isin, weight in optimized_weights[target_name].items():
|
|
if weight > 0:
|
|
info = universe_info.get(isin, {})
|
|
rows.append({
|
|
"cod_por": OUTPUT_PORTFOLIO_CODE,
|
|
"ISIN": isin,
|
|
"des_tit": info.get("Nome", ""),
|
|
"Bloomberg Ticker": info.get("Bloomberg Ticker", ""),
|
|
"peso": float(weight * TARGET_WEIGHT_MULTIPLIER),
|
|
})
|
|
|
|
return pd.DataFrame(rows, columns=["cod_por", "ISIN", "des_tit", "Bloomberg Ticker", "peso"])
|
|
|
|
|
|
def export_target_weights(target_df: pd.DataFrame, output_dir: Path, date_tag: str) -> Path:
|
|
path = output_dir / f"{date_tag} Pesi ottimizzati Quant Best Europe.xlsx"
|
|
export_columns = ["cod_por", "ISIN", "des_tit", "peso"]
|
|
with pd.ExcelWriter(path, engine="openpyxl", mode="w") as writer:
|
|
target_df.reindex(columns=export_columns).to_excel(writer, sheet_name=OUTPUT_PORTFOLIO_CODE, index=False)
|
|
logger.info("File pesi target salvato: %s", path)
|
|
return path
|
|
|
|
|
|
# =========================
|
|
# CONFRONTO CON WEALTH-OVERVIEW
|
|
# =========================
|
|
def find_latest_wealth_overview(input_dir: Path) -> Path:
|
|
"""Trova il Wealth-overview piu' recente nella cartella Input."""
|
|
pattern = re.compile(r"^(\d{8})_(\d{4})_Wealth-overview\.xlsx$", re.IGNORECASE)
|
|
candidates = []
|
|
|
|
for path in input_dir.glob("*_Wealth-overview.xlsx"):
|
|
match = pattern.match(path.name)
|
|
if match:
|
|
candidates.append((match.group(1), match.group(2), path))
|
|
|
|
if not candidates:
|
|
logger.error("Nessun file AAAAMMGG_HHMM_Wealth-overview.xlsx trovato in %s", input_dir)
|
|
sys.exit(1)
|
|
|
|
candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
|
|
selected = candidates[0][2]
|
|
logger.info("Wealth-overview selezionato: %s", selected)
|
|
return selected
|
|
|
|
|
|
def load_current_portfolio_from_wealth(path: Path) -> pd.DataFrame:
|
|
"""Legge il Wealth-overview, esclude Liquidity e usa la colonna '% of net assets'."""
|
|
raw = pd.read_excel(path, sheet_name=0)
|
|
raw.columns = [str(c).strip() for c in raw.columns]
|
|
|
|
required = ["Asset class", "Description", "ISIN / IBAN", "Position currency", "% of net assets"]
|
|
missing = [c for c in required if c not in raw.columns]
|
|
if missing:
|
|
logger.error("Colonne mancanti nel Wealth-overview %s: %s", path.name, missing)
|
|
sys.exit(1)
|
|
|
|
df = raw.copy()
|
|
df["Asset class"] = df["Asset class"].astype(str).str.strip()
|
|
df = df[df["Asset class"].str.casefold() != "liquidity"]
|
|
df = df[df["ISIN / IBAN"].notna()].copy()
|
|
|
|
df["ISIN"] = df["ISIN / IBAN"].astype(str).str.strip()
|
|
df["current_weight"] = pd.to_numeric(df["% of net assets"], errors="coerce").fillna(0.0)
|
|
df["Description"] = df["Description"].fillna("").astype(str)
|
|
df["TradingCurrency"] = df["Position currency"].fillna("EUR").astype(str).str.strip().replace("", "EUR")
|
|
|
|
grouped = (
|
|
df.groupby("ISIN", as_index=False)
|
|
.agg(
|
|
current_weight=("current_weight", "sum"),
|
|
Description=("Description", "first"),
|
|
TradingCurrency=("TradingCurrency", "first"),
|
|
AssetClass=("Asset class", "first"),
|
|
)
|
|
)
|
|
return grouped
|
|
|
|
|
|
def build_orders(target_df: pd.DataFrame, current_df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Costruisce ordini Buy/Sell confrontando target e portafoglio corrente."""
|
|
target = target_df.copy()
|
|
target["target_weight"] = pd.to_numeric(target["peso"], errors="coerce").fillna(0.0) / 100.0
|
|
if "Bloomberg Ticker" not in target.columns:
|
|
target["Bloomberg Ticker"] = ""
|
|
target["Bloomberg Ticker"] = target["Bloomberg Ticker"].fillna("").astype(str).str.strip()
|
|
target = target[["ISIN", "des_tit", "Bloomberg Ticker", "target_weight"]]
|
|
|
|
merged = current_df.merge(target, on="ISIN", how="outer")
|
|
merged["current_weight"] = merged["current_weight"].fillna(0.0)
|
|
merged["target_weight"] = merged["target_weight"].fillna(0.0)
|
|
merged["Description"] = merged["des_tit"].combine_first(merged["Description"]).fillna("")
|
|
merged["TradingCurrency"] = merged["TradingCurrency"].fillna("EUR").replace("", "EUR")
|
|
merged["Bloomberg Ticker"] = merged["Bloomberg Ticker"].fillna("").astype(str).str.strip()
|
|
merged["delta_weight"] = merged["target_weight"] - merged["current_weight"]
|
|
|
|
order_columns = [
|
|
"Order", "ISIN", "TradingCurrency", "Description", "Bloomberg Ticker",
|
|
"Limit Price", "Quantity", "Close Position", "Amount", "Weight", "Notes",
|
|
]
|
|
|
|
rows = []
|
|
for _, row in merged.iterrows():
|
|
delta = float(row["delta_weight"])
|
|
current_weight = float(row["current_weight"])
|
|
target_weight = float(row["target_weight"])
|
|
|
|
if abs(delta) <= MIN_TRADE_WEIGHT:
|
|
continue
|
|
|
|
base = {
|
|
"ISIN": row["ISIN"],
|
|
"TradingCurrency": row["TradingCurrency"] or "EUR",
|
|
"Description": row["Description"],
|
|
"Bloomberg Ticker": "",
|
|
"Limit Price": "",
|
|
"Quantity": "",
|
|
"Close Position": "",
|
|
"Amount": "",
|
|
"Weight": "",
|
|
"Notes": "",
|
|
}
|
|
|
|
if delta > 0:
|
|
base["Order"] = "Buy"
|
|
base["Weight"] = round(delta, 10)
|
|
|
|
# Per i BUY di titoli non gia' presenti in portafoglio, valorizza la colonna E
|
|
# del file 10040912.538.xlsx con il Bloomberg Ticker letto dalla colonna E
|
|
# dell'universo. Per i BUY di titoli gia' presenti, la colonna resta vuota.
|
|
if current_weight <= MIN_TRADE_WEIGHT:
|
|
base["Bloomberg Ticker"] = row["Bloomberg Ticker"]
|
|
elif target_weight <= MIN_TRADE_WEIGHT and current_weight > MIN_TRADE_WEIGHT:
|
|
base["Order"] = "Sell"
|
|
base["Close Position"] = "Yes"
|
|
else:
|
|
base["Order"] = "Sell"
|
|
base["Weight"] = round(abs(delta), 10)
|
|
|
|
rows.append(base)
|
|
|
|
orders = pd.DataFrame(rows, columns=order_columns)
|
|
if orders.empty:
|
|
return orders
|
|
|
|
order_rank = {("Sell", "Yes"): 0, ("Sell", ""): 1, ("Buy", ""): 2}
|
|
orders["_rank"] = orders.apply(lambda r: order_rank.get((r["Order"], r["Close Position"]), 9), axis=1)
|
|
orders = orders.sort_values(["_rank", "ISIN"], kind="stable").drop(columns="_rank").reset_index(drop=True)
|
|
return orders
|
|
|
|
|
|
def instruction_rows() -> list:
|
|
return [
|
|
["IMPORTANTE", "NON CAMBIARE IL NOME DEL FILE!!"],
|
|
["", ""],
|
|
["Campi obbligatori:", ""],
|
|
["Order", "Selezionare dal menu a tendina 'Buy' o 'Sell'"],
|
|
["TradingCurrency", "Selezionare la currency dello strumento"],
|
|
["Description", "Compilare con il nome del titolo - serve per controllo"],
|
|
["", ""],
|
|
["Campi alternativi obbligatori:", ""],
|
|
["ISIN", "Indicare il codice ISIN del titolo"],
|
|
["Bloomberg Ticker", "Compilare con il ticker Bloomberg completo"],
|
|
["", ""],
|
|
["Campi facoltativi:", ""],
|
|
["Limit Price", "Se vuoto, si intende ordine a mercato"],
|
|
["Quantity", "Se vuoto, compilare uno fra Close Position, Amount o Weight"],
|
|
["Close Position", "Usare 'Yes' solo per chiudere integralmente una posizione"],
|
|
["Amount", "Compilare solo se Quantity e Weight restano vuoti"],
|
|
["Weight", "Peso aggiuntivo rispetto al portafoglio gia' presente, espresso in forma decimale"],
|
|
["Notes", "Eventuali annotazioni per il Middle Office"],
|
|
]
|
|
|
|
|
|
def example_rows() -> list:
|
|
return [
|
|
["Order", "ISIN", "TradingCurrency", "Description", "Bloomberg Ticker", "Limit Price", "Quantity", "Close Position", "Amount", "Weight", "Notes"],
|
|
["Sell", "CA00217Y1043", "CAD", "ATS Corporation", "", "", "", "Yes", "", "", ""],
|
|
["Buy", "FR0000121014", "EUR", "LVMH", "", "", "", "", "", 0.10, ""],
|
|
["Buy", "", "USD", "Apple", "AAPL US Equity", "", "", "", 10000, "", ""],
|
|
]
|
|
|
|
|
|
def caption_rows() -> list:
|
|
return [
|
|
["Order Type", "", "In Portfolio", "", "Trading Currency", "", "Close Position"],
|
|
["Buy", "", "Yes", "", "EUR", "", "Yes"],
|
|
["Sell", "", "No", "", "USD", "", ""],
|
|
["", "", "", "", "GBP", "", ""],
|
|
["", "", "", "", "CAD", "", ""],
|
|
["", "", "", "", "CHF", "", ""],
|
|
["", "", "", "", "JPY", "", ""],
|
|
]
|
|
|
|
|
|
def export_orders_workbook(orders_df: pd.DataFrame, output_dir: Path, date_tag: str) -> Path:
|
|
"""Esporta un file ordini analogo a 10040912.538.xlsx."""
|
|
output_path = output_dir / "10040912.538.xlsx"
|
|
|
|
with pd.ExcelWriter(output_path, engine="openpyxl", mode="w") as writer:
|
|
orders_df.to_excel(writer, sheet_name="Orders", index=False)
|
|
pd.DataFrame(instruction_rows()).to_excel(writer, sheet_name="Instructions", index=False, header=False)
|
|
pd.DataFrame(example_rows()[1:], columns=example_rows()[0]).to_excel(writer, sheet_name="Example", index=False)
|
|
pd.DataFrame(caption_rows()).to_excel(writer, sheet_name="Caption", index=False, header=False)
|
|
|
|
workbook = writer.book
|
|
for ws in workbook.worksheets:
|
|
ws.freeze_panes = "A2" if ws.title in {"Orders", "Example"} else None
|
|
for column_cells in ws.columns:
|
|
max_len = max(len(str(cell.value)) if cell.value is not None else 0 for cell in column_cells)
|
|
ws.column_dimensions[column_cells[0].column_letter].width = min(max(max_len + 2, 10), 45)
|
|
|
|
logger.info("File ordini salvato: %s", output_path)
|
|
return output_path
|
|
|
|
|
|
# =========================
|
|
# MAIN
|
|
# =========================
|
|
def main() -> None:
|
|
date_tag = datetime.now().strftime("%Y%m%d")
|
|
end_date = pd.Timestamp.now().normalize() - pd.Timedelta(days=1)
|
|
|
|
universe = load_universe(UNIVERSE_FILE)
|
|
volatility_targets, asset_class_limits = load_targets_and_limits(CONFIG_FILE)
|
|
engine = build_engine(CONNECTION_FILE)
|
|
returns = load_returns(engine, universe, end_date)
|
|
|
|
optimized_weights = run_optimizations(returns, universe, volatility_targets, asset_class_limits, end_date)
|
|
target_df = build_target_weights_export(optimized_weights, universe, TARGET_PORTFOLIO_NAME)
|
|
export_target_weights(target_df, OUTPUT_DIR, date_tag)
|
|
|
|
wealth_path = find_latest_wealth_overview(INPUT_DIR)
|
|
current_df = load_current_portfolio_from_wealth(wealth_path)
|
|
orders_df = build_orders(target_df, current_df)
|
|
export_orders_workbook(orders_df, OUTPUT_DIR, date_tag)
|
|
|
|
logger.info("Processo completato. Ordini generati: %d", len(orders_df))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|