diff --git a/equity_from_log_v2.py b/equity_from_log_v2.py new file mode 100644 index 0000000..bb26dfd --- /dev/null +++ b/equity_from_log_v2.py @@ -0,0 +1,591 @@ +# -*- coding: utf-8 -*- +""" +Equity/Reconciliation Builder from Audit Log [v2] + +Modifiche rispetto a v1: + - Fix parser header CSV: rimosso split(",",1)[0] che troncava le colonne + - Fix mixed_rows: conta solo righe realmente miste (non tutti i CSV comma-separated) + - Fix EntryIndex: usa None come sentinel; fallback a searchsorted sempre se assente + - Fix euristica % vs decimale: soglia più robusta + check consistenza colonne + - Fix compatibilità Python 3.8: dict | dict sostituito con {**d1, **d2} + - Performance: iterrows() -> itertuples() + - Errori: uniformato FileNotFoundError/ValueError al posto di SystemExit raw + - Side effects: lettura config spostata in _load_globals() chiamata da main() + +- Legge trades_audit_log.csv (OPEN/CLOSE; EntryAmount base=100; EntryIndex opzionale) +- Scarica rendimenti giornalieri via stored procedure (connection.txt) +- Converte i rendimenti in decimali coerenti (percentuali => /100; log-return => expm1) +- Ricostruisce i rendimenti giornalieri per strategia come MEDIA PONDERATA sui trade attivi +- Salva: + - daily_returns_by_strategy.csv + - equity_by_strategy.csv + - debug_daily_by_strategy.csv + - equity_by_strategy.png, drawdown_by_strategy.png +- Mostra anche a video i grafici +""" + +from __future__ import annotations +from pathlib import Path +import pandas as pd +import numpy as np +import shutil + +from shared_utils import ( + detect_column, + load_config, + read_connection_txt, + require_section, +) + +# ============================================================================= +# PATH & OUTPUT +# ============================================================================= +BASE_DIR = Path(__file__).resolve().parent + +# Defaults — verranno aggiornati da _load_globals() prima dell'uso +OUTPUT_DIR = BASE_DIR / "output" +PLOT_DIR = BASE_DIR / "plot" +AUDIT_LOG_CSV = BASE_DIR / "output" / "trades_audit_log.csv" +CONNECTION_TXT = BASE_DIR / "connection.txt" + +OUT_DAILY_CSV = OUTPUT_DIR / "daily_returns_by_strategy.csv" +OUT_EQUITY_CSV = OUTPUT_DIR / "equity_by_strategy.csv" +OUT_DEBUG_CSV = OUTPUT_DIR / "debug_daily_by_strategy.csv" +PLOT_EQUITY = PLOT_DIR / "equity_by_strategy.png" +PLOT_DD = PLOT_DIR / "drawdown_by_strategy.png" + +DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF") + +# Stored procedure defaults +SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL" +SP_N_DEFAULT = 1260 +PTF_CURR_DEFAULT = "EUR" + +DEFAULT_STRATEGIES = ["Equal_Weight", "Risk_Parity"] +VALID_STRATEGIES = list(DEFAULT_STRATEGIES) + + +def _load_globals() -> None: + """Carica config e aggiorna le variabili globali. Chiamata una sola volta da main().""" + global OUTPUT_DIR, PLOT_DIR, AUDIT_LOG_CSV, CONNECTION_TXT + global OUT_DAILY_CSV, OUT_EQUITY_CSV, OUT_DEBUG_CSV, PLOT_EQUITY, PLOT_DD + global SP_NAME_DEFAULT, SP_N_DEFAULT, PTF_CURR_DEFAULT, VALID_STRATEGIES + + try: + config = load_config() + paths_cfg = require_section(config, "paths") + except Exception as exc: + print(f"[WARN] Config non disponibile ({exc}); uso i percorsi di default.") + config = None + paths_cfg = {} + + OUTPUT_DIR = BASE_DIR / paths_cfg.get("output_dir", "output") + PLOT_DIR = BASE_DIR / paths_cfg.get("plot_dir", "plot") + AUDIT_LOG_CSV = BASE_DIR / paths_cfg.get("audit_log_csv", str(OUTPUT_DIR / "trades_audit_log.csv")) + CONNECTION_TXT = BASE_DIR / paths_cfg.get("connection_txt", "connection.txt") + + OUT_DAILY_CSV = OUTPUT_DIR / "daily_returns_by_strategy.csv" + OUT_EQUITY_CSV = OUTPUT_DIR / "equity_by_strategy.csv" + OUT_DEBUG_CSV = OUTPUT_DIR / "debug_daily_by_strategy.csv" + PLOT_EQUITY = PLOT_DIR / "equity_by_strategy.png" + PLOT_DD = PLOT_DIR / "drawdown_by_strategy.png" + + try: + db_cfg = require_section(config, "db") if config else {} + except Exception as exc: + print(f"[WARN] Config DB non disponibile ({exc}); uso i default interni.") + db_cfg = {} + + if db_cfg: + SP_NAME_DEFAULT = str(db_cfg.get("stored_proc", SP_NAME_DEFAULT)) + SP_N_DEFAULT = int(db_cfg.get("n_bars", SP_N_DEFAULT)) + PTF_CURR_DEFAULT = str(db_cfg.get("ptf_curr", PTF_CURR_DEFAULT)) + + equity_cfg = config.get("equity_log", {}) if config and isinstance(config, dict) else {} + raw_whitelist = equity_cfg.get("strategy_whitelist") if isinstance(equity_cfg, dict) else None + if raw_whitelist: + whitelist = [str(x).strip() for x in raw_whitelist if str(x).strip()] + if whitelist: + VALID_STRATEGIES = whitelist + + +# ============================================================================= +# UTILITY +# ============================================================================= +def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR) -> bool: + if not src or not dst_dir: + return False + if not src.exists(): + print(f"[WARN] file non trovato per copia Dropbox: {src}") + return False + try: + dst_dir.mkdir(parents=True, exist_ok=True) + dst = dst_dir / src.name + shutil.copy2(src, dst) + print(f"[DROPBOX] Copiato {src.name} in {dst_dir}") + return True + except Exception as exc: + print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}") + return False + + +# ============================================================================= +# AUDIT LOG LOADER (FORMAT CHECKS) +# ============================================================================= +REQUIRED_AUDIT_COLS = ["Strategy", "ISIN", "Action", "TradeDate"] +CANONICAL_AUDIT_COLS = [ + "Strategy", + "ISIN", + "Action", + "TradeDate", + "EntryIndex", + "EntryAmount", + "SizeWeight", + "Price", + "PnL_%", + "ExitReason", + "LinkedOpenDate", + "Duration_bars", + "Notes", +] +NUMERIC_COLS = [ + "EntryIndex", + "EntryAmount", + "SizeWeight", + "Price", + "PnL_%", + "Duration_bars", +] + + +def _clean_numeric_series(s: pd.Series) -> pd.Series: + if pd.api.types.is_numeric_dtype(s): + return s + txt = s.astype(str).str.strip() + txt = txt.str.replace("%", "", regex=False) + txt = txt.replace({"": np.nan, "nan": np.nan, "None": np.nan}) + + def _fix_one(val: str) -> str: + if val is None or (isinstance(val, float) and np.isnan(val)): + return val + v = str(val).strip() + if not v: + return v + dot_n = v.count(".") + comma_n = v.count(",") + + if dot_n > 1 and comma_n == 0: + # più punti senza virgole => punti come separatori migliaia + return v.replace(".", "") + if dot_n > 0 and comma_n > 0: + last_dot = v.rfind(".") + last_comma = v.rfind(",") + if last_comma > last_dot: + # virgola decimale, punti migliaia + return v.replace(".", "").replace(",", ".") + # punto decimale, virgole migliaia + return v.replace(",", "") + if comma_n > 0 and dot_n == 0: + # solo virgola => virgola decimale + return v.replace(",", ".") + return v + + cleaned = txt.map(_fix_one) + return pd.to_numeric(cleaned, errors="coerce") + + +def _parse_mixed_dates(series: pd.Series) -> pd.Series: + s = series.astype(str).str.strip() + s = s.replace({"": np.nan, "nan": np.nan, "None": np.nan}) + dt_iso = pd.to_datetime(s, format="%Y-%m-%d", errors="coerce") + dt_iso_ts = pd.to_datetime(s, format="%Y-%m-%d %H:%M:%S", errors="coerce") + dt_dmy = pd.to_datetime(s, format="%d/%m/%Y", errors="coerce") + dt_dmy_ts = pd.to_datetime(s, format="%d/%m/%Y %H:%M:%S", errors="coerce") + return dt_iso.fillna(dt_iso_ts).fillna(dt_dmy).fillna(dt_dmy_ts) + + +def load_audit_log(path: Path) -> pd.DataFrame: + if not path.exists(): + raise FileNotFoundError(f"Missing trades_audit_log.csv at {path}") + + raw = path.read_text(encoding="utf-8", errors="ignore") + if not raw.strip(): + raise ValueError("Audit log vuoto.") + + lines = raw.splitlines() + first_line = lines[0] + + # FIX v2: l'header viene splittato sul separatore reale senza troncature. + # In v1 c'era un errato split(",", 1)[0] che perdeva le colonne dopo la prima virgola. + if ";" in first_line: + header = [c.strip() for c in first_line.split(";")] + else: + header = [c.strip() for c in first_line.split(",")] + + # Rimuovi eventuale colonna indice vuota in testa (es. export pandas con index) + if header and header[0] == "": + header = header[1:] + + if not header or "TradeDate" not in header: + header = CANONICAL_AUDIT_COLS.copy() + + # Determina il separatore dominante per le righe dati + semicolon_file = ";" in first_line + + rows = [] + mixed_rows = 0 + for line in lines[1:]: + if not line or not line.strip(): + continue + + # FIX v2: "mixed" solo se il separatore della riga differisce dal file header. + # In v1 tutte le righe CSV venivano contate come "legacy/misto". + if semicolon_file: + if ";" in line and line.count(";") >= (len(header) - 1): + parts = line.split(";") + else: + parts = line.split(",") + mixed_rows += 1 + else: + if ";" in line and line.count(";") >= (len(header) - 1): + parts = line.split(";") + mixed_rows += 1 + else: + parts = line.split(",") + if parts and parts[0] == "": + parts = parts[1:] + + if len(parts) > len(header): + parts = parts[: len(header) - 1] + [",".join(parts[len(header) - 1:])] + elif len(parts) < len(header): + parts = parts + [""] * (len(header) - len(parts)) + rows.append(parts) + + df = pd.DataFrame(rows, columns=header) + if mixed_rows > 0: + print(f"[WARN] Audit log con {mixed_rows} righe in formato legacy/misto: normalizzate in lettura.") + + missing = [c for c in REQUIRED_AUDIT_COLS if c not in df.columns] + if missing: + raise ValueError( + f"Formato audit log non valido. Colonne mancanti: {missing}. " + f"Colonne trovate: {list(df.columns)}" + ) + + # Normalizza colonne chiave + df["Action"] = df["Action"].astype(str).str.upper().str.strip() + df["Strategy"] = df["Strategy"].astype(str).str.strip() + df["ISIN"] = df["ISIN"].astype(str).str.strip() + + # Date + df["TradeDate"] = _parse_mixed_dates(df["TradeDate"]) + if "LinkedOpenDate" in df.columns: + df["LinkedOpenDate"] = _parse_mixed_dates(df["LinkedOpenDate"]) + + # Rimuovi righe con date invalide + before = len(df) + df = df.dropna(subset=["TradeDate"]) + dropped = before - len(df) + if dropped > 0: + print(f"[WARN] Rimosse {dropped} righe con TradeDate non valido.") + + # Mantieni solo OPEN/CLOSE + if "Action" in df.columns: + before = len(df) + df = df[df["Action"].isin(["OPEN", "CLOSE"])] + dropped = before - len(df) + if dropped > 0: + print(f"[WARN] Rimosse {dropped} righe con Action non valida.") + + # Pulizia numerica + for col in NUMERIC_COLS: + if col in df.columns: + df[col] = _clean_numeric_series(df[col]) + + return df + + +# ============================================================================= +# FETCH RENDIMENTI DAL DB +# ============================================================================= +def fetch_returns_from_db(isins, start_date, end_date) -> pd.DataFrame: + import sqlalchemy as sa + from sqlalchemy import text as sql_text + + conn_str = read_connection_txt(CONNECTION_TXT) + engine = sa.create_engine(conn_str, fast_executemany=True) + + sp = SP_NAME_DEFAULT + nbar = SP_N_DEFAULT + ptf = PTF_CURR_DEFAULT + + sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") + + frames = [] + with engine.begin() as conn: + for isin in isins: + try: + df = pd.read_sql_query(sql_sp, conn, params={"isin": isin, "n": nbar, "ptf": ptf}) + except Exception as e: + print(f"[ERROR] SP {sp} fallita per {isin}: {e}") + continue + if df.empty: + continue + + col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) + col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) + col_px = detect_column(df, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"]) + + if not col_date: + continue + + df[col_date] = pd.to_datetime(df[col_date], errors="coerce") + df = df.dropna(subset=[col_date]).sort_values(col_date) + + if col_ret: + r = pd.to_numeric(df[col_ret], errors="coerce") + out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r}) + elif col_px: + px = pd.to_numeric(df[col_px], errors="coerce").replace(0, np.nan) + log_r = np.log(px / px.shift(1)) + r = np.expm1(log_r) # log-return -> rendimento semplice decimale + out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r}) + else: + continue + + frames.append(out) + + if not frames: + return pd.DataFrame(index=pd.DatetimeIndex([], name="Date")) + + long = pd.concat(frames, ignore_index=True).dropna(subset=["Date"]) + + mask = ( + (long["Date"].dt.date >= start_date) + & (long["Date"].dt.date <= end_date) + ) + long = long.loc[mask] + + wide = long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() + + # FIX v2: euristica % vs decimale più robusta. + # Verifica la mediana dei valori assoluti per colonna, non solo il massimo globale. + # Se la maggioranza delle colonne ha mediana > 0.5, i dati sono probabilmente percentuali. + if not wide.empty: + col_medians = np.nanmedian(np.abs(wide.values), axis=0) + pct_cols = np.sum(col_medians > 0.5) + total_cols = len(col_medians) + if total_cols > 0 and (pct_cols / total_cols) > 0.5: + wide = wide / 100.0 + print(f"[INFO] Rendimenti convertiti da % a decimale ({pct_cols}/{total_cols} colonne con mediana >0.5).") + + return wide + + +# ============================================================================= +# RICOSTRUZIONE DAILY RETURNS +# ============================================================================= +def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> pd.DataFrame: + strategies = sorted(audit["Strategy"].dropna().astype(str).unique()) + if not strategies: + return pd.DataFrame(index=returns_wide.index, columns=[]) + + idx = returns_wide.index + + daily_num = pd.DataFrame(0.0, index=idx, columns=strategies) + daily_den = pd.DataFrame(0.0, index=idx, columns=strategies) + + closes = audit[audit["Action"] == "CLOSE"].copy() + if not closes.empty: + if "LinkedOpenDate" in closes.columns: + closes["trade_key"] = ( + closes["ISIN"].astype(str) + + "|" + + pd.to_datetime(closes["LinkedOpenDate"]).dt.strftime("%Y-%m-%d") + ) + else: + closes["trade_key"] = ( + closes["ISIN"].astype(str) + + "|" + + pd.to_datetime(closes["TradeDate"]).dt.strftime("%Y-%m-%d") + ) + closes["TradeDate"] = pd.to_datetime(closes["TradeDate"]) + closes_agg = closes.sort_values("TradeDate").groupby("trade_key", as_index=False)["TradeDate"].last() + close_map = closes_agg.set_index("trade_key") + else: + close_map = pd.DataFrame(columns=["TradeDate"]).set_index(pd.Index([], name="trade_key")) + + # debug counters + total_opens = 0 + used_opens = 0 + skipped_missing_isin = 0 + skipped_bad_amount = 0 + skipped_bad_window = 0 + + for strat in strategies: + aud_s = audit[audit["Strategy"] == strat] + opens = aud_s[aud_s["Action"] == "OPEN"].copy() + if opens.empty: + continue + + opens["trade_key"] = ( + opens["ISIN"].astype(str) + + "|" + + pd.to_datetime(opens["TradeDate"]).dt.strftime("%Y-%m-%d") + ) + + # FIX v2: itertuples() al posto di iterrows() — più veloce su dataset grandi. + for op in opens.itertuples(index=False): + total_opens += 1 + isin = op.ISIN + if isin not in returns_wide.columns: + skipped_missing_isin += 1 + continue + + ser = returns_wide[isin].astype(float) + entry_amount = float(op.EntryAmount if pd.notna(op.EntryAmount) else 0.0) + if entry_amount <= 0: + skipped_bad_amount += 1 + continue + + # FIX v2: EntryIndex usato solo se presente e valido; altrimenti sempre + # searchsorted sulla TradeDate. In v1 il default 0 era silenziosamente + # accettato come indice reale anche quando il campo era assente. + raw_idx = getattr(op, "EntryIndex", None) + if raw_idx is not None and pd.notna(raw_idx): + entry_idx = int(raw_idx) + if entry_idx < 0 or entry_idx >= len(ser): + d_open = pd.Timestamp(op.TradeDate).normalize() + entry_idx = int(ser.index.searchsorted(d_open, side="left")) + else: + d_open = pd.Timestamp(op.TradeDate).normalize() + entry_idx = int(ser.index.searchsorted(d_open, side="left")) + + key = op.trade_key + if key in close_map.index: + close_val = close_map.loc[key, "TradeDate"] + if isinstance(close_val, pd.Series): + close_val = close_val.iloc[-1] + d_close = pd.Timestamp(close_val).normalize() + exit_idx = int(ser.index.searchsorted(d_close, side="left")) + else: + exit_idx = len(ser) + + if exit_idx <= entry_idx: + skipped_bad_window += 1 + continue + + idx_seg = ser.index[entry_idx:exit_idx] + vals_seg = ser.values[entry_idx:exit_idx] + + daily_num.loc[idx_seg, strat] += entry_amount * vals_seg + daily_den.loc[idx_seg, strat] += entry_amount + used_opens += 1 + + daily = pd.DataFrame(0.0, index=idx, columns=strategies) + mask = daily_den > 0 + daily[mask] = daily_num[mask] / daily_den[mask] + + # FIX v2: {**d1, **d2, **d3} al posto di d1 | d2 | d3 (richiede Python 3.9+). + debug_dict = { + **{f"num_{c}": daily_num[c] for c in strategies}, + **{f"den_{c}": daily_den[c] for c in strategies}, + **{f"ret_{c}": daily[c] for c in strategies}, + } + debug = pd.concat(debug_dict, axis=1) + debug.to_csv(OUT_DEBUG_CSV, index_label="Date") + + print( + f"[DEBUG] OPEN totali: {total_opens}, usati: {used_opens}, " + f"mancano ISIN: {skipped_missing_isin}, " + f"EntryAmount<=0: {skipped_bad_amount}, " + f"finestra non valida: {skipped_bad_window}" + ) + + return daily + + +# ============================================================================= +# MAIN +# ============================================================================= +def main(): + # FIX v2: config e globals caricati qui, non a import-time. + _load_globals() + + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + PLOT_DIR.mkdir(parents=True, exist_ok=True) + + if not AUDIT_LOG_CSV.exists(): + raise FileNotFoundError(f"Missing trades_audit_log.csv at {AUDIT_LOG_CSV}") + + audit = load_audit_log(AUDIT_LOG_CSV) + + if audit.empty: + raise ValueError("Audit log vuoto.") + + if "Strategy" not in audit.columns: + raise ValueError("Colonna 'Strategy' mancante nell'audit log.") + + # Filtro whitelist + audit["Strategy"] = audit["Strategy"].astype(str) + before = len(audit) + audit = audit[audit["Strategy"].isin(VALID_STRATEGIES)] + removed = before - len(audit) + if removed > 0: + print(f"[INFO] Filtrate {removed} righe con strategie non incluse in {VALID_STRATEGIES}.") + if audit.empty: + raise ValueError(f"Nessuna riga con strategie in {VALID_STRATEGIES} nell'audit log.") + + start_date = (audit["TradeDate"].min() - pd.Timedelta(days=10)).date() + end_date = (audit["TradeDate"].max() + pd.Timedelta(days=10)).date() + + isins = sorted(audit["ISIN"].dropna().astype(str).unique()) + ret_wide = fetch_returns_from_db(isins, start_date, end_date) + if ret_wide.empty: + raise RuntimeError("Nessun rendimento recuperato dal DB nell'intervallo richiesto.") + + daily = rebuild_daily_from_log(audit, ret_wide).sort_index() + daily.to_csv(OUT_DAILY_CSV, index_label="Date") + + equity = (1.0 + daily.fillna(0.0)).cumprod() * 100.0 + equity.to_csv(OUT_EQUITY_CSV, index_label="Date") + + import matplotlib.pyplot as plt + + # Equity + plt.figure(figsize=(10, 6)) + for col in equity.columns: + plt.plot(equity.index, equity[col], label=col) + plt.title("Equity per Strategy") + plt.grid(True) + plt.legend() + plt.tight_layout() + plt.savefig(str(PLOT_EQUITY), dpi=150) + plt.close() + copy_to_dropbox(PLOT_EQUITY) + + # Drawdown + dd = equity / equity.cummax() - 1.0 + plt.figure(figsize=(10, 5)) + for col in dd.columns: + plt.plot(dd.index, dd[col], label=col) + plt.title("Drawdown per Strategy") + plt.grid(True) + plt.legend() + plt.tight_layout() + plt.savefig(str(PLOT_DD), dpi=150) + plt.close() + copy_to_dropbox(PLOT_DD) + + print("Salvati:") + print(" -", OUT_DAILY_CSV) + print(" -", OUT_EQUITY_CSV) + print(" -", OUT_DEBUG_CSV) + print(" -", PLOT_EQUITY) + print(" -", PLOT_DD) + print(" -", DROPBOX_EXPORT_DIR / PLOT_EQUITY.name) + print(" -", DROPBOX_EXPORT_DIR / PLOT_DD.name) + + +if __name__ == "__main__": + main()