From 9543d63591da177351deb340e23039415652ace1 Mon Sep 17 00:00:00 2001 From: fredmaloggia Date: Fri, 5 Dec 2025 18:07:55 +0100 Subject: [PATCH] affinata versione stabile e fatto rename --- ...ly_kNN_prod_v.2.py => Signals Daily kNN.py | 1418 ++++++++--------- ...1.6.py => Trading Pattern Recon w Hurst.py | 0 2 files changed, 709 insertions(+), 709 deletions(-) rename signals_daily_kNN_prod_v.2.py => Signals Daily kNN.py (97%) rename Trading Pattern Recon w Hurst v.3.1.6.py => Trading Pattern Recon w Hurst.py (100%) diff --git a/signals_daily_kNN_prod_v.2.py b/Signals Daily kNN.py similarity index 97% rename from signals_daily_kNN_prod_v.2.py rename to Signals Daily kNN.py index 1fbaaf8..fb8d548 100644 --- a/signals_daily_kNN_prod_v.2.py +++ b/Signals Daily kNN.py @@ -1,59 +1,59 @@ -# -*- coding: utf-8 -*- -""" -Daily Signals Generator (kNN) – PRODUCTION (coerente al backtest v3.1.5) - -Novità principali: -- Cap giornaliero MAX_OPEN=15: ranking unico dei buy e revisione per differenza su ogni strategia -- Risk Parity con cap per singolo asset (RP_MAX_WEIGHT) allineato al backtest v3.1.5 -- Fetch OPEN una sola volta per ISIN coinvolti in OPEN/CLOSE (cache condivisa tra strategie) -- Audit log per TUTTE le strategie operative (Equal_Weight, Risk_Parity) - -Pipeline (giorno D, EOD -> t+1 OPEN): -1) Carica universo e serie rendimenti dal DB (stored procedure) -2) Pattern kNN (WP=60, HA=10, K=25), Signal=1 se EstOutcome > THETA (decimali) -3) Ranking unico dei buy e selezione Top-N (MAX_OPEN) come target giornaliero -4) Revisione per differenza: chiudi aperture fuori top, apri nuove entrate nel top (+ risk exits) -5) Sizing (Equal Weight / Risk Parity con cap) -6) Fetch OPEN prices UNA VOLTA per ISIN interessati (OPEN/CLOSE) e popolamento ordini -7) Log ordini e snapshot Excel con fogli: Open_Equal_Weight / Open_Risk_Parity -""" - -from __future__ import annotations - -import os -import ssl -import json -import time -import shutil -import warnings -import datetime as dt -from dataclasses import dataclass -from pathlib import Path -from typing import Dict, List, Optional, Tuple, Iterable, Set - -import numpy as np -import pandas as pd -from urllib.request import urlopen -from urllib.error import URLError, HTTPError - -# DB -import sqlalchemy as sa -from sqlalchemy import text as sql_text - -from shared_utils import ( - build_hurst_map, - build_pattern_library, - characterize_window, - detect_column, - load_config, - predict_from_library, - read_connection_txt, - require_section, - require_value, - z_norm, -) - -# ========================= +# -*- coding: utf-8 -*- +""" +Daily Signals Generator (kNN) – PRODUCTION (coerente al backtest v3.1.5) + +Novità principali: +- Cap giornaliero MAX_OPEN=15: ranking unico dei buy e revisione per differenza su ogni strategia +- Risk Parity con cap per singolo asset (RP_MAX_WEIGHT) allineato al backtest v3.1.5 +- Fetch OPEN una sola volta per ISIN coinvolti in OPEN/CLOSE (cache condivisa tra strategie) +- Audit log per TUTTE le strategie operative (Equal_Weight, Risk_Parity) + +Pipeline (giorno D, EOD -> t+1 OPEN): +1) Carica universo e serie rendimenti dal DB (stored procedure) +2) Pattern kNN (WP=60, HA=10, K=25), Signal=1 se EstOutcome > THETA (decimali) +3) Ranking unico dei buy e selezione Top-N (MAX_OPEN) come target giornaliero +4) Revisione per differenza: chiudi aperture fuori top, apri nuove entrate nel top (+ risk exits) +5) Sizing (Equal Weight / Risk Parity con cap) +6) Fetch OPEN prices UNA VOLTA per ISIN interessati (OPEN/CLOSE) e popolamento ordini +7) Log ordini e snapshot Excel con fogli: Open_Equal_Weight / Open_Risk_Parity +""" + +from __future__ import annotations + +import os +import ssl +import json +import time +import shutil +import warnings +import datetime as dt +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Iterable, Set + +import numpy as np +import pandas as pd +from urllib.request import urlopen +from urllib.error import URLError, HTTPError + +# DB +import sqlalchemy as sa +from sqlalchemy import text as sql_text + +from shared_utils import ( + build_hurst_map, + build_pattern_library, + characterize_window, + detect_column, + load_config, + predict_from_library, + read_connection_txt, + require_section, + require_value, + z_norm, +) + +# ========================= # CONFIG # ========================= CONFIG = load_config() @@ -80,41 +80,41 @@ DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di def _dated_signals_filename() -> Path: date_prefix = pd.Timestamp.today().strftime("%Y%m%d") return OUTPUT_DIR / f"{date_prefix}_signals.xlsx" - -# Stored procedure / parametri DB -SP_NAME_DEFAULT = str(require_value(DB_CONFIG, "stored_proc", "db")) -SP_N_DEFAULT = int(require_value(DB_CONFIG, "n_bars", "db")) -PTF_CURR_DEFAULT = str(require_value(DB_CONFIG, "ptf_curr", "db")) - -# Pattern recognition (come backtest) -WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) -HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) -KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) -THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # 0,005% in decimali (identico al backtest) -Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging")) -Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging")) -STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging")) - -# Exit rules (identiche al backtest) -SL_BPS = float(require_value(SIGNALS_CONFIG, "sl_bps", "signals")) -TP_BPS = float(require_value(SIGNALS_CONFIG, "tp_bps", "signals")) -TRAIL_BPS = float(require_value(SIGNALS_CONFIG, "trail_bps", "signals")) -TIME_STOP_BARS = int(require_value(SIGNALS_CONFIG, "time_stop_bars", "signals")) -THETA_EXIT = float(require_value(SIGNALS_CONFIG, "theta_exit", "signals")) # soglia debolezza -WEAK_DAYS_EXIT = require_value(SIGNALS_CONFIG, "weak_days_exit", "signals") # uscita IMMEDIATA in caso di debolezza (come backtest) - -# Ranking e selezione Top-N per APERTURE -MAX_OPEN = int(require_value(SIGNALS_CONFIG, "max_open", "signals")) # cap strumenti aperti oggi (come backtest) - -# Allineamento al backtest v3.1.5 per il cap del Risk Parity -TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) -RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # ≈ 0.1333 = 13,33% per singolo asset -if RP_MAX_WEIGHT is None: - RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1) -else: - RP_MAX_WEIGHT = float(RP_MAX_WEIGHT) - -# Sizing + +# Stored procedure / parametri DB +SP_NAME_DEFAULT = str(require_value(DB_CONFIG, "stored_proc", "db")) +SP_N_DEFAULT = int(require_value(DB_CONFIG, "n_bars", "db")) +PTF_CURR_DEFAULT = str(require_value(DB_CONFIG, "ptf_curr", "db")) + +# Pattern recognition (come backtest) +WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) +HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) +KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) +THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # 0,005% in decimali (identico al backtest) +Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging")) +Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging")) +STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging")) + +# Exit rules (identiche al backtest) +SL_BPS = float(require_value(SIGNALS_CONFIG, "sl_bps", "signals")) +TP_BPS = float(require_value(SIGNALS_CONFIG, "tp_bps", "signals")) +TRAIL_BPS = float(require_value(SIGNALS_CONFIG, "trail_bps", "signals")) +TIME_STOP_BARS = int(require_value(SIGNALS_CONFIG, "time_stop_bars", "signals")) +THETA_EXIT = float(require_value(SIGNALS_CONFIG, "theta_exit", "signals")) # soglia debolezza +WEAK_DAYS_EXIT = require_value(SIGNALS_CONFIG, "weak_days_exit", "signals") # uscita IMMEDIATA in caso di debolezza (come backtest) + +# Ranking e selezione Top-N per APERTURE +MAX_OPEN = int(require_value(SIGNALS_CONFIG, "max_open", "signals")) # cap strumenti aperti oggi (come backtest) + +# Allineamento al backtest v3.1.5 per il cap del Risk Parity +TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) +RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # ≈ 0.1333 = 13,33% per singolo asset +if RP_MAX_WEIGHT is None: + RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1) +else: + RP_MAX_WEIGHT = float(RP_MAX_WEIGHT) + +# Sizing BASE_CAPITAL_PER_STRATEGY = float(require_value(SIGNALS_CONFIG, "base_capital_per_strategy", "signals")) MIN_TRADE_NOTIONAL = float(require_value(SIGNALS_CONFIG, "min_trade_notional", "signals")) RISK_PARITY_LOOKBACK = int(require_value(SIGNALS_CONFIG, "risk_parity_lookback", "signals")) @@ -126,93 +126,93 @@ OPEN_SLEEP_SEC = float(PRICES_CONFIG.get("sleep_sec", 0.1)) OPEN_TIMEOUT = float(PRICES_CONFIG.get("timeout", 10)) BUSINESS_DAYS_ONLY = bool(RUN_CONFIG.get("business_days_only", True)) SEED = int(RUN_CONFIG.get("seed", 42)) - -warnings.filterwarnings("ignore") -np.random.seed(SEED) - -# ========================= -# UTILS -# ========================= -def ensure_dir(p: Path): - p.mkdir(parents=True, exist_ok=True) - -def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR): - if not src or not dst_dir: - return - if not src.exists(): - return - try: - ensure_dir(dst_dir) - dst = dst_dir / src.name - shutil.copy2(src, dst) - except Exception as exc: - print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}") - -def next_business_day(d: dt.date) -> dt.date: - nd = d + dt.timedelta(days=1) - if not BUSINESS_DAYS_ONLY: - return nd - while nd.weekday() >= 5: # 5=Sat, 6=Sun - nd += dt.timedelta(days=1) - return nd - -def _safe_to_float(x) -> Optional[float]: - try: - return float(x) - except Exception: - return None - -def _db_fetch_returns(conn_str: str, - isins: List[str], - sp_name: Optional[str] = None, - n_bars: Optional[int] = None, - ptf_curr: Optional[str] = None) -> pd.DataFrame: - engine = sa.create_engine(conn_str, fast_executemany=True) - sp = sp_name or os.environ.get("SP_NAME", SP_NAME_DEFAULT) - n_val = n_bars if n_bars is not None else int(os.environ.get("SP_N", SP_N_DEFAULT)) - ptf = (ptf_curr or os.environ.get("PTF_CURR", PTF_CURR_DEFAULT) or "").strip() or PTF_CURR_DEFAULT - - sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") - frames: List[pd.DataFrame] = [] - - with engine.begin() as conn: - for i, isin in enumerate(isins, start=1): - print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True) - try: - df = pd.read_sql_query(sql_sp, conn, params={"isin": str(isin), "n": int(n_val), "ptf": ptf}) - except Exception as e: - print(f"[ERROR] SP {sp} fallita per {isin}: {e}") - continue - - if df.empty: - print(f"[WARN] Nessun dato per {isin}") - continue - - col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) - col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) - if not col_date or not col_ret: - print(f"[WARN] Colonne mancanti per {isin}") - continue - - out = df[[col_date, col_ret]].copy() - out.columns = ["Date", "Ret"] - out["Date"] = pd.to_datetime(out["Date"], errors="coerce").dt.tz_localize(None) - out["ISIN"] = str(isin) - - med = pd.to_numeric(out["Ret"], errors="coerce").abs().median() - if pd.notnull(med) and med > 1.5: - out["Ret"] = out["Ret"].astype(float) / 100.0 - - print(f" ↳ righe scaricate: {len(out)}") - frames.append(out[["Date", "ISIN", "Ret"]]) - - if not frames: - return pd.DataFrame(columns=["Date", "ISIN", "Ret"]) - - out_all = pd.concat(frames, ignore_index=True) - out_all = out_all.dropna(subset=["Date"]).sort_values(["ISIN", "Date"]).reset_index(drop=True) - return out_all[["Date", "ISIN", "Ret"]] - + +warnings.filterwarnings("ignore") +np.random.seed(SEED) + +# ========================= +# UTILS +# ========================= +def ensure_dir(p: Path): + p.mkdir(parents=True, exist_ok=True) + +def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR): + if not src or not dst_dir: + return + if not src.exists(): + return + try: + ensure_dir(dst_dir) + dst = dst_dir / src.name + shutil.copy2(src, dst) + except Exception as exc: + print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}") + +def next_business_day(d: dt.date) -> dt.date: + nd = d + dt.timedelta(days=1) + if not BUSINESS_DAYS_ONLY: + return nd + while nd.weekday() >= 5: # 5=Sat, 6=Sun + nd += dt.timedelta(days=1) + return nd + +def _safe_to_float(x) -> Optional[float]: + try: + return float(x) + except Exception: + return None + +def _db_fetch_returns(conn_str: str, + isins: List[str], + sp_name: Optional[str] = None, + n_bars: Optional[int] = None, + ptf_curr: Optional[str] = None) -> pd.DataFrame: + engine = sa.create_engine(conn_str, fast_executemany=True) + sp = sp_name or os.environ.get("SP_NAME", SP_NAME_DEFAULT) + n_val = n_bars if n_bars is not None else int(os.environ.get("SP_N", SP_N_DEFAULT)) + ptf = (ptf_curr or os.environ.get("PTF_CURR", PTF_CURR_DEFAULT) or "").strip() or PTF_CURR_DEFAULT + + sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") + frames: List[pd.DataFrame] = [] + + with engine.begin() as conn: + for i, isin in enumerate(isins, start=1): + print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True) + try: + df = pd.read_sql_query(sql_sp, conn, params={"isin": str(isin), "n": int(n_val), "ptf": ptf}) + except Exception as e: + print(f"[ERROR] SP {sp} fallita per {isin}: {e}") + continue + + if df.empty: + print(f"[WARN] Nessun dato per {isin}") + continue + + col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) + col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) + if not col_date or not col_ret: + print(f"[WARN] Colonne mancanti per {isin}") + continue + + out = df[[col_date, col_ret]].copy() + out.columns = ["Date", "Ret"] + out["Date"] = pd.to_datetime(out["Date"], errors="coerce").dt.tz_localize(None) + out["ISIN"] = str(isin) + + med = pd.to_numeric(out["Ret"], errors="coerce").abs().median() + if pd.notnull(med) and med > 1.5: + out["Ret"] = out["Ret"].astype(float) / 100.0 + + print(f" ↳ righe scaricate: {len(out)}") + frames.append(out[["Date", "ISIN", "Ret"]]) + + if not frames: + return pd.DataFrame(columns=["Date", "ISIN", "Ret"]) + + out_all = pd.concat(frames, ignore_index=True) + out_all = out_all.dropna(subset=["Date"]).sort_values(["ISIN", "Date"]).reset_index(drop=True) + return out_all[["Date", "ISIN", "Ret"]] + # ========================= # UNIVERSO + OPEN PRICE API (schema checker) # ========================= @@ -221,12 +221,12 @@ def load_universe(path: Path) -> pd.DataFrame: if "ISIN" not in df.columns: raise KeyError("Nel file Universo manca la colonna 'ISIN'") df["ISIN"] = df["ISIN"].astype(str).str.strip() - for col in ["Asset Class", "Mercato", "TickerOpen"]: - if col not in df.columns: - df[col] = "" - df[col] = df[col].astype(str).str.strip() - return df - + for col in ["Asset Class", "Mercato", "TickerOpen"]: + if col not in df.columns: + df[col] = "" + df[col] = df[col].astype(str).str.strip() + return df + def _build_symbol_euronext(row: pd.Series) -> Tuple[str, str]: isin = str(row.get("ISIN", "")).strip() venue = str(row.get("Mercato", "")).strip() @@ -237,469 +237,469 @@ def _build_symbol_euronext(row: pd.Series) -> Tuple[str, str]: if isin and venue: return base, f"{isin}-{venue}" return base, isin - -def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]: - """ - Endpoint euronext/price/ISIN-Mercato con retry/backoff. - Log dettagliato stile checker: [TRY]/[RETRY]/[OK]/[FAIL]. - """ - try: - row = universe.loc[universe["ISIN"] == str(isin)].iloc[0] - except Exception: - print(f"[WARN] ISIN {isin} non trovato nell’universo.") - return None - - base, symbol = _build_symbol_euronext(row) - url = f"{base}/{symbol}" - print(f"[DOWNLOAD] {symbol:<30s} -> [TRY 1/{OPEN_MAX_RETRY}]", flush=True) - - for attempt in range(1, OPEN_MAX_RETRY + 1): - if attempt > 1: - print(f"[DOWNLOAD] {symbol:<30s} -> [TRY {attempt}/{OPEN_MAX_RETRY}]", flush=True) - try: - with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp: - data = json.loads(resp.read().decode("utf-8")) - if not isinstance(data, list) or not data: - print(" ↳ NO DATA") - continue - - d = (data[0] or {}).get("data") or {} - px = d.get("open") if d.get("open") is not None else d.get("prevClose") - if px is None: - print(" ↳ WARN: 'open' e 'prevClose' assenti") - continue - - px = float(px) - print(f" ↳ OK open={d.get('open')} close={d.get('close')} (ritorno prezzo={px})") - return px - - except (HTTPError, URLError, ssl.SSLError) as e: - if attempt < OPEN_MAX_RETRY: - print(f" ↳ ERR {e}\nritento tra {OPEN_SLEEP_SEC}s") - time.sleep(OPEN_SLEEP_SEC) - else: - print(f" ↳ ERR {e}") - - print(f"[ERROR] nessun prezzo per {symbol} dopo {OPEN_MAX_RETRY} tentativi") - return None - -# ========================= -# HURST ESTIMATOR & MAP -# ========================= -# ========================= -# GENERAZIONE SEGNALI (EOD su D) -# ========================= -def generate_signals_today(universe: pd.DataFrame, - returns_long: pd.DataFrame, - today: dt.date, - hurst_map: Optional[Dict[str, float]] = None) -> pd.DataFrame: - ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() - decision_date = ret_wide.index[-1].date() - - rows = [] - for isin in ret_wide.columns: - r = ret_wide[isin].dropna().astype(float) - # THETA = HURST IN PERCENTUALE (H = 0.50 -> theta_entry = 0.005 = 0.5%) - theta_entry = THETA # fallback globale - H_val = None - if hurst_map is not None: - H_val = hurst_map.get(str(isin)) - if H_val is not None and not pd.isna(H_val): - theta_entry = float(H_val) / 100.0 - - if len(r) < max(200, WP + HA + 10): - rows.append({"Date": decision_date, "ISIN": isin, - "Signal": 0, "EstOutcome": np.nan, "AvgDist": np.nan, - "PatternType": None, "Confidence": np.nan}) - continue - - lib_wins, lib_out = build_pattern_library(r, WP, HA) - if lib_wins is None or len(r) < WP + HA: - est_out, avg_dist, sig = np.nan, np.nan, 0 - ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) - else: - curr = r.values[-WP:] - curr_zn = z_norm(curr) - if curr_zn is None: - est_out, avg_dist, sig = np.nan, np.nan, 0 - ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) - else: - est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K) - sig = 1 if (pd.notna(est_out) and float(est_out) > float(theta_entry)) else 0 - ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) - - rows.append({ - "Date": decision_date, "ISIN": isin, - "Signal": int(sig), - "EstOutcome": (None if pd.isna(est_out) else float(est_out)), - "AvgDist": (None if pd.isna(avg_dist) else float(avg_dist)), - "PatternType": ptype, - "Confidence": (None if pconf is None else float(max(0.0, min(1.0, pconf)))), - "Hurst": (None if H_val is None else float(H_val)), - "Theta": float(theta_entry), - }) - - sig_df = pd.DataFrame(rows).set_index(["Date","ISIN"]).sort_index() - return sig_df - -# ========================= -# TOP-N SELECTION & PRICE CACHE -# ========================= -def _rank_buy(signals_today: pd.DataFrame, decision_date: dt.date) -> pd.DataFrame: - """ - Ritorna un DataFrame con i soli buy del giorno, ordinati per: - EstOutcome desc, Confidence desc, AvgDist asc. - Colonne: ['ISIN','EstOutcome','Confidence','AvgDist'] - """ - if signals_today.empty: - return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"]) - day_df = signals_today.reset_index().query("Date == @decision_date") - if day_df.empty: - return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"]) - - buy = (day_df[day_df["Signal"] == 1] - .assign( - EstOutcome=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "EstOutcome"], errors="coerce"), - Confidence=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "Confidence"], errors="coerce"), - AvgDist=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "AvgDist"], errors="coerce"), - ) - .sort_values(by=["EstOutcome","Confidence","AvgDist"], ascending=[False,False,True], na_position="last")) - return buy[["ISIN","EstOutcome","Confidence","AvgDist"]].reset_index(drop=True) - -def _select_top_signals(buy_rank_df: pd.DataFrame, max_open: int) -> List[str]: - if buy_rank_df.empty: - return [] - return buy_rank_df["ISIN"].head(max_open).tolist() - -def _fetch_open_prices_once(isins: Iterable[str], universe: pd.DataFrame) -> Dict[str, Optional[float]]: - cache: Dict[str, Optional[float]] = {} - for isin in sorted(set(isins)): - cache[isin] = get_open_price(isin, universe) - return cache - -# ========================= -# POSIZIONI / ORDINI / LOG -# ========================= -@dataclass -class OpenPos: - Strategy: str - ISIN: str - EntryDate: dt.date - EntryIndex: int - EntryAmount: float - SizeWeight: float - PeakPnL: float = 0.0 - Notes: str = "" - -def open_trades_path(strategy: str) -> Path: - ensure_dir(OPEN_TRADES_DIR) - return OPEN_TRADES_DIR / f"open_{strategy}.csv" - -def load_open_trades(strategy: str) -> pd.DataFrame: - p = open_trades_path(strategy) - if not p.exists(): - return pd.DataFrame(columns=[ - "Strategy","ISIN","AssetName","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes" - ]) - df = pd.read_csv(p) - if "EntryDate" in df.columns: - df["EntryDate"] = pd.to_datetime(df["EntryDate"], errors="coerce").dt.date - if "WeakDays" not in df.columns: - df["WeakDays"] = 0 - if "AssetName" not in df.columns: - df["AssetName"] = "" - df["Strategy"] = strategy - return df - -def save_open_trades(strategy: str, df: pd.DataFrame): - p = open_trades_path(strategy) - df.to_csv(p, index=False) - -def append_audit_rows(rows: List[Dict]): - if not rows: - return - ensure_dir(AUDIT_LOG_CSV.parent) - log = pd.DataFrame(rows) - if AUDIT_LOG_CSV.exists(): - old = pd.read_csv(AUDIT_LOG_CSV) - log = pd.concat([old, log], ignore_index=True) - log.to_csv(AUDIT_LOG_CSV, index=False) - -# sizing -def compute_current_capital_from_log(strategy: str, - returns_wide: pd.DataFrame, - asof_date: dt.date) -> float: - return BASE_CAPITAL_PER_STRATEGY - -def size_equal_weight(candidates: List[str]) -> Dict[str, float]: - if not candidates: - return {} - w = 1.0 / max(1, len(candidates)) - return {isin: w for isin in candidates} - -def size_risk_parity(candidates: List[str], - returns_wide: pd.DataFrame, - asof_idx: int) -> Dict[str, float]: - """ - Risk Parity sui soli candidati, con cap per singolo asset (RP_MAX_WEIGHT), - allineato alla logica usata nel backtest v3.1.5. - Eventuale parte non allocata resta in cash (non rinormalizziamo dopo il cap). - """ - if not candidates: - return {} - L = RISK_PARITY_LOOKBACK - start = max(0, asof_idx - L + 1) - sub = returns_wide.iloc[start:asof_idx+1][candidates] - vol = sub.std().replace(0, np.nan) - inv_vol = 1.0 / vol - inv_vol = inv_vol.replace([np.inf, -np.inf], np.nan).fillna(0.0) - if inv_vol.sum() == 0: - return size_equal_weight(candidates) - w = (inv_vol / inv_vol.sum()).to_dict() - # Cap per singolo peso, come RP_MAX_WEIGHT nel Pattern Recon - if RP_MAX_WEIGHT is not None: - w = {k: min(RP_MAX_WEIGHT, float(v)) for k, v in w.items()} - return w - -def _risk_exit_flags(isin: str, - entry_date: Optional[dt.date], - ret_wide: pd.DataFrame, - decision_date: dt.date, - est_map_today: pd.Series) -> List[str]: - """Calcola motivi di uscita (SL/TP/Trailing/Time/Weak) per una posizione.""" - reasons: List[str] = [] - pnl_if_stay = 0.0 - peak_dd = 0.0 - if entry_date is not None and isin in ret_wide.columns: - sub = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)] - if not sub.empty and sub.index[0].date() == entry_date: - sub = sub.iloc[1:] - if not sub.empty: - eq_path = (1.0 + sub.fillna(0.0)).cumprod() - pnl_if_stay = float(eq_path.iloc[-1] - 1.0) - peak_curve = eq_path.cummax() - dd_series = eq_path / peak_curve - 1.0 - peak_dd = float(-dd_series.min()) - - if SL_BPS is not None and pnl_if_stay <= -SL_BPS/10000.0: - reasons.append("SL") - if TP_BPS is not None and pnl_if_stay >= TP_BPS/10000.0: - reasons.append("TP") - if TRAIL_BPS is not None and peak_dd >= TRAIL_BPS/10000.0: - reasons.append("TRAIL") - - if TIME_STOP_BARS is not None and entry_date is not None and isin in ret_wide.columns: - sub_all = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)] - bars = len(sub_all) - (1 if (not sub_all.empty and sub_all.index[0].date() == entry_date) else 0) - if bars >= int(TIME_STOP_BARS): - reasons.append("TIME") - - if THETA_EXIT is not None and est_map_today is not None and isin in est_map_today.index: - est_out_today = est_map_today.loc[isin] - if pd.notna(est_out_today) and float(est_out_today) <= float(THETA_EXIT): - reasons.append("WEAK") - return reasons - -def update_positions_and_build_orders(universe: pd.DataFrame, - returns_long: pd.DataFrame, - signals_today: pd.DataFrame, - today: dt.date, - buy_rank_df: Optional[pd.DataFrame], - allowed_open_isins: Optional[List[str]] = None, - asset_name_map: Optional[pd.Series] = None) -> Tuple[pd.DataFrame, List[Dict]]: - """ - - decision_date = ultima data disponibile (EOD) - - target giornaliero = primi MAX_OPEN del ranking buy (uguale per tutte le strategie) - - per ogni strategia: revisione per differenza vs posizioni già aperte - - CLOSE anche per risk exits (SL/TP/Trailing/Time/Weak) - - Esecuzione a t+1 open; fetch prezzi UNA VOLTA per tutti gli ISIN con ordini - - NB: strategia Aggressiva/Crypto rimossa. Restano ONLY: - - Equal_Weight - - Risk_Parity (con cap per singolo asset) - """ - strategies = ["Equal_Weight", "Risk_Parity"] - - ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() - decision_date: dt.date = ret_wide.index[-1].date() - asof_idx = len(ret_wide.index) - 1 - next_open_date = next_business_day(decision_date) - - # Ranking e maps di oggi - est_map_all = None - if buy_rank_df is not None and not buy_rank_df.empty: - est_map_all = buy_rank_df.set_index("ISIN")["EstOutcome"] - else: - # fallback: usa tutti i segnali di oggi - day_df = signals_today.reset_index().query("Date == @decision_date") - if not day_df.empty: - est_map_all = day_df.set_index("ISIN")["EstOutcome"] - - target_isins: List[str] = allowed_open_isins or [] - target_set: Set[str] = set(target_isins) - - # Sizing per strategia basato sul target - size_eq = size_equal_weight(target_isins) - size_rp = size_risk_parity(target_isins, ret_wide, asof_idx) - sizing_by_strategy = {"Equal_Weight": size_eq, "Risk_Parity": size_rp} - - open_concat: List[pd.DataFrame] = [] - audit_rows_all: List[Dict] = [] - isins_for_open_fetch: Set[str] = set() - isins_for_close_fetch: Set[str] = set() - - for strat in strategies: - df_open = load_open_trades(strat) - current_set: Set[str] = set(df_open["ISIN"].astype(str).tolist()) - - # --- risk exits su posizioni correnti --- - closers: List[Tuple[str, str]] = [] - if not df_open.empty: - if "WeakDays" not in df_open.columns: - df_open["WeakDays"] = 0 - - for _, row_open in df_open.iterrows(): - isin = str(row_open["ISIN"]) - entry_date = pd.to_datetime(row_open["EntryDate"]).date() if pd.notna(row_open["EntryDate"]) else None - reasons = _risk_exit_flags(isin, entry_date, ret_wide, decision_date, est_map_all if est_map_all is not None else pd.Series(dtype=float)) - if reasons: - closers.append((isin, "+".join(sorted(set(reasons))))) - - # --- chiusure per ranking (scivolati fuori dal top-N) --- - drop_by_rank = list(current_set - target_set) - for isin in drop_by_rank: - closers.append((isin, "RANK")) - - # --- applica chiusure (accumula ordini) --- - for isin, reason in closers: - row_open = df_open.loc[df_open["ISIN"] == isin] - linked_date = row_open["EntryDate"].iloc[0] if not row_open.empty else None - - isins_for_close_fetch.add(isin) - audit_rows_all.append({ - "Strategy": strat, - "ISIN": isin, - "Action": "CLOSE", - "TradeDate": next_open_date, - "EntryIndex": np.nan, - "EntryAmount": np.nan, - "SizeWeight": np.nan, - "Price": None, # riempito dopo fetch - "PnL_%": np.nan, - "ExitReason": reason, - "LinkedOpenDate": linked_date, - "Duration_bars": np.nan, - "Notes": f"DecisionDate={decision_date}" - }) - df_open = df_open[df_open["ISIN"] != isin] - current_set.discard(isin) - - # --- aperture per differenza verso target --- - add_list = [isin for isin in target_isins if isin not in current_set] # mantiene l'ordine del ranking - w_dict = sizing_by_strategy[strat] - if add_list and w_dict: - cap = compute_current_capital_from_log(strat, ret_wide, decision_date) - for isin in add_list: - w = float(w_dict.get(isin, 0.0)) - if w <= 0: - continue - notional = max(MIN_TRADE_NOTIONAL, cap * w) - entry_idx = asof_idx + 1 - - isins_for_open_fetch.add(isin) - audit_rows_all.append({ - "Strategy": strat, - "ISIN": isin, - "Action": "OPEN", - "TradeDate": next_open_date, - "EntryIndex": entry_idx, - "EntryAmount": float(notional), - "SizeWeight": float(w), - "Price": None, # riempito dopo fetch - "PnL_%": np.nan, - "ExitReason": "", - "LinkedOpenDate": "", - "Duration_bars": 0, - "Notes": f"DecisionDate={decision_date}" - }) - df_open = pd.concat([df_open, pd.DataFrame([{ - "Strategy": strat, - "ISIN": isin, - "EntryDate": decision_date, - "EntryIndex": entry_idx, - "EntryAmount": float(notional), - "SizeWeight": float(w), - "PeakPnL": 0.0, - "WeakDays": 0, - "Notes": "" - }])], ignore_index=True) - current_set.add(isin) - - if asset_name_map is not None: - df_open["AssetName"] = df_open["ISIN"].astype(str).map(asset_name_map).fillna("") - else: - if "AssetName" not in df_open.columns: - df_open["AssetName"] = "" - if "AssetName" in df_open.columns: - cols = list(df_open.columns) - if "ISIN" in cols and "AssetName" in cols: - cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName"))) - df_open = df_open[cols] - - save_open_trades(strat, df_open) - df_open["Strategy"] = strat - open_concat.append(df_open) - - # ---- FETCH UNA VOLTA (OPEN + CLOSE) ---- - fetch_isins = sorted(isins_for_open_fetch.union(isins_for_close_fetch)) - if fetch_isins: - print(f"[PRICE] fetch open per {len(fetch_isins)} ISIN (open={len(isins_for_open_fetch)}, close={len(isins_for_close_fetch)})", flush=True) - px_cache = _fetch_open_prices_once(fetch_isins, universe) - for r in audit_rows_all: - if r.get("Price") is None: - r["Price"] = _safe_to_float(px_cache.get(r["ISIN"])) - - # ---- SUMMARY ---- - n_open = sum(1 for r in audit_rows_all if r.get("Action") == "OPEN") - n_close = sum(1 for r in audit_rows_all if r.get("Action") == "CLOSE") - print(f"[SUMMARY] decision_date={decision_date} opens={n_open} closes={n_close} target={len(target_isins)} (cap={MAX_OPEN})", flush=True) - - open_all = pd.concat(open_concat, ignore_index=True) if open_concat else pd.DataFrame() - return open_all, audit_rows_all - -# ========================= -# MAIN RUN -# ========================= -def main_run(run_date: Optional[dt.date] = None): - today = run_date or dt.date.today() - ensure_dir(OUTPUT_DIR) - - # 1) Universo - universe = load_universe(UNIVERSO_XLSX) - asset_name_col = detect_column(universe, [ - "Nome", "Name", "Asset", "Asset Name", "Descrizione", "Description" - ]) - if not asset_name_col: - print("[WARN] Colonna con il nome dell'asset non trovata nell'universo.") - asset_name_map: Optional[pd.Series] = None - if asset_name_col: - asset_name_map = ( - universe[["ISIN", asset_name_col]] - .dropna(subset=["ISIN"]) - .assign(ISIN=lambda df: df["ISIN"].astype(str).str.strip()) - ) - asset_name_map = asset_name_map.set_index("ISIN")[asset_name_col].astype(str).str.strip() - - # 2) Ritorni (DB) - conn_str = read_connection_txt(CONNECTION_TXT) - isins = universe["ISIN"].tolist() - returns_long = _db_fetch_returns( - conn_str, - isins, - sp_name=SP_NAME_DEFAULT, - n_bars=SP_N_DEFAULT, - ptf_curr=PTF_CURR_DEFAULT, - ) + +def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]: + """ + Endpoint euronext/price/ISIN-Mercato con retry/backoff. + Log dettagliato stile checker: [TRY]/[RETRY]/[OK]/[FAIL]. + """ + try: + row = universe.loc[universe["ISIN"] == str(isin)].iloc[0] + except Exception: + print(f"[WARN] ISIN {isin} non trovato nell’universo.") + return None + + base, symbol = _build_symbol_euronext(row) + url = f"{base}/{symbol}" + print(f"[DOWNLOAD] {symbol:<30s} -> [TRY 1/{OPEN_MAX_RETRY}]", flush=True) + + for attempt in range(1, OPEN_MAX_RETRY + 1): + if attempt > 1: + print(f"[DOWNLOAD] {symbol:<30s} -> [TRY {attempt}/{OPEN_MAX_RETRY}]", flush=True) + try: + with urlopen(url, timeout=OPEN_TIMEOUT, context=ssl.create_default_context()) as resp: + data = json.loads(resp.read().decode("utf-8")) + if not isinstance(data, list) or not data: + print(" ↳ NO DATA") + continue + + d = (data[0] or {}).get("data") or {} + px = d.get("open") if d.get("open") is not None else d.get("prevClose") + if px is None: + print(" ↳ WARN: 'open' e 'prevClose' assenti") + continue + + px = float(px) + print(f" ↳ OK open={d.get('open')} close={d.get('close')} (ritorno prezzo={px})") + return px + + except (HTTPError, URLError, ssl.SSLError) as e: + if attempt < OPEN_MAX_RETRY: + print(f" ↳ ERR {e}\nritento tra {OPEN_SLEEP_SEC}s") + time.sleep(OPEN_SLEEP_SEC) + else: + print(f" ↳ ERR {e}") + + print(f"[ERROR] nessun prezzo per {symbol} dopo {OPEN_MAX_RETRY} tentativi") + return None + +# ========================= +# HURST ESTIMATOR & MAP +# ========================= +# ========================= +# GENERAZIONE SEGNALI (EOD su D) +# ========================= +def generate_signals_today(universe: pd.DataFrame, + returns_long: pd.DataFrame, + today: dt.date, + hurst_map: Optional[Dict[str, float]] = None) -> pd.DataFrame: + ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() + decision_date = ret_wide.index[-1].date() + + rows = [] + for isin in ret_wide.columns: + r = ret_wide[isin].dropna().astype(float) + # THETA = HURST IN PERCENTUALE (H = 0.50 -> theta_entry = 0.005 = 0.5%) + theta_entry = THETA # fallback globale + H_val = None + if hurst_map is not None: + H_val = hurst_map.get(str(isin)) + if H_val is not None and not pd.isna(H_val): + theta_entry = float(H_val) / 100.0 + + if len(r) < max(200, WP + HA + 10): + rows.append({"Date": decision_date, "ISIN": isin, + "Signal": 0, "EstOutcome": np.nan, "AvgDist": np.nan, + "PatternType": None, "Confidence": np.nan}) + continue + + lib_wins, lib_out = build_pattern_library(r, WP, HA) + if lib_wins is None or len(r) < WP + HA: + est_out, avg_dist, sig = np.nan, np.nan, 0 + ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + else: + curr = r.values[-WP:] + curr_zn = z_norm(curr) + if curr_zn is None: + est_out, avg_dist, sig = np.nan, np.nan, 0 + ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + else: + est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K) + sig = 1 if (pd.notna(est_out) and float(est_out) > float(theta_entry)) else 0 + ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT) + + rows.append({ + "Date": decision_date, "ISIN": isin, + "Signal": int(sig), + "EstOutcome": (None if pd.isna(est_out) else float(est_out)), + "AvgDist": (None if pd.isna(avg_dist) else float(avg_dist)), + "PatternType": ptype, + "Confidence": (None if pconf is None else float(max(0.0, min(1.0, pconf)))), + "Hurst": (None if H_val is None else float(H_val)), + "Theta": float(theta_entry), + }) + + sig_df = pd.DataFrame(rows).set_index(["Date","ISIN"]).sort_index() + return sig_df + +# ========================= +# TOP-N SELECTION & PRICE CACHE +# ========================= +def _rank_buy(signals_today: pd.DataFrame, decision_date: dt.date) -> pd.DataFrame: + """ + Ritorna un DataFrame con i soli buy del giorno, ordinati per: + EstOutcome desc, Confidence desc, AvgDist asc. + Colonne: ['ISIN','EstOutcome','Confidence','AvgDist'] + """ + if signals_today.empty: + return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"]) + day_df = signals_today.reset_index().query("Date == @decision_date") + if day_df.empty: + return pd.DataFrame(columns=["ISIN","EstOutcome","Confidence","AvgDist"]) + + buy = (day_df[day_df["Signal"] == 1] + .assign( + EstOutcome=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "EstOutcome"], errors="coerce"), + Confidence=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "Confidence"], errors="coerce"), + AvgDist=pd.to_numeric(day_df.loc[day_df["Signal"] == 1, "AvgDist"], errors="coerce"), + ) + .sort_values(by=["EstOutcome","Confidence","AvgDist"], ascending=[False,False,True], na_position="last")) + return buy[["ISIN","EstOutcome","Confidence","AvgDist"]].reset_index(drop=True) + +def _select_top_signals(buy_rank_df: pd.DataFrame, max_open: int) -> List[str]: + if buy_rank_df.empty: + return [] + return buy_rank_df["ISIN"].head(max_open).tolist() + +def _fetch_open_prices_once(isins: Iterable[str], universe: pd.DataFrame) -> Dict[str, Optional[float]]: + cache: Dict[str, Optional[float]] = {} + for isin in sorted(set(isins)): + cache[isin] = get_open_price(isin, universe) + return cache + +# ========================= +# POSIZIONI / ORDINI / LOG +# ========================= +@dataclass +class OpenPos: + Strategy: str + ISIN: str + EntryDate: dt.date + EntryIndex: int + EntryAmount: float + SizeWeight: float + PeakPnL: float = 0.0 + Notes: str = "" + +def open_trades_path(strategy: str) -> Path: + ensure_dir(OPEN_TRADES_DIR) + return OPEN_TRADES_DIR / f"open_{strategy}.csv" + +def load_open_trades(strategy: str) -> pd.DataFrame: + p = open_trades_path(strategy) + if not p.exists(): + return pd.DataFrame(columns=[ + "Strategy","ISIN","AssetName","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes" + ]) + df = pd.read_csv(p) + if "EntryDate" in df.columns: + df["EntryDate"] = pd.to_datetime(df["EntryDate"], errors="coerce").dt.date + if "WeakDays" not in df.columns: + df["WeakDays"] = 0 + if "AssetName" not in df.columns: + df["AssetName"] = "" + df["Strategy"] = strategy + return df + +def save_open_trades(strategy: str, df: pd.DataFrame): + p = open_trades_path(strategy) + df.to_csv(p, index=False) + +def append_audit_rows(rows: List[Dict]): + if not rows: + return + ensure_dir(AUDIT_LOG_CSV.parent) + log = pd.DataFrame(rows) + if AUDIT_LOG_CSV.exists(): + old = pd.read_csv(AUDIT_LOG_CSV) + log = pd.concat([old, log], ignore_index=True) + log.to_csv(AUDIT_LOG_CSV, index=False) + +# sizing +def compute_current_capital_from_log(strategy: str, + returns_wide: pd.DataFrame, + asof_date: dt.date) -> float: + return BASE_CAPITAL_PER_STRATEGY + +def size_equal_weight(candidates: List[str]) -> Dict[str, float]: + if not candidates: + return {} + w = 1.0 / max(1, len(candidates)) + return {isin: w for isin in candidates} + +def size_risk_parity(candidates: List[str], + returns_wide: pd.DataFrame, + asof_idx: int) -> Dict[str, float]: + """ + Risk Parity sui soli candidati, con cap per singolo asset (RP_MAX_WEIGHT), + allineato alla logica usata nel backtest v3.1.5. + Eventuale parte non allocata resta in cash (non rinormalizziamo dopo il cap). + """ + if not candidates: + return {} + L = RISK_PARITY_LOOKBACK + start = max(0, asof_idx - L + 1) + sub = returns_wide.iloc[start:asof_idx+1][candidates] + vol = sub.std().replace(0, np.nan) + inv_vol = 1.0 / vol + inv_vol = inv_vol.replace([np.inf, -np.inf], np.nan).fillna(0.0) + if inv_vol.sum() == 0: + return size_equal_weight(candidates) + w = (inv_vol / inv_vol.sum()).to_dict() + # Cap per singolo peso, come RP_MAX_WEIGHT nel Pattern Recon + if RP_MAX_WEIGHT is not None: + w = {k: min(RP_MAX_WEIGHT, float(v)) for k, v in w.items()} + return w + +def _risk_exit_flags(isin: str, + entry_date: Optional[dt.date], + ret_wide: pd.DataFrame, + decision_date: dt.date, + est_map_today: pd.Series) -> List[str]: + """Calcola motivi di uscita (SL/TP/Trailing/Time/Weak) per una posizione.""" + reasons: List[str] = [] + pnl_if_stay = 0.0 + peak_dd = 0.0 + if entry_date is not None and isin in ret_wide.columns: + sub = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)] + if not sub.empty and sub.index[0].date() == entry_date: + sub = sub.iloc[1:] + if not sub.empty: + eq_path = (1.0 + sub.fillna(0.0)).cumprod() + pnl_if_stay = float(eq_path.iloc[-1] - 1.0) + peak_curve = eq_path.cummax() + dd_series = eq_path / peak_curve - 1.0 + peak_dd = float(-dd_series.min()) + + if SL_BPS is not None and pnl_if_stay <= -SL_BPS/10000.0: + reasons.append("SL") + if TP_BPS is not None and pnl_if_stay >= TP_BPS/10000.0: + reasons.append("TP") + if TRAIL_BPS is not None and peak_dd >= TRAIL_BPS/10000.0: + reasons.append("TRAIL") + + if TIME_STOP_BARS is not None and entry_date is not None and isin in ret_wide.columns: + sub_all = ret_wide[isin].loc[pd.Timestamp(entry_date):pd.Timestamp(decision_date)] + bars = len(sub_all) - (1 if (not sub_all.empty and sub_all.index[0].date() == entry_date) else 0) + if bars >= int(TIME_STOP_BARS): + reasons.append("TIME") + + if THETA_EXIT is not None and est_map_today is not None and isin in est_map_today.index: + est_out_today = est_map_today.loc[isin] + if pd.notna(est_out_today) and float(est_out_today) <= float(THETA_EXIT): + reasons.append("WEAK") + return reasons + +def update_positions_and_build_orders(universe: pd.DataFrame, + returns_long: pd.DataFrame, + signals_today: pd.DataFrame, + today: dt.date, + buy_rank_df: Optional[pd.DataFrame], + allowed_open_isins: Optional[List[str]] = None, + asset_name_map: Optional[pd.Series] = None) -> Tuple[pd.DataFrame, List[Dict]]: + """ + - decision_date = ultima data disponibile (EOD) + - target giornaliero = primi MAX_OPEN del ranking buy (uguale per tutte le strategie) + - per ogni strategia: revisione per differenza vs posizioni già aperte + - CLOSE anche per risk exits (SL/TP/Trailing/Time/Weak) + - Esecuzione a t+1 open; fetch prezzi UNA VOLTA per tutti gli ISIN con ordini + + NB: strategia Aggressiva/Crypto rimossa. Restano ONLY: + - Equal_Weight + - Risk_Parity (con cap per singolo asset) + """ + strategies = ["Equal_Weight", "Risk_Parity"] + + ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() + decision_date: dt.date = ret_wide.index[-1].date() + asof_idx = len(ret_wide.index) - 1 + next_open_date = next_business_day(decision_date) + + # Ranking e maps di oggi + est_map_all = None + if buy_rank_df is not None and not buy_rank_df.empty: + est_map_all = buy_rank_df.set_index("ISIN")["EstOutcome"] + else: + # fallback: usa tutti i segnali di oggi + day_df = signals_today.reset_index().query("Date == @decision_date") + if not day_df.empty: + est_map_all = day_df.set_index("ISIN")["EstOutcome"] + + target_isins: List[str] = allowed_open_isins or [] + target_set: Set[str] = set(target_isins) + + # Sizing per strategia basato sul target + size_eq = size_equal_weight(target_isins) + size_rp = size_risk_parity(target_isins, ret_wide, asof_idx) + sizing_by_strategy = {"Equal_Weight": size_eq, "Risk_Parity": size_rp} + + open_concat: List[pd.DataFrame] = [] + audit_rows_all: List[Dict] = [] + isins_for_open_fetch: Set[str] = set() + isins_for_close_fetch: Set[str] = set() + + for strat in strategies: + df_open = load_open_trades(strat) + current_set: Set[str] = set(df_open["ISIN"].astype(str).tolist()) + + # --- risk exits su posizioni correnti --- + closers: List[Tuple[str, str]] = [] + if not df_open.empty: + if "WeakDays" not in df_open.columns: + df_open["WeakDays"] = 0 + + for _, row_open in df_open.iterrows(): + isin = str(row_open["ISIN"]) + entry_date = pd.to_datetime(row_open["EntryDate"]).date() if pd.notna(row_open["EntryDate"]) else None + reasons = _risk_exit_flags(isin, entry_date, ret_wide, decision_date, est_map_all if est_map_all is not None else pd.Series(dtype=float)) + if reasons: + closers.append((isin, "+".join(sorted(set(reasons))))) + + # --- chiusure per ranking (scivolati fuori dal top-N) --- + drop_by_rank = list(current_set - target_set) + for isin in drop_by_rank: + closers.append((isin, "RANK")) + + # --- applica chiusure (accumula ordini) --- + for isin, reason in closers: + row_open = df_open.loc[df_open["ISIN"] == isin] + linked_date = row_open["EntryDate"].iloc[0] if not row_open.empty else None + + isins_for_close_fetch.add(isin) + audit_rows_all.append({ + "Strategy": strat, + "ISIN": isin, + "Action": "CLOSE", + "TradeDate": next_open_date, + "EntryIndex": np.nan, + "EntryAmount": np.nan, + "SizeWeight": np.nan, + "Price": None, # riempito dopo fetch + "PnL_%": np.nan, + "ExitReason": reason, + "LinkedOpenDate": linked_date, + "Duration_bars": np.nan, + "Notes": f"DecisionDate={decision_date}" + }) + df_open = df_open[df_open["ISIN"] != isin] + current_set.discard(isin) + + # --- aperture per differenza verso target --- + add_list = [isin for isin in target_isins if isin not in current_set] # mantiene l'ordine del ranking + w_dict = sizing_by_strategy[strat] + if add_list and w_dict: + cap = compute_current_capital_from_log(strat, ret_wide, decision_date) + for isin in add_list: + w = float(w_dict.get(isin, 0.0)) + if w <= 0: + continue + notional = max(MIN_TRADE_NOTIONAL, cap * w) + entry_idx = asof_idx + 1 + + isins_for_open_fetch.add(isin) + audit_rows_all.append({ + "Strategy": strat, + "ISIN": isin, + "Action": "OPEN", + "TradeDate": next_open_date, + "EntryIndex": entry_idx, + "EntryAmount": float(notional), + "SizeWeight": float(w), + "Price": None, # riempito dopo fetch + "PnL_%": np.nan, + "ExitReason": "", + "LinkedOpenDate": "", + "Duration_bars": 0, + "Notes": f"DecisionDate={decision_date}" + }) + df_open = pd.concat([df_open, pd.DataFrame([{ + "Strategy": strat, + "ISIN": isin, + "EntryDate": decision_date, + "EntryIndex": entry_idx, + "EntryAmount": float(notional), + "SizeWeight": float(w), + "PeakPnL": 0.0, + "WeakDays": 0, + "Notes": "" + }])], ignore_index=True) + current_set.add(isin) + + if asset_name_map is not None: + df_open["AssetName"] = df_open["ISIN"].astype(str).map(asset_name_map).fillna("") + else: + if "AssetName" not in df_open.columns: + df_open["AssetName"] = "" + if "AssetName" in df_open.columns: + cols = list(df_open.columns) + if "ISIN" in cols and "AssetName" in cols: + cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName"))) + df_open = df_open[cols] + + save_open_trades(strat, df_open) + df_open["Strategy"] = strat + open_concat.append(df_open) + + # ---- FETCH UNA VOLTA (OPEN + CLOSE) ---- + fetch_isins = sorted(isins_for_open_fetch.union(isins_for_close_fetch)) + if fetch_isins: + print(f"[PRICE] fetch open per {len(fetch_isins)} ISIN (open={len(isins_for_open_fetch)}, close={len(isins_for_close_fetch)})", flush=True) + px_cache = _fetch_open_prices_once(fetch_isins, universe) + for r in audit_rows_all: + if r.get("Price") is None: + r["Price"] = _safe_to_float(px_cache.get(r["ISIN"])) + + # ---- SUMMARY ---- + n_open = sum(1 for r in audit_rows_all if r.get("Action") == "OPEN") + n_close = sum(1 for r in audit_rows_all if r.get("Action") == "CLOSE") + print(f"[SUMMARY] decision_date={decision_date} opens={n_open} closes={n_close} target={len(target_isins)} (cap={MAX_OPEN})", flush=True) + + open_all = pd.concat(open_concat, ignore_index=True) if open_concat else pd.DataFrame() + return open_all, audit_rows_all + +# ========================= +# MAIN RUN +# ========================= +def main_run(run_date: Optional[dt.date] = None): + today = run_date or dt.date.today() + ensure_dir(OUTPUT_DIR) + + # 1) Universo + universe = load_universe(UNIVERSO_XLSX) + asset_name_col = detect_column(universe, [ + "Nome", "Name", "Asset", "Asset Name", "Descrizione", "Description" + ]) + if not asset_name_col: + print("[WARN] Colonna con il nome dell'asset non trovata nell'universo.") + asset_name_map: Optional[pd.Series] = None + if asset_name_col: + asset_name_map = ( + universe[["ISIN", asset_name_col]] + .dropna(subset=["ISIN"]) + .assign(ISIN=lambda df: df["ISIN"].astype(str).str.strip()) + ) + asset_name_map = asset_name_map.set_index("ISIN")[asset_name_col].astype(str).str.strip() + + # 2) Ritorni (DB) + conn_str = read_connection_txt(CONNECTION_TXT) + isins = universe["ISIN"].tolist() + returns_long = _db_fetch_returns( + conn_str, + isins, + sp_name=SP_NAME_DEFAULT, + n_bars=SP_N_DEFAULT, + ptf_curr=PTF_CURR_DEFAULT, + ) if returns_long.empty: raise RuntimeError("Nessun rendimento disponibile dal DB (SP vuota?).") @@ -709,65 +709,65 @@ def main_run(run_date: Optional[dt.date] = None): lookback=HURST_LOOKBACK or SP_N_DEFAULT, min_length=HURST_MIN_LENGTH, ) - - # 3) Segnali EOD su D con THETA = Hurst/100 per ISIN - sig_df = generate_signals_today(universe, returns_long, today, hurst_map=hurst_map) - - # 3b) Ranking unico e selezione Top-N - decision_date = returns_long["Date"].max().date() - buy_rank_df = _rank_buy(sig_df, decision_date) # tutti i buy ordinati - allowed_open = _select_top_signals(buy_rank_df, MAX_OPEN) # top-N ISIN - - # 4) Posizioni + audit (OPEN/CLOSE) con target Top-N - open_df, audit_rows = update_positions_and_build_orders( - universe, returns_long, sig_df, today, - buy_rank_df=buy_rank_df, - allowed_open_isins=allowed_open, - asset_name_map=asset_name_map, - ) - - # 5) Append audit log (TUTTE le strategie operative) - if audit_rows: - append_audit_rows(audit_rows) - - # 6) Snapshot Excel datato — fogli con nomi completi - ensure_dir(OPEN_TRADES_DIR) - signals_path = _dated_signals_filename() - signals_sheet = sig_df.reset_index() - if asset_name_map is not None: - signals_sheet["AssetName"] = signals_sheet["ISIN"].astype(str).map(asset_name_map).fillna("") - else: - signals_sheet["AssetName"] = "" - - # inserisci la colonna subito dopo l'ISIN - if "AssetName" in signals_sheet.columns: - cols = list(signals_sheet.columns) - cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName"))) - signals_sheet = signals_sheet[cols] - - with pd.ExcelWriter(signals_path) as xw: - signals_sheet.to_excel(xw, sheet_name="Signals", index=False) - if not open_df.empty: - for strat, g in open_df.groupby("Strategy"): - sheet_name_map = { - "Equal_Weight": "Open_Equal_Weight", - "Risk_Parity": "Open_Risk_Parity", - } - sheet_name = sheet_name_map.get(strat, f"Open_{strat}")[:31] - g.to_excel(xw, sheet_name=sheet_name, index=False) - - copy_to_dropbox(signals_path) - for strat in ["Equal_Weight", "Risk_Parity"]: - csv_path = open_trades_path(strat) - if csv_path.exists(): - copy_to_dropbox(csv_path) - - print(f"✅ Signals generated for {today}. Saved to {signals_path}") - print(f"Open trades saved in {OPEN_TRADES_DIR}") - print(f"Audit log updated at {AUDIT_LOG_CSV}") - -# ========================= -# ENTRY POINT -# ========================= -if __name__ == "__main__": - main_run() + + # 3) Segnali EOD su D con THETA = Hurst/100 per ISIN + sig_df = generate_signals_today(universe, returns_long, today, hurst_map=hurst_map) + + # 3b) Ranking unico e selezione Top-N + decision_date = returns_long["Date"].max().date() + buy_rank_df = _rank_buy(sig_df, decision_date) # tutti i buy ordinati + allowed_open = _select_top_signals(buy_rank_df, MAX_OPEN) # top-N ISIN + + # 4) Posizioni + audit (OPEN/CLOSE) con target Top-N + open_df, audit_rows = update_positions_and_build_orders( + universe, returns_long, sig_df, today, + buy_rank_df=buy_rank_df, + allowed_open_isins=allowed_open, + asset_name_map=asset_name_map, + ) + + # 5) Append audit log (TUTTE le strategie operative) + if audit_rows: + append_audit_rows(audit_rows) + + # 6) Snapshot Excel datato — fogli con nomi completi + ensure_dir(OPEN_TRADES_DIR) + signals_path = _dated_signals_filename() + signals_sheet = sig_df.reset_index() + if asset_name_map is not None: + signals_sheet["AssetName"] = signals_sheet["ISIN"].astype(str).map(asset_name_map).fillna("") + else: + signals_sheet["AssetName"] = "" + + # inserisci la colonna subito dopo l'ISIN + if "AssetName" in signals_sheet.columns: + cols = list(signals_sheet.columns) + cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName"))) + signals_sheet = signals_sheet[cols] + + with pd.ExcelWriter(signals_path) as xw: + signals_sheet.to_excel(xw, sheet_name="Signals", index=False) + if not open_df.empty: + for strat, g in open_df.groupby("Strategy"): + sheet_name_map = { + "Equal_Weight": "Open_Equal_Weight", + "Risk_Parity": "Open_Risk_Parity", + } + sheet_name = sheet_name_map.get(strat, f"Open_{strat}")[:31] + g.to_excel(xw, sheet_name=sheet_name, index=False) + + copy_to_dropbox(signals_path) + for strat in ["Equal_Weight", "Risk_Parity"]: + csv_path = open_trades_path(strat) + if csv_path.exists(): + copy_to_dropbox(csv_path) + + print(f"✅ Signals generated for {today}. Saved to {signals_path}") + print(f"Open trades saved in {OPEN_TRADES_DIR}") + print(f"Audit log updated at {AUDIT_LOG_CSV}") + +# ========================= +# ENTRY POINT +# ========================= +if __name__ == "__main__": + main_run() diff --git a/Trading Pattern Recon w Hurst v.3.1.6.py b/Trading Pattern Recon w Hurst.py similarity index 100% rename from Trading Pattern Recon w Hurst v.3.1.6.py rename to Trading Pattern Recon w Hurst.py