Files
Trading/equity_from_log.py
2025-12-01 17:37:51 +01:00

347 lines
12 KiB
Python

# -*- coding: utf-8 -*-
"""
Equity/Reconciliation Builder from Audit Log
- Legge trades_audit_log.csv (OPEN/CLOSE; EntryAmount base=100; EntryIndex opzionale)
- Scarica rendimenti giornalieri via stored procedure (connection.txt)
- Converte i rendimenti in decimali coerenti (percentuali => /100; log-return => expm1)
- Ricostruisce i rendimenti giornalieri per strategia come MEDIA PONDERATA sui trade attivi
- Salva:
- daily_returns_by_strategy.csv
- equity_by_strategy.csv
- debug_daily_by_strategy.csv
- equity_by_strategy.png, drawdown_by_strategy.png
- Mostra anche a video i grafici
"""
from __future__ import annotations
from pathlib import Path
import pandas as pd
import numpy as np
import shutil
from shared_utils import (
detect_column,
load_config,
read_connection_txt,
require_section,
)
# =============================================================================
# PATH & OUTPUT
# =============================================================================
BASE_DIR = Path(__file__).resolve().parent
CONFIG = None
try:
CONFIG = load_config()
PATHS_CONFIG = require_section(CONFIG, "paths")
except Exception as exc: # pragma: no cover - best effort
print(f"[WARN] Config non disponibile ({exc}); uso i percorsi di default.")
PATHS_CONFIG = {}
OUTPUT_DIR = BASE_DIR / PATHS_CONFIG.get("output_dir", "output")
PLOT_DIR = BASE_DIR / PATHS_CONFIG.get("plot_dir", "plot")
AUDIT_LOG_CSV = BASE_DIR / PATHS_CONFIG.get("audit_log_csv", OUTPUT_DIR / "trades_audit_log.csv")
CONNECTION_TXT = BASE_DIR / PATHS_CONFIG.get("connection_txt", "connection.txt")
OUT_DAILY_CSV = OUTPUT_DIR / "daily_returns_by_strategy.csv"
OUT_EQUITY_CSV = OUTPUT_DIR / "equity_by_strategy.csv"
OUT_DEBUG_CSV = OUTPUT_DIR / "debug_daily_by_strategy.csv"
PLOT_EQUITY = PLOT_DIR / "equity_by_strategy.png"
PLOT_DD = PLOT_DIR / "drawdown_by_strategy.png"
DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF")
def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR) -> bool:
if not src or not dst_dir:
return False
if not src.exists():
print(f"[WARN] file non trovato per copia Dropbox: {src}")
return False
try:
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / src.name
shutil.copy2(src, dst)
print(f"[DROPBOX] Copiato {src.name} in {dst_dir}")
return True
except Exception as exc:
print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}")
return False
# Stored procedure
SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL"
SP_N_DEFAULT = 1260
PTF_CURR_DEFAULT = "EUR"
try:
DB_CONFIG = require_section(CONFIG, "db") if CONFIG else {}
except Exception as exc: # pragma: no cover - best effort
print(f"[WARN] Config DB non disponibile ({exc}); uso i default interni.")
DB_CONFIG = {}
else:
SP_NAME_DEFAULT = str(DB_CONFIG.get("stored_proc", SP_NAME_DEFAULT))
SP_N_DEFAULT = int(DB_CONFIG.get("n_bars", SP_N_DEFAULT))
PTF_CURR_DEFAULT = str(DB_CONFIG.get("ptf_curr", PTF_CURR_DEFAULT))
DEFAULT_STRATEGIES = ["Equal_Weight", "Risk_Parity"]
VALID_STRATEGIES = DEFAULT_STRATEGIES
EQUITY_CFG = CONFIG.get("equity_log", {}) if CONFIG else {}
raw_whitelist = EQUITY_CFG.get("strategy_whitelist") if isinstance(EQUITY_CFG, dict) else None
if raw_whitelist:
whitelist = [str(x).strip() for x in raw_whitelist if str(x).strip()]
if whitelist:
VALID_STRATEGIES = whitelist
# =============================================================================
# FETCH RENDIMENTI DAL DB
# =============================================================================
def fetch_returns_from_db(isins, start_date, end_date) -> pd.DataFrame:
import sqlalchemy as sa
from sqlalchemy import text as sql_text
conn_str = read_connection_txt(CONNECTION_TXT)
engine = sa.create_engine(conn_str, fast_executemany=True)
sp = SP_NAME_DEFAULT
nbar = SP_N_DEFAULT
ptf = PTF_CURR_DEFAULT
sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
frames = []
with engine.begin() as conn:
for isin in isins:
try:
df = pd.read_sql_query(sql_sp, conn, params={"isin": isin, "n": nbar, "ptf": ptf})
except Exception as e:
print(f"[ERROR] SP {sp} fallita per {isin}: {e}")
continue
if df.empty:
continue
col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"])
col_px = detect_column(df, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"])
if not col_date:
continue
df[col_date] = pd.to_datetime(df[col_date], errors="coerce")
df = df.dropna(subset=[col_date]).sort_values(col_date)
if col_ret:
r = pd.to_numeric(df[col_ret], errors="coerce")
out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r})
elif col_px:
px = pd.to_numeric(df[col_px], errors="coerce").astype(float).replace(0, np.nan)
log_r = np.log(px / px.shift(1))
r = np.expm1(log_r) # log-return -> semplice decimale
out = pd.DataFrame({"Date": df[col_date], "ISIN": isin, "Ret": r})
else:
continue
frames.append(out)
if not frames:
return pd.DataFrame(index=pd.DatetimeIndex([], name="Date"))
long = pd.concat(frames, ignore_index=True).dropna(subset=["Date"])
mask = (
(long["Date"].dt.date >= start_date)
& (long["Date"].dt.date <= end_date)
)
long = long.loc[mask]
wide = long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
if not wide.empty:
max_abs = np.nanmax(np.abs(wide.values))
if np.isfinite(max_abs) and max_abs > 0.5:
wide = wide / 100.0
return wide
# =============================================================================
# RICOSTRUZIONE DAILY RETURNS
# =============================================================================
def rebuild_daily_from_log(audit: pd.DataFrame, returns_wide: pd.DataFrame) -> pd.DataFrame:
strategies = sorted(audit["Strategy"].dropna().astype(str).unique())
if not strategies:
return pd.DataFrame(index=returns_wide.index, columns=[])
idx = returns_wide.index
daily_num = pd.DataFrame(0.0, index=idx, columns=strategies)
daily_den = pd.DataFrame(0.0, index=idx, columns=strategies)
closes = audit[audit["Action"] == "CLOSE"].copy()
if not closes.empty:
if "LinkedOpenDate" in closes.columns:
closes["_key"] = (
closes["ISIN"].astype(str)
+ "|"
+ pd.to_datetime(closes["LinkedOpenDate"]).dt.strftime("%Y-%m-%d")
)
else:
closes["_key"] = (
closes["ISIN"].astype(str)
+ "|"
+ pd.to_datetime(closes["TradeDate"]).dt.strftime("%Y-%m-%d")
)
closes["TradeDate"] = pd.to_datetime(closes["TradeDate"])
closes_agg = closes.sort_values("TradeDate").groupby("_key", as_index=False)["TradeDate"].last()
close_map = closes_agg.set_index("_key")
else:
close_map = pd.DataFrame().set_index(pd.Index([], name="_key"))
for strat in strategies:
aud_s = audit[audit["Strategy"] == strat]
opens = aud_s[aud_s["Action"] == "OPEN"].copy()
if opens.empty:
continue
opens["_key"] = (
opens["ISIN"].astype(str)
+ "|"
+ pd.to_datetime(opens["TradeDate"]).dt.strftime("%Y-%m-%d")
)
for _, op in opens.iterrows():
isin = op["ISIN"]
if isin not in returns_wide.columns:
continue
ser = returns_wide[isin].astype(float)
entry_amount = float(op.get("EntryAmount", 0.0) or 0.0)
if entry_amount <= 0:
continue
entry_idx = int(op.get("EntryIndex", 0) or 0)
if entry_idx < 0 or entry_idx >= len(ser):
d_open = pd.Timestamp(op["TradeDate"]).normalize()
entry_idx = int(ser.index.searchsorted(d_open, side="left"))
key = op["_key"]
if key in close_map.index:
close_val = close_map.loc[key, "TradeDate"]
if isinstance(close_val, pd.Series):
close_val = close_val.iloc[-1]
d_close = pd.Timestamp(close_val).normalize()
exit_idx = int(ser.index.searchsorted(d_close, side="left"))
else:
exit_idx = len(ser)
if exit_idx <= entry_idx:
continue
idx_seg = ser.index[entry_idx:exit_idx]
vals_seg = ser.values[entry_idx:exit_idx]
daily_num.loc[idx_seg, strat] += entry_amount * vals_seg
daily_den.loc[idx_seg, strat] += entry_amount
daily = pd.DataFrame(0.0, index=idx, columns=strategies)
mask = daily_den > 0
daily[mask] = daily_num[mask] / daily_den[mask]
debug = pd.concat(
{f"num_{c}": daily_num[c] for c in strategies}
| {f"den_{c}": daily_den[c] for c in strategies}
| {f"ret_{c}": daily[c] for c in strategies},
axis=1
)
debug.to_csv(OUT_DEBUG_CSV, index_label="Date")
return daily
# =============================================================================
# MAIN
# =============================================================================
def main():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
PLOT_DIR.mkdir(parents=True, exist_ok=True)
if not AUDIT_LOG_CSV.exists():
raise FileNotFoundError("Missing trades_audit_log.csv")
# parsing robusto (LinkedOpenDate può mancare)
try:
audit = pd.read_csv(AUDIT_LOG_CSV, parse_dates=["TradeDate", "LinkedOpenDate"])
except ValueError:
audit = pd.read_csv(AUDIT_LOG_CSV, parse_dates=["TradeDate"])
if audit.empty:
raise SystemExit("Audit log vuoto.")
if "Strategy" not in audit.columns:
raise SystemExit("Colonna 'Strategy' mancante nell'audit log.")
# === filtro whitelist: solo strategie volute ===
audit["Strategy"] = audit["Strategy"].astype(str)
before = len(audit)
audit = audit[audit["Strategy"].isin(VALID_STRATEGIES)]
removed = before - len(audit)
if removed > 0:
print(
f"[INFO] Filtrate {removed} righe con strategie non incluse in {VALID_STRATEGIES}."
)
if audit.empty:
raise SystemExit(f"Nessuna riga con strategie in {VALID_STRATEGIES} nell'audit log.")
start_date = (audit["TradeDate"].min() - pd.Timedelta(days=10)).date()
end_date = (audit["TradeDate"].max() + pd.Timedelta(days=10)).date()
isins = sorted(audit["ISIN"].dropna().astype(str).unique())
ret_wide = fetch_returns_from_db(isins, start_date, end_date)
if ret_wide.empty:
raise RuntimeError("Nessun rendimento recuperato dal DB nell'intervallo richiesto.")
daily = rebuild_daily_from_log(audit, ret_wide).sort_index()
daily.to_csv(OUT_DAILY_CSV, index_label="Date")
equity = (1.0 + daily.fillna(0.0)).cumprod() * 100.0
equity.to_csv(OUT_EQUITY_CSV, index_label="Date")
import matplotlib.pyplot as plt
# Equity
plt.figure(figsize=(10, 6))
for col in equity.columns:
plt.plot(equity.index, equity[col], label=col)
plt.title("Equity per Strategy")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.savefig(str(PLOT_EQUITY), dpi=150)
plt.close()
copy_to_dropbox(PLOT_EQUITY)
# Drawdown
dd = equity / equity.cummax() - 1.0
plt.figure(figsize=(10, 5))
for col in dd.columns:
plt.plot(dd.index, dd[col], label=col)
plt.title("Drawdown per Strategy")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.savefig(str(PLOT_DD), dpi=150)
plt.close()
copy_to_dropbox(PLOT_DD)
print("Salvati:")
print(" -", OUT_DAILY_CSV)
print(" -", OUT_EQUITY_CSV)
print(" -", OUT_DEBUG_CSV)
print(" -", PLOT_EQUITY)
print(" -", PLOT_DD)
print(" -", DROPBOX_EXPORT_DIR / PLOT_EQUITY.name)
print(" -", DROPBOX_EXPORT_DIR / PLOT_DD.name)
if __name__ == "__main__":
main()