Files
Trading/signals daily kNN prod v.2.py
2026-05-24 20:46:43 +02:00

1188 lines
43 KiB
Python

# -*- coding: utf-8 -*-
"""
Daily Signals Generator (kNN) - PRODUCTION
============================================
Allineato al backtest v3.1.5 con estensioni Config B (grid search out-of-sample).
Modifiche v2.1 (questa versione):
- HURST RIMOSSO dalla logica decisionale: theta_entry usa il valore globale del
config (PATTERN_CONFIG.theta). Le funzioni hurst_rs/build_hurst_map restano
disponibili in shared_utils per usi qualitativi (regime classification, report).
- Aggiunto supporto MULTI-STRATEGIA configurabile da pattern_knn_config.json
sezione "strategies". Ogni strategia pu\u00f2 sovrascrivere singoli parametri di
signals (tp_bps, sl_bps, trail_bps, time_stop_bars, decision_every,
min_holding_bars, ecc.).
- Nuovi parametri di exit: decision_every (N giorni tra refresh ranking) e
min_holding_bars (blocca SL/RANK per le prime N barre, TP/TRAIL sempre attivi).
- Fix bug "RANK duplicato": un ISIN che esce per TP/SL non viene ri-aggiunto alla
lista chiusure per RANK nello stesso ciclo.
- Formato TradeDate uniforme ISO (YYYY-MM-DD) nel trades_audit_log.csv.
- Rimossa duplicazione del codice di lettura config.
Pipeline (giorno D, EOD -> t+1 OPEN):
1) Carica universo e serie rendimenti dal DB (stored procedure, una volta sola)
2) Pattern kNN (WP, HA, K, theta globale), Signal=1 se EstOutcome > theta
3) Ranking unico dei buy e selezione Top-N (MAX_OPEN) come target giornaliero
4) Per ogni strategia: applica eventuale override parametri, esegui revisione per
differenza vs posizioni aperte, gestisci risk exits con regole specifiche
5) Sizing (Equal Weight oppure Risk Parity con cap)
6) Fetch OPEN prices UNA VOLTA per ISIN interessati e popolamento ordini
7) Log ordini e snapshot Excel
"""
from __future__ import annotations
import datetime as dt
import json
import os
import shutil
import ssl
import time
import warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import text as sql_text
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
from shared_utils import (
build_pattern_library,
characterize_window,
detect_column,
load_config,
predict_from_library,
read_connection_txt,
require_section,
require_value,
z_norm,
)
# =============================================================================
# CONFIG LOADING - una sola volta, parametri immutabili a livello modulo
# =============================================================================
CONFIG = load_config()
DB_CONFIG = require_section(CONFIG, "db")
PATTERN_CONFIG = require_section(CONFIG, "pattern")
TAGGING_CONFIG = require_section(CONFIG, "tagging")
RANKING_CONFIG = require_section(CONFIG, "ranking")
SIGNALS_CONFIG = require_section(CONFIG, "signals")
STRATEGIES_CONFIG: Dict[str, Any] = CONFIG.get("strategies", {})
# DB / SP
SP_NAME_DEFAULT = str(require_value(DB_CONFIG, "stored_proc", "db"))
SP_N_DEFAULT = int(require_value(DB_CONFIG, "n_bars", "db"))
PTF_CURR_DEFAULT = str(require_value(DB_CONFIG, "ptf_curr", "db"))
# Pattern recognition (parametri kNN globali)
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern"))
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern"))
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern"))
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern"))
# Pattern tagging (informativo)
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
# Ranking globali
MAX_OPEN = int(require_value(SIGNALS_CONFIG, "max_open", "signals"))
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking"))
RP_MAX_WEIGHT_RAW = RANKING_CONFIG.get("rp_max_weight")
RP_MAX_WEIGHT_DEFAULT = (
float(RP_MAX_WEIGHT_RAW) if RP_MAX_WEIGHT_RAW is not None else 2.0 / max(TOP_N_MAX, 1)
)
# Sizing globali
BASE_CAPITAL_PER_STRATEGY = float(
require_value(SIGNALS_CONFIG, "base_capital_per_strategy", "signals")
)
MIN_TRADE_NOTIONAL = float(
require_value(SIGNALS_CONFIG, "min_trade_notional", "signals")
)
RISK_PARITY_LOOKBACK = int(
require_value(SIGNALS_CONFIG, "risk_parity_lookback", "signals")
)
# Calendario
BUSINESS_DAYS_ONLY = True
SEED = 42
warnings.filterwarnings("ignore")
np.random.seed(SEED)
# =============================================================================
# PATHS
# =============================================================================
PATHS_CONFIG = CONFIG.get("paths", {})
BASE_DIR = Path(PATHS_CONFIG.get("base_dir", "."))
OUTPUT_DIR = BASE_DIR / PATHS_CONFIG.get("output_dir", "output")
UNIVERSO_XLSX = BASE_DIR / PATHS_CONFIG.get(
"input_universe", "Input/Universo per Trading System.xlsx"
)
CONNECTION_TXT = BASE_DIR / PATHS_CONFIG.get("connection_txt", "connection.txt")
AUDIT_LOG_CSV = BASE_DIR / PATHS_CONFIG.get(
"audit_log_csv", "output/trades_audit_log.csv"
)
OPEN_TRADES_DIR = BASE_DIR / PATHS_CONFIG.get("open_trades_dir", "open_trades")
DROPBOX_EXPORT_DIR = Path(
r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF"
)
AUDIT_COLUMNS = [
"Strategy",
"ISIN",
"Action",
"TradeDate",
"EntryIndex",
"EntryAmount",
"SizeWeight",
"Price",
"PnL_%",
"ExitReason",
"LinkedOpenDate",
"Duration_bars",
"Notes",
]
OPEN_MAX_RETRY = 3
OPEN_SLEEP_SEC = 0.1
OPEN_TIMEOUT = 10
# =============================================================================
# STRATEGY DEFINITIONS
# =============================================================================
@dataclass
class StrategyConfig:
"""
Configurazione di una singola strategia operativa.
I parametri override leggono dalla sezione 'strategies.<name>.params' di
pattern_knn_config.json. Se non presenti, ereditano dalla sezione 'signals'.
"""
name: str
sizing: str # "equal_weight" | "risk_parity"
enabled: bool
sl_bps: Optional[float]
tp_bps: Optional[float]
trail_bps: Optional[float]
time_stop_bars: Optional[int]
theta_exit: Optional[float]
weak_days_exit: Optional[int]
decision_every: int
min_holding_bars: int
rp_max_weight: float
def _get_param(strategy_params: Dict, key: str, signals_default: Dict, cast=None):
"""Restituisce il valore della strategia se presente, altrimenti il default globale."""
if key in strategy_params:
val = strategy_params[key]
else:
val = signals_default.get(key)
if cast is not None and val is not None:
try:
return cast(val)
except (TypeError, ValueError):
return val
return val
def build_strategies_from_config() -> List[StrategyConfig]:
"""
Costruisce la lista delle strategie attive da pattern_knn_config.json.
Se la sezione 'strategies' non esiste o \u00e8 vuota, fallback alle due
strategie legacy Equal_Weight + Risk_Parity con i parametri globali.
"""
strategies: List[StrategyConfig] = []
raw = STRATEGIES_CONFIG or {}
# Filtra metadata keys
items = {k: v for k, v in raw.items() if not k.startswith("_") and isinstance(v, dict)}
# Fallback legacy
if not items:
items = {
"Equal_Weight": {"sizing": "equal_weight", "enabled": True, "params": {}},
"Risk_Parity": {"sizing": "risk_parity", "enabled": True, "params": {}},
}
for name, spec in items.items():
if not spec.get("enabled", True):
continue
params = spec.get("params", {}) or {}
sizing = str(spec.get("sizing", "equal_weight")).lower()
rp_max = params.get("rp_max_weight", RP_MAX_WEIGHT_DEFAULT)
strategies.append(
StrategyConfig(
name=str(name),
sizing=sizing,
enabled=True,
sl_bps=_get_param(params, "sl_bps", SIGNALS_CONFIG, float),
tp_bps=_get_param(params, "tp_bps", SIGNALS_CONFIG, float),
trail_bps=_get_param(params, "trail_bps", SIGNALS_CONFIG, float),
time_stop_bars=_get_param(params, "time_stop_bars", SIGNALS_CONFIG, int),
theta_exit=_get_param(params, "theta_exit", SIGNALS_CONFIG, float),
weak_days_exit=_get_param(params, "weak_days_exit", SIGNALS_CONFIG, int),
decision_every=int(_get_param(params, "decision_every", SIGNALS_CONFIG) or 1),
min_holding_bars=int(_get_param(params, "min_holding_bars", SIGNALS_CONFIG) or 0),
rp_max_weight=float(rp_max) if rp_max is not None else RP_MAX_WEIGHT_DEFAULT,
)
)
return strategies
STRATEGIES: List[StrategyConfig] = build_strategies_from_config()
# =============================================================================
# UTILS
# =============================================================================
def ensure_dir(p: Path) -> None:
p.mkdir(parents=True, exist_ok=True)
def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR) -> None:
if not src or not dst_dir or not src.exists():
return
try:
ensure_dir(dst_dir)
shutil.copy2(src, dst_dir / src.name)
except Exception as exc:
print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}")
def next_business_day(d: dt.date) -> dt.date:
nd = d + dt.timedelta(days=1)
if not BUSINESS_DAYS_ONLY:
return nd
while nd.weekday() >= 5:
nd += dt.timedelta(days=1)
return nd
def _safe_to_float(x: Any) -> Optional[float]:
try:
return float(x)
except (TypeError, ValueError):
return None
def _format_date_iso(d: Any) -> str:
"""Formato uniforme ISO YYYY-MM-DD per tutte le date nel trades_audit_log."""
if d is None or pd.isna(d):
return ""
if isinstance(d, str):
return d
if isinstance(d, (dt.date, dt.datetime, pd.Timestamp)):
return pd.Timestamp(d).strftime("%Y-%m-%d")
return str(d)
def _dated_signals_filename() -> Path:
date_prefix = pd.Timestamp.today().strftime("%Y%m%d")
return OUTPUT_DIR / f"{date_prefix}_signals.xlsx"
# =============================================================================
# DB FETCH
# =============================================================================
def _db_fetch_returns(
conn_str: str,
isins: List[str],
sp_name: Optional[str] = None,
n_bars: Optional[int] = None,
ptf_curr: Optional[str] = None,
) -> pd.DataFrame:
"""Scarica via stored procedure le serie storiche dei rendimenti giornalieri."""
engine = sa.create_engine(conn_str, fast_executemany=True)
sp = sp_name or os.environ.get("SP_NAME", SP_NAME_DEFAULT)
n_val = n_bars if n_bars is not None else int(os.environ.get("SP_N", SP_N_DEFAULT))
ptf = (ptf_curr or os.environ.get("PTF_CURR", PTF_CURR_DEFAULT) or "").strip() or PTF_CURR_DEFAULT
sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
frames: List[pd.DataFrame] = []
with engine.begin() as conn:
for i, isin in enumerate(isins, start=1):
print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True)
try:
df = pd.read_sql_query(
sql_sp,
conn,
params={"isin": str(isin), "n": int(n_val), "ptf": ptf},
)
except Exception as exc:
print(f"[ERROR] SP {sp} fallita per {isin}: {exc}")
continue
if df.empty:
print(f"[WARN] nessun dato per {isin}")
continue
col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"])
if not col_date or not col_ret:
print(f"[WARN] colonne mancanti per {isin}")
continue
out = df[[col_date, col_ret]].copy()
out.columns = ["Date", "Ret"]
out["Date"] = pd.to_datetime(out["Date"], errors="coerce").dt.tz_localize(None)
out["ISIN"] = str(isin)
# Auto-detect scala (percent vs decimal)
med = pd.to_numeric(out["Ret"], errors="coerce").abs().median()
if pd.notnull(med) and med > 1.5:
out["Ret"] = out["Ret"].astype(float) / 100.0
print(f" \u21b3 righe scaricate: {len(out)}")
frames.append(out[["Date", "ISIN", "Ret"]])
if not frames:
return pd.DataFrame(columns=["Date", "ISIN", "Ret"])
out_all = pd.concat(frames, ignore_index=True)
out_all = out_all.dropna(subset=["Date"]).sort_values(["ISIN", "Date"]).reset_index(drop=True)
return out_all[["Date", "ISIN", "Ret"]]
# =============================================================================
# UNIVERSO + OPEN PRICE
# =============================================================================
def load_universe(path: Path) -> pd.DataFrame:
df = pd.read_excel(path)
if "ISIN" not in df.columns:
raise KeyError("Nel file Universo manca la colonna 'ISIN'")
df["ISIN"] = df["ISIN"].astype(str).str.strip()
for col in ["Asset Class", "Mercato", "TickerOpen"]:
if col not in df.columns:
df[col] = ""
df[col] = df[col].astype(str).str.strip()
return df
def _build_symbol_euronext(row: pd.Series) -> Tuple[str, str]:
isin = str(row.get("ISIN", "")).strip()
venue = str(row.get("Mercato", "")).strip()
tok = str(row.get("TickerOpen", "") or "").strip()
base = "https://fin.scorer.app/finance/euronext/price"
if tok and "-" in tok and tok.split("-")[0].upper() == isin.upper():
return base, tok
if isin and venue:
return base, f"{isin}-{venue}"
return base, isin
def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]:
"""Fetch del prezzo open via API con retry."""
try:
row = universe.loc[universe["ISIN"] == str(isin)].iloc[0]
except (KeyError, IndexError):
print(f"[WARN] ISIN {isin} non trovato nell'universo.")
return None
base, symbol = _build_symbol_euronext(row)
url = f"{base}/{symbol}"
print(f"[DOWNLOAD] {symbol:<30s} -> [TRY 1/{OPEN_MAX_RETRY}]", flush=True)
for attempt in range(1, OPEN_MAX_RETRY + 1):
if attempt > 1:
print(f"[DOWNLOAD] {symbol:<30s} -> [TRY {attempt}/{OPEN_MAX_RETRY}]", flush=True)
try:
with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp:
data = json.loads(resp.read().decode("utf-8"))
if not isinstance(data, list) or not data:
print(" \u21b3 NO DATA")
continue
d = (data[0] or {}).get("data") or {}
px = d.get("open") if d.get("open") is not None else d.get("prevClose")
if px is None:
print(" \u21b3 WARN: 'open' e 'prevClose' assenti")
continue
px = float(px)
print(f" \u21b3 OK open={d.get('open')} close={d.get('close')} (ritorno prezzo={px})")
return px
except (HTTPError, URLError, ssl.SSLError) as exc:
if attempt < OPEN_MAX_RETRY:
print(f" \u21b3 ERR {exc}\nritento tra {OPEN_SLEEP_SEC}s")
time.sleep(OPEN_SLEEP_SEC)
else:
print(f" \u21b3 ERR {exc}")
print(f"[ERROR] nessun prezzo per {symbol} dopo {OPEN_MAX_RETRY} tentativi")
return None
def _fetch_open_prices_once(
isins: Iterable[str], universe: pd.DataFrame
) -> Dict[str, Optional[float]]:
return {isin: get_open_price(isin, universe) for isin in sorted(set(isins))}
# =============================================================================
# GENERAZIONE SEGNALI (EOD su D)
# =============================================================================
def generate_signals_today(
universe: pd.DataFrame,
returns_long: pd.DataFrame,
today: dt.date,
) -> pd.DataFrame:
"""
Genera segnali kNN per ogni ISIN dell'universo.
Nota v2.1: theta_entry NON e' piu' personalizzata per asset via Hurst.
Tutti gli ISIN usano la stessa soglia globale (PATTERN_CONFIG.theta).
"""
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
decision_date = ret_wide.index[-1].date()
rows = []
for isin in ret_wide.columns:
r = ret_wide[isin].dropna().astype(float)
theta_entry = THETA # soglia globale (Hurst rimosso)
if len(r) < max(200, WP + HA + 10):
rows.append({
"Date": decision_date, "ISIN": isin,
"Signal": 0, "EstOutcome": np.nan, "AvgDist": np.nan,
"PatternType": None, "Confidence": np.nan,
"Theta": float(theta_entry),
})
continue
lib_wins, lib_out = build_pattern_library(r, WP, HA)
if lib_wins is None or len(r) < WP + HA:
est_out, avg_dist, sig = np.nan, np.nan, 0
ptype, pconf = characterize_window(
r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT
)
else:
curr_zn = z_norm(r.values[-WP:])
if curr_zn is None:
est_out, avg_dist, sig = np.nan, np.nan, 0
ptype, pconf = characterize_window(
r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT
)
else:
est_out, avg_dist, _ = predict_from_library(
curr_zn, lib_wins, lib_out, k=KNN_K
)
sig = (
1
if (pd.notna(est_out) and float(est_out) > float(theta_entry))
else 0
)
ptype, pconf = characterize_window(
r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT
)
rows.append({
"Date": decision_date, "ISIN": isin,
"Signal": int(sig),
"EstOutcome": (None if pd.isna(est_out) else float(est_out)),
"AvgDist": (None if pd.isna(avg_dist) else float(avg_dist)),
"PatternType": ptype,
"Confidence": (None if pconf is None else float(max(0.0, min(1.0, pconf)))),
"Theta": float(theta_entry),
})
return pd.DataFrame(rows).set_index(["Date", "ISIN"]).sort_index()
# =============================================================================
# RANKING & SELECTION
# =============================================================================
def _rank_buy(signals_today: pd.DataFrame, decision_date: dt.date) -> pd.DataFrame:
"""Ritorna i buy del giorno ordinati per EstOutcome desc, Confidence desc, AvgDist asc."""
if signals_today.empty:
return pd.DataFrame(columns=["ISIN", "EstOutcome", "Confidence", "AvgDist"])
day_df = signals_today.reset_index().query("Date == @decision_date")
if day_df.empty:
return pd.DataFrame(columns=["ISIN", "EstOutcome", "Confidence", "AvgDist"])
buy_mask = day_df["Signal"] == 1
buy = day_df[buy_mask].copy()
buy["EstOutcome"] = pd.to_numeric(buy["EstOutcome"], errors="coerce")
buy["Confidence"] = pd.to_numeric(buy["Confidence"], errors="coerce")
buy["AvgDist"] = pd.to_numeric(buy["AvgDist"], errors="coerce")
buy = buy.sort_values(
by=["EstOutcome", "Confidence", "AvgDist"],
ascending=[False, False, True],
na_position="last",
)
return buy[["ISIN", "EstOutcome", "Confidence", "AvgDist"]].reset_index(drop=True)
def _select_top_signals(buy_rank_df: pd.DataFrame, max_open: int) -> List[str]:
if buy_rank_df.empty:
return []
return buy_rank_df["ISIN"].head(max_open).tolist()
# =============================================================================
# POSIZIONI / ORDINI / LOG
# =============================================================================
@dataclass
class OpenPos:
Strategy: str
ISIN: str
EntryDate: dt.date
EntryIndex: int
EntryAmount: float
SizeWeight: float
PeakPnL: float = 0.0
Notes: str = ""
def open_trades_path(strategy: str) -> Path:
ensure_dir(OPEN_TRADES_DIR)
return OPEN_TRADES_DIR / f"open_{strategy}.csv"
def load_open_trades(strategy: str) -> pd.DataFrame:
p = open_trades_path(strategy)
if not p.exists():
return pd.DataFrame(columns=[
"Strategy", "ISIN", "AssetName", "EntryDate", "EntryIndex",
"EntryAmount", "SizeWeight", "PeakPnL", "WeakDays", "Notes",
])
df = pd.read_csv(p)
if "EntryDate" in df.columns:
df["EntryDate"] = pd.to_datetime(df["EntryDate"], errors="coerce").dt.date
if "WeakDays" not in df.columns:
df["WeakDays"] = 0
if "AssetName" not in df.columns:
df["AssetName"] = ""
df["Strategy"] = strategy
return df
def save_open_trades(strategy: str, df: pd.DataFrame) -> None:
p = open_trades_path(strategy)
df.to_csv(p, index=False)
def _read_audit_log_mixed(path: Path) -> pd.DataFrame:
"""
Lettura tollerante del log esistente.
Supporta sia formato ISO che europeo per le date (per retrocompatibilita').
"""
if not path.exists():
return pd.DataFrame(columns=AUDIT_COLUMNS)
raw = path.read_text(encoding="utf-8", errors="ignore")
lines = raw.splitlines()
if not lines:
return pd.DataFrame(columns=AUDIT_COLUMNS)
rows: List[Dict[str, str]] = []
for line in lines[1:]:
if not line or not line.strip():
continue
if ";" in line and line.count(";") >= 5:
parts = line.split(";")
else:
parts = line.split(",")
if parts and parts[0] == "":
parts = parts[1:]
if len(parts) > len(AUDIT_COLUMNS):
parts = parts[: len(AUDIT_COLUMNS) - 1] + [",".join(parts[len(AUDIT_COLUMNS) - 1 :])]
elif len(parts) < len(AUDIT_COLUMNS):
parts = parts + [""] * (len(AUDIT_COLUMNS) - len(parts))
rows.append(dict(zip(AUDIT_COLUMNS, parts)))
df = pd.DataFrame(rows, columns=AUDIT_COLUMNS)
if df.empty:
return df
for col in ["Strategy", "ISIN", "Action", "ExitReason", "Notes"]:
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
df["Action"] = df["Action"].str.upper()
return df
def append_audit_rows(rows: List[Dict]) -> None:
"""Append delle nuove righe nel trades_audit_log con TradeDate sempre in ISO."""
if not rows:
return
ensure_dir(AUDIT_LOG_CSV.parent)
new_rows = pd.DataFrame(rows)
# Normalizza TradeDate/LinkedOpenDate a ISO
for col in ["TradeDate", "LinkedOpenDate"]:
if col in new_rows.columns:
new_rows[col] = new_rows[col].apply(_format_date_iso)
for c in AUDIT_COLUMNS:
if c not in new_rows.columns:
new_rows[c] = np.nan
new_rows = new_rows[AUDIT_COLUMNS]
if AUDIT_LOG_CSV.exists():
old = _read_audit_log_mixed(AUDIT_LOG_CSV)
log = pd.concat([old, new_rows], ignore_index=True)
else:
log = new_rows
log.to_csv(AUDIT_LOG_CSV, index=False, sep=";", encoding="utf-8")
# =============================================================================
# SIZING
# =============================================================================
def compute_current_capital_from_log(
strategy: str, returns_wide: pd.DataFrame, asof_date: dt.date
) -> float:
return BASE_CAPITAL_PER_STRATEGY
def size_equal_weight(candidates: List[str]) -> Dict[str, float]:
"""
Equal-Weight sizing.
FIX v2.1.1: ogni candidato riceve peso 1/MAX_OPEN, NON 1/len(candidates).
Cosi' il peso massimo per asset e' sempre <= 1/MAX_OPEN, indipendentemente
dal numero di candidati effettivamente selezionati. Se il target ha meno
di MAX_OPEN candidati, la quota non investita resta implicitamente in cash
(il sistema non apre nuove posizioni per "riempire" l'esposizione).
"""
if not candidates:
return {}
w = 1.0 / max(1, MAX_OPEN)
return {isin: w for isin in candidates}
def size_risk_parity(
candidates: List[str],
returns_wide: pd.DataFrame,
asof_idx: int,
rp_max_weight: float,
) -> Dict[str, float]:
"""
Risk Parity sui soli candidati con cap per singolo asset.
FIX v2.1.1: la normalizzazione finale rispetta il cap. Target di esposizione
totale = len(candidates)/MAX_OPEN (cosi' con MAX_OPEN candidati l'esposizione
e' ~100%, con meno candidati e' frazionaria e il resto va in cash).
"""
if not candidates:
return {}
L = RISK_PARITY_LOOKBACK
start = max(0, asof_idx - L + 1)
sub = returns_wide.iloc[start : asof_idx + 1][candidates]
vol = sub.std().replace(0, np.nan)
inv_vol = 1.0 / vol
inv_vol = inv_vol.replace([np.inf, -np.inf], np.nan).fillna(0.0)
if inv_vol.sum() == 0:
return size_equal_weight(candidates)
# Pesi normalizzati a sum=target_total invece di sum=1
target_total = len(candidates) / max(1, MAX_OPEN)
w_raw = (inv_vol / inv_vol.sum()) * target_total
w = w_raw.to_dict()
if rp_max_weight is not None:
# Cap per singolo asset (NON si rinormalizza: il deficit resta in cash)
w = {k: min(rp_max_weight, float(v)) for k, v in w.items()}
return w
def compute_sizing(
strategy: StrategyConfig,
candidates: List[str],
returns_wide: pd.DataFrame,
asof_idx: int,
) -> Dict[str, float]:
if strategy.sizing == "risk_parity":
return size_risk_parity(candidates, returns_wide, asof_idx, strategy.rp_max_weight)
return size_equal_weight(candidates)
# =============================================================================
# RISK EXITS
# =============================================================================
def _bars_in_trade(
isin: str,
entry_date: Optional[dt.date],
ret_wide: pd.DataFrame,
decision_date: dt.date,
) -> int:
"""Numero di barre trascorse da entry_date a decision_date (esclusa la barra di entrata)."""
if entry_date is None or isin not in ret_wide.columns:
return 0
sub = ret_wide[isin].loc[pd.Timestamp(entry_date) : pd.Timestamp(decision_date)]
bars = len(sub)
if not sub.empty and sub.index[0].date() == entry_date:
bars -= 1
return max(0, bars)
def _risk_exit_flags(
strategy: StrategyConfig,
isin: str,
entry_date: Optional[dt.date],
ret_wide: pd.DataFrame,
decision_date: dt.date,
est_map_today: pd.Series,
) -> List[str]:
"""
Calcola motivi di uscita di rischio per una posizione, applicando i parametri
SPECIFICI della strategia (TP/SL/TRAIL/TIME/WEAK + min_holding_bars).
Regole min_holding_bars:
- TP e TRAIL sono sempre attivi (un movimento favorevole va sempre incassato)
- SL, TIME, WEAK sono bloccati nei primi min_holding_bars
"""
reasons: List[str] = []
pnl_if_stay = 0.0
peak_dd = 0.0
if entry_date is not None and isin in ret_wide.columns:
sub = ret_wide[isin].loc[pd.Timestamp(entry_date) : pd.Timestamp(decision_date)]
if not sub.empty and sub.index[0].date() == entry_date:
sub = sub.iloc[1:]
if not sub.empty:
eq_path = (1.0 + sub.fillna(0.0)).cumprod()
pnl_if_stay = float(eq_path.iloc[-1] - 1.0)
peak_curve = eq_path.cummax()
dd_series = eq_path / peak_curve - 1.0
peak_dd = float(-dd_series.min())
bars = _bars_in_trade(isin, entry_date, ret_wide, decision_date)
can_stop_out = bars >= strategy.min_holding_bars
if strategy.tp_bps is not None and pnl_if_stay >= strategy.tp_bps / 10000.0:
reasons.append("TP")
if strategy.trail_bps is not None and peak_dd >= strategy.trail_bps / 10000.0:
reasons.append("TRAIL")
if can_stop_out and strategy.sl_bps is not None and pnl_if_stay <= -strategy.sl_bps / 10000.0:
reasons.append("SL")
if can_stop_out and strategy.time_stop_bars is not None and bars >= int(strategy.time_stop_bars):
reasons.append("TIME")
if can_stop_out and strategy.theta_exit is not None and est_map_today is not None:
if isin in est_map_today.index:
est_out_today = est_map_today.loc[isin]
if pd.notna(est_out_today) and float(est_out_today) <= float(strategy.theta_exit):
reasons.append("WEAK")
return reasons
# =============================================================================
# DECISION DAY (decision_every)
# =============================================================================
def _is_decision_day(
strategy: StrategyConfig, decision_date: dt.date, anchor_date: Optional[dt.date]
) -> bool:
"""
Determina se oggi e' un giorno di "ranking refresh" per la strategia.
- decision_every=1 -> sempre True (decisione giornaliera)
- decision_every>1 -> True solo ogni N giorni di calendario rispetto all'anchor
"""
if strategy.decision_every <= 1 or anchor_date is None:
return True
delta_days = (decision_date - anchor_date).days
return (delta_days % strategy.decision_every) == 0
def _get_strategy_anchor(strategy_name: str) -> Optional[dt.date]:
"""Recupera la data del primo OPEN della strategia dal log per ancorare decision_every."""
if not AUDIT_LOG_CSV.exists():
return None
try:
df = _read_audit_log_mixed(AUDIT_LOG_CSV)
if df.empty:
return None
opens = df[(df["Strategy"] == strategy_name) & (df["Action"] == "OPEN")]
if opens.empty:
return None
# Parse robusto delle date (mix ISO + europeo legacy)
dates = pd.to_datetime(opens["TradeDate"], errors="coerce")
if dates.isna().all():
dates = pd.to_datetime(opens["TradeDate"], errors="coerce", dayfirst=True)
valid = dates.dropna()
if valid.empty:
return None
return valid.min().date()
except Exception as exc:
print(f"[WARN] impossibile leggere anchor per {strategy_name}: {exc}")
return None
# =============================================================================
# CORE LOGIC: ordini per una singola strategia
# =============================================================================
def _process_strategy(
strategy: StrategyConfig,
universe: pd.DataFrame,
ret_wide: pd.DataFrame,
target_isins: List[str],
target_set: Set[str],
est_map_all: Optional[pd.Series],
decision_date: dt.date,
next_open_date: dt.date,
asof_idx: int,
asset_name_map: Optional[pd.Series],
) -> Tuple[pd.DataFrame, List[Dict], Set[str], Set[str]]:
"""
Esegue il ciclo decisionale per una singola strategia.
Restituisce:
- df_open aggiornato
- audit_rows da scrivere nel log
- ISIN per cui fare fetch del prezzo open (apertura)
- ISIN per cui fare fetch del prezzo open (chiusura)
"""
audit_rows: List[Dict] = []
isins_open_fetch: Set[str] = set()
isins_close_fetch: Set[str] = set()
df_open = load_open_trades(strategy.name)
current_set: Set[str] = set(df_open["ISIN"].astype(str).tolist())
# decision_every: oggi e' un giorno di refresh ranking?
anchor_date = _get_strategy_anchor(strategy.name)
refresh_today = _is_decision_day(strategy, decision_date, anchor_date)
# ====== STEP 1: identifica chiusure ======
# Set di ISIN gia' marcati per chiusura (anti-duplicato RANK)
closed_this_cycle: Set[str] = set()
closers: List[Tuple[str, str]] = []
if not df_open.empty:
for _, row_open in df_open.iterrows():
isin = str(row_open["ISIN"])
entry_date = (
pd.to_datetime(row_open["EntryDate"]).date()
if pd.notna(row_open["EntryDate"])
else None
)
reasons = _risk_exit_flags(
strategy,
isin,
entry_date,
ret_wide,
decision_date,
est_map_all if est_map_all is not None else pd.Series(dtype=float),
)
if reasons:
closers.append((isin, "+".join(sorted(set(reasons)))))
closed_this_cycle.add(isin)
# Chiusure per ranking (solo nei giorni di refresh, e solo se non gia' chiuso)
if refresh_today:
for isin in current_set - target_set:
if isin not in closed_this_cycle:
# min_holding_bars filtra anche le chiusure RANK
row_open = df_open.loc[df_open["ISIN"] == isin]
if not row_open.empty:
entry_date = (
pd.to_datetime(row_open["EntryDate"].iloc[0]).date()
if pd.notna(row_open["EntryDate"].iloc[0])
else None
)
bars = _bars_in_trade(isin, entry_date, ret_wide, decision_date)
if bars < strategy.min_holding_bars:
continue # blocca RANK durante min_holding
closers.append((isin, "RANK"))
closed_this_cycle.add(isin)
# ====== STEP 2: applica chiusure ======
for isin, reason in closers:
row_open = df_open.loc[df_open["ISIN"] == isin]
linked_date = row_open["EntryDate"].iloc[0] if not row_open.empty else None
entry_idx_val = row_open["EntryIndex"].iloc[0] if not row_open.empty else np.nan
# Duration_bars valorizzato
duration_bars = np.nan
if pd.notna(linked_date):
duration_bars = _bars_in_trade(
isin,
pd.to_datetime(linked_date).date(),
ret_wide,
decision_date,
)
isins_close_fetch.add(isin)
audit_rows.append({
"Strategy": strategy.name,
"ISIN": isin,
"Action": "CLOSE",
"TradeDate": next_open_date,
"EntryIndex": entry_idx_val,
"EntryAmount": np.nan,
"SizeWeight": np.nan,
"Price": None,
"PnL_%": np.nan,
"ExitReason": reason,
"LinkedOpenDate": linked_date,
"Duration_bars": duration_bars,
"Notes": f"DecisionDate={decision_date}",
})
df_open = df_open[df_open["ISIN"] != isin]
current_set.discard(isin)
# ====== STEP 3: aperture (solo nei giorni di refresh) ======
if refresh_today:
# Sizing ricalcolato sulla lista target completa (coerente col backtest)
w_dict = compute_sizing(strategy, target_isins, ret_wide, asof_idx)
add_list = [isin for isin in target_isins if isin not in current_set]
if add_list and w_dict:
cap = compute_current_capital_from_log(strategy.name, ret_wide, decision_date)
for isin in add_list:
w = float(w_dict.get(isin, 0.0))
if w <= 0:
continue
notional = max(MIN_TRADE_NOTIONAL, cap * w)
entry_idx = asof_idx + 1
isins_open_fetch.add(isin)
audit_rows.append({
"Strategy": strategy.name,
"ISIN": isin,
"Action": "OPEN",
"TradeDate": next_open_date,
"EntryIndex": entry_idx,
"EntryAmount": float(notional),
"SizeWeight": float(w),
"Price": None,
"PnL_%": np.nan,
"ExitReason": "",
"LinkedOpenDate": "",
"Duration_bars": 0,
"Notes": f"DecisionDate={decision_date}",
})
df_open = pd.concat([
df_open,
pd.DataFrame([{
"Strategy": strategy.name,
"ISIN": isin,
"EntryDate": decision_date,
"EntryIndex": entry_idx,
"EntryAmount": float(notional),
"SizeWeight": float(w),
"PeakPnL": 0.0,
"WeakDays": 0,
"Notes": "",
}]),
], ignore_index=True)
current_set.add(isin)
# ====== STEP 4: arricchisci con AssetName e salva ======
if asset_name_map is not None:
df_open["AssetName"] = (
df_open["ISIN"].astype(str).map(asset_name_map).fillna("")
)
elif "AssetName" not in df_open.columns:
df_open["AssetName"] = ""
if "AssetName" in df_open.columns and "ISIN" in df_open.columns:
cols = list(df_open.columns)
cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName")))
df_open = df_open[cols]
save_open_trades(strategy.name, df_open)
df_open["Strategy"] = strategy.name
return df_open, audit_rows, isins_open_fetch, isins_close_fetch
def update_positions_and_build_orders(
universe: pd.DataFrame,
returns_long: pd.DataFrame,
signals_today: pd.DataFrame,
today: dt.date,
buy_rank_df: Optional[pd.DataFrame],
allowed_open_isins: Optional[List[str]] = None,
asset_name_map: Optional[pd.Series] = None,
) -> Tuple[pd.DataFrame, List[Dict]]:
"""
Orchestratore della parte decisionale: itera su tutte le strategie attive
in STRATEGIES e raccoglie ordini + audit rows.
"""
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
decision_date: dt.date = ret_wide.index[-1].date()
asof_idx = len(ret_wide.index) - 1
next_open_date = next_business_day(decision_date)
# Mappa EstOutcome per WEAK
est_map_all: Optional[pd.Series] = None
if buy_rank_df is not None and not buy_rank_df.empty:
est_map_all = buy_rank_df.set_index("ISIN")["EstOutcome"]
else:
day_df = signals_today.reset_index().query("Date == @decision_date")
if not day_df.empty:
est_map_all = day_df.set_index("ISIN")["EstOutcome"]
target_isins: List[str] = allowed_open_isins or []
target_set: Set[str] = set(target_isins)
open_concat: List[pd.DataFrame] = []
audit_rows_all: List[Dict] = []
isins_for_open_fetch: Set[str] = set()
isins_for_close_fetch: Set[str] = set()
for strategy in STRATEGIES:
df_open, audit_rows, isins_open, isins_close = _process_strategy(
strategy=strategy,
universe=universe,
ret_wide=ret_wide,
target_isins=target_isins,
target_set=target_set,
est_map_all=est_map_all,
decision_date=decision_date,
next_open_date=next_open_date,
asof_idx=asof_idx,
asset_name_map=asset_name_map,
)
open_concat.append(df_open)
audit_rows_all.extend(audit_rows)
isins_for_open_fetch.update(isins_open)
isins_for_close_fetch.update(isins_close)
# Fetch prezzi (una volta sola)
fetch_isins = sorted(isins_for_open_fetch.union(isins_for_close_fetch))
if fetch_isins:
print(
f"[PRICE] fetch open per {len(fetch_isins)} ISIN "
f"(open={len(isins_for_open_fetch)}, close={len(isins_for_close_fetch)})",
flush=True,
)
px_cache = _fetch_open_prices_once(fetch_isins, universe)
for r in audit_rows_all:
if r.get("Price") is None:
r["Price"] = _safe_to_float(px_cache.get(r["ISIN"]))
n_open = sum(1 for r in audit_rows_all if r.get("Action") == "OPEN")
n_close = sum(1 for r in audit_rows_all if r.get("Action") == "CLOSE")
print(
f"[SUMMARY] decision_date={decision_date} opens={n_open} closes={n_close} "
f"target={len(target_isins)} (cap={MAX_OPEN}, strategie={len(STRATEGIES)})",
flush=True,
)
open_all = pd.concat(open_concat, ignore_index=True) if open_concat else pd.DataFrame()
return open_all, audit_rows_all
# =============================================================================
# MAIN
# =============================================================================
def main_run(run_date: Optional[dt.date] = None) -> None:
today = run_date or dt.date.today()
ensure_dir(OUTPUT_DIR)
# 1) Universo
universe = load_universe(UNIVERSO_XLSX)
asset_name_col = detect_column(
universe, ["Nome", "Name", "Asset", "Asset Name", "Descrizione", "Description"]
)
asset_name_map: Optional[pd.Series] = None
if asset_name_col:
asset_name_map = (
universe[["ISIN", asset_name_col]]
.dropna(subset=["ISIN"])
.assign(ISIN=lambda df: df["ISIN"].astype(str).str.strip())
)
asset_name_map = (
asset_name_map.set_index("ISIN")[asset_name_col].astype(str).str.strip()
)
else:
print("[WARN] colonna con il nome dell'asset non trovata nell'universo.")
print(
f"[CONFIG] strategie attive ({len(STRATEGIES)}): "
+ ", ".join(s.name for s in STRATEGIES)
)
# 2) Rendimenti dal DB
conn_str = read_connection_txt(CONNECTION_TXT)
isins = universe["ISIN"].tolist()
returns_long = _db_fetch_returns(
conn_str, isins,
sp_name=SP_NAME_DEFAULT, n_bars=SP_N_DEFAULT, ptf_curr=PTF_CURR_DEFAULT,
)
if returns_long.empty:
raise RuntimeError("Nessun rendimento disponibile dal DB (SP vuota?).")
# 3) Segnali EOD su D (theta globale, NO Hurst)
sig_df = generate_signals_today(universe, returns_long, today)
# 3b) Ranking + Top-N
decision_date = returns_long["Date"].max().date()
buy_rank_df = _rank_buy(sig_df, decision_date)
allowed_open = _select_top_signals(buy_rank_df, MAX_OPEN)
# 4) Posizioni + audit (tutte le strategie)
open_df, audit_rows = update_positions_and_build_orders(
universe, returns_long, sig_df, today,
buy_rank_df=buy_rank_df,
allowed_open_isins=allowed_open,
asset_name_map=asset_name_map,
)
# 5) Append audit log
if audit_rows:
append_audit_rows(audit_rows)
# 6) Snapshot Excel
ensure_dir(OPEN_TRADES_DIR)
signals_path = _dated_signals_filename()
signals_sheet = sig_df.reset_index()
if asset_name_map is not None:
signals_sheet["AssetName"] = (
signals_sheet["ISIN"].astype(str).map(asset_name_map).fillna("")
)
else:
signals_sheet["AssetName"] = ""
if "AssetName" in signals_sheet.columns:
cols = list(signals_sheet.columns)
cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName")))
signals_sheet = signals_sheet[cols]
sheet_name_overrides = {
"Equal_Weight": "Open_EW",
"Risk_Parity": "Open_RP",
"Equal_Weight_v2": "Open_EW_v2",
"Risk_Parity_v2": "Open_RP_v2",
}
with pd.ExcelWriter(signals_path) as xw:
signals_sheet.to_excel(xw, sheet_name="Signals", index=False)
if not open_df.empty:
for strat, g in open_df.groupby("Strategy"):
sheet_name = sheet_name_overrides.get(strat, f"Open_{strat}")[:31]
g.to_excel(xw, sheet_name=sheet_name, index=False)
copy_to_dropbox(signals_path)
for strategy in STRATEGIES:
csv_path = open_trades_path(strategy.name)
if csv_path.exists():
copy_to_dropbox(csv_path)
print(f"\u2705 Signals generated for {today}. Saved to {signals_path}")
print(f"Open trades saved in {OPEN_TRADES_DIR}")
print(f"Audit log updated at {AUDIT_LOG_CSV}")
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
main_run()