Add shared utilities and config
This commit is contained in:
199
shared_utils.py
Normal file
199
shared_utils.py
Normal file
@@ -0,0 +1,199 @@
|
||||
"""Shared helpers for trading pattern scripts."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Sequence, Tuple
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyodbc
|
||||
|
||||
DEFAULT_CONFIG_PATH = Path("config/pattern_knn_config.json")
|
||||
|
||||
|
||||
def load_config(path: Optional[Path] = None) -> Dict:
|
||||
"""Load the JSON configuration that holds operational parameters."""
|
||||
cfg_path = Path(path or DEFAULT_CONFIG_PATH)
|
||||
if not cfg_path.exists():
|
||||
raise FileNotFoundError(f"Missing configuration file: {cfg_path}")
|
||||
with cfg_path.open("r", encoding="utf-8") as fh:
|
||||
return json.load(fh)
|
||||
|
||||
|
||||
def detect_column(df: pd.DataFrame, candidates: Sequence[str]) -> Optional[str]:
|
||||
"""Return the first column whose name matches one of the candidates (case insensitive)."""
|
||||
low = {c.lower(): c for c in df.columns}
|
||||
for cand in candidates:
|
||||
cl = cand.lower()
|
||||
if cl in low:
|
||||
return low[cl]
|
||||
for cand in candidates:
|
||||
cl = cand.lower()
|
||||
for col in df.columns:
|
||||
if cl in col.lower():
|
||||
return col
|
||||
return None
|
||||
|
||||
|
||||
def read_connection_txt(path: Path | str = "connection.txt") -> str:
|
||||
params: Dict[str, str] = {}
|
||||
path = Path(path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Missing connection.txt at {path}")
|
||||
for line in path.read_text(encoding="utf-8").splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
k, v = line.split("=", 1)
|
||||
params[k.strip().lower()] = v.strip()
|
||||
|
||||
username = params.get("username")
|
||||
password = params.get("password")
|
||||
host = params.get("host")
|
||||
port = params.get("port", "1433")
|
||||
database = params.get("database")
|
||||
|
||||
if not all([username, password, host, database]):
|
||||
raise ValueError("connection.txt incompleto: servono username/password/host/database.")
|
||||
|
||||
installed = [d for d in pyodbc.drivers()]
|
||||
driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server"
|
||||
return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}"
|
||||
|
||||
|
||||
def z_norm(arr: np.ndarray) -> Optional[np.ndarray]:
|
||||
arr = np.asarray(arr, dtype=float)
|
||||
if arr.size == 0:
|
||||
return None
|
||||
mu = arr.mean()
|
||||
sd = arr.std()
|
||||
if sd < 1e-12:
|
||||
return None
|
||||
return (arr - mu) / (sd + 1e-12)
|
||||
|
||||
|
||||
def build_pattern_library(ret_series: pd.Series, wp: int, ha: int) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
|
||||
x = ret_series.dropna().values
|
||||
n = len(x)
|
||||
if n < wp + ha + 10:
|
||||
return None, None
|
||||
wins: List[np.ndarray] = []
|
||||
outs: List[float] = []
|
||||
for t in range(0, n - wp - ha):
|
||||
win = x[t : t + wp]
|
||||
winzn = z_norm(win)
|
||||
if winzn is None:
|
||||
continue
|
||||
outcome = np.sum(x[t + wp : t + wp + ha])
|
||||
wins.append(winzn)
|
||||
outs.append(outcome)
|
||||
if not wins:
|
||||
return None, None
|
||||
return np.array(wins), np.array(outs)
|
||||
|
||||
|
||||
def predict_from_library(
|
||||
curr_win: np.ndarray,
|
||||
lib_wins: np.ndarray,
|
||||
lib_out: np.ndarray,
|
||||
k: int = 25,
|
||||
) -> Tuple[float, float, np.ndarray]:
|
||||
dists = np.linalg.norm(lib_wins - curr_win, axis=1)
|
||||
idx = np.argsort(dists)[: min(k, len(dists))]
|
||||
return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx
|
||||
|
||||
|
||||
def characterize_window(
|
||||
ret_series: pd.Series,
|
||||
wp: int,
|
||||
z_rev: float = 2.0,
|
||||
z_vol: float = 2.0,
|
||||
std_comp_pct: float = 0.15,
|
||||
) -> Tuple[Optional[str], float]:
|
||||
x = ret_series.dropna().values
|
||||
if len(x) < max(wp, 30):
|
||||
return None, 0.0
|
||||
win = x[-wp:]
|
||||
mu, sd = win.mean(), win.std()
|
||||
if sd < 1e-12:
|
||||
return "compression", 0.5
|
||||
|
||||
last = win[-1]
|
||||
z_last = (last - mu) / (sd + 1e-12)
|
||||
abs_z_last = abs(z_last)
|
||||
last3 = win[-3:] if len(win) >= 3 else win
|
||||
sum3 = np.sum(last3)
|
||||
|
||||
if len(x) > 3 * wp:
|
||||
roll_std = pd.Series(x).rolling(wp).std().dropna().values
|
||||
if len(roll_std) > 20:
|
||||
pct = (roll_std < np.std(win)).mean()
|
||||
else:
|
||||
pct = 0.5
|
||||
else:
|
||||
pct = 0.5
|
||||
|
||||
if pct < std_comp_pct:
|
||||
return "compression", float(1.0 - pct)
|
||||
|
||||
if abs(sum3) > 2 * sd / np.sqrt(3) and np.sign(last3).sum() in (3, -3):
|
||||
conf = min(1.0, abs(sum3) / (sd + 1e-12))
|
||||
return "momentum_burst", float(conf)
|
||||
|
||||
mean_prev = np.mean(win[:-1]) if len(win) > 1 else 0.0
|
||||
if abs_z_last >= z_rev and np.sign(last) != np.sign(mean_prev):
|
||||
conf = min(1.0, abs_z_last / 3.0)
|
||||
return "reversal_candidate", float(conf)
|
||||
|
||||
if abs_z_last >= z_vol:
|
||||
conf = min(1.0, abs_z_last / 3.0)
|
||||
return "vol_spike", float(conf)
|
||||
|
||||
return None, 0.0
|
||||
|
||||
|
||||
def hurst_rs(series: pd.Series) -> Optional[float]:
|
||||
x = pd.to_numeric(series.dropna(), errors="coerce").astype(float).values
|
||||
n = len(x)
|
||||
if n < 100:
|
||||
return None
|
||||
x = x - x.mean()
|
||||
z = np.cumsum(x)
|
||||
r = z.max() - z.min()
|
||||
s = x.std(ddof=1)
|
||||
if s <= 0 or r <= 0:
|
||||
return None
|
||||
h = np.log(r / s) / np.log(n)
|
||||
if not np.isfinite(h):
|
||||
return None
|
||||
return float(h)
|
||||
|
||||
|
||||
def build_hurst_map(returns_long: pd.DataFrame, lookback: int = 252) -> Dict[str, float]:
|
||||
if returns_long.empty:
|
||||
return {}
|
||||
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
|
||||
hurst_map: Dict[str, float] = {}
|
||||
for isin in ret_wide.columns:
|
||||
series = ret_wide[isin].dropna().astype(float)
|
||||
if len(series) < max(lookback, 100):
|
||||
continue
|
||||
h_val = hurst_rs(series.iloc[-lookback:])
|
||||
if h_val is None or not np.isfinite(h_val):
|
||||
continue
|
||||
hurst_map[str(isin)] = float(h_val)
|
||||
return hurst_map
|
||||
|
||||
|
||||
__all__ = [
|
||||
"build_hurst_map",
|
||||
"build_pattern_library",
|
||||
"characterize_window",
|
||||
"detect_column",
|
||||
"hurst_rs",
|
||||
"load_config",
|
||||
"predict_from_library",
|
||||
"read_connection_txt",
|
||||
"z_norm",
|
||||
]
|
||||
Reference in New Issue
Block a user