"""Shared helpers for trading pattern scripts.""" from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple import numpy as np import pandas as pd import pyodbc CONFIG_FILENAME = "pattern_knn_config.json" # Ordine di ricerca del file di configurazione. Il primo trovato vince. # Tutti i path sono relativi al CWD eccetto quello accanto a shared_utils.py. _CONFIG_SEARCH_PATHS = [ Path(CONFIG_FILENAME), # root del progetto Path("config") / CONFIG_FILENAME, # sottocartella config/ Path(__file__).resolve().parent / CONFIG_FILENAME, # stessa cartella di shared_utils.py Path(__file__).resolve().parent / "config" / CONFIG_FILENAME, ] # Backward-compat: alcuni script importano DEFAULT_CONFIG_PATH direttamente. DEFAULT_CONFIG_PATH = _CONFIG_SEARCH_PATHS[0] def _resolve_config_path(explicit: Optional[Path] = None) -> Path: """Restituisce il path al file di config, cercando in piu' posizioni standard. Priorita': 1. Argomento esplicito passato a load_config() 2. Variabile ambiente PATTERN_KNN_CONFIG_PATH 3. Lista _CONFIG_SEARCH_PATHS in ordine """ import os if explicit is not None: return Path(explicit) env_path = os.environ.get("PATTERN_KNN_CONFIG_PATH") if env_path: return Path(env_path) for candidate in _CONFIG_SEARCH_PATHS: if candidate.exists(): return candidate # Nessuna posizione standard trovata: ritorna la prima per generare un # messaggio di errore informativo a valle. return _CONFIG_SEARCH_PATHS[0] def load_config(path: Optional[Path] = None) -> Dict: """Load the JSON configuration that holds operational parameters.""" cfg_path = _resolve_config_path(path) if not cfg_path.exists(): searched = "\n - ".join(str(p.resolve()) for p in _CONFIG_SEARCH_PATHS) raise FileNotFoundError( f"Missing configuration file '{CONFIG_FILENAME}'.\n" f"Searched in:\n - {searched}\n" f"Set PATTERN_KNN_CONFIG_PATH env var to override." ) with cfg_path.open("r", encoding="utf-8") as fh: return json.load(fh) def require_section(config: Dict, section: str) -> Dict: sect = config.get(section) if not isinstance(sect, dict): raise KeyError(f"Missing '{section}' section in configuration file") return sect def require_value(section: Dict, key: str, section_name: str) -> Any: if key not in section: raise KeyError(f"Missing key '{key}' inside '{section_name}' section of configuration file") return section[key] def detect_column(df: pd.DataFrame, candidates: Sequence[str]) -> Optional[str]: """Return the first column whose name matches one of the candidates (case insensitive).""" low = {c.lower(): c for c in df.columns} for cand in candidates: cl = cand.lower() if cl in low: return low[cl] for cand in candidates: cl = cand.lower() for col in df.columns: if cl in col.lower(): return col return None def read_connection_txt(path: Path | str = "connection.txt") -> str: params: Dict[str, str] = {} path = Path(path) if not path.exists(): raise FileNotFoundError(f"Missing connection.txt at {path}") for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) params[k.strip().lower()] = v.strip() username = params.get("username") password = params.get("password") host = params.get("host") port = params.get("port", "1433") database = params.get("database") if not all([username, password, host, database]): raise ValueError("connection.txt incompleto: servono username/password/host/database.") installed = [d for d in pyodbc.drivers()] driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server" return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}" def z_norm(arr: np.ndarray) -> Optional[np.ndarray]: arr = np.asarray(arr, dtype=float) if arr.size == 0: return None mu = arr.mean() sd = arr.std() if sd < 1e-12: return None return (arr - mu) / (sd + 1e-12) def build_pattern_library( ret_series: pd.Series, wp: int, ha: int, embargo: Optional[int] = None, ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """Create the normalized pattern windows and their realized outcomes. Args: ret_series: Series of returns (ordered oldest->latest). wp: Window length for the pattern. ha: Holding horizon used to compute the outcome. embargo: Optional number of most-recent observations to exclude when building the library (useful to avoid leakage when reusing the same series for inference). """ x = ret_series.dropna().values n = len(x) if n < wp + ha + 10: return None, None embargo = int(embargo or 0) usable_n = n - max(0, embargo) if usable_n <= wp + ha: return None, None wins: List[np.ndarray] = [] outs: List[float] = [] last_start = usable_n - wp - ha if last_start <= 0: return None, None for t in range(0, last_start + 1): win = x[t : t + wp] winzn = z_norm(win) if winzn is None: continue outcome = np.sum(x[t + wp : t + wp + ha]) wins.append(winzn) outs.append(outcome) if not wins: return None, None return np.array(wins), np.array(outs) def predict_from_library( curr_win: np.ndarray, lib_wins: np.ndarray, lib_out: np.ndarray, k: int = 25, ) -> Tuple[float, float, np.ndarray]: dists = np.linalg.norm(lib_wins - curr_win, axis=1) idx = np.argsort(dists)[: min(k, len(dists))] return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx def characterize_window( ret_series: pd.Series, wp: int, z_rev: float = 2.0, z_vol: float = 2.0, std_comp_pct: float = 0.15, ) -> Tuple[Optional[str], float]: x = ret_series.dropna().values if len(x) < max(wp, 30): return None, 0.0 win = x[-wp:] mu, sd = win.mean(), win.std() if sd < 1e-12: return "compression", 0.5 last = win[-1] z_last = (last - mu) / (sd + 1e-12) abs_z_last = abs(z_last) last3 = win[-3:] if len(win) >= 3 else win sum3 = np.sum(last3) if len(x) > 3 * wp: roll_std = pd.Series(x).rolling(wp).std().dropna().values if len(roll_std) > 20: pct = (roll_std < np.std(win)).mean() else: pct = 0.5 else: pct = 0.5 if pct < std_comp_pct: return "compression", float(1.0 - pct) if abs(sum3) > 2 * sd / np.sqrt(3) and np.sign(last3).sum() in (3, -3): conf = min(1.0, abs(sum3) / (sd + 1e-12)) return "momentum_burst", float(conf) mean_prev = np.mean(win[:-1]) if len(win) > 1 else 0.0 if abs_z_last >= z_rev and np.sign(last) != np.sign(mean_prev): conf = min(1.0, abs_z_last / 3.0) return "reversal_candidate", float(conf) if abs_z_last >= z_vol: conf = min(1.0, abs_z_last / 3.0) return "vol_spike", float(conf) return None, 0.0 def hurst_rs(series: pd.Series) -> Optional[float]: x = pd.to_numeric(series.dropna(), errors="coerce").astype(float).values n = len(x) if n < 100: return None x = x - x.mean() z = np.cumsum(x) r = z.max() - z.min() s = x.std(ddof=1) if s <= 0 or r <= 0: return None h = np.log(r / s) / np.log(n) if not np.isfinite(h): return None return float(h) def build_hurst_map(returns_long: pd.DataFrame, lookback: int = 252) -> Dict[str, float]: if returns_long.empty: return {} ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index() hurst_map: Dict[str, float] = {} for isin in ret_wide.columns: series = ret_wide[isin].dropna().astype(float) if len(series) < max(lookback, 100): continue h_val = hurst_rs(series.iloc[-lookback:]) if h_val is None or not np.isfinite(h_val): continue hurst_map[str(isin)] = float(h_val) return hurst_map __all__ = [ "build_hurst_map", "build_pattern_library", "characterize_window", "detect_column", "require_section", "require_value", "hurst_rs", "load_config", "predict_from_library", "read_connection_txt", "z_norm", ]