affinata versione stabile e fatto rename
This commit is contained in:
773
Signals Daily kNN.py
Normal file
773
Signals Daily kNN.py
Normal file
@@ -0,0 +1,773 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Daily Signals Generator (kNN) – PRODUCTION (coerente al backtest v3.1.5)
|
||||
|
||||
Novità principali:
|
||||
- Cap giornaliero MAX_OPEN=15: ranking unico dei buy e revisione per differenza su ogni strategia
|
||||
- Risk Parity con cap per singolo asset (RP_MAX_WEIGHT) allineato al backtest v3.1.5
|
||||
- Fetch OPEN una sola volta per ISIN coinvolti in OPEN/CLOSE (cache condivisa tra strategie)
|
||||
- Audit log per TUTTE le strategie operative (Equal_Weight, Risk_Parity)
|
||||
|
||||
Pipeline (giorno D, EOD -> t+1 OPEN):
|
||||
1) Carica universo e serie rendimenti dal DB (stored procedure)
|
||||
2) Pattern kNN (WP=60, HA=10, K=25), Signal=1 se EstOutcome > THETA (decimali)
|
||||
3) Ranking unico dei buy e selezione Top-N (MAX_OPEN) come target giornaliero
|
||||
4) Revisione per differenza: chiudi aperture fuori top, apri nuove entrate nel top (+ risk exits)
|
||||
5) Sizing (Equal Weight / Risk Parity con cap)
|
||||
6) Fetch OPEN prices UNA VOLTA per ISIN interessati (OPEN/CLOSE) e popolamento ordini
|
||||
7) Log ordini e snapshot Excel con fogli: Open_Equal_Weight / Open_Risk_Parity
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import ssl
|
||||
import json
|
||||
import time
|
||||
import shutil
|
||||
import warnings
|
||||
import datetime as dt
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple, Iterable, Set
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from urllib.request import urlopen
|
||||
from urllib.error import URLError, HTTPError
|
||||
|
||||
# DB
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import text as sql_text
|
||||
|
||||
from shared_utils import (
|
||||
build_hurst_map,
|
||||
build_pattern_library,
|
||||
characterize_window,
|
||||
detect_column,
|
||||
load_config,
|
||||
predict_from_library,
|
||||
read_connection_txt,
|
||||
require_section,
|
||||
require_value,
|
||||
z_norm,
|
||||
)
|
||||
|
||||
# =========================
|
||||
# CONFIG
|
||||
# =========================
|
||||
CONFIG = load_config()
|
||||
DB_CONFIG = require_section(CONFIG, "db")
|
||||
PATTERN_CONFIG = require_section(CONFIG, "pattern")
|
||||
TAGGING_CONFIG = require_section(CONFIG, "tagging")
|
||||
RANKING_CONFIG = require_section(CONFIG, "ranking")
|
||||
SIGNALS_CONFIG = require_section(CONFIG, "signals")
|
||||
PATHS_CONFIG = require_section(CONFIG, "paths")
|
||||
HURST_CONFIG = CONFIG.get("hurst", {})
|
||||
PRICES_CONFIG = CONFIG.get("prices", {})
|
||||
RUN_CONFIG = CONFIG.get("run", {})
|
||||
|
||||
BASE_DIR = Path(PATHS_CONFIG.get("base_dir", ".")).resolve()
|
||||
OUTPUT_DIR = BASE_DIR / PATHS_CONFIG.get("output_dir", "output")
|
||||
PLOT_DIR = BASE_DIR / PATHS_CONFIG.get("plot_dir", "plot")
|
||||
# Universe now expected inside Input folder
|
||||
UNIVERSO_XLSX = BASE_DIR / PATHS_CONFIG.get("input_universe", "Input/Universo per Trading System.xlsx")
|
||||
CONNECTION_TXT = BASE_DIR / PATHS_CONFIG.get("connection_txt", "connection.txt")
|
||||
AUDIT_LOG_CSV = BASE_DIR / PATHS_CONFIG.get("audit_log_csv", OUTPUT_DIR / "trades_audit_log.csv")
|
||||
OPEN_TRADES_DIR = BASE_DIR / PATHS_CONFIG.get("open_trades_dir", "open_trades")
|
||||
DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF")
|
||||
|
||||
def _dated_signals_filename() -> Path:
|
||||
date_prefix = pd.Timestamp.today().strftime("%Y%m%d")
|
||||
return OUTPUT_DIR / f"{date_prefix}_signals.xlsx"
|
||||
|
||||
# Stored procedure / parametri DB
|
||||
SP_NAME_DEFAULT = str(require_value(DB_CONFIG, "stored_proc", "db"))
|
||||
SP_N_DEFAULT = int(require_value(DB_CONFIG, "n_bars", "db"))
|
||||
PTF_CURR_DEFAULT = str(require_value(DB_CONFIG, "ptf_curr", "db"))
|
||||
|
||||
# Pattern recognition (come backtest)
|
||||
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern"))
|
||||
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern"))
|
||||
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern"))
|
||||
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # 0,005% in decimali (identico al backtest)
|
||||
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
|
||||
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
|
||||
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
|
||||
|
||||
# Exit rules (identiche al backtest)
|
||||
SL_BPS = float(require_value(SIGNALS_CONFIG, "sl_bps", "signals"))
|
||||
TP_BPS = float(require_value(SIGNALS_CONFIG, "tp_bps", "signals"))
|
||||
TRAIL_BPS = float(require_value(SIGNALS_CONFIG, "trail_bps", "signals"))
|
||||
TIME_STOP_BARS = int(require_value(SIGNALS_CONFIG, "time_stop_bars", "signals"))
|
||||
THETA_EXIT = float(require_value(SIGNALS_CONFIG, "theta_exit", "signals")) # soglia debolezza
|
||||
WEAK_DAYS_EXIT = require_value(SIGNALS_CONFIG, "weak_days_exit", "signals") # uscita IMMEDIATA in caso di debolezza (come backtest)
|
||||
|
||||
# Ranking e selezione Top-N per APERTURE
|
||||
MAX_OPEN = int(require_value(SIGNALS_CONFIG, "max_open", "signals")) # cap strumenti aperti oggi (come backtest)
|
||||
|
||||
# Allineamento al backtest v3.1.5 per il cap del Risk Parity
|
||||
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking"))
|
||||
RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # ≈ 0.1333 = 13,33% per singolo asset
|
||||
if RP_MAX_WEIGHT is None:
|
||||
RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1)
|
||||
else:
|
||||
RP_MAX_WEIGHT = float(RP_MAX_WEIGHT)
|
||||
|
||||
# Sizing
|
||||
BASE_CAPITAL_PER_STRATEGY = float(require_value(SIGNALS_CONFIG, "base_capital_per_strategy", "signals"))
|
||||
MIN_TRADE_NOTIONAL = float(require_value(SIGNALS_CONFIG, "min_trade_notional", "signals"))
|
||||
RISK_PARITY_LOOKBACK = int(require_value(SIGNALS_CONFIG, "risk_parity_lookback", "signals"))
|
||||
HURST_LOOKBACK = HURST_CONFIG.get("lookback", None)
|
||||
HURST_MIN_LENGTH = int(HURST_CONFIG.get("min_length", 200))
|
||||
OPEN_PRICE_BASE_URL = str(PRICES_CONFIG.get("base_url", "https://fin.scorer.app/finance/euronext/price"))
|
||||
OPEN_MAX_RETRY = int(PRICES_CONFIG.get("max_retry", 3))
|
||||
OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1))
|
||||
OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10))
|
||||
BUSINESS_DAYS_ONLY = bool(RUN_CONFIG.get("business_days_only", True))
|
||||
SEED = int(RUN_CONFIG.get("seed", 42))
|
||||
|
||||
warnings.filterwarnings("ignore")
|
||||
np.random.seed(SEED)
|
||||
|
||||
# =========================
|
||||
# UTILS
|
||||
# =========================
|
||||
def ensure_dir(p: Path):
|
||||
p.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR):
|
||||
if not src or not dst_dir:
|
||||
return
|
||||
if not src.exists():
|
||||
return
|
||||
try:
|
||||
ensure_dir(dst_dir)
|
||||
dst = dst_dir / src.name
|
||||
shutil.copy2(src, dst)
|
||||
except Exception as exc:
|
||||
print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}")
|
||||
|
||||
def next_business_day(d: dt.date) -> dt.date:
|
||||
nd = d + dt.timedelta(days=1)
|
||||
if not BUSINESS_DAYS_ONLY:
|
||||
return nd
|
||||
while nd.weekday() >= 5: # 5=Sat, 6=Sun
|
||||
nd += dt.timedelta(days=1)
|
||||
return nd
|
||||
|
||||
def _safe_to_float(x) -> Optional[float]:
|
||||
try:
|
||||
return float(x)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _db_fetch_returns(conn_str: str,
|
||||
isins: List[str],
|
||||
sp_name: Optional[str] = None,
|
||||
n_bars: Optional[int] = None,
|
||||
ptf_curr: Optional[str] = None) -> pd.DataFrame:
|
||||
engine = sa.create_engine(conn_str, fast_executemany=True)
|
||||
sp = sp_name or os.environ.get("SP_NAME", SP_NAME_DEFAULT)
|
||||
n_val = n_bars if n_bars is not None else int(os.environ.get("SP_N", SP_N_DEFAULT))
|
||||
ptf = (ptf_curr or os.environ.get("PTF_CURR", PTF_CURR_DEFAULT) or "").strip() or PTF_CURR_DEFAULT
|
||||
|
||||
sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
|
||||
frames: List[pd.DataFrame] = []
|
||||
|
||||
with engine.begin() as conn:
|
||||
for i, isin in enumerate(isins, start=1):
|
||||
print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True)
|
||||
try:
|
||||
df = pd.read_sql_query(sql_sp, conn, params={"isin": str(isin), "n": int(n_val), "ptf": ptf})
|
||||
except Exception as e:
|
||||
print(f"[ERROR] SP {sp} fallita per {isin}: {e}")
|
||||
continue
|
||||
|
||||
if df.empty:
|
||||
print(f"[WARN] Nessun dato per {isin}")
|
||||
continue
|
||||
|
||||
col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"])
|
||||
col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"])
|
||||
if not col_date or not col_ret:
|
||||
print(f"[WARN] Colonne mancanti per {isin}")
|
||||
continue
|
||||
|
||||
out = df[[col_date, col_ret]].copy()
|
||||
out.columns = ["Date", "Ret"]
|
||||
out["Date"] = pd.to_datetime(out["Date"], errors="coerce").dt.tz_localize(None)
|
||||
out["ISIN"] = str(isin)
|
||||
|
||||
med = pd.to_numeric(out["Ret"], errors="coerce").abs().median()
|
||||
if pd.notnull(med) and med > 1.5:
|
||||
out["Ret"] = out["Ret"].astype(float) / 100.0
|
||||
|
||||
print(f" ↳ righe scaricate: {len(out)}")
|
||||
frames.append(out[["Date", "ISIN", "Ret"]])
|
||||
|
||||
if not frames:
|
||||
return pd.DataFrame(columns=["Date", "ISIN", "Ret"])
|
||||
|
||||
out_all = pd.concat(frames, ignore_index=True)
|
||||
out_all = out_all.dropna(subset=["Date"]).sort_values(["ISIN", "Date"]).reset_index(drop=True)
|
||||
return out_all[["Date", "ISIN", "Ret"]]
|
||||
|
||||
# =========================
|
||||
# UNIVERSO + OPEN PRICE API (schema checker)
|
||||
# =========================
|
||||
def load_universe(path: Path) -> pd.DataFrame:
|
||||
df = pd.read_excel(path)
|
||||
if "ISIN" not in df.columns:
|
||||
raise KeyError("Nel file Universo manca la colonna 'ISIN'")
|
||||
df["ISIN"] = df["ISIN"].astype(str).str.strip()
|
||||
for col in ["Asset Class", "Mercato", "TickerOpen"]:
|
||||
if col not in df.columns:
|
||||
df[col] = ""
|
||||
df[col] = df[col].astype(str).str.strip()
|
||||
return df
|
||||
|
||||
def _build_symbol_euronext(row: pd.Series) -> Tuple[str, str]:
|
||||
isin = str(row.get("ISIN", "")).strip()
|
||||
venue = str(row.get("Mercato", "")).strip()
|
||||
tok = str(row.get("TickerOpen", "") or "").strip()
|
||||
base = OPEN_PRICE_BASE_URL
|
||||
if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper():
|
||||
return base, tok
|
||||
if isin and venue:
|
||||
return base, f"{isin}-{venue}"
|
||||
return base, isin
|
||||
|
||||
def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]:
|
||||
"""
|
||||
Endpoint euronext/price/ISIN-Mercato con retry/backoff.
|
||||
Log dettagliato stile checker: [TRY]/[RETRY]/[OK]/[FAIL].
|
||||
"""
|
||||
try:
|
||||
row = universe.loc[universe["ISIN"] == str(isin)].iloc[0]
|
||||
except Exception:
|
||||
print(f"[WARN] ISIN {isin} non trovato nell’universo.")
|
||||
return None
|
||||
|
||||
base, symbol = _build_symbol_euronext(row)
|
||||
url = f"{base}/{symbol}"
|
||||
print(f"[DOWNLOAD] {symbol:<30s} -> [TRY 1/{OPEN_MAX_RETRY}]", flush=True)
|
||||
|
||||
for attempt in range(1, OPEN_MAX_RETRY + 1):
|
||||
if attempt > 1:
|
||||
print(f"[DOWNLOAD] {symbol:<30s} -> [TRY {attempt}/{OPEN_MAX_RETRY}]", flush=True)
|
||||
try:
|
||||
with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp:
|
||||
data = json.loads(resp.read().decode("utf-8"))
|
||||
if not isinstance(data, list) or not data:
|
||||
print(" ↳ NO DATA")
|
||||
continue
|
||||
|
||||
d = (data[0] or {}).get("data") or {}
|
||||
px = d.get("open") if d.get("open") is not None else d.get("prevClose")
|
||||
if px is None:
|
||||
print(" ↳ WARN: 'open' e 'prevClose' assenti")
|
||||
continue
|
||||
|
||||
px = float(px)
|
||||
print(f" ↳ OK open={d.get('open')} close={d.get('close')} (ritorno prezzo={px})")
|
||||
return px
|
||||
|
||||
except (HTTPError, URLError, ssl.SSLError) as e:
|
||||
if attempt < OPEN_MAX_RETRY:
|
||||
print(f" ↳ ERR {e}\nritento tra {OPEN_SLEEP_SEC}s")
|
||||
time.sleep(OPEN_SLEEP_SEC)
|
||||
else:
|
||||
print(f" ↳ ERR {e}")
|
||||
|
||||
print(f"[ERROR] nessun prezzo per {symbol} dopo {OPEN_MAX_RETRY} tentativi")
|
||||
return None
|
||||
|
||||
# =========================
|
||||
# HURST ESTIMATOR & MAP
|
||||
# =========================
|
||||
# =========================
|
||||
# GENERAZIONE SEGNALI (EOD su D)
|
||||
# =========================
|
||||
def generate_signals_today(universe: pd.DataFrame,
|
||||
returns_long: pd.DataFrame,
|
||||
today: dt.date,
|
||||
hurst_map: Optional[Dict[str, float]] = None) -> pd.DataFrame:
|
||||
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
|
||||
decision_date = ret_wide.index[-1].date()
|
||||
|
||||
rows = []
|
||||
for isin in ret_wide.columns:
|
||||
r = ret_wide[isin].dropna().astype(float)
|
||||
# THETA = HURST IN PERCENTUALE (H = 0.50 -> theta_entry = 0.005 = 0.5%)
|
||||
theta_entry = THETA # fallback globale
|
||||
H_val = None
|
||||
if hurst_map is not None:
|
||||
H_val = hurst_map.get(str(isin))
|
||||
if H_val is not None and not pd.isna(H_val):
|
||||
theta_entry = float(H_val) / 100.0
|
||||
|
||||
if len(r) < max(200, WP + HA + 10):
|
||||
rows.append({"Date": decision_date, "ISIN": isin,
|
||||
"Signal": 0, "EstOutcome": np.nan, "AvgDist": np.nan,
|
||||
"PatternType": None, "Confidence": np.nan})
|
||||
continue
|
||||
|
||||
lib_wins, lib_out = build_pattern_library(r, WP, HA)
|
||||
if lib_wins is None or len(r) < WP + HA:
|
||||
est_out, avg_dist, sig = np.nan, np.nan, 0
|
||||
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
|
||||
else:
|
||||
curr = r.values[-WP:]
|
||||
curr_zn = z_norm(curr)
|
||||
if curr_zn is None:
|
||||
est_out, avg_dist, sig = np.nan, np.nan, 0
|
||||
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
|
||||
else:
|
||||
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K)
|
||||
sig = 1 if (pd.notna(est_out) and float(est_out) > float(theta_entry)) else 0
|
||||
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
|
||||
|
||||
rows.append({
|
||||
"Date": decision_date, "ISIN": isin,
|
||||
"Signal": int(sig),
|
||||
"EstOutcome": (None if pd.isna(est_out) else float(est_out)),
|
||||
"AvgDist": (None if pd.isna(avg_dist) else float(avg_dist)),
|
||||
"PatternType": ptype,
|
||||
"Confidence": (None if pconf is None else float(max(0.0, min(1.0, pconf)))),
|
||||
"Hurst": (None if H_val is None else float(H_val)),
|
||||
"Theta": float(theta_entry),
|
||||
})
|
||||
|
||||
sig_df = pd.DataFrame(rows).set_index(["Date","ISIN"]).sort_index()
|
||||
return sig_df
|
||||
|
||||
# =========================
|
||||
# TOP-N SELECTION & PRICE CACHE
|
||||
# =========================
|
||||
def _rank_buy(signals_today: pd.DataFrame, decision_date: dt.date) -> pd.DataFrame:
|
||||
"""
|
||||
Ritorna un DataFrame con i soli buy del giorno, ordinati per:
|
||||
EstOutcome desc, Confidence desc, AvgDist asc.
|
||||
Colonne: ['ISIN','EstOutcome','Confidence','AvgDist']
|
||||
"""
|
||||
if signals_today.empty:
|
||||
return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"])
|
||||
day_df = signals_today.reset_index().query("Date == @decision_date")
|
||||
if day_df.empty:
|
||||
return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"])
|
||||
|
||||
buy = (day_df[day_df["Signal"] == 1]
|
||||
.assign(
|
||||
EstOutcome=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "EstOutcome"], errors="coerce"),
|
||||
Confidence=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "Confidence"], errors="coerce"),
|
||||
AvgDist=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "AvgDist"], errors="coerce"),
|
||||
)
|
||||
.sort_values(by=["EstOutcome","Confidence","AvgDist"], ascending=[False,False,True], na_position="last"))
|
||||
return buy[["ISIN","EstOutcome","Confidence","AvgDist"]].reset_index(drop=True)
|
||||
|
||||
def _select_top_signals(buy_rank_df: pd.DataFrame, max_open: int) -> List[str]:
|
||||
if buy_rank_df.empty:
|
||||
return []
|
||||
return buy_rank_df["ISIN"].head(max_open).tolist()
|
||||
|
||||
def _fetch_open_prices_once(isins: Iterable[str], universe: pd.DataFrame) -> Dict[str, Optional[float]]:
|
||||
cache: Dict[str, Optional[float]] = {}
|
||||
for isin in sorted(set(isins)):
|
||||
cache[isin] = get_open_price(isin, universe)
|
||||
return cache
|
||||
|
||||
# =========================
|
||||
# POSIZIONI / ORDINI / LOG
|
||||
# =========================
|
||||
@dataclass
|
||||
class OpenPos:
|
||||
Strategy: str
|
||||
ISIN: str
|
||||
EntryDate: dt.date
|
||||
EntryIndex: int
|
||||
EntryAmount: float
|
||||
SizeWeight: float
|
||||
PeakPnL: float = 0.0
|
||||
Notes: str = ""
|
||||
|
||||
def open_trades_path(strategy: str) -> Path:
|
||||
ensure_dir(OPEN_TRADES_DIR)
|
||||
return OPEN_TRADES_DIR / f"open_{strategy}.csv"
|
||||
|
||||
def load_open_trades(strategy: str) -> pd.DataFrame:
|
||||
p = open_trades_path(strategy)
|
||||
if not p.exists():
|
||||
return pd.DataFrame(columns=[
|
||||
"Strategy","ISIN","AssetName","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes"
|
||||
])
|
||||
df = pd.read_csv(p)
|
||||
if "EntryDate" in df.columns:
|
||||
df["EntryDate"] = pd.to_datetime(df["EntryDate"], errors="coerce").dt.date
|
||||
if "WeakDays" not in df.columns:
|
||||
df["WeakDays"] = 0
|
||||
if "AssetName" not in df.columns:
|
||||
df["AssetName"] = ""
|
||||
df["Strategy"] = strategy
|
||||
return df
|
||||
|
||||
def save_open_trades(strategy: str, df: pd.DataFrame):
|
||||
p = open_trades_path(strategy)
|
||||
df.to_csv(p, index=False)
|
||||
|
||||
def append_audit_rows(rows: List[Dict]):
|
||||
if not rows:
|
||||
return
|
||||
ensure_dir(AUDIT_LOG_CSV.parent)
|
||||
log = pd.DataFrame(rows)
|
||||
if AUDIT_LOG_CSV.exists():
|
||||
old = pd.read_csv(AUDIT_LOG_CSV)
|
||||
log = pd.concat([old, log], ignore_index=True)
|
||||
log.to_csv(AUDIT_LOG_CSV, index=False)
|
||||
|
||||
# sizing
|
||||
def compute_current_capital_from_log(strategy: str,
|
||||
returns_wide: pd.DataFrame,
|
||||
asof_date: dt.date) -> float:
|
||||
return BASE_CAPITAL_PER_STRATEGY
|
||||
|
||||
def size_equal_weight(candidates: List[str]) -> Dict[str, float]:
|
||||
if not candidates:
|
||||
return {}
|
||||
w = 1.0 / max(1, len(candidates))
|
||||
return {isin: w for isin in candidates}
|
||||
|
||||
def size_risk_parity(candidates: List[str],
|
||||
returns_wide: pd.DataFrame,
|
||||
asof_idx: int) -> Dict[str, float]:
|
||||
"""
|
||||
Risk Parity sui soli candidati, con cap per singolo asset (RP_MAX_WEIGHT),
|
||||
allineato alla logica usata nel backtest v3.1.5.
|
||||
Eventuale parte non allocata resta in cash (non rinormalizziamo dopo il cap).
|
||||
"""
|
||||
if not candidates:
|
||||
return {}
|
||||
L = RISK_PARITY_LOOKBACK
|
||||
start = max(0, asof_idx - L + 1)
|
||||
sub = returns_wide.iloc[start:asof_idx+1][candidates]
|
||||
vol = sub.std().replace(0, np.nan)
|
||||
inv_vol = 1.0 / vol
|
||||
inv_vol = inv_vol.replace([np.inf, -np.inf], np.nan).fillna(0.0)
|
||||
if inv_vol.sum() == 0:
|
||||
return size_equal_weight(candidates)
|
||||
w = (inv_vol / inv_vol.sum()).to_dict()
|
||||
# Cap per singolo peso, come RP_MAX_WEIGHT nel Pattern Recon
|
||||
if RP_MAX_WEIGHT is not None:
|
||||
w = {k: min(RP_MAX_WEIGHT, float(v)) for k, v in w.items()}
|
||||
return w
|
||||
|
||||
def _risk_exit_flags(isin: str,
|
||||
entry_date: Optional[dt.date],
|
||||
ret_wide: pd.DataFrame,
|
||||
decision_date: dt.date,
|
||||
est_map_today: pd.Series) -> List[str]:
|
||||
"""Calcola motivi di uscita (SL/TP/Trailing/Time/Weak) per una posizione."""
|
||||
reasons: List[str] = []
|
||||
pnl_if_stay = 0.0
|
||||
peak_dd = 0.0
|
||||
if entry_date is not None and isin in ret_wide.columns:
|
||||
sub = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)]
|
||||
if not sub.empty and sub.index[0].date() == entry_date:
|
||||
sub = sub.iloc[1:]
|
||||
if not sub.empty:
|
||||
eq_path = (1.0 + sub.fillna(0.0)).cumprod()
|
||||
pnl_if_stay = float(eq_path.iloc[-1] - 1.0)
|
||||
peak_curve = eq_path.cummax()
|
||||
dd_series = eq_path / peak_curve - 1.0
|
||||
peak_dd = float(-dd_series.min())
|
||||
|
||||
if SL_BPS is not None and pnl_if_stay <= -SL_BPS/10000.0:
|
||||
reasons.append("SL")
|
||||
if TP_BPS is not None and pnl_if_stay >= TP_BPS/10000.0:
|
||||
reasons.append("TP")
|
||||
if TRAIL_BPS is not None and peak_dd >= TRAIL_BPS/10000.0:
|
||||
reasons.append("TRAIL")
|
||||
|
||||
if TIME_STOP_BARS is not None and entry_date is not None and isin in ret_wide.columns:
|
||||
sub_all = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)]
|
||||
bars = len(sub_all) - (1 if (not sub_all.empty and sub_all.index[0].date() == entry_date) else 0)
|
||||
if bars >= int(TIME_STOP_BARS):
|
||||
reasons.append("TIME")
|
||||
|
||||
if THETA_EXIT is not None and est_map_today is not None and isin in est_map_today.index:
|
||||
est_out_today = est_map_today.loc[isin]
|
||||
if pd.notna(est_out_today) and float(est_out_today) <= float(THETA_EXIT):
|
||||
reasons.append("WEAK")
|
||||
return reasons
|
||||
|
||||
def update_positions_and_build_orders(universe: pd.DataFrame,
|
||||
returns_long: pd.DataFrame,
|
||||
signals_today: pd.DataFrame,
|
||||
today: dt.date,
|
||||
buy_rank_df: Optional[pd.DataFrame],
|
||||
allowed_open_isins: Optional[List[str]] = None,
|
||||
asset_name_map: Optional[pd.Series] = None) -> Tuple[pd.DataFrame, List[Dict]]:
|
||||
"""
|
||||
- decision_date = ultima data disponibile (EOD)
|
||||
- target giornaliero = primi MAX_OPEN del ranking buy (uguale per tutte le strategie)
|
||||
- per ogni strategia: revisione per differenza vs posizioni già aperte
|
||||
- CLOSE anche per risk exits (SL/TP/Trailing/Time/Weak)
|
||||
- Esecuzione a t+1 open; fetch prezzi UNA VOLTA per tutti gli ISIN con ordini
|
||||
|
||||
NB: strategia Aggressiva/Crypto rimossa. Restano ONLY:
|
||||
- Equal_Weight
|
||||
- Risk_Parity (con cap per singolo asset)
|
||||
"""
|
||||
strategies = ["Equal_Weight", "Risk_Parity"]
|
||||
|
||||
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
|
||||
decision_date: dt.date = ret_wide.index[-1].date()
|
||||
asof_idx = len(ret_wide.index) - 1
|
||||
next_open_date = next_business_day(decision_date)
|
||||
|
||||
# Ranking e maps di oggi
|
||||
est_map_all = None
|
||||
if buy_rank_df is not None and not buy_rank_df.empty:
|
||||
est_map_all = buy_rank_df.set_index("ISIN")["EstOutcome"]
|
||||
else:
|
||||
# fallback: usa tutti i segnali di oggi
|
||||
day_df = signals_today.reset_index().query("Date == @decision_date")
|
||||
if not day_df.empty:
|
||||
est_map_all = day_df.set_index("ISIN")["EstOutcome"]
|
||||
|
||||
target_isins: List[str] = allowed_open_isins or []
|
||||
target_set: Set[str] = set(target_isins)
|
||||
|
||||
# Sizing per strategia basato sul target
|
||||
size_eq = size_equal_weight(target_isins)
|
||||
size_rp = size_risk_parity(target_isins, ret_wide, asof_idx)
|
||||
sizing_by_strategy = {"Equal_Weight": size_eq, "Risk_Parity": size_rp}
|
||||
|
||||
open_concat: List[pd.DataFrame] = []
|
||||
audit_rows_all: List[Dict] = []
|
||||
isins_for_open_fetch: Set[str] = set()
|
||||
isins_for_close_fetch: Set[str] = set()
|
||||
|
||||
for strat in strategies:
|
||||
df_open = load_open_trades(strat)
|
||||
current_set: Set[str] = set(df_open["ISIN"].astype(str).tolist())
|
||||
|
||||
# --- risk exits su posizioni correnti ---
|
||||
closers: List[Tuple[str, str]] = []
|
||||
if not df_open.empty:
|
||||
if "WeakDays" not in df_open.columns:
|
||||
df_open["WeakDays"] = 0
|
||||
|
||||
for _, row_open in df_open.iterrows():
|
||||
isin = str(row_open["ISIN"])
|
||||
entry_date = pd.to_datetime(row_open["EntryDate"]).date() if pd.notna(row_open["EntryDate"]) else None
|
||||
reasons = _risk_exit_flags(isin, entry_date, ret_wide, decision_date, est_map_all if est_map_all is not None else pd.Series(dtype=float))
|
||||
if reasons:
|
||||
closers.append((isin, "+".join(sorted(set(reasons)))))
|
||||
|
||||
# --- chiusure per ranking (scivolati fuori dal top-N) ---
|
||||
drop_by_rank = list(current_set - target_set)
|
||||
for isin in drop_by_rank:
|
||||
closers.append((isin, "RANK"))
|
||||
|
||||
# --- applica chiusure (accumula ordini) ---
|
||||
for isin, reason in closers:
|
||||
row_open = df_open.loc[df_open["ISIN"] == isin]
|
||||
linked_date = row_open["EntryDate"].iloc[0] if not row_open.empty else None
|
||||
|
||||
isins_for_close_fetch.add(isin)
|
||||
audit_rows_all.append({
|
||||
"Strategy": strat,
|
||||
"ISIN": isin,
|
||||
"Action": "CLOSE",
|
||||
"TradeDate": next_open_date,
|
||||
"EntryIndex": np.nan,
|
||||
"EntryAmount": np.nan,
|
||||
"SizeWeight": np.nan,
|
||||
"Price": None, # riempito dopo fetch
|
||||
"PnL_%": np.nan,
|
||||
"ExitReason": reason,
|
||||
"LinkedOpenDate": linked_date,
|
||||
"Duration_bars": np.nan,
|
||||
"Notes": f"DecisionDate={decision_date}"
|
||||
})
|
||||
df_open = df_open[df_open["ISIN"] != isin]
|
||||
current_set.discard(isin)
|
||||
|
||||
# --- aperture per differenza verso target ---
|
||||
add_list = [isin for isin in target_isins if isin not in current_set] # mantiene l'ordine del ranking
|
||||
w_dict = sizing_by_strategy[strat]
|
||||
if add_list and w_dict:
|
||||
cap = compute_current_capital_from_log(strat, ret_wide, decision_date)
|
||||
for isin in add_list:
|
||||
w = float(w_dict.get(isin, 0.0))
|
||||
if w <= 0:
|
||||
continue
|
||||
notional = max(MIN_TRADE_NOTIONAL, cap * w)
|
||||
entry_idx = asof_idx + 1
|
||||
|
||||
isins_for_open_fetch.add(isin)
|
||||
audit_rows_all.append({
|
||||
"Strategy": strat,
|
||||
"ISIN": isin,
|
||||
"Action": "OPEN",
|
||||
"TradeDate": next_open_date,
|
||||
"EntryIndex": entry_idx,
|
||||
"EntryAmount": float(notional),
|
||||
"SizeWeight": float(w),
|
||||
"Price": None, # riempito dopo fetch
|
||||
"PnL_%": np.nan,
|
||||
"ExitReason": "",
|
||||
"LinkedOpenDate": "",
|
||||
"Duration_bars": 0,
|
||||
"Notes": f"DecisionDate={decision_date}"
|
||||
})
|
||||
df_open = pd.concat([df_open, pd.DataFrame([{
|
||||
"Strategy": strat,
|
||||
"ISIN": isin,
|
||||
"EntryDate": decision_date,
|
||||
"EntryIndex": entry_idx,
|
||||
"EntryAmount": float(notional),
|
||||
"SizeWeight": float(w),
|
||||
"PeakPnL": 0.0,
|
||||
"WeakDays": 0,
|
||||
"Notes": ""
|
||||
}])], ignore_index=True)
|
||||
current_set.add(isin)
|
||||
|
||||
if asset_name_map is not None:
|
||||
df_open["AssetName"] = df_open["ISIN"].astype(str).map(asset_name_map).fillna("")
|
||||
else:
|
||||
if "AssetName" not in df_open.columns:
|
||||
df_open["AssetName"] = ""
|
||||
if "AssetName" in df_open.columns:
|
||||
cols = list(df_open.columns)
|
||||
if "ISIN" in cols and "AssetName" in cols:
|
||||
cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName")))
|
||||
df_open = df_open[cols]
|
||||
|
||||
save_open_trades(strat, df_open)
|
||||
df_open["Strategy"] = strat
|
||||
open_concat.append(df_open)
|
||||
|
||||
# ---- FETCH UNA VOLTA (OPEN + CLOSE) ----
|
||||
fetch_isins = sorted(isins_for_open_fetch.union(isins_for_close_fetch))
|
||||
if fetch_isins:
|
||||
print(f"[PRICE] fetch open per {len(fetch_isins)} ISIN (open={len(isins_for_open_fetch)}, close={len(isins_for_close_fetch)})", flush=True)
|
||||
px_cache = _fetch_open_prices_once(fetch_isins, universe)
|
||||
for r in audit_rows_all:
|
||||
if r.get("Price") is None:
|
||||
r["Price"] = _safe_to_float(px_cache.get(r["ISIN"]))
|
||||
|
||||
# ---- SUMMARY ----
|
||||
n_open = sum(1 for r in audit_rows_all if r.get("Action") == "OPEN")
|
||||
n_close = sum(1 for r in audit_rows_all if r.get("Action") == "CLOSE")
|
||||
print(f"[SUMMARY] decision_date={decision_date} opens={n_open} closes={n_close} target={len(target_isins)} (cap={MAX_OPEN})", flush=True)
|
||||
|
||||
open_all = pd.concat(open_concat, ignore_index=True) if open_concat else pd.DataFrame()
|
||||
return open_all, audit_rows_all
|
||||
|
||||
# =========================
|
||||
# MAIN RUN
|
||||
# =========================
|
||||
def main_run(run_date: Optional[dt.date] = None):
|
||||
today = run_date or dt.date.today()
|
||||
ensure_dir(OUTPUT_DIR)
|
||||
|
||||
# 1) Universo
|
||||
universe = load_universe(UNIVERSO_XLSX)
|
||||
asset_name_col = detect_column(universe, [
|
||||
"Nome", "Name", "Asset", "Asset Name", "Descrizione", "Description"
|
||||
])
|
||||
if not asset_name_col:
|
||||
print("[WARN] Colonna con il nome dell'asset non trovata nell'universo.")
|
||||
asset_name_map: Optional[pd.Series] = None
|
||||
if asset_name_col:
|
||||
asset_name_map = (
|
||||
universe[["ISIN", asset_name_col]]
|
||||
.dropna(subset=["ISIN"])
|
||||
.assign(ISIN=lambda df: df["ISIN"].astype(str).str.strip())
|
||||
)
|
||||
asset_name_map = asset_name_map.set_index("ISIN")[asset_name_col].astype(str).str.strip()
|
||||
|
||||
# 2) Ritorni (DB)
|
||||
conn_str = read_connection_txt(CONNECTION_TXT)
|
||||
isins = universe["ISIN"].tolist()
|
||||
returns_long = _db_fetch_returns(
|
||||
conn_str,
|
||||
isins,
|
||||
sp_name=SP_NAME_DEFAULT,
|
||||
n_bars=SP_N_DEFAULT,
|
||||
ptf_curr=PTF_CURR_DEFAULT,
|
||||
)
|
||||
if returns_long.empty:
|
||||
raise RuntimeError("Nessun rendimento disponibile dal DB (SP vuota?).")
|
||||
|
||||
# 2b) Hurst map per ISIN (stessa logica concettuale del backtest)
|
||||
hurst_map = build_hurst_map(
|
||||
returns_long,
|
||||
lookback=HURST_LOOKBACK or SP_N_DEFAULT,
|
||||
min_length=HURST_MIN_LENGTH,
|
||||
)
|
||||
|
||||
# 3) Segnali EOD su D con THETA = Hurst/100 per ISIN
|
||||
sig_df = generate_signals_today(universe, returns_long, today, hurst_map=hurst_map)
|
||||
|
||||
# 3b) Ranking unico e selezione Top-N
|
||||
decision_date = returns_long["Date"].max().date()
|
||||
buy_rank_df = _rank_buy(sig_df, decision_date) # tutti i buy ordinati
|
||||
allowed_open = _select_top_signals(buy_rank_df, MAX_OPEN) # top-N ISIN
|
||||
|
||||
# 4) Posizioni + audit (OPEN/CLOSE) con target Top-N
|
||||
open_df, audit_rows = update_positions_and_build_orders(
|
||||
universe, returns_long, sig_df, today,
|
||||
buy_rank_df=buy_rank_df,
|
||||
allowed_open_isins=allowed_open,
|
||||
asset_name_map=asset_name_map,
|
||||
)
|
||||
|
||||
# 5) Append audit log (TUTTE le strategie operative)
|
||||
if audit_rows:
|
||||
append_audit_rows(audit_rows)
|
||||
|
||||
# 6) Snapshot Excel datato — fogli con nomi completi
|
||||
ensure_dir(OPEN_TRADES_DIR)
|
||||
signals_path = _dated_signals_filename()
|
||||
signals_sheet = sig_df.reset_index()
|
||||
if asset_name_map is not None:
|
||||
signals_sheet["AssetName"] = signals_sheet["ISIN"].astype(str).map(asset_name_map).fillna("")
|
||||
else:
|
||||
signals_sheet["AssetName"] = ""
|
||||
|
||||
# inserisci la colonna subito dopo l'ISIN
|
||||
if "AssetName" in signals_sheet.columns:
|
||||
cols = list(signals_sheet.columns)
|
||||
cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName")))
|
||||
signals_sheet = signals_sheet[cols]
|
||||
|
||||
with pd.ExcelWriter(signals_path) as xw:
|
||||
signals_sheet.to_excel(xw, sheet_name="Signals", index=False)
|
||||
if not open_df.empty:
|
||||
for strat, g in open_df.groupby("Strategy"):
|
||||
sheet_name_map = {
|
||||
"Equal_Weight": "Open_Equal_Weight",
|
||||
"Risk_Parity": "Open_Risk_Parity",
|
||||
}
|
||||
sheet_name = sheet_name_map.get(strat, f"Open_{strat}")[:31]
|
||||
g.to_excel(xw, sheet_name=sheet_name, index=False)
|
||||
|
||||
copy_to_dropbox(signals_path)
|
||||
for strat in ["Equal_Weight", "Risk_Parity"]:
|
||||
csv_path = open_trades_path(strat)
|
||||
if csv_path.exists():
|
||||
copy_to_dropbox(csv_path)
|
||||
|
||||
print(f"✅ Signals generated for {today}. Saved to {signals_path}")
|
||||
print(f"Open trades saved in {OPEN_TRADES_DIR}")
|
||||
print(f"Audit log updated at {AUDIT_LOG_CSV}")
|
||||
|
||||
# =========================
|
||||
# ENTRY POINT
|
||||
# =========================
|
||||
if __name__ == "__main__":
|
||||
main_run()
|
||||
Reference in New Issue
Block a user