592 lines
22 KiB
Python
592 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Equity/Reconciliation Builder from Audit Log [v2]
|
|
|
|
Modifiche rispetto a v1:
|
|
- Fix parser header CSV: rimosso split(",",1)[0] che troncava le colonne
|
|
- Fix mixed_rows: conta solo righe realmente miste (non tutti i CSV comma-separated)
|
|
- Fix EntryIndex: usa None come sentinel; fallback a searchsorted sempre se assente
|
|
- Fix euristica % vs decimale: soglia più robusta + check consistenza colonne
|
|
- Fix compatibilità Python 3.8: dict | dict sostituito con {**d1, **d2}
|
|
- Performance: iterrows() -> itertuples()
|
|
- Errori: uniformato FileNotFoundError/ValueError al posto di SystemExit raw
|
|
- Side effects: lettura config spostata in _load_globals() chiamata da main()
|
|
|
|
- Legge trades_audit_log.csv (OPEN/CLOSE; EntryAmount base=100; EntryIndex opzionale)
|
|
- Scarica rendimenti giornalieri via stored procedure (connection.txt)
|
|
- Converte i rendimenti in decimali coerenti (percentuali => /100; log-return => expm1)
|
|
- Ricostruisce i rendimenti giornalieri per strategia come MEDIA PONDERATA sui trade attivi
|
|
- Salva:
|
|
- daily_returns_by_strategy.csv
|
|
- equity_by_strategy.csv
|
|
- debug_daily_by_strategy.csv
|
|
- equity_by_strategy.png, drawdown_by_strategy.png
|
|
- Mostra anche a video i grafici
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
from pathlib import Path
|
|
import pandas as pd
|
|
import numpy as np
|
|
import shutil
|
|
|
|
from shared_utils import (
|
|
detect_column,
|
|
load_config,
|
|
read_connection_txt,
|
|
require_section,
|
|
)
|
|
|
|
# =============================================================================
|
|
# PATH & OUTPUT
|
|
# =============================================================================
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
|
|
# Defaults — verranno aggiornati da _load_globals() prima dell'uso
|
|
OUTPUT_DIR = BASE_DIR / "output"
|
|
PLOT_DIR = BASE_DIR / "plot"
|
|
AUDIT_LOG_CSV = BASE_DIR / "output" / "trades_audit_log.csv"
|
|
CONNECTION_TXT = BASE_DIR / "connection.txt"
|
|
|
|
OUT_DAILY_CSV = OUTPUT_DIR / "daily_returns_by_strategy.csv"
|
|
OUT_EQUITY_CSV = OUTPUT_DIR / "equity_by_strategy.csv"
|
|
OUT_DEBUG_CSV = OUTPUT_DIR / "debug_daily_by_strategy.csv"
|
|
PLOT_EQUITY = PLOT_DIR / "equity_by_strategy.png"
|
|
PLOT_DD = PLOT_DIR / "drawdown_by_strategy.png"
|
|
|
|
DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF")
|
|
|
|
# Stored procedure defaults
|
|
SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL"
|
|
SP_N_DEFAULT = 1260
|
|
PTF_CURR_DEFAULT = "EUR"
|
|
|
|
DEFAULT_STRATEGIES = ["Equal_Weight", "Risk_Parity"]
|
|
VALID_STRATEGIES = list(DEFAULT_STRATEGIES)
|
|
|
|
|
|
def _load_globals() -> None:
|
|
"""Carica config e aggiorna le variabili globali. Chiamata una sola volta da main()."""
|
|
global OUTPUT_DIR, PLOT_DIR, AUDIT_LOG_CSV, CONNECTION_TXT
|
|
global OUT_DAILY_CSV, OUT_EQUITY_CSV, OUT_DEBUG_CSV, PLOT_EQUITY, PLOT_DD
|
|
global SP_NAME_DEFAULT, SP_N_DEFAULT, PTF_CURR_DEFAULT, VALID_STRATEGIES
|
|
|
|
try:
|
|
config = load_config()
|
|
paths_cfg = require_section(config, "paths")
|
|
except Exception as exc:
|
|
print(f"[WARN] Config non disponibile ({exc}); uso i percorsi di default.")
|
|
config = None
|
|
paths_cfg = {}
|
|
|
|
OUTPUT_DIR = BASE_DIR / paths_cfg.get("output_dir", "output")
|
|
PLOT_DIR = BASE_DIR / paths_cfg.get("plot_dir", "plot")
|
|
AUDIT_LOG_CSV = BASE_DIR / paths_cfg.get("audit_log_csv", str(OUTPUT_DIR / "trades_audit_log.csv"))
|
|
CONNECTION_TXT = BASE_DIR / paths_cfg.get("connection_txt", "connection.txt")
|
|
|
|
OUT_DAILY_CSV = OUTPUT_DIR / "daily_returns_by_strategy.csv"
|
|
OUT_EQUITY_CSV = OUTPUT_DIR / "equity_by_strategy.csv"
|
|
OUT_DEBUG_CSV = OUTPUT_DIR / "debug_daily_by_strategy.csv"
|
|
PLOT_EQUITY = PLOT_DIR / "equity_by_strategy.png"
|
|
PLOT_DD = PLOT_DIR / "drawdown_by_strategy.png"
|
|
|
|
try:
|
|
db_cfg = require_section(config, "db") if config else {}
|
|
except Exception as exc:
|
|
print(f"[WARN] Config DB non disponibile ({exc}); uso i default interni.")
|
|
db_cfg = {}
|
|
|
|
if db_cfg:
|
|
SP_NAME_DEFAULT = str(db_cfg.get("stored_proc", SP_NAME_DEFAULT))
|
|
SP_N_DEFAULT = int(db_cfg.get("n_bars", SP_N_DEFAULT))
|
|
PTF_CURR_DEFAULT = str(db_cfg.get("ptf_curr", PTF_CURR_DEFAULT))
|
|
|
|
equity_cfg = config.get("equity_log", {}) if config and isinstance(config, dict) else {}
|
|
raw_whitelist = equity_cfg.get("strategy_whitelist") if isinstance(equity_cfg, dict) else None
|
|
if raw_whitelist:
|
|
whitelist = [str(x).strip() for x in raw_whitelist if str(x).strip()]
|
|
if whitelist:
|
|
VALID_STRATEGIES = whitelist
|
|
|
|
|
|
# =============================================================================
|
|
# UTILITY
|
|
# =============================================================================
|
|
def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR) -> bool:
|
|
if not src or not dst_dir:
|
|
return False
|
|
if not src.exists():
|
|
print(f"[WARN] file non trovato per copia Dropbox: {src}")
|
|
return False
|
|
try:
|
|
dst_dir.mkdir(parents=True, exist_ok=True)
|
|
dst = dst_dir / src.name
|
|
shutil.copy2(src, dst)
|
|
print(f"[DROPBOX] Copiato {src.name} in {dst_dir}")
|
|
return True
|
|
except Exception as exc:
|
|
print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}")
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# AUDIT LOG LOADER (FORMAT CHECKS)
|
|
# =============================================================================
|
|
REQUIRED_AUDIT_COLS = ["Strategy", "ISIN", "Action", "TradeDate"]
|
|
CANONICAL_AUDIT_COLS = [
|
|
"Strategy",
|
|
"ISIN",
|
|
"Action",
|
|
"TradeDate",
|
|
"EntryIndex",
|
|
"EntryAmount",
|
|
"SizeWeight",
|
|
"Price",
|
|
"PnL_%",
|
|
"ExitReason",
|
|
"LinkedOpenDate",
|
|
"Duration_bars",
|
|
"Notes",
|
|
]
|
|
NUMERIC_COLS = [
|
|
"EntryIndex",
|
|
"EntryAmount",
|
|
"SizeWeight",
|
|
"Price",
|
|
"PnL_%",
|
|
"Duration_bars",
|
|
]
|
|
|
|
|
|
def _clean_numeric_series(s: pd.Series) -> pd.Series:
|
|
if pd.api.types.is_numeric_dtype(s):
|
|
return s
|
|
txt = s.astype(str).str.strip()
|
|
txt = txt.str.replace("%", "", regex=False)
|
|
txt = txt.replace({"": np.nan, "nan": np.nan, "None": np.nan})
|
|
|
|
def _fix_one(val: str) -> str:
|
|
if val is None or (isinstance(val, float) and np.isnan(val)):
|
|
return val
|
|
v = str(val).strip()
|
|
if not v:
|
|
return v
|
|
dot_n = v.count(".")
|
|
comma_n = v.count(",")
|
|
|
|
if dot_n > 1 and comma_n == 0:
|
|
# più punti senza virgole => punti come separatori migliaia
|
|
return v.replace(".", "")
|
|
if dot_n > 0 and comma_n > 0:
|
|
last_dot = v.rfind(".")
|
|
last_comma = v.rfind(",")
|
|
if last_comma > last_dot:
|
|
# virgola decimale, punti migliaia
|
|
return v.replace(".", "").replace(",", ".")
|
|
# punto decimale, virgole migliaia
|
|
return v.replace(",", "")
|
|
if comma_n > 0 and dot_n == 0:
|
|
# solo virgola => virgola decimale
|
|
return v.replace(",", ".")
|
|
return v
|
|
|
|
cleaned = txt.map(_fix_one)
|
|
return pd.to_numeric(cleaned, errors="coerce")
|
|
|
|
|
|
def _parse_mixed_dates(series: pd.Series) -> pd.Series:
|
|
s = series.astype(str).str.strip()
|
|
s = s.replace({"": np.nan, "nan": np.nan, "None": np.nan})
|
|
dt_iso = pd.to_datetime(s, format="%Y-%m-%d", errors="coerce")
|
|
dt_iso_ts = pd.to_datetime(s, format="%Y-%m-%d %H:%M:%S", errors="coerce")
|
|
dt_dmy = pd.to_datetime(s, format="%d/%m/%Y", errors="coerce")
|
|
dt_dmy_ts = pd.to_datetime(s, format="%d/%m/%Y %H:%M:%S", errors="coerce")
|
|
return dt_iso.fillna(dt_iso_ts).fillna(dt_dmy).fillna(dt_dmy_ts)
|
|
|
|
|
|
def load_audit_log(path: Path) -> pd.DataFrame:
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Missing trades_audit_log.csv at {path}")
|
|
|
|
raw = path.read_text(encoding="utf-8", errors="ignore")
|
|
if not raw.strip():
|
|
raise ValueError("Audit log vuoto.")
|
|
|
|
lines = raw.splitlines()
|
|
first_line = lines[0]
|
|
|
|
# FIX v2: l'header viene splittato sul separatore reale senza troncature.
|
|
# In v1 c'era un errato split(",", 1)[0] che perdeva le colonne dopo la prima virgola.
|
|
if ";" in first_line:
|
|
header = [c.strip() for c in first_line.split(";")]
|
|
else:
|
|
header = [c.strip() for c in first_line.split(",")]
|
|
|
|
# Rimuovi eventuale colonna indice vuota in testa (es. export pandas con index)
|
|
if header and header[0] == "":
|
|
header = header[1:]
|
|
|
|
if not header or "TradeDate" not in header:
|
|
header = CANONICAL_AUDIT_COLS.copy()
|
|
|
|
# Determina il separatore dominante per le righe dati
|
|
semicolon_file = ";" in first_line
|
|
|
|
rows = []
|
|
mixed_rows = 0
|
|
for line in lines[1:]:
|
|
if not line or not line.strip():
|
|
continue
|
|
|
|
# FIX v2: "mixed" solo se il separatore della riga differisce dal file header.
|
|
# In v1 tutte le righe CSV venivano contate come "legacy/misto".
|
|
if semicolon_file:
|
|
if ";" in line and line.count(";") >= (len(header) - 1):
|
|
parts = line.split(";")
|
|
else:
|
|
parts = line.split(",")
|
|
mixed_rows += 1
|
|
else:
|
|
if ";" in line and line.count(";") >= (len(header) - 1):
|
|
parts = line.split(";")
|
|
mixed_rows += 1
|
|
else:
|
|
parts = line.split(",")
|
|
if parts and parts[0] == "":
|
|
parts = parts[1:]
|
|
|
|
if len(parts) > len(header):
|
|
parts = parts[: len(header) - 1] + [",".join(parts[len(header) - 1:])]
|
|
elif len(parts) < len(header):
|
|
parts = parts + [""] * (len(header) - len(parts))
|
|
rows.append(parts)
|
|
|
|
df = pd.DataFrame(rows, columns=header)
|
|
if mixed_rows > 0:
|
|
print(f"[WARN] Audit log con {mixed_rows} righe in formato legacy/misto: normalizzate in lettura.")
|
|
|
|
missing = [c for c in REQUIRED_AUDIT_COLS if c not in df.columns]
|
|
if missing:
|
|
raise ValueError(
|
|
f"Formato audit log non valido. Colonne mancanti: {missing}. "
|
|
f"Colonne trovate: {list(df.columns)}"
|
|
)
|
|
|
|
# Normalizza colonne chiave
|
|
df["Action"] = df["Action"].astype(str).str.upper().str.strip()
|
|
df["Strategy"] = df["Strategy"].astype(str).str.strip()
|
|
df["ISIN"] = df["ISIN"].astype(str).str.strip()
|
|
|
|
# Date
|
|
df["TradeDate"] = _parse_mixed_dates(df["TradeDate"])
|
|
if "LinkedOpenDate" in df.columns:
|
|
df["LinkedOpenDate"] = _parse_mixed_dates(df["LinkedOpenDate"])
|
|
|
|
# Rimuovi righe con date invalide
|
|
before = len(df)
|
|
df = df.dropna(subset=["TradeDate"])
|
|
dropped = before - len(df)
|
|
if dropped > 0:
|
|
print(f"[WARN] Rimosse {dropped} righe con TradeDate non valido.")
|
|
|
|
# Mantieni solo OPEN/CLOSE
|
|
if "Action" in df.columns:
|
|
before = len(df)
|
|
df = df[df["Action"].isin(["OPEN", "CLOSE"])]
|
|
dropped = before - len(df)
|
|
if dropped > 0:
|
|
print(f"[WARN] Rimosse {dropped} righe con Action non valida.")
|
|
|
|
# Pulizia numerica
|
|
for col in NUMERIC_COLS:
|
|
if col in df.columns:
|
|
df[col] = _clean_numeric_series(df[col])
|
|
|
|
return df
|
|
|
|
|
|
# =============================================================================
|
|
# FETCH RENDIMENTI DAL DB
|
|
# =============================================================================
|
|
def fetch_returns_from_db(isins, start_date, end_date) -> pd.DataFrame:
|
|
import sqlalchemy as sa
|
|
from sqlalchemy import text as sql_text
|
|
|
|
conn_str = read_connection_txt(CONNECTION_TXT)
|
|
engine = sa.create_engine(conn_str, fast_executemany=True)
|
|
|
|
sp = SP_NAME_DEFAULT
|
|
nbar = SP_N_DEFAULT
|
|
ptf = PTF_CURR_DEFAULT
|
|
|
|
sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
|
|
|
|
frames = []
|
|
with engine.begin() as conn:
|
|
for isin in isins:
|
|
try:
|
|
df = pd.read_sql_query(sql_sp, conn, params={"isin": isin, "n": nbar, "ptf": ptf})
|
|
except Exception as e:
|
|
print(f"[ERROR] SP {sp} fallita per {isin}: {e}")
|
|
continue
|
|
if df.empty:
|
|
continue
|
|
|
|
col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"])
|
|
col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"])
|
|
col_px = detect_column(df, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"])
|
|
|
|
if not col_date:
|
|
continue
|
|
|
|
df[col_date] = pd.to_datetime(df[col_date], errors="coerce")
|
|
df = df.dropna(subset=[col_date]).sort_values(col_date)
|
|
|
|
if col_ret:
|
|
r = pd.to_numeric(df[col_ret], errors="coerce")
|
|
out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r})
|
|
elif col_px:
|
|
px = pd.to_numeric(df[col_px], errors="coerce").replace(0, np.nan)
|
|
log_r = np.log(px / px.shift(1))
|
|
r = np.expm1(log_r) # log-return -> rendimento semplice decimale
|
|
out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r})
|
|
else:
|
|
continue
|
|
|
|
frames.append(out)
|
|
|
|
if not frames:
|
|
return pd.DataFrame(index=pd.DatetimeIndex([], name="Date"))
|
|
|
|
long = pd.concat(frames, ignore_index=True).dropna(subset=["Date"])
|
|
|
|
mask = (
|
|
(long["Date"].dt.date >= start_date)
|
|
& (long["Date"].dt.date <= end_date)
|
|
)
|
|
long = long.loc[mask]
|
|
|
|
wide = long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
|
|
|
|
# FIX v2: euristica % vs decimale più robusta.
|
|
# Verifica la mediana dei valori assoluti per colonna, non solo il massimo globale.
|
|
# Se la maggioranza delle colonne ha mediana > 0.5, i dati sono probabilmente percentuali.
|
|
if not wide.empty:
|
|
col_medians = np.nanmedian(np.abs(wide.values), axis=0)
|
|
pct_cols = np.sum(col_medians > 0.5)
|
|
total_cols = len(col_medians)
|
|
if total_cols > 0 and (pct_cols / total_cols) > 0.5:
|
|
wide = wide / 100.0
|
|
print(f"[INFO] Rendimenti convertiti da % a decimale ({pct_cols}/{total_cols} colonne con mediana >0.5).")
|
|
|
|
return wide
|
|
|
|
|
|
# =============================================================================
|
|
# RICOSTRUZIONE DAILY RETURNS
|
|
# =============================================================================
|
|
def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> pd.DataFrame:
|
|
strategies = sorted(audit["Strategy"].dropna().astype(str).unique())
|
|
if not strategies:
|
|
return pd.DataFrame(index=returns_wide.index, columns=[])
|
|
|
|
idx = returns_wide.index
|
|
|
|
daily_num = pd.DataFrame(0.0, index=idx, columns=strategies)
|
|
daily_den = pd.DataFrame(0.0, index=idx, columns=strategies)
|
|
|
|
closes = audit[audit["Action"] == "CLOSE"].copy()
|
|
if not closes.empty:
|
|
if "LinkedOpenDate" in closes.columns:
|
|
closes["trade_key"] = (
|
|
closes["ISIN"].astype(str)
|
|
+ "|"
|
|
+ pd.to_datetime(closes["LinkedOpenDate"]).dt.strftime("%Y-%m-%d")
|
|
)
|
|
else:
|
|
closes["trade_key"] = (
|
|
closes["ISIN"].astype(str)
|
|
+ "|"
|
|
+ pd.to_datetime(closes["TradeDate"]).dt.strftime("%Y-%m-%d")
|
|
)
|
|
closes["TradeDate"] = pd.to_datetime(closes["TradeDate"])
|
|
closes_agg = closes.sort_values("TradeDate").groupby("trade_key", as_index=False)["TradeDate"].last()
|
|
close_map = closes_agg.set_index("trade_key")
|
|
else:
|
|
close_map = pd.DataFrame(columns=["TradeDate"]).set_index(pd.Index([], name="trade_key"))
|
|
|
|
# debug counters
|
|
total_opens = 0
|
|
used_opens = 0
|
|
skipped_missing_isin = 0
|
|
skipped_bad_amount = 0
|
|
skipped_bad_window = 0
|
|
|
|
for strat in strategies:
|
|
aud_s = audit[audit["Strategy"] == strat]
|
|
opens = aud_s[aud_s["Action"] == "OPEN"].copy()
|
|
if opens.empty:
|
|
continue
|
|
|
|
opens["trade_key"] = (
|
|
opens["ISIN"].astype(str)
|
|
+ "|"
|
|
+ pd.to_datetime(opens["TradeDate"]).dt.strftime("%Y-%m-%d")
|
|
)
|
|
|
|
# FIX v2: itertuples() al posto di iterrows() — più veloce su dataset grandi.
|
|
for op in opens.itertuples(index=False):
|
|
total_opens += 1
|
|
isin = op.ISIN
|
|
if isin not in returns_wide.columns:
|
|
skipped_missing_isin += 1
|
|
continue
|
|
|
|
ser = returns_wide[isin].astype(float)
|
|
entry_amount = float(op.EntryAmount if pd.notna(op.EntryAmount) else 0.0)
|
|
if entry_amount <= 0:
|
|
skipped_bad_amount += 1
|
|
continue
|
|
|
|
# FIX v2: EntryIndex usato solo se presente e valido; altrimenti sempre
|
|
# searchsorted sulla TradeDate. In v1 il default 0 era silenziosamente
|
|
# accettato come indice reale anche quando il campo era assente.
|
|
raw_idx = getattr(op, "EntryIndex", None)
|
|
if raw_idx is not None and pd.notna(raw_idx):
|
|
entry_idx = int(raw_idx)
|
|
if entry_idx < 0 or entry_idx >= len(ser):
|
|
d_open = pd.Timestamp(op.TradeDate).normalize()
|
|
entry_idx = int(ser.index.searchsorted(d_open, side="left"))
|
|
else:
|
|
d_open = pd.Timestamp(op.TradeDate).normalize()
|
|
entry_idx = int(ser.index.searchsorted(d_open, side="left"))
|
|
|
|
key = op.trade_key
|
|
if key in close_map.index:
|
|
close_val = close_map.loc[key, "TradeDate"]
|
|
if isinstance(close_val, pd.Series):
|
|
close_val = close_val.iloc[-1]
|
|
d_close = pd.Timestamp(close_val).normalize()
|
|
exit_idx = int(ser.index.searchsorted(d_close, side="left"))
|
|
else:
|
|
exit_idx = len(ser)
|
|
|
|
if exit_idx <= entry_idx:
|
|
skipped_bad_window += 1
|
|
continue
|
|
|
|
idx_seg = ser.index[entry_idx:exit_idx]
|
|
vals_seg = ser.values[entry_idx:exit_idx]
|
|
|
|
daily_num.loc[idx_seg, strat] += entry_amount * vals_seg
|
|
daily_den.loc[idx_seg, strat] += entry_amount
|
|
used_opens += 1
|
|
|
|
daily = pd.DataFrame(0.0, index=idx, columns=strategies)
|
|
mask = daily_den > 0
|
|
daily[mask] = daily_num[mask] / daily_den[mask]
|
|
|
|
# FIX v2: {**d1, **d2, **d3} al posto di d1 | d2 | d3 (richiede Python 3.9+).
|
|
debug_dict = {
|
|
**{f"num_{c}": daily_num[c] for c in strategies},
|
|
**{f"den_{c}": daily_den[c] for c in strategies},
|
|
**{f"ret_{c}": daily[c] for c in strategies},
|
|
}
|
|
debug = pd.concat(debug_dict, axis=1)
|
|
debug.to_csv(OUT_DEBUG_CSV, index_label="Date")
|
|
|
|
print(
|
|
f"[DEBUG] OPEN totali: {total_opens}, usati: {used_opens}, "
|
|
f"mancano ISIN: {skipped_missing_isin}, "
|
|
f"EntryAmount<=0: {skipped_bad_amount}, "
|
|
f"finestra non valida: {skipped_bad_window}"
|
|
)
|
|
|
|
return daily
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN
|
|
# =============================================================================
|
|
def main():
|
|
# FIX v2: config e globals caricati qui, non a import-time.
|
|
_load_globals()
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
PLOT_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
if not AUDIT_LOG_CSV.exists():
|
|
raise FileNotFoundError(f"Missing trades_audit_log.csv at {AUDIT_LOG_CSV}")
|
|
|
|
audit = load_audit_log(AUDIT_LOG_CSV)
|
|
|
|
if audit.empty:
|
|
raise ValueError("Audit log vuoto.")
|
|
|
|
if "Strategy" not in audit.columns:
|
|
raise ValueError("Colonna 'Strategy' mancante nell'audit log.")
|
|
|
|
# Filtro whitelist
|
|
audit["Strategy"] = audit["Strategy"].astype(str)
|
|
before = len(audit)
|
|
audit = audit[audit["Strategy"].isin(VALID_STRATEGIES)]
|
|
removed = before - len(audit)
|
|
if removed > 0:
|
|
print(f"[INFO] Filtrate {removed} righe con strategie non incluse in {VALID_STRATEGIES}.")
|
|
if audit.empty:
|
|
raise ValueError(f"Nessuna riga con strategie in {VALID_STRATEGIES} nell'audit log.")
|
|
|
|
start_date = (audit["TradeDate"].min() - pd.Timedelta(days=10)).date()
|
|
end_date = (audit["TradeDate"].max() + pd.Timedelta(days=10)).date()
|
|
|
|
isins = sorted(audit["ISIN"].dropna().astype(str).unique())
|
|
ret_wide = fetch_returns_from_db(isins, start_date, end_date)
|
|
if ret_wide.empty:
|
|
raise RuntimeError("Nessun rendimento recuperato dal DB nell'intervallo richiesto.")
|
|
|
|
daily = rebuild_daily_from_log(audit, ret_wide).sort_index()
|
|
daily.to_csv(OUT_DAILY_CSV, index_label="Date")
|
|
|
|
equity = (1.0 + daily.fillna(0.0)).cumprod() * 100.0
|
|
equity.to_csv(OUT_EQUITY_CSV, index_label="Date")
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
# Equity
|
|
plt.figure(figsize=(10, 6))
|
|
for col in equity.columns:
|
|
plt.plot(equity.index, equity[col], label=col)
|
|
plt.title("Equity per Strategy")
|
|
plt.grid(True)
|
|
plt.legend()
|
|
plt.tight_layout()
|
|
plt.savefig(str(PLOT_EQUITY), dpi=150)
|
|
plt.close()
|
|
copy_to_dropbox(PLOT_EQUITY)
|
|
|
|
# Drawdown
|
|
dd = equity / equity.cummax() - 1.0
|
|
plt.figure(figsize=(10, 5))
|
|
for col in dd.columns:
|
|
plt.plot(dd.index, dd[col], label=col)
|
|
plt.title("Drawdown per Strategy")
|
|
plt.grid(True)
|
|
plt.legend()
|
|
plt.tight_layout()
|
|
plt.savefig(str(PLOT_DD), dpi=150)
|
|
plt.close()
|
|
copy_to_dropbox(PLOT_DD)
|
|
|
|
print("Salvati:")
|
|
print(" -", OUT_DAILY_CSV)
|
|
print(" -", OUT_EQUITY_CSV)
|
|
print(" -", OUT_DEBUG_CSV)
|
|
print(" -", PLOT_EQUITY)
|
|
print(" -", PLOT_DD)
|
|
print(" -", DROPBOX_EXPORT_DIR / PLOT_EQUITY.name)
|
|
print(" -", DROPBOX_EXPORT_DIR / PLOT_DD.name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|