# -*- coding: utf-8 -*- """ Equity/Reconciliation Builder from Audit Log - Legge trades_audit_log.csv (OPEN/CLOSE; EntryAmount base=100; EntryIndex opzionale) - Scarica rendimenti giornalieri via stored procedure (connection.txt) - Converte i rendimenti in decimali coerenti (percentuali => /100; log-return => expm1) - Ricostruisce i rendimenti giornalieri per strategia come MEDIA PONDERATA sui trade attivi - Salva: - daily_returns_by_strategy.csv - equity_by_strategy.csv - debug_daily_by_strategy.csv - equity_by_strategy.png, drawdown_by_strategy.png - Mostra anche a video i grafici """ from __future__ import annotations from pathlib import Path import pandas as pd import numpy as np import shutil from shared_utils import ( detect_column, load_config, read_connection_txt, require_section, ) # ============================================================================= # PATH & OUTPUT # ============================================================================= BASE_DIR = Path(__file__).resolve().parent CONFIG = None try: CONFIG = load_config() PATHS_CONFIG = require_section(CONFIG, "paths") except Exception as exc: # pragma: no cover - best effort print(f"[WARN] Config non disponibile ({exc}); uso i percorsi di default.") PATHS_CONFIG = {} OUTPUT_DIR = BASE_DIR / PATHS_CONFIG.get("output_dir", "output") PLOT_DIR = BASE_DIR / PATHS_CONFIG.get("plot_dir", "plot") AUDIT_LOG_CSV = BASE_DIR / PATHS_CONFIG.get("audit_log_csv", OUTPUT_DIR / "trades_audit_log.csv") CONNECTION_TXT = BASE_DIR / PATHS_CONFIG.get("connection_txt", "connection.txt") OUT_DAILY_CSV = OUTPUT_DIR / "daily_returns_by_strategy.csv" OUT_EQUITY_CSV = OUTPUT_DIR / "equity_by_strategy.csv" OUT_DEBUG_CSV = OUTPUT_DIR / "debug_daily_by_strategy.csv" PLOT_EQUITY = PLOT_DIR / "equity_by_strategy.png" PLOT_DD = PLOT_DIR / "drawdown_by_strategy.png" DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF") def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR) -> bool: if not src or not dst_dir: return False if not src.exists(): print(f"[WARN] file non trovato per copia Dropbox: {src}") return False try: dst_dir.mkdir(parents=True, exist_ok=True) dst = dst_dir / src.name shutil.copy2(src, dst) print(f"[DROPBOX] Copiato {src.name} in {dst_dir}") return True except Exception as exc: print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}") return False # Stored procedure SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL" SP_N_DEFAULT = 1260 PTF_CURR_DEFAULT = "EUR" try: DB_CONFIG = require_section(CONFIG, "db") if CONFIG else {} except Exception as exc: # pragma: no cover - best effort print(f"[WARN] Config DB non disponibile ({exc}); uso i default interni.") DB_CONFIG = {} else: SP_NAME_DEFAULT = str(DB_CONFIG.get("stored_proc", SP_NAME_DEFAULT)) SP_N_DEFAULT = int(DB_CONFIG.get("n_bars", SP_N_DEFAULT)) PTF_CURR_DEFAULT = str(DB_CONFIG.get("ptf_curr", PTF_CURR_DEFAULT)) DEFAULT_STRATEGIES = ["Equal_Weight", "Risk_Parity"] VALID_STRATEGIES = DEFAULT_STRATEGIES EQUITY_CFG = CONFIG.get("equity_log", {}) if CONFIG else {} raw_whitelist = EQUITY_CFG.get("strategy_whitelist") if isinstance(EQUITY_CFG, dict) else None if raw_whitelist: whitelist = [str(x).strip() for x in raw_whitelist if str(x).strip()] if whitelist: VALID_STRATEGIES = whitelist # ============================================================================= # FETCH RENDIMENTI DAL DB # ============================================================================= def fetch_returns_from_db(isins, start_date, end_date) -> pd.DataFrame: import sqlalchemy as sa from sqlalchemy import text as sql_text conn_str = read_connection_txt(CONNECTION_TXT) engine = sa.create_engine(conn_str, fast_executemany=True) sp = SP_NAME_DEFAULT nbar = SP_N_DEFAULT ptf = PTF_CURR_DEFAULT sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") frames = [] with engine.begin() as conn: for isin in isins: try: df = pd.read_sql_query(sql_sp, conn, params={"isin": isin, "n": nbar, "ptf": ptf}) except Exception as e: print(f"[ERROR] SP {sp} fallita per {isin}: {e}") continue if df.empty: continue col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) col_px = detect_column(df, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"]) if not col_date: continue df[col_date] = pd.to_datetime(df[col_date], errors="coerce") df = df.dropna(subset=[col_date]).sort_values(col_date) if col_ret: r = pd.to_numeric(df[col_ret], errors="coerce") out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r}) elif col_px: px = pd.to_numeric(df[col_px], errors="coerce").astype(float).replace(0, np.nan) log_r = np.log(px / px.shift(1)) r = np.expm1(log_r) # log-return -> semplice decimale out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r}) else: continue frames.append(out) if not frames: return pd.DataFrame(index=pd.DatetimeIndex([], name="Date")) long = pd.concat(frames, ignore_index=True).dropna(subset=["Date"]) mask = ( (long["Date"].dt.date >= start_date) & (long["Date"].dt.date <= end_date) ) long = long.loc[mask] wide = long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() if not wide.empty: max_abs = np.nanmax(np.abs(wide.values)) if np.isfinite(max_abs) and max_abs > 0.5: wide = wide / 100.0 return wide # ============================================================================= # RICOSTRUZIONE DAILY RETURNS # ============================================================================= def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> pd.DataFrame: strategies = sorted(audit["Strategy"].dropna().astype(str).unique()) if not strategies: return pd.DataFrame(index=returns_wide.index, columns=[]) idx = returns_wide.index daily_num = pd.DataFrame(0.0, index=idx, columns=strategies) daily_den = pd.DataFrame(0.0, index=idx, columns=strategies) closes = audit[audit["Action"] == "CLOSE"].copy() if not closes.empty: if "LinkedOpenDate" in closes.columns: closes["_key"] = ( closes["ISIN"].astype(str) + "|" + pd.to_datetime(closes["LinkedOpenDate"]).dt.strftime("%Y-%m-%d") ) else: closes["_key"] = ( closes["ISIN"].astype(str) + "|" + pd.to_datetime(closes["TradeDate"]).dt.strftime("%Y-%m-%d") ) closes["TradeDate"] = pd.to_datetime(closes["TradeDate"]) closes_agg = closes.sort_values("TradeDate").groupby("_key", as_index=False)["TradeDate"].last() close_map = closes_agg.set_index("_key") else: close_map = pd.DataFrame().set_index(pd.Index([], name="_key")) for strat in strategies: aud_s = audit[audit["Strategy"] == strat] opens = aud_s[aud_s["Action"] == "OPEN"].copy() if opens.empty: continue opens["_key"] = ( opens["ISIN"].astype(str) + "|" + pd.to_datetime(opens["TradeDate"]).dt.strftime("%Y-%m-%d") ) for _, op in opens.iterrows(): isin = op["ISIN"] if isin not in returns_wide.columns: continue ser = returns_wide[isin].astype(float) entry_amount = float(op.get("EntryAmount", 0.0) or 0.0) if entry_amount <= 0: continue entry_idx = int(op.get("EntryIndex", 0) or 0) if entry_idx < 0 or entry_idx >= len(ser): d_open = pd.Timestamp(op["TradeDate"]).normalize() entry_idx = int(ser.index.searchsorted(d_open, side="left")) key = op["_key"] if key in close_map.index: close_val = close_map.loc[key, "TradeDate"] if isinstance(close_val, pd.Series): close_val = close_val.iloc[-1] d_close = pd.Timestamp(close_val).normalize() exit_idx = int(ser.index.searchsorted(d_close, side="left")) else: exit_idx = len(ser) if exit_idx <= entry_idx: continue idx_seg = ser.index[entry_idx:exit_idx] vals_seg = ser.values[entry_idx:exit_idx] daily_num.loc[idx_seg, strat] += entry_amount * vals_seg daily_den.loc[idx_seg, strat] += entry_amount daily = pd.DataFrame(0.0, index=idx, columns=strategies) mask = daily_den > 0 daily[mask] = daily_num[mask] / daily_den[mask] debug = pd.concat( {f"num_{c}": daily_num[c] for c in strategies} | {f"den_{c}": daily_den[c] for c in strategies} | {f"ret_{c}": daily[c] for c in strategies}, axis=1 ) debug.to_csv(OUT_DEBUG_CSV, index_label="Date") return daily # ============================================================================= # MAIN # ============================================================================= def main(): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) PLOT_DIR.mkdir(parents=True, exist_ok=True) if not AUDIT_LOG_CSV.exists(): raise FileNotFoundError("Missing trades_audit_log.csv") # parsing robusto (LinkedOpenDate può mancare) try: audit = pd.read_csv(AUDIT_LOG_CSV, parse_dates=["TradeDate", "LinkedOpenDate"]) except ValueError: audit = pd.read_csv(AUDIT_LOG_CSV, parse_dates=["TradeDate"]) if audit.empty: raise SystemExit("Audit log vuoto.") if "Strategy" not in audit.columns: raise SystemExit("Colonna 'Strategy' mancante nell'audit log.") # === filtro whitelist: solo strategie volute === audit["Strategy"] = audit["Strategy"].astype(str) before = len(audit) audit = audit[audit["Strategy"].isin(VALID_STRATEGIES)] removed = before - len(audit) if removed > 0: print( f"[INFO] Filtrate {removed} righe con strategie non incluse in {VALID_STRATEGIES}." ) if audit.empty: raise SystemExit(f"Nessuna riga con strategie in {VALID_STRATEGIES} nell'audit log.") start_date = (audit["TradeDate"].min() - pd.Timedelta(days=10)).date() end_date = (audit["TradeDate"].max() + pd.Timedelta(days=10)).date() isins = sorted(audit["ISIN"].dropna().astype(str).unique()) ret_wide = fetch_returns_from_db(isins, start_date, end_date) if ret_wide.empty: raise RuntimeError("Nessun rendimento recuperato dal DB nell'intervallo richiesto.") daily = rebuild_daily_from_log(audit, ret_wide).sort_index() daily.to_csv(OUT_DAILY_CSV, index_label="Date") equity = (1.0 + daily.fillna(0.0)).cumprod() * 100.0 equity.to_csv(OUT_EQUITY_CSV, index_label="Date") import matplotlib.pyplot as plt # Equity plt.figure(figsize=(10, 6)) for col in equity.columns: plt.plot(equity.index, equity[col], label=col) plt.title("Equity per Strategy") plt.grid(True) plt.legend() plt.tight_layout() plt.savefig(str(PLOT_EQUITY), dpi=150) plt.close() copy_to_dropbox(PLOT_EQUITY) # Drawdown dd = equity / equity.cummax() - 1.0 plt.figure(figsize=(10, 5)) for col in dd.columns: plt.plot(dd.index, dd[col], label=col) plt.title("Drawdown per Strategy") plt.grid(True) plt.legend() plt.tight_layout() plt.savefig(str(PLOT_DD), dpi=150) plt.close() copy_to_dropbox(PLOT_DD) print("Salvati:") print(" -", OUT_DAILY_CSV) print(" -", OUT_EQUITY_CSV) print(" -", OUT_DEBUG_CSV) print(" -", PLOT_EQUITY) print(" -", PLOT_DD) print(" -", DROPBOX_EXPORT_DIR / PLOT_EQUITY.name) print(" -", DROPBOX_EXPORT_DIR / PLOT_DD.name) if __name__ == "__main__": main()