Segnali giornalieri

This commit is contained in:
fredmaloggia
2025-11-17 08:11:27 +01:00
committed by GitHub
parent 9522877271
commit ecf03b7775

View File

@@ -0,0 +1,809 @@
# -*- coding: utf-8 -*-
"""
Daily Signals Generator (kNN) PRODUCTION (coerente al backtest v3.1.5)
Novità principali:
- Cap giornaliero MAX_OPEN=15: ranking unico dei buy e revisione per differenza su ogni strategia
- Risk Parity con cap per singolo asset (RP_MAX_WEIGHT) allineato al backtest v3.1.5
- Fetch OPEN una sola volta per ISIN coinvolti in OPEN/CLOSE (cache condivisa tra strategie)
- Audit log per TUTTE le strategie operative (Equal_Weight, Risk_Parity)
Pipeline (giorno D, EOD -> t+1 OPEN):
1) Carica universo e serie rendimenti dal DB (stored procedure)
2) Pattern kNN (WP=60, HA=10, K=25), Signal=1 se EstOutcome > THETA (decimali)
3) Ranking unico dei buy e selezione Top-N (MAX_OPEN) come target giornaliero
4) Revisione per differenza: chiudi aperture fuori top, apri nuove entrate nel top (+ risk exits)
5) Sizing (Equal Weight / Risk Parity con cap)
6) Fetch OPEN prices UNA VOLTA per ISIN interessati (OPEN/CLOSE) e popolamento ordini
7) Log ordini e snapshot Excel con fogli: Open_Equal_Weight / Open_Risk_Parity
"""
from __future__ import annotations
import os
import ssl
import json
import time
import warnings
import datetime as dt
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Iterable, Set
import numpy as np
import pandas as pd
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
# DB
import sqlalchemy as sa
from sqlalchemy import text as sql_text
import pyodbc
# =========================
# CONFIG
# =========================
BASE_DIR = Path(".")
UNIVERSO_XLSX = BASE_DIR / "Universo per Trading System.xlsx"
CONNECTION_TXT = BASE_DIR / "connection.txt"
AUDIT_LOG_CSV = BASE_DIR / "trades_audit_log.csv"
OPEN_TRADES_DIR = BASE_DIR / "open_trades"
def _dated_signals_filename() -> Path:
date_prefix = pd.Timestamp.today().strftime("%Y%m%d")
return BASE_DIR / f"{date_prefix}_signals.xlsx"
# Stored procedure / parametri DB
SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL"
SP_N_DEFAULT = 1305
PTF_CURR_DEFAULT = "EUR"
# Pattern recognition (come backtest)
WP = 60
HA = 10
KNN_K = 25
THETA = 0.005 # 0,005% in decimali (identico al backtest)
# Exit rules (identiche al backtest)
SL_BPS = 300.0
TP_BPS = 800.0
TRAIL_BPS = 300.0
TIME_STOP_BARS = 20
THETA_EXIT = 0.0 # soglia debolezza
WEAK_DAYS_EXIT = None # uscita IMMEDIATA in caso di debolezza (come backtest)
# Ranking e selezione Top-N per APERTURE
MAX_OPEN = 15 # cap strumenti aperti oggi (come backtest)
# Allineamento al backtest v3.1.5 per il cap del Risk Parity
TOP_N_MAX = MAX_OPEN
RP_MAX_WEIGHT = 2 / TOP_N_MAX # ≈ 0.1333 = 13,33% per singolo asset
# Sizing
BASE_CAPITAL_PER_STRATEGY = 100.0
MIN_TRADE_NOTIONAL = 0.01
RISK_PARITY_LOOKBACK = 60
# Calendario
BUSINESS_DAYS_ONLY = True
SEED = 42
warnings.filterwarnings("ignore")
np.random.seed(SEED)
# =========================
# UTILS
# =========================
def ensure_dir(p: Path):
p.mkdir(parents=True, exist_ok=True)
def next_business_day(d: dt.date) -> dt.date:
nd = d + dt.timedelta(days=1)
if not BUSINESS_DAYS_ONLY:
return nd
while nd.weekday() >= 5: # 5=Sat, 6=Sun
nd += dt.timedelta(days=1)
return nd
def _safe_to_float(x) -> Optional[float]:
try:
return float(x)
except Exception:
return None
# =========================
# CONNESSIONE DB
# =========================
def read_connection_txt(path: Path) -> str:
if not path.exists():
raise FileNotFoundError(f"Missing connection.txt at {path}")
params: Dict[str, str] = {}
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
params[k.strip().lower()] = v.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
if not all([username, password, host, database]):
raise ValueError("connection.txt incompleto: servono username/password/host/database.")
installed = [d for d in pyodbc.drivers()]
driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server"
return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}"
def _db_fetch_returns(conn_str: str,
isins: List[str],
sp_name: Optional[str] = None,
n_bars: Optional[int] = None,
ptf_curr: Optional[str] = None) -> pd.DataFrame:
engine = sa.create_engine(conn_str, fast_executemany=True)
sp = sp_name or os.environ.get("SP_NAME", SP_NAME_DEFAULT)
n_val = n_bars if n_bars is not None else int(os.environ.get("SP_N", SP_N_DEFAULT))
ptf = (ptf_curr or os.environ.get("PTF_CURR", PTF_CURR_DEFAULT) or "").strip() or PTF_CURR_DEFAULT
sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
frames: List[pd.DataFrame] = []
def _pick(df: pd.DataFrame, candidates: List[str]) -> Optional[str]:
low = {c.lower(): c for c in df.columns}
for c in candidates:
if c.lower() in low:
return low[c.lower()]
for c in df.columns:
cl = c.lower()
if any(tok in cl for tok in [x.lower() for x in candidates]):
return c
return None
with engine.begin() as conn:
for i, isin in enumerate(isins, start=1):
print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True)
try:
df = pd.read_sql_query(sql_sp, conn, params={"isin": str(isin), "n": int(n_val), "ptf": ptf})
except Exception as e:
print(f"[ERROR] SP {sp} fallita per {isin}: {e}")
continue
if df.empty:
print(f"[WARN] Nessun dato per {isin}")
continue
col_date = _pick(df, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = _pick(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"])
if not col_date or not col_ret:
print(f"[WARN] Colonne mancanti per {isin}")
continue
out = df[[col_date, col_ret]].copy()
out.columns = ["Date", "Ret"]
out["Date"] = pd.to_datetime(out["Date"], errors="coerce").dt.tz_localize(None)
out["ISIN"] = str(isin)
med = pd.to_numeric(out["Ret"], errors="coerce").abs().median()
if pd.notnull(med) and med > 1.5:
out["Ret"] = out["Ret"].astype(float) / 100.0
print(f" ↳ righe scaricate: {len(out)}")
frames.append(out[["Date", "ISIN", "Ret"]])
if not frames:
return pd.DataFrame(columns=["Date", "ISIN", "Ret"])
out_all = pd.concat(frames, ignore_index=True)
out_all = out_all.dropna(subset=["Date"]).sort_values(["ISIN", "Date"]).reset_index(drop=True)
return out_all[["Date", "ISIN", "Ret"]]
# =========================
# UNIVERSO + OPEN PRICE API (schema checker)
# =========================
OPEN_MAX_RETRY = 3
OPEN_SLEEP_SEC = 0.1
OPEN_TIMEOUT = 10
def load_universe(path: Path) -> pd.DataFrame:
df = pd.read_excel(path)
if "ISIN" not in df.columns:
raise KeyError("Nel file Universo manca la colonna 'ISIN'")
df["ISIN"] = df["ISIN"].astype(str).str.strip()
for col in ["Asset Class", "Mercato", "TickerOpen"]:
if col not in df.columns:
df[col] = ""
df[col] = df[col].astype(str).str.strip()
return df
def _build_symbol_euronext(row: pd.Series) -> Tuple[str, str]:
isin = str(row.get("ISIN", "")).strip()
venue = str(row.get("Mercato", "")).strip()
tok = str(row.get("TickerOpen", "") or "").strip()
base = "https://fin.scorer.app/finance/euronext/price"
if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper():
return base, tok
if isin and venue:
return base, f"{isin}-{venue}"
return base, isin
def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]:
"""
Endpoint euronext/price/ISIN-Mercato con retry/backoff.
Log dettagliato stile checker: [TRY]/[RETRY]/[OK]/[FAIL].
"""
try:
row = universe.loc[universe["ISIN"] == str(isin)].iloc[0]
except Exception:
print(f"[WARN] ISIN {isin} non trovato nelluniverso.")
return None
base, symbol = _build_symbol_euronext(row)
url = f"{base}/{symbol}"
print(f"[DOWNLOAD] {symbol:<30s} -> [TRY 1/{OPEN_MAX_RETRY}]", flush=True)
for attempt in range(1, OPEN_MAX_RETRY + 1):
if attempt > 1:
print(f"[DOWNLOAD] {symbol:<30s} -> [TRY {attempt}/{OPEN_MAX_RETRY}]", flush=True)
try:
with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp:
data = json.loads(resp.read().decode("utf-8"))
if not isinstance(data, list) or not data:
print(" ↳ NO DATA")
continue
d = (data[0] or {}).get("data") or {}
px = d.get("open") if d.get("open") is not None else d.get("prevClose")
if px is None:
print(" ↳ WARN: 'open' e 'prevClose' assenti")
continue
px = float(px)
print(f" ↳ OK open={d.get('open')} close={d.get('close')} (ritorno prezzo={px})")
return px
except (HTTPError, URLError, ssl.SSLError) as e:
if attempt < OPEN_MAX_RETRY:
print(f" ↳ ERR {e}\nritento tra {OPEN_SLEEP_SEC}s")
time.sleep(OPEN_SLEEP_SEC)
else:
print(f" ↳ ERR {e}")
print(f"[ERROR] nessun prezzo per {symbol} dopo {OPEN_MAX_RETRY} tentativi")
return None
# =========================
# HURST ESTIMATOR & MAP
# =========================
from typing import Optional # in cima al file c'è già Optional nei type hints, quindi ok
def _hurst_rs(series: pd.Series) -> Optional[float]:
"""
Stima semplice del coefficiente di Hurst tramite Rescaled Range (R/S) su un'unica finestra.
Ritorna NaN se la serie è troppo corta o degenerata.
"""
x = pd.to_numeric(series.dropna(), errors="coerce").astype(float).values
n = len(x)
if n < 100:
return None
x = x - x.mean()
z = np.cumsum(x)
R = z.max() - z.min()
S = x.std(ddof=1)
if S <= 0 or R <= 0:
return None
H = np.log(R / S) / np.log(n)
if not np.isfinite(H):
return None
return float(H)
def build_hurst_map(returns_long: pd.DataFrame,
lookback: int = 252) -> Dict[str, float]:
"""
Costruisce una mappa ISIN -> Hurst usando gli ultimi `lookback` rendimenti.
"""
if returns_long.empty:
return {}
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
hurst_map: Dict[str, float] = {}
for isin in ret_wide.columns:
s = ret_wide[isin].dropna().astype(float)
if len(s) < max(lookback, 100):
continue
h = _hurst_rs(s.iloc[-lookback:])
if h is None or not np.isfinite(h):
continue
hurst_map[str(isin)] = float(h)
return hurst_map
# =========================
# PATTERN RECOGNITION (WP/HA)
# =========================
def z_norm(arr: np.ndarray) -> Optional[np.ndarray]:
arr = np.asarray(arr, dtype=float)
mu = arr.mean()
sd = arr.std()
if sd < 1e-12:
return None
return (arr - mu) / (sd + 1e-12)
def build_pattern_library(ret_series: pd.Series, Wp: int, Ha: int) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
x = ret_series.dropna().values
N = len(x)
if N < Wp + Ha + 10:
return None, None
wins, outs = [], []
for t in range(0, N - Wp - Ha):
win = x[t:t+Wp]
winzn = z_norm(win)
if winzn is None:
continue
outcome = np.sum(x[t+Wp : t+Wp+Ha]) # somma rendimenti futuri su Ha (decimali)
wins.append(winzn); outs.append(outcome)
if not wins:
return None, None
return np.array(wins), np.array(outs)
def predict_from_library(curr_win: np.ndarray,
lib_wins: np.ndarray,
lib_out: np.ndarray,
k: int = 25) -> Tuple[float, float, np.ndarray]:
dists = np.linalg.norm(lib_wins - curr_win, axis=1)
idx = np.argsort(dists)[:min(k, len(dists))]
return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx
def characterize_window(ret_series: pd.Series, Wp: int) -> Tuple[Optional[str], float]:
x = ret_series.dropna().values
if len(x) < max(WP, 30):
return None, 0.0
win = x[-Wp:]
mu, sd = win.mean(), win.std()
if sd < 1e-12:
return "compression", 0.5
last3 = win[-3:] if len(win) >= 3 else win
if np.sign(last3).sum() in (3, -3):
return "momentum_burst", min(1.0, abs(last3.sum())/(sd+1e-12))
return None, 0.0
# =========================
# GENERAZIONE SEGNALI (EOD su D)
# =========================
def generate_signals_today(universe: pd.DataFrame,
returns_long: pd.DataFrame,
today: dt.date,
hurst_map: Optional[Dict[str, float]] = None) -> pd.DataFrame:
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
decision_date = ret_wide.index[-1].date()
rows = []
for isin in ret_wide.columns:
r = ret_wide[isin].dropna().astype(float)
# THETA = HURST IN PERCENTUALE (H = 0.50 -> theta_entry = 0.005 = 0.5%)
theta_entry = THETA # fallback globale
H_val = None
if hurst_map is not None:
H_val = hurst_map.get(str(isin))
if H_val is not None and not pd.isna(H_val):
theta_entry = float(H_val) / 100.0
if len(r) < max(200, WP + HA + 10):
rows.append({"Date": decision_date, "ISIN": isin,
"Signal": 0, "EstOutcome": np.nan, "AvgDist": np.nan,
"PatternType": None, "Confidence": np.nan})
continue
lib_wins, lib_out = build_pattern_library(r, WP, HA)
if lib_wins is None or len(r) < WP + HA:
est_out, avg_dist, sig = np.nan, np.nan, 0
ptype, pconf = characterize_window(r, WP)
else:
curr = r.values[-WP:]
curr_zn = z_norm(curr)
if curr_zn is None:
est_out, avg_dist, sig = np.nan, np.nan, 0
ptype, pconf = characterize_window(r, WP)
else:
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K)
sig = 1 if (pd.notna(est_out) and float(est_out) > float(theta_entry)) else 0
ptype, pconf = characterize_window(r, WP)
rows.append({
"Date": decision_date, "ISIN": isin,
"Signal": int(sig),
"EstOutcome": (None if pd.isna(est_out) else float(est_out)),
"AvgDist": (None if pd.isna(avg_dist) else float(avg_dist)),
"PatternType": ptype,
"Confidence": (None if pconf is None else float(max(0.0, min(1.0, pconf)))),
"Hurst": (None if H_val is None else float(H_val)),
"Theta": float(theta_entry),
})
sig_df = pd.DataFrame(rows).set_index(["Date","ISIN"]).sort_index()
return sig_df
# =========================
# TOP-N SELECTION & PRICE CACHE
# =========================
def _rank_buy(signals_today: pd.DataFrame, decision_date: dt.date) -> pd.DataFrame:
"""
Ritorna un DataFrame con i soli buy del giorno, ordinati per:
EstOutcome desc, Confidence desc, AvgDist asc.
Colonne: ['ISIN','EstOutcome','Confidence','AvgDist']
"""
if signals_today.empty:
return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"])
day_df = signals_today.reset_index().query("Date == @decision_date")
if day_df.empty:
return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"])
buy = (day_df[day_df["Signal"] == 1]
.assign(
EstOutcome=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "EstOutcome"], errors="coerce"),
Confidence=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "Confidence"], errors="coerce"),
AvgDist=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "AvgDist"], errors="coerce"),
)
.sort_values(by=["EstOutcome","Confidence","AvgDist"], ascending=[False,False,True], na_position="last"))
return buy[["ISIN","EstOutcome","Confidence","AvgDist"]].reset_index(drop=True)
def _select_top_signals(buy_rank_df: pd.DataFrame, max_open: int) -> List[str]:
if buy_rank_df.empty:
return []
return buy_rank_df["ISIN"].head(max_open).tolist()
def _fetch_open_prices_once(isins: Iterable[str], universe: pd.DataFrame) -> Dict[str, Optional[float]]:
cache: Dict[str, Optional[float]] = {}
for isin in sorted(set(isins)):
cache[isin] = get_open_price(isin, universe)
return cache
# =========================
# POSIZIONI / ORDINI / LOG
# =========================
@dataclass
class OpenPos:
Strategy: str
ISIN: str
EntryDate: dt.date
EntryIndex: int
EntryAmount: float
SizeWeight: float
PeakPnL: float = 0.0
Notes: str = ""
def open_trades_path(strategy: str) -> Path:
ensure_dir(OPEN_TRADES_DIR)
return OPEN_TRADES_DIR / f"open_{strategy}.csv"
def load_open_trades(strategy: str) -> pd.DataFrame:
p = open_trades_path(strategy)
if not p.exists():
return pd.DataFrame(columns=[
"Strategy","ISIN","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes"
])
df = pd.read_csv(p)
if "EntryDate" in df.columns:
df["EntryDate"] = pd.to_datetime(df["EntryDate"], errors="coerce").dt.date
if "WeakDays" not in df.columns:
df["WeakDays"] = 0
df["Strategy"] = strategy
return df
def save_open_trades(strategy: str, df: pd.DataFrame):
p = open_trades_path(strategy)
df.to_csv(p, index=False)
def append_audit_rows(rows: List[Dict]):
if not rows:
return
log = pd.DataFrame(rows)
if AUDIT_LOG_CSV.exists():
old = pd.read_csv(AUDIT_LOG_CSV)
log = pd.concat([old, log], ignore_index=True)
log.to_csv(AUDIT_LOG_CSV, index=False)
# sizing
def compute_current_capital_from_log(strategy: str,
returns_wide: pd.DataFrame,
asof_date: dt.date) -> float:
return BASE_CAPITAL_PER_STRATEGY
def size_equal_weight(candidates: List[str]) -> Dict[str, float]:
if not candidates:
return {}
w = 1.0 / max(1, len(candidates))
return {isin: w for isin in candidates}
def size_risk_parity(candidates: List[str],
returns_wide: pd.DataFrame,
asof_idx: int) -> Dict[str, float]:
"""
Risk Parity sui soli candidati, con cap per singolo asset (RP_MAX_WEIGHT),
allineato alla logica usata nel backtest v3.1.5.
Eventuale parte non allocata resta in cash (non rinormalizziamo dopo il cap).
"""
if not candidates:
return {}
L = RISK_PARITY_LOOKBACK
start = max(0, asof_idx - L + 1)
sub = returns_wide.iloc[start:asof_idx+1][candidates]
vol = sub.std().replace(0, np.nan)
inv_vol = 1.0 / vol
inv_vol = inv_vol.replace([np.inf, -np.inf], np.nan).fillna(0.0)
if inv_vol.sum() == 0:
return size_equal_weight(candidates)
w = (inv_vol / inv_vol.sum()).to_dict()
# Cap per singolo peso, come RP_MAX_WEIGHT nel Pattern Recon
if RP_MAX_WEIGHT is not None:
w = {k: min(RP_MAX_WEIGHT, float(v)) for k, v in w.items()}
return w
def _risk_exit_flags(isin: str,
entry_date: Optional[dt.date],
ret_wide: pd.DataFrame,
decision_date: dt.date,
est_map_today: pd.Series) -> List[str]:
"""Calcola motivi di uscita (SL/TP/Trailing/Time/Weak) per una posizione."""
reasons: List[str] = []
pnl_if_stay = 0.0
peak_dd = 0.0
if entry_date is not None and isin in ret_wide.columns:
sub = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)]
if not sub.empty and sub.index[0].date() == entry_date:
sub = sub.iloc[1:]
if not sub.empty:
eq_path = (1.0 + sub.fillna(0.0)).cumprod()
pnl_if_stay = float(eq_path.iloc[-1] - 1.0)
peak_curve = eq_path.cummax()
dd_series = eq_path / peak_curve - 1.0
peak_dd = float(-dd_series.min())
if SL_BPS is not None and pnl_if_stay <= -SL_BPS/10000.0:
reasons.append("SL")
if TP_BPS is not None and pnl_if_stay >= TP_BPS/10000.0:
reasons.append("TP")
if TRAIL_BPS is not None and peak_dd >= TRAIL_BPS/10000.0:
reasons.append("TRAIL")
if TIME_STOP_BARS is not None and entry_date is not None and isin in ret_wide.columns:
sub_all = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)]
bars = len(sub_all) - (1 if (not sub_all.empty and sub_all.index[0].date() == entry_date) else 0)
if bars >= int(TIME_STOP_BARS):
reasons.append("TIME")
if THETA_EXIT is not None and est_map_today is not None and isin in est_map_today.index:
est_out_today = est_map_today.loc[isin]
if pd.notna(est_out_today) and float(est_out_today) <= float(THETA_EXIT):
reasons.append("WEAK")
return reasons
def update_positions_and_build_orders(universe: pd.DataFrame,
returns_long: pd.DataFrame,
signals_today: pd.DataFrame,
today: dt.date,
buy_rank_df: Optional[pd.DataFrame],
allowed_open_isins: Optional[List[str]] = None) -> Tuple[pd.DataFrame, List[Dict]]:
"""
- decision_date = ultima data disponibile (EOD)
- target giornaliero = primi MAX_OPEN del ranking buy (uguale per tutte le strategie)
- per ogni strategia: revisione per differenza vs posizioni già aperte
- CLOSE anche per risk exits (SL/TP/Trailing/Time/Weak)
- Esecuzione a t+1 open; fetch prezzi UNA VOLTA per tutti gli ISIN con ordini
NB: strategia Aggressiva/Crypto rimossa. Restano ONLY:
- Equal_Weight
- Risk_Parity (con cap per singolo asset)
"""
strategies = ["Equal_Weight", "Risk_Parity"]
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
decision_date: dt.date = ret_wide.index[-1].date()
asof_idx = len(ret_wide.index) - 1
next_open_date = next_business_day(decision_date)
# Ranking e maps di oggi
est_map_all = None
if buy_rank_df is not None and not buy_rank_df.empty:
est_map_all = buy_rank_df.set_index("ISIN")["EstOutcome"]
else:
# fallback: usa tutti i segnali di oggi
day_df = signals_today.reset_index().query("Date == @decision_date")
if not day_df.empty:
est_map_all = day_df.set_index("ISIN")["EstOutcome"]
target_isins: List[str] = allowed_open_isins or []
target_set: Set[str] = set(target_isins)
# Sizing per strategia basato sul target
size_eq = size_equal_weight(target_isins)
size_rp = size_risk_parity(target_isins, ret_wide, asof_idx)
sizing_by_strategy = {"Equal_Weight": size_eq, "Risk_Parity": size_rp}
open_concat: List[pd.DataFrame] = []
audit_rows_all: List[Dict] = []
isins_for_open_fetch: Set[str] = set()
isins_for_close_fetch: Set[str] = set()
for strat in strategies:
df_open = load_open_trades(strat)
current_set: Set[str] = set(df_open["ISIN"].astype(str).tolist())
# --- risk exits su posizioni correnti ---
closers: List[Tuple[str, str]] = []
if not df_open.empty:
if "WeakDays" not in df_open.columns:
df_open["WeakDays"] = 0
for _, row_open in df_open.iterrows():
isin = str(row_open["ISIN"])
entry_date = pd.to_datetime(row_open["EntryDate"]).date() if pd.notna(row_open["EntryDate"]) else None
reasons = _risk_exit_flags(isin, entry_date, ret_wide, decision_date, est_map_all if est_map_all is not None else pd.Series(dtype=float))
if reasons:
closers.append((isin, "+".join(sorted(set(reasons)))))
# --- chiusure per ranking (scivolati fuori dal top-N) ---
drop_by_rank = list(current_set - target_set)
for isin in drop_by_rank:
closers.append((isin, "RANK"))
# --- applica chiusure (accumula ordini) ---
for isin, reason in closers:
row_open = df_open.loc[df_open["ISIN"] == isin]
linked_date = row_open["EntryDate"].iloc[0] if not row_open.empty else None
isins_for_close_fetch.add(isin)
audit_rows_all.append({
"Strategy": strat,
"ISIN": isin,
"Action": "CLOSE",
"TradeDate": next_open_date,
"EntryIndex": np.nan,
"EntryAmount": np.nan,
"SizeWeight": np.nan,
"Price": None, # riempito dopo fetch
"PnL_%": np.nan,
"ExitReason": reason,
"LinkedOpenDate": linked_date,
"Duration_bars": np.nan,
"Notes": f"DecisionDate={decision_date}"
})
df_open = df_open[df_open["ISIN"] != isin]
current_set.discard(isin)
# --- aperture per differenza verso target ---
add_list = [isin for isin in target_isins if isin not in current_set] # mantiene l'ordine del ranking
w_dict = sizing_by_strategy[strat]
if add_list and w_dict:
cap = compute_current_capital_from_log(strat, ret_wide, decision_date)
for isin in add_list:
w = float(w_dict.get(isin, 0.0))
if w <= 0:
continue
notional = max(MIN_TRADE_NOTIONAL, cap * w)
entry_idx = asof_idx + 1
isins_for_open_fetch.add(isin)
audit_rows_all.append({
"Strategy": strat,
"ISIN": isin,
"Action": "OPEN",
"TradeDate": next_open_date,
"EntryIndex": entry_idx,
"EntryAmount": float(notional),
"SizeWeight": float(w),
"Price": None, # riempito dopo fetch
"PnL_%": np.nan,
"ExitReason": "",
"LinkedOpenDate": "",
"Duration_bars": 0,
"Notes": f"DecisionDate={decision_date}"
})
df_open = pd.concat([df_open, pd.DataFrame([{
"Strategy": strat,
"ISIN": isin,
"EntryDate": decision_date,
"EntryIndex": entry_idx,
"EntryAmount": float(notional),
"SizeWeight": float(w),
"PeakPnL": 0.0,
"WeakDays": 0,
"Notes": ""
}])], ignore_index=True)
current_set.add(isin)
save_open_trades(strat, df_open)
df_open["Strategy"] = strat
open_concat.append(df_open)
# ---- FETCH UNA VOLTA (OPEN + CLOSE) ----
fetch_isins = sorted(isins_for_open_fetch.union(isins_for_close_fetch))
if fetch_isins:
print(f"[PRICE] fetch open per {len(fetch_isins)} ISIN (open={len(isins_for_open_fetch)}, close={len(isins_for_close_fetch)})", flush=True)
px_cache = _fetch_open_prices_once(fetch_isins, universe)
for r in audit_rows_all:
if r.get("Price") is None:
r["Price"] = _safe_to_float(px_cache.get(r["ISIN"]))
# ---- SUMMARY ----
n_open = sum(1 for r in audit_rows_all if r.get("Action") == "OPEN")
n_close = sum(1 for r in audit_rows_all if r.get("Action") == "CLOSE")
print(f"[SUMMARY] decision_date={decision_date} opens={n_open} closes={n_close} target={len(target_isins)} (cap={MAX_OPEN})", flush=True)
open_all = pd.concat(open_concat, ignore_index=True) if open_concat else pd.DataFrame()
return open_all, audit_rows_all
# =========================
# MAIN RUN
# =========================
def main_run(run_date: Optional[dt.date] = None):
today = run_date or dt.date.today()
# 1) Universo
universe = load_universe(UNIVERSO_XLSX)
# 2) Ritorni (DB)
conn_str = read_connection_txt(CONNECTION_TXT)
isins = universe["ISIN"].tolist()
returns_long = _db_fetch_returns(
conn_str,
isins,
sp_name=SP_NAME_DEFAULT,
n_bars=SP_N_DEFAULT,
ptf_curr=PTF_CURR_DEFAULT,
)
if returns_long.empty:
raise RuntimeError("Nessun rendimento disponibile dal DB (SP vuota?).")
# 2b) Hurst map per ISIN (stessa logica concettuale del backtest)
hurst_map = build_hurst_map(returns_long, lookback=252)
# 3) Segnali EOD su D con THETA = Hurst/100 per ISIN
sig_df = generate_signals_today(universe, returns_long, today, hurst_map=hurst_map)
# 3b) Ranking unico e selezione Top-N
decision_date = returns_long["Date"].max().date()
buy_rank_df = _rank_buy(sig_df, decision_date) # tutti i buy ordinati
allowed_open = _select_top_signals(buy_rank_df, MAX_OPEN) # top-N ISIN
# 4) Posizioni + audit (OPEN/CLOSE) con target Top-N
open_df, audit_rows = update_positions_and_build_orders(
universe, returns_long, sig_df, today,
buy_rank_df=buy_rank_df,
allowed_open_isins=allowed_open
)
# 5) Append audit log (TUTTE le strategie operative)
if audit_rows:
append_audit_rows(audit_rows)
# 6) Snapshot Excel datato — fogli con nomi completi
ensure_dir(OPEN_TRADES_DIR)
signals_path = _dated_signals_filename()
with pd.ExcelWriter(signals_path) as xw:
sig_df.reset_index().to_excel(xw, sheet_name="Signals", index=False)
if not open_df.empty:
for strat, g in open_df.groupby("Strategy"):
sheet_name_map = {
"Equal_Weight": "Open_Equal_Weight",
"Risk_Parity": "Open_Risk_Parity",
}
sheet_name = sheet_name_map.get(strat, f"Open_{strat}")[:31]
g.to_excel(xw, sheet_name=sheet_name, index=False)
print(f"✅ Signals generated for {today}. Saved to {signals_path}")
print(f"Open trades saved in {OPEN_TRADES_DIR}")
print(f"Audit log updated at {AUDIT_LOG_CSV}")
# =========================
# ENTRY POINT
# =========================
if __name__ == "__main__":
main_run()