Merge pull request #3 from fredmaloggia/codex/review-function-consolidation-across-files

Refactor shared logic and load runtime config
This commit is contained in:
fredmaloggia
2025-11-17 15:37:52 +01:00
committed by GitHub
4 changed files with 362 additions and 321 deletions

View File

@@ -19,8 +19,17 @@ import pandas as pd
import numpy as np import numpy as np
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import text from sqlalchemy import text
import pyodbc
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from shared_utils import (
build_pattern_library,
characterize_window,
detect_column,
load_config,
predict_from_library,
read_connection_txt,
z_norm,
)
#from math import isfinite #from math import isfinite
import time import time
@@ -59,6 +68,12 @@ def savefig_safe(path, **kwargs):
# ========================================= # =========================================
# PARAMETRI GLOBALI # PARAMETRI GLOBALI
# ========================================= # =========================================
CONFIG = load_config()
DB_CONFIG = CONFIG.get("db", {})
PATTERN_CONFIG = CONFIG.get("pattern", {})
TAGGING_CONFIG = CONFIG.get("tagging", {})
RANKING_CONFIG = CONFIG.get("ranking", {})
UNIVERSO_XLSX = "Universo per Trading System.xlsx" UNIVERSO_XLSX = "Universo per Trading System.xlsx"
# Export # Export
@@ -67,70 +82,30 @@ OUTPUT_PATTERN_XLSX = "pattern_signals.xlsx"
ERROR_LOG_CSV = "errori_isin.csv" ERROR_LOG_CSV = "errori_isin.csv"
# Stored Procedure & parametri # Stored Procedure & parametri
STORED_PROC = "opt_RendimentoGiornaliero1_ALL" STORED_PROC = DB_CONFIG.get("stored_proc", "opt_RendimentoGiornaliero1_ALL")
N_BARS = 1305 N_BARS = DB_CONFIG.get("n_bars", 1305)
PTF_CURR = "EUR" PTF_CURR = DB_CONFIG.get("ptf_curr", "EUR")
# Pattern-matching (iper-parametri) # Pattern-matching (iper-parametri)
WP = 60 # lunghezza finestra pattern (barre) WP = PATTERN_CONFIG.get("wp", 60) # lunghezza finestra pattern (barre)
HA = 10 # orizzonte outcome (barre) HA = PATTERN_CONFIG.get("ha", 10) # orizzonte outcome (barre)
KNN_K = 25 # numero di vicini KNN_K = PATTERN_CONFIG.get("knn_k", 25) # numero di vicini
THETA = 0.005 # soglia su outcome per generare segnale THETA = PATTERN_CONFIG.get("theta", 0.005) # soglia su outcome per generare segnale
EMBARGO = WP + HA EMBARGO = PATTERN_CONFIG.get("embargo", WP + HA)
# Tagging rule-based (soglie) # Tagging rule-based (soglie)
Z_REV = 2.0 Z_REV = TAGGING_CONFIG.get("z_rev", 2.0)
Z_VOL = 2.0 Z_VOL = TAGGING_CONFIG.get("z_vol", 2.0)
STD_COMP_PCT = 0.15 STD_COMP_PCT = TAGGING_CONFIG.get("std_comp_pct", 0.15)
DAYS_PER_YEAR = 252 DAYS_PER_YEAR = 252
TOP_N_MAX = 15 # numero massimo di asset ammessi TOP_N_MAX = RANKING_CONFIG.get("top_n_max", 15) # numero massimo di asset ammessi
RP_MAX_WEIGHT = 2 / TOP_N_MAX # 2 x 1/15 ≈ 0.1333 = 13,33% RP_MAX_WEIGHT = RANKING_CONFIG.get("rp_max_weight", 2 / max(TOP_N_MAX, 1)) # 2 x 1/15 ≈ 0.1333 = 13,33%
# ========================================= # =========================================
# UTILS GENERALI # UTILS GENERALI
# ========================================= # =========================================
def pick_first(df, candidates):
low = {c.lower(): c for c in df.columns}
for c in candidates:
if c.lower() in low:
return low[c.lower()]
for c in candidates:
matches = [low[k] for k in low if c.lower() in k]
if matches:
return matches[0]
return None
def read_connection_txt(path="connection.txt"):
"""
connection.txt con:
username=...
password=...
host=...
port=1433
database=...
"""
params = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
params[k.strip().lower()] = v.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
if not all([username, password, host, database]):
raise ValueError("connection.txt incompleto: username/password/host/database richiesti.")
installed = [d for d in pyodbc.drivers()]
driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server"
return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}"
def clamp01(x): def clamp01(x):
if not np.isfinite(x): if not np.isfinite(x):
return np.nan return np.nan
@@ -229,81 +204,6 @@ def hurst_dfa_returns(r, win_grid=None):
slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1) slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1)
return clamp01(slope) return clamp01(slope)
# =========== PATTERN MATCHING k-NN (su rendimenti) ===========
def z_norm(arr):
arr = np.asarray(arr, dtype=float)
mu = arr.mean(); sd = arr.std()
if sd < 1e-12:
return None
return (arr - mu)/(sd+1e-12)
def build_pattern_library(ret_series: pd.Series, Wp: int, Ha: int, embargo: int = None):
x = ret_series.dropna().values
N = len(x)
if N < Wp + Ha + 10:
return None, None
wins = []
outs = []
for t in range(0, N - Wp - Ha):
win = x[t:t+Wp]
winzn = z_norm(win)
if winzn is None:
continue
outcome = np.sum(x[t+Wp : t+Wp+Ha]) # outcome futuro su Ha barre
wins.append(winzn); outs.append(outcome)
if not wins:
return None, None
return np.array(wins), np.array(outs)
def predict_from_library(curr_win: np.ndarray, lib_wins: np.ndarray, lib_out: np.ndarray, k: int = 25):
dists = np.linalg.norm(lib_wins - curr_win, axis=1)
idx = np.argsort(dists)[:min(k, len(dists))]
return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx
# =========== TAGGING RULE-BASED (4 categorie) ===========
def characterize_window(ret_series: pd.Series, Wp: int,
z_rev=2.0, z_vol=2.0, std_comp_pct=0.15):
x = ret_series.dropna().values
if len(x) < max(WP, 30):
return None, 0.0
win = x[-Wp:]
mu, sd = win.mean(), win.std()
if sd < 1e-12:
return "compression", 0.5
last = win[-1]
z_last = (last - mu)/(sd+1e-12)
abs_z_last = abs(z_last)
last3 = win[-3:] if len(win) >= 3 else win
sum3 = np.sum(last3)
if len(x) > 3*Wp:
roll_std = pd.Series(x).rolling(Wp).std().dropna().values
if len(roll_std) > 20:
pct = (roll_std < np.std(win)).mean()
else:
pct = 0.5
else:
pct = 0.5
if pct < std_comp_pct:
return "compression", float(1.0 - pct)
if abs(sum3) > 2*sd/np.sqrt(3) and np.sign(last3).sum() in (3, -3):
conf = min(1.0, abs(sum3)/(sd+1e-12))
return "momentum_burst", float(conf)
mean_prev = np.mean(win[:-1]) if len(win) > 1 else 0.0
if abs_z_last >= z_rev and np.sign(last) != np.sign(mean_prev):
conf = min(1.0, abs_z_last/3.0)
return "reversal_candidate", float(conf)
if abs_z_last >= z_vol:
conf = min(1.0, abs_z_last/3.0)
return "vol_spike", float(conf)
return None, 0.0
# --------------------------------- # ---------------------------------
# R^2 su equity line (log-equity vs tempo) # R^2 su equity line (log-equity vs tempo)
# --------------------------------- # ---------------------------------
@@ -455,13 +355,13 @@ def h_min_100(returns: pd.Series, month_len: int = 21):
# ========================================= # =========================================
universo = pd.read_excel(UNIVERSO_XLSX) universo = pd.read_excel(UNIVERSO_XLSX)
col_isin_uni = pick_first(universo, ["ISIN", "isin", "codice isin"]) col_isin_uni = detect_column(universo, ["ISIN", "isin", "codice isin"])
if col_isin_uni is None: if col_isin_uni is None:
raise ValueError("Nel file universo non trovo una colonna ISIN.") raise ValueError("Nel file universo non trovo una colonna ISIN.")
col_name_uni = pick_first(universo, ["Nome", "Name", "Descrizione", "Description", "Security Name", "Instrument Name"]) col_name_uni = detect_column(universo, ["Nome", "Name", "Descrizione", "Description", "Security Name", "Instrument Name"])
col_cat_uni = pick_first(universo, ["Categoria", "Category", "Classe", "Linea", "Tipo"]) col_cat_uni = detect_column(universo, ["Categoria", "Category", "Classe", "Linea", "Tipo"])
col_ac_uni = pick_first(universo, ["Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class"]) col_ac_uni = detect_column(universo, ["Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class"])
isins = ( isins = (
universo[col_isin_uni].astype(str).str.strip() universo[col_isin_uni].astype(str).str.strip()
@@ -493,9 +393,9 @@ last_dates = []
sql_sp = text(f"EXEC {STORED_PROC} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") sql_sp = text(f"EXEC {STORED_PROC} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
def detect_cols(df0): def detect_cols(df0):
col_date = pick_first(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"]) col_date = detect_column(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = pick_first(df0, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"]) col_ret = detect_column(df0, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"])
col_px = pick_first(df0, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"]) col_px = detect_column(df0, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"])
return col_date, col_ret, col_px return col_date, col_ret, col_px
ok_count = 0 ok_count = 0

View File

@@ -0,0 +1,35 @@
{
"db": {
"stored_proc": "opt_RendimentoGiornaliero1_ALL",
"n_bars": 1305,
"ptf_curr": "EUR"
},
"pattern": {
"wp": 60,
"ha": 10,
"knn_k": 25,
"theta": 0.005,
"embargo": null
},
"tagging": {
"z_rev": 2.0,
"z_vol": 2.0,
"std_comp_pct": 0.15
},
"ranking": {
"top_n_max": 15,
"rp_max_weight": 0.1333333333
},
"signals": {
"sl_bps": 300.0,
"tp_bps": 800.0,
"trail_bps": 300.0,
"time_stop_bars": 20,
"theta_exit": 0.0,
"weak_days_exit": null,
"max_open": 15,
"base_capital_per_strategy": 100.0,
"min_trade_notional": 0.01,
"risk_parity_lookback": 60
}
}

221
shared_utils.py Normal file
View File

@@ -0,0 +1,221 @@
"""Shared helpers for trading pattern scripts."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
import pyodbc
DEFAULT_CONFIG_PATH = Path("config/pattern_knn_config.json")
def load_config(path: Optional[Path] = None) -> Dict:
"""Load the JSON configuration that holds operational parameters."""
cfg_path = Path(path or DEFAULT_CONFIG_PATH)
if not cfg_path.exists():
raise FileNotFoundError(f"Missing configuration file: {cfg_path}")
with cfg_path.open("r", encoding="utf-8") as fh:
return json.load(fh)
def detect_column(df: pd.DataFrame, candidates: Sequence[str]) -> Optional[str]:
"""Return the first column whose name matches one of the candidates (case insensitive)."""
low = {c.lower(): c for c in df.columns}
for cand in candidates:
cl = cand.lower()
if cl in low:
return low[cl]
for cand in candidates:
cl = cand.lower()
for col in df.columns:
if cl in col.lower():
return col
return None
def read_connection_txt(path: Path | str = "connection.txt") -> str:
params: Dict[str, str] = {}
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Missing connection.txt at {path}")
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
params[k.strip().lower()] = v.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
if not all([username, password, host, database]):
raise ValueError("connection.txt incompleto: servono username/password/host/database.")
installed = [d for d in pyodbc.drivers()]
driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server"
return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}"
def z_norm(arr: np.ndarray) -> Optional[np.ndarray]:
arr = np.asarray(arr, dtype=float)
if arr.size == 0:
return None
mu = arr.mean()
sd = arr.std()
if sd < 1e-12:
return None
return (arr - mu) / (sd + 1e-12)
def build_pattern_library(
ret_series: pd.Series,
wp: int,
ha: int,
embargo: Optional[int] = None,
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
"""Create the normalized pattern windows and their realized outcomes.
Args:
ret_series: Series of returns (ordered oldest→latest).
wp: Window length for the pattern.
ha: Holding horizon used to compute the outcome.
embargo: Optional number of most-recent observations to exclude when
building the library (useful to avoid leakage when reusing the
same series for inference).
"""
x = ret_series.dropna().values
n = len(x)
if n < wp + ha + 10:
return None, None
embargo = int(embargo or 0)
usable_n = n - max(0, embargo)
if usable_n <= wp + ha:
return None, None
wins: List[np.ndarray] = []
outs: List[float] = []
last_start = usable_n - wp - ha
if last_start <= 0:
return None, None
for t in range(0, last_start + 1):
win = x[t : t + wp]
winzn = z_norm(win)
if winzn is None:
continue
outcome = np.sum(x[t + wp : t + wp + ha])
wins.append(winzn)
outs.append(outcome)
if not wins:
return None, None
return np.array(wins), np.array(outs)
def predict_from_library(
curr_win: np.ndarray,
lib_wins: np.ndarray,
lib_out: np.ndarray,
k: int = 25,
) -> Tuple[float, float, np.ndarray]:
dists = np.linalg.norm(lib_wins - curr_win, axis=1)
idx = np.argsort(dists)[: min(k, len(dists))]
return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx
def characterize_window(
ret_series: pd.Series,
wp: int,
z_rev: float = 2.0,
z_vol: float = 2.0,
std_comp_pct: float = 0.15,
) -> Tuple[Optional[str], float]:
x = ret_series.dropna().values
if len(x) < max(wp, 30):
return None, 0.0
win = x[-wp:]
mu, sd = win.mean(), win.std()
if sd < 1e-12:
return "compression", 0.5
last = win[-1]
z_last = (last - mu) / (sd + 1e-12)
abs_z_last = abs(z_last)
last3 = win[-3:] if len(win) >= 3 else win
sum3 = np.sum(last3)
if len(x) > 3 * wp:
roll_std = pd.Series(x).rolling(wp).std().dropna().values
if len(roll_std) > 20:
pct = (roll_std < np.std(win)).mean()
else:
pct = 0.5
else:
pct = 0.5
if pct < std_comp_pct:
return "compression", float(1.0 - pct)
if abs(sum3) > 2 * sd / np.sqrt(3) and np.sign(last3).sum() in (3, -3):
conf = min(1.0, abs(sum3) / (sd + 1e-12))
return "momentum_burst", float(conf)
mean_prev = np.mean(win[:-1]) if len(win) > 1 else 0.0
if abs_z_last >= z_rev and np.sign(last) != np.sign(mean_prev):
conf = min(1.0, abs_z_last / 3.0)
return "reversal_candidate", float(conf)
if abs_z_last >= z_vol:
conf = min(1.0, abs_z_last / 3.0)
return "vol_spike", float(conf)
return None, 0.0
def hurst_rs(series: pd.Series) -> Optional[float]:
x = pd.to_numeric(series.dropna(), errors="coerce").astype(float).values
n = len(x)
if n < 100:
return None
x = x - x.mean()
z = np.cumsum(x)
r = z.max() - z.min()
s = x.std(ddof=1)
if s <= 0 or r <= 0:
return None
h = np.log(r / s) / np.log(n)
if not np.isfinite(h):
return None
return float(h)
def build_hurst_map(returns_long: pd.DataFrame, lookback: int = 252) -> Dict[str, float]:
if returns_long.empty:
return {}
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
hurst_map: Dict[str, float] = {}
for isin in ret_wide.columns:
series = ret_wide[isin].dropna().astype(float)
if len(series) < max(lookback, 100):
continue
h_val = hurst_rs(series.iloc[-lookback:])
if h_val is None or not np.isfinite(h_val):
continue
hurst_map[str(isin)] = float(h_val)
return hurst_map
__all__ = [
"build_hurst_map",
"build_pattern_library",
"characterize_window",
"detect_column",
"hurst_rs",
"load_config",
"predict_from_library",
"read_connection_txt",
"z_norm",
]

View File

@@ -38,11 +38,28 @@ from urllib.error import URLError, HTTPError
# DB # DB
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import text as sql_text from sqlalchemy import text as sql_text
import pyodbc
from shared_utils import (
build_hurst_map,
build_pattern_library,
characterize_window,
detect_column,
load_config,
predict_from_library,
read_connection_txt,
z_norm,
)
# ========================= # =========================
# CONFIG # CONFIG
# ========================= # =========================
CONFIG = load_config()
DB_CONFIG = CONFIG.get("db", {})
PATTERN_CONFIG = CONFIG.get("pattern", {})
TAGGING_CONFIG = CONFIG.get("tagging", {})
RANKING_CONFIG = CONFIG.get("ranking", {})
SIGNALS_CONFIG = CONFIG.get("signals", {})
BASE_DIR = Path(".") BASE_DIR = Path(".")
UNIVERSO_XLSX = BASE_DIR / "Universo per Trading System.xlsx" UNIVERSO_XLSX = BASE_DIR / "Universo per Trading System.xlsx"
CONNECTION_TXT = BASE_DIR / "connection.txt" CONNECTION_TXT = BASE_DIR / "connection.txt"
@@ -54,35 +71,38 @@ def _dated_signals_filename() -> Path:
return BASE_DIR / f"{date_prefix}_signals.xlsx" return BASE_DIR / f"{date_prefix}_signals.xlsx"
# Stored procedure / parametri DB # Stored procedure / parametri DB
SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL" SP_NAME_DEFAULT = DB_CONFIG.get("stored_proc", "opt_RendimentoGiornaliero1_ALL")
SP_N_DEFAULT = 1305 SP_N_DEFAULT = DB_CONFIG.get("n_bars", 1305)
PTF_CURR_DEFAULT = "EUR" PTF_CURR_DEFAULT = DB_CONFIG.get("ptf_curr", "EUR")
# Pattern recognition (come backtest) # Pattern recognition (come backtest)
WP = 60 WP = PATTERN_CONFIG.get("wp", 60)
HA = 10 HA = PATTERN_CONFIG.get("ha", 10)
KNN_K = 25 KNN_K = PATTERN_CONFIG.get("knn_k", 25)
THETA = 0.005 # 0,005% in decimali (identico al backtest) THETA = PATTERN_CONFIG.get("theta", 0.005) # 0,005% in decimali (identico al backtest)
Z_REV = TAGGING_CONFIG.get("z_rev", 2.0)
Z_VOL = TAGGING_CONFIG.get("z_vol", 2.0)
STD_COMP_PCT = TAGGING_CONFIG.get("std_comp_pct", 0.15)
# Exit rules (identiche al backtest) # Exit rules (identiche al backtest)
SL_BPS = 300.0 SL_BPS = SIGNALS_CONFIG.get("sl_bps", 300.0)
TP_BPS = 800.0 TP_BPS = SIGNALS_CONFIG.get("tp_bps", 800.0)
TRAIL_BPS = 300.0 TRAIL_BPS = SIGNALS_CONFIG.get("trail_bps", 300.0)
TIME_STOP_BARS = 20 TIME_STOP_BARS = SIGNALS_CONFIG.get("time_stop_bars", 20)
THETA_EXIT = 0.0 # soglia debolezza THETA_EXIT = SIGNALS_CONFIG.get("theta_exit", 0.0) # soglia debolezza
WEAK_DAYS_EXIT = None # uscita IMMEDIATA in caso di debolezza (come backtest) WEAK_DAYS_EXIT = SIGNALS_CONFIG.get("weak_days_exit") # uscita IMMEDIATA in caso di debolezza (come backtest)
# Ranking e selezione Top-N per APERTURE # Ranking e selezione Top-N per APERTURE
MAX_OPEN = 15 # cap strumenti aperti oggi (come backtest) MAX_OPEN = SIGNALS_CONFIG.get("max_open", 15) # cap strumenti aperti oggi (come backtest)
# Allineamento al backtest v3.1.5 per il cap del Risk Parity # Allineamento al backtest v3.1.5 per il cap del Risk Parity
TOP_N_MAX = MAX_OPEN TOP_N_MAX = RANKING_CONFIG.get("top_n_max", MAX_OPEN)
RP_MAX_WEIGHT = 2 / TOP_N_MAX # ≈ 0.1333 = 13,33% per singolo asset RP_MAX_WEIGHT = RANKING_CONFIG.get("rp_max_weight", 2 / max(TOP_N_MAX, 1)) # ≈ 0.1333 = 13,33% per singolo asset
# Sizing # Sizing
BASE_CAPITAL_PER_STRATEGY = 100.0 BASE_CAPITAL_PER_STRATEGY = SIGNALS_CONFIG.get("base_capital_per_strategy", 100.0)
MIN_TRADE_NOTIONAL = 0.01 MIN_TRADE_NOTIONAL = SIGNALS_CONFIG.get("min_trade_notional", 0.01)
RISK_PARITY_LOOKBACK = 60 RISK_PARITY_LOOKBACK = SIGNALS_CONFIG.get("risk_parity_lookback", 60)
# Calendario # Calendario
BUSINESS_DAYS_ONLY = True BUSINESS_DAYS_ONLY = True
@@ -111,34 +131,6 @@ def _safe_to_float(x) -> Optional[float]:
except Exception: except Exception:
return None return None
# =========================
# CONNESSIONE DB
# =========================
def read_connection_txt(path: Path) -> str:
if not path.exists():
raise FileNotFoundError(f"Missing connection.txt at {path}")
params: Dict[str, str] = {}
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
params[k.strip().lower()] = v.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
if not all([username, password, host, database]):
raise ValueError("connection.txt incompleto: servono username/password/host/database.")
installed = [d for d in pyodbc.drivers()]
driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server"
return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}"
def _db_fetch_returns(conn_str: str, def _db_fetch_returns(conn_str: str,
isins: List[str], isins: List[str],
sp_name: Optional[str] = None, sp_name: Optional[str] = None,
@@ -152,17 +144,6 @@ def _db_fetch_returns(conn_str: str,
sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf") sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
frames: List[pd.DataFrame] = [] frames: List[pd.DataFrame] = []
def _pick(df: pd.DataFrame, candidates: List[str]) -> Optional[str]:
low = {c.lower(): c for c in df.columns}
for c in candidates:
if c.lower() in low:
return low[c.lower()]
for c in df.columns:
cl = c.lower()
if any(tok in cl for tok in [x.lower() for x in candidates]):
return c
return None
with engine.begin() as conn: with engine.begin() as conn:
for i, isin in enumerate(isins, start=1): for i, isin in enumerate(isins, start=1):
print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True) print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True)
@@ -176,8 +157,8 @@ def _db_fetch_returns(conn_str: str,
print(f"[WARN] Nessun dato per {isin}") print(f"[WARN] Nessun dato per {isin}")
continue continue
col_date = _pick(df, ["Date", "Data", "Datetime", "Timestamp", "Time"]) col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = _pick(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"]) col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"])
if not col_date or not col_ret: if not col_date or not col_ret:
print(f"[WARN] Colonne mancanti per {isin}") print(f"[WARN] Colonne mancanti per {isin}")
continue continue
@@ -278,102 +259,6 @@ def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]:
# ========================= # =========================
# HURST ESTIMATOR & MAP # HURST ESTIMATOR & MAP
# ========================= # =========================
from typing import Optional # in cima al file c'è già Optional nei type hints, quindi ok
def _hurst_rs(series: pd.Series) -> Optional[float]:
"""
Stima semplice del coefficiente di Hurst tramite Rescaled Range (R/S) su un'unica finestra.
Ritorna NaN se la serie è troppo corta o degenerata.
"""
x = pd.to_numeric(series.dropna(), errors="coerce").astype(float).values
n = len(x)
if n < 100:
return None
x = x - x.mean()
z = np.cumsum(x)
R = z.max() - z.min()
S = x.std(ddof=1)
if S <= 0 or R <= 0:
return None
H = np.log(R / S) / np.log(n)
if not np.isfinite(H):
return None
return float(H)
def build_hurst_map(returns_long: pd.DataFrame,
lookback: int = 252) -> Dict[str, float]:
"""
Costruisce una mappa ISIN -> Hurst usando gli ultimi `lookback` rendimenti.
"""
if returns_long.empty:
return {}
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
hurst_map: Dict[str, float] = {}
for isin in ret_wide.columns:
s = ret_wide[isin].dropna().astype(float)
if len(s) < max(lookback, 100):
continue
h = _hurst_rs(s.iloc[-lookback:])
if h is None or not np.isfinite(h):
continue
hurst_map[str(isin)] = float(h)
return hurst_map
# =========================
# PATTERN RECOGNITION (WP/HA)
# =========================
def z_norm(arr: np.ndarray) -> Optional[np.ndarray]:
arr = np.asarray(arr, dtype=float)
mu = arr.mean()
sd = arr.std()
if sd < 1e-12:
return None
return (arr - mu) / (sd + 1e-12)
def build_pattern_library(ret_series: pd.Series, Wp: int, Ha: int) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
x = ret_series.dropna().values
N = len(x)
if N < Wp + Ha + 10:
return None, None
wins, outs = [], []
for t in range(0, N - Wp - Ha):
win = x[t:t+Wp]
winzn = z_norm(win)
if winzn is None:
continue
outcome = np.sum(x[t+Wp : t+Wp+Ha]) # somma rendimenti futuri su Ha (decimali)
wins.append(winzn); outs.append(outcome)
if not wins:
return None, None
return np.array(wins), np.array(outs)
def predict_from_library(curr_win: np.ndarray,
lib_wins: np.ndarray,
lib_out: np.ndarray,
k: int = 25) -> Tuple[float, float, np.ndarray]:
dists = np.linalg.norm(lib_wins - curr_win, axis=1)
idx = np.argsort(dists)[:min(k, len(dists))]
return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx
def characterize_window(ret_series: pd.Series, Wp: int) -> Tuple[Optional[str], float]:
x = ret_series.dropna().values
if len(x) < max(WP, 30):
return None, 0.0
win = x[-Wp:]
mu, sd = win.mean(), win.std()
if sd < 1e-12:
return "compression", 0.5
last3 = win[-3:] if len(win) >= 3 else win
if np.sign(last3).sum() in (3, -3):
return "momentum_burst", min(1.0, abs(last3.sum())/(sd+1e-12))
return None, 0.0
# ========================= # =========================
# GENERAZIONE SEGNALI (EOD su D) # GENERAZIONE SEGNALI (EOD su D)
# ========================= # =========================
@@ -404,17 +289,17 @@ def generate_signals_today(universe: pd.DataFrame,
lib_wins, lib_out = build_pattern_library(r, WP, HA) lib_wins, lib_out = build_pattern_library(r, WP, HA)
if lib_wins is None or len(r) < WP + HA: if lib_wins is None or len(r) < WP + HA:
est_out, avg_dist, sig = np.nan, np.nan, 0 est_out, avg_dist, sig = np.nan, np.nan, 0
ptype, pconf = characterize_window(r, WP) ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
else: else:
curr = r.values[-WP:] curr = r.values[-WP:]
curr_zn = z_norm(curr) curr_zn = z_norm(curr)
if curr_zn is None: if curr_zn is None:
est_out, avg_dist, sig = np.nan, np.nan, 0 est_out, avg_dist, sig = np.nan, np.nan, 0
ptype, pconf = characterize_window(r, WP) ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
else: else:
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K) est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K)
sig = 1 if (pd.notna(est_out) and float(est_out) > float(theta_entry)) else 0 sig = 1 if (pd.notna(est_out) and float(est_out) > float(theta_entry)) else 0
ptype, pconf = characterize_window(r, WP) ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
rows.append({ rows.append({
"Date": decision_date, "ISIN": isin, "Date": decision_date, "ISIN": isin,