Add asset names to open trades and copy exports

This commit is contained in:
fredmaloggia
2025-11-17 17:06:46 +01:00
parent ecf03b7775
commit ad66cc5fe0
4 changed files with 512 additions and 380 deletions

View File

@@ -19,8 +19,19 @@ import pandas as pd
import numpy as np
import sqlalchemy as sa
from sqlalchemy import text
import pyodbc
import matplotlib.pyplot as plt
from shared_utils import (
build_pattern_library,
characterize_window,
detect_column,
load_config,
predict_from_library,
read_connection_txt,
require_section,
require_value,
z_norm,
)
#from math import isfinite
import time
@@ -59,6 +70,12 @@ def savefig_safe(path, **kwargs):
# =========================================
# PARAMETRI GLOBALI
# =========================================
CONFIG = load_config()
DB_CONFIG = require_section(CONFIG, "db")
PATTERN_CONFIG = require_section(CONFIG, "pattern")
TAGGING_CONFIG = require_section(CONFIG, "tagging")
RANKING_CONFIG = require_section(CONFIG, "ranking")
UNIVERSO_XLSX = "Universo per Trading System.xlsx"
# Export
@@ -67,70 +84,38 @@ OUTPUT_PATTERN_XLSX = "pattern_signals.xlsx"
ERROR_LOG_CSV = "errori_isin.csv"
# Stored Procedure & parametri
STORED_PROC = "opt_RendimentoGiornaliero1_ALL"
N_BARS = 1305
PTF_CURR = "EUR"
STORED_PROC = str(require_value(DB_CONFIG, "stored_proc", "db"))
N_BARS = int(require_value(DB_CONFIG, "n_bars", "db"))
PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db"))
# Pattern-matching (iper-parametri)
WP = 60 # lunghezza finestra pattern (barre)
HA = 10 # orizzonte outcome (barre)
KNN_K = 25 # numero di vicini
THETA = 0.005 # soglia su outcome per generare segnale
EMBARGO = WP + HA
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) # lunghezza finestra pattern (barre)
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) # orizzonte outcome (barre)
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) # numero di vicini
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # soglia su outcome per generare segnale
EMBARGO = require_value(PATTERN_CONFIG, "embargo", "pattern")
if EMBARGO is None:
EMBARGO = WP + HA
else:
EMBARGO = int(EMBARGO)
# Tagging rule-based (soglie)
Z_REV = 2.0
Z_VOL = 2.0
STD_COMP_PCT = 0.15
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
DAYS_PER_YEAR = 252
TOP_N_MAX = 15 # numero massimo di asset ammessi
RP_MAX_WEIGHT = 2 / TOP_N_MAX # 2 x 1/15 ≈ 0.1333 = 13,33%
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) # numero massimo di asset ammessi
RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # 2 x 1/15 ≈ 0.1333 = 13,33%
if RP_MAX_WEIGHT is None:
RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1)
else:
RP_MAX_WEIGHT = float(RP_MAX_WEIGHT)
# =========================================
# UTILS GENERALI
# =========================================
def pick_first(df, candidates):
low = {c.lower(): c for c in df.columns}
for c in candidates:
if c.lower() in low:
return low[c.lower()]
for c in candidates:
matches = [low[k] for k in low if c.lower() in k]
if matches:
return matches[0]
return None
def read_connection_txt(path="connection.txt"):
"""
connection.txt con:
username=...
password=...
host=...
port=1433
database=...
"""
params = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
params[k.strip().lower()] = v.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
if not all([username, password, host, database]):
raise ValueError("connection.txt incompleto: username/password/host/database richiesti.")
installed = [d for d in pyodbc.drivers()]
driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server"
return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}"
def clamp01(x):
if not np.isfinite(x):
return np.nan
@@ -229,81 +214,6 @@ def hurst_dfa_returns(r, win_grid=None):
slope, _ = np.polyfit(np.log(sizes), np.log(F_vals), 1)
return clamp01(slope)
# =========== PATTERN MATCHING k-NN (su rendimenti) ===========
def z_norm(arr):
arr = np.asarray(arr, dtype=float)
mu = arr.mean(); sd = arr.std()
if sd < 1e-12:
return None
return (arr - mu)/(sd+1e-12)
def build_pattern_library(ret_series: pd.Series, Wp: int, Ha: int, embargo: int = None):
x = ret_series.dropna().values
N = len(x)
if N < Wp + Ha + 10:
return None, None
wins = []
outs = []
for t in range(0, N - Wp - Ha):
win = x[t:t+Wp]
winzn = z_norm(win)
if winzn is None:
continue
outcome = np.sum(x[t+Wp : t+Wp+Ha]) # outcome futuro su Ha barre
wins.append(winzn); outs.append(outcome)
if not wins:
return None, None
return np.array(wins), np.array(outs)
def predict_from_library(curr_win: np.ndarray, lib_wins: np.ndarray, lib_out: np.ndarray, k: int = 25):
dists = np.linalg.norm(lib_wins - curr_win, axis=1)
idx = np.argsort(dists)[:min(k, len(dists))]
return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx
# =========== TAGGING RULE-BASED (4 categorie) ===========
def characterize_window(ret_series: pd.Series, Wp: int,
z_rev=2.0, z_vol=2.0, std_comp_pct=0.15):
x = ret_series.dropna().values
if len(x) < max(WP, 30):
return None, 0.0
win = x[-Wp:]
mu, sd = win.mean(), win.std()
if sd < 1e-12:
return "compression", 0.5
last = win[-1]
z_last = (last - mu)/(sd+1e-12)
abs_z_last = abs(z_last)
last3 = win[-3:] if len(win) >= 3 else win
sum3 = np.sum(last3)
if len(x) > 3*Wp:
roll_std = pd.Series(x).rolling(Wp).std().dropna().values
if len(roll_std) > 20:
pct = (roll_std < np.std(win)).mean()
else:
pct = 0.5
else:
pct = 0.5
if pct < std_comp_pct:
return "compression", float(1.0 - pct)
if abs(sum3) > 2*sd/np.sqrt(3) and np.sign(last3).sum() in (3, -3):
conf = min(1.0, abs(sum3)/(sd+1e-12))
return "momentum_burst", float(conf)
mean_prev = np.mean(win[:-1]) if len(win) > 1 else 0.0
if abs_z_last >= z_rev and np.sign(last) != np.sign(mean_prev):
conf = min(1.0, abs_z_last/3.0)
return "reversal_candidate", float(conf)
if abs_z_last >= z_vol:
conf = min(1.0, abs_z_last/3.0)
return "vol_spike", float(conf)
return None, 0.0
# ---------------------------------
# R^2 su equity line (log-equity vs tempo)
# ---------------------------------
@@ -455,13 +365,13 @@ def h_min_100(returns: pd.Series, month_len: int = 21):
# =========================================
universo = pd.read_excel(UNIVERSO_XLSX)
col_isin_uni = pick_first(universo, ["ISIN", "isin", "codice isin"])
col_isin_uni = detect_column(universo, ["ISIN", "isin", "codice isin"])
if col_isin_uni is None:
raise ValueError("Nel file universo non trovo una colonna ISIN.")
col_name_uni = pick_first(universo, ["Nome", "Name", "Descrizione", "Description", "Security Name", "Instrument Name"])
col_cat_uni = pick_first(universo, ["Categoria", "Category", "Classe", "Linea", "Tipo"])
col_ac_uni = pick_first(universo, ["Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class"])
col_name_uni = detect_column(universo, ["Nome", "Name", "Descrizione", "Description", "Security Name", "Instrument Name"])
col_cat_uni = detect_column(universo, ["Categoria", "Category", "Classe", "Linea", "Tipo"])
col_ac_uni = detect_column(universo, ["Asset Class", "AssetClass", "Classe di Attivo", "Classe Attivo", "Class"])
isins = (
universo[col_isin_uni].astype(str).str.strip()
@@ -493,9 +403,9 @@ last_dates = []
sql_sp = text(f"EXEC {STORED_PROC} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
def detect_cols(df0):
col_date = pick_first(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = pick_first(df0, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"])
col_px = pick_first(df0, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"])
col_date = detect_column(df0, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = detect_column(df0, ["Ret", "Return", "Rendimento", "Rend", "LogRet", "r_log", "r", "pct_chg"])
col_px = detect_column(df0, ["Close", "AdjClose", "Price", "Px", "Last", "Prezzo", "Chiusura"])
return col_date, col_ret, col_px
ok_count = 0

View File

@@ -0,0 +1,35 @@
{
"db": {
"stored_proc": "opt_RendimentoGiornaliero1_ALL",
"n_bars": 1305,
"ptf_curr": "EUR"
},
"pattern": {
"wp": 60,
"ha": 10,
"knn_k": 25,
"theta": 0.005,
"embargo": null
},
"tagging": {
"z_rev": 2.0,
"z_vol": 2.0,
"std_comp_pct": 0.15
},
"ranking": {
"top_n_max": 15,
"rp_max_weight": 0.1333333333
},
"signals": {
"sl_bps": 300.0,
"tp_bps": 800.0,
"trail_bps": 300.0,
"time_stop_bars": 20,
"theta_exit": 0.0,
"weak_days_exit": null,
"max_open": 15,
"base_capital_per_strategy": 100.0,
"min_trade_notional": 0.01,
"risk_parity_lookback": 60
}
}

236
shared_utils.py Normal file
View File

@@ -0,0 +1,236 @@
"""Shared helpers for trading pattern scripts."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
import pyodbc
DEFAULT_CONFIG_PATH = Path("config/pattern_knn_config.json")
def load_config(path: Optional[Path] = None) -> Dict:
"""Load the JSON configuration that holds operational parameters."""
cfg_path = Path(path or DEFAULT_CONFIG_PATH)
if not cfg_path.exists():
raise FileNotFoundError(f"Missing configuration file: {cfg_path}")
with cfg_path.open("r", encoding="utf-8") as fh:
return json.load(fh)
def require_section(config: Dict, section: str) -> Dict:
sect = config.get(section)
if not isinstance(sect, dict):
raise KeyError(f"Missing '{section}' section in configuration file")
return sect
def require_value(section: Dict, key: str, section_name: str) -> Any:
if key not in section:
raise KeyError(f"Missing key '{key}' inside '{section_name}' section of configuration file")
return section[key]
def detect_column(df: pd.DataFrame, candidates: Sequence[str]) -> Optional[str]:
"""Return the first column whose name matches one of the candidates (case insensitive)."""
low = {c.lower(): c for c in df.columns}
for cand in candidates:
cl = cand.lower()
if cl in low:
return low[cl]
for cand in candidates:
cl = cand.lower()
for col in df.columns:
if cl in col.lower():
return col
return None
def read_connection_txt(path: Path | str = "connection.txt") -> str:
params: Dict[str, str] = {}
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Missing connection.txt at {path}")
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
params[k.strip().lower()] = v.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
if not all([username, password, host, database]):
raise ValueError("connection.txt incompleto: servono username/password/host/database.")
installed = [d for d in pyodbc.drivers()]
driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server"
return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}"
def z_norm(arr: np.ndarray) -> Optional[np.ndarray]:
arr = np.asarray(arr, dtype=float)
if arr.size == 0:
return None
mu = arr.mean()
sd = arr.std()
if sd < 1e-12:
return None
return (arr - mu) / (sd + 1e-12)
def build_pattern_library(
ret_series: pd.Series,
wp: int,
ha: int,
embargo: Optional[int] = None,
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
"""Create the normalized pattern windows and their realized outcomes.
Args:
ret_series: Series of returns (ordered oldest→latest).
wp: Window length for the pattern.
ha: Holding horizon used to compute the outcome.
embargo: Optional number of most-recent observations to exclude when
building the library (useful to avoid leakage when reusing the
same series for inference).
"""
x = ret_series.dropna().values
n = len(x)
if n < wp + ha + 10:
return None, None
embargo = int(embargo or 0)
usable_n = n - max(0, embargo)
if usable_n <= wp + ha:
return None, None
wins: List[np.ndarray] = []
outs: List[float] = []
last_start = usable_n - wp - ha
if last_start <= 0:
return None, None
for t in range(0, last_start + 1):
win = x[t : t + wp]
winzn = z_norm(win)
if winzn is None:
continue
outcome = np.sum(x[t + wp : t + wp + ha])
wins.append(winzn)
outs.append(outcome)
if not wins:
return None, None
return np.array(wins), np.array(outs)
def predict_from_library(
curr_win: np.ndarray,
lib_wins: np.ndarray,
lib_out: np.ndarray,
k: int = 25,
) -> Tuple[float, float, np.ndarray]:
dists = np.linalg.norm(lib_wins - curr_win, axis=1)
idx = np.argsort(dists)[: min(k, len(dists))]
return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx
def characterize_window(
ret_series: pd.Series,
wp: int,
z_rev: float = 2.0,
z_vol: float = 2.0,
std_comp_pct: float = 0.15,
) -> Tuple[Optional[str], float]:
x = ret_series.dropna().values
if len(x) < max(wp, 30):
return None, 0.0
win = x[-wp:]
mu, sd = win.mean(), win.std()
if sd < 1e-12:
return "compression", 0.5
last = win[-1]
z_last = (last - mu) / (sd + 1e-12)
abs_z_last = abs(z_last)
last3 = win[-3:] if len(win) >= 3 else win
sum3 = np.sum(last3)
if len(x) > 3 * wp:
roll_std = pd.Series(x).rolling(wp).std().dropna().values
if len(roll_std) > 20:
pct = (roll_std < np.std(win)).mean()
else:
pct = 0.5
else:
pct = 0.5
if pct < std_comp_pct:
return "compression", float(1.0 - pct)
if abs(sum3) > 2 * sd / np.sqrt(3) and np.sign(last3).sum() in (3, -3):
conf = min(1.0, abs(sum3) / (sd + 1e-12))
return "momentum_burst", float(conf)
mean_prev = np.mean(win[:-1]) if len(win) > 1 else 0.0
if abs_z_last >= z_rev and np.sign(last) != np.sign(mean_prev):
conf = min(1.0, abs_z_last / 3.0)
return "reversal_candidate", float(conf)
if abs_z_last >= z_vol:
conf = min(1.0, abs_z_last / 3.0)
return "vol_spike", float(conf)
return None, 0.0
def hurst_rs(series: pd.Series) -> Optional[float]:
x = pd.to_numeric(series.dropna(), errors="coerce").astype(float).values
n = len(x)
if n < 100:
return None
x = x - x.mean()
z = np.cumsum(x)
r = z.max() - z.min()
s = x.std(ddof=1)
if s <= 0 or r <= 0:
return None
h = np.log(r / s) / np.log(n)
if not np.isfinite(h):
return None
return float(h)
def build_hurst_map(returns_long: pd.DataFrame, lookback: int = 252) -> Dict[str, float]:
if returns_long.empty:
return {}
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
hurst_map: Dict[str, float] = {}
for isin in ret_wide.columns:
series = ret_wide[isin].dropna().astype(float)
if len(series) < max(lookback, 100):
continue
h_val = hurst_rs(series.iloc[-lookback:])
if h_val is None or not np.isfinite(h_val):
continue
hurst_map[str(isin)] = float(h_val)
return hurst_map
__all__ = [
"build_hurst_map",
"build_pattern_library",
"characterize_window",
"detect_column",
"require_section",
"require_value",
"hurst_rs",
"load_config",
"predict_from_library",
"read_connection_txt",
"z_norm",
]

View File

@@ -24,6 +24,7 @@ import os
import ssl
import json
import time
import shutil
import warnings
import datetime as dt
from dataclasses import dataclass
@@ -38,51 +39,78 @@ from urllib.error import URLError, HTTPError
# DB
import sqlalchemy as sa
from sqlalchemy import text as sql_text
import pyodbc
from shared_utils import (
build_hurst_map,
build_pattern_library,
characterize_window,
detect_column,
load_config,
predict_from_library,
read_connection_txt,
require_section,
require_value,
z_norm,
)
# =========================
# CONFIG
# =========================
CONFIG = load_config()
DB_CONFIG = require_section(CONFIG, "db")
PATTERN_CONFIG = require_section(CONFIG, "pattern")
TAGGING_CONFIG = require_section(CONFIG, "tagging")
RANKING_CONFIG = require_section(CONFIG, "ranking")
SIGNALS_CONFIG = require_section(CONFIG, "signals")
BASE_DIR = Path(".")
UNIVERSO_XLSX = BASE_DIR / "Universo per Trading System.xlsx"
CONNECTION_TXT = BASE_DIR / "connection.txt"
AUDIT_LOG_CSV = BASE_DIR / "trades_audit_log.csv"
OPEN_TRADES_DIR = BASE_DIR / "open_trades"
DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF")
def _dated_signals_filename() -> Path:
date_prefix = pd.Timestamp.today().strftime("%Y%m%d")
return BASE_DIR / f"{date_prefix}_signals.xlsx"
# Stored procedure / parametri DB
SP_NAME_DEFAULT = "opt_RendimentoGiornaliero1_ALL"
SP_N_DEFAULT = 1305
PTF_CURR_DEFAULT = "EUR"
SP_NAME_DEFAULT = str(require_value(DB_CONFIG, "stored_proc", "db"))
SP_N_DEFAULT = int(require_value(DB_CONFIG, "n_bars", "db"))
PTF_CURR_DEFAULT = str(require_value(DB_CONFIG, "ptf_curr", "db"))
# Pattern recognition (come backtest)
WP = 60
HA = 10
KNN_K = 25
THETA = 0.005 # 0,005% in decimali (identico al backtest)
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern"))
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern"))
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern"))
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # 0,005% in decimali (identico al backtest)
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
# Exit rules (identiche al backtest)
SL_BPS = 300.0
TP_BPS = 800.0
TRAIL_BPS = 300.0
TIME_STOP_BARS = 20
THETA_EXIT = 0.0 # soglia debolezza
WEAK_DAYS_EXIT = None # uscita IMMEDIATA in caso di debolezza (come backtest)
SL_BPS = float(require_value(SIGNALS_CONFIG, "sl_bps", "signals"))
TP_BPS = float(require_value(SIGNALS_CONFIG, "tp_bps", "signals"))
TRAIL_BPS = float(require_value(SIGNALS_CONFIG, "trail_bps", "signals"))
TIME_STOP_BARS = int(require_value(SIGNALS_CONFIG, "time_stop_bars", "signals"))
THETA_EXIT = float(require_value(SIGNALS_CONFIG, "theta_exit", "signals")) # soglia debolezza
WEAK_DAYS_EXIT = require_value(SIGNALS_CONFIG, "weak_days_exit", "signals") # uscita IMMEDIATA in caso di debolezza (come backtest)
# Ranking e selezione Top-N per APERTURE
MAX_OPEN = 15 # cap strumenti aperti oggi (come backtest)
MAX_OPEN = int(require_value(SIGNALS_CONFIG, "max_open", "signals")) # cap strumenti aperti oggi (come backtest)
# Allineamento al backtest v3.1.5 per il cap del Risk Parity
TOP_N_MAX = MAX_OPEN
RP_MAX_WEIGHT = 2 / TOP_N_MAX # ≈ 0.1333 = 13,33% per singolo asset
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking"))
RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # ≈ 0.1333 = 13,33% per singolo asset
if RP_MAX_WEIGHT is None:
RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1)
else:
RP_MAX_WEIGHT = float(RP_MAX_WEIGHT)
# Sizing
BASE_CAPITAL_PER_STRATEGY = 100.0
MIN_TRADE_NOTIONAL = 0.01
RISK_PARITY_LOOKBACK = 60
BASE_CAPITAL_PER_STRATEGY = float(require_value(SIGNALS_CONFIG, "base_capital_per_strategy", "signals"))
MIN_TRADE_NOTIONAL = float(require_value(SIGNALS_CONFIG, "min_trade_notional", "signals"))
RISK_PARITY_LOOKBACK = int(require_value(SIGNALS_CONFIG, "risk_parity_lookback", "signals"))
# Calendario
BUSINESS_DAYS_ONLY = True
@@ -97,6 +125,18 @@ np.random.seed(SEED)
def ensure_dir(p: Path):
p.mkdir(parents=True, exist_ok=True)
def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR):
if not src or not dst_dir:
return
if not src.exists():
return
try:
ensure_dir(dst_dir)
dst = dst_dir / src.name
shutil.copy2(src, dst)
except Exception as exc:
print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}")
def next_business_day(d: dt.date) -> dt.date:
nd = d + dt.timedelta(days=1)
if not BUSINESS_DAYS_ONLY:
@@ -111,34 +151,6 @@ def _safe_to_float(x) -> Optional[float]:
except Exception:
return None
# =========================
# CONNESSIONE DB
# =========================
def read_connection_txt(path: Path) -> str:
if not path.exists():
raise FileNotFoundError(f"Missing connection.txt at {path}")
params: Dict[str, str] = {}
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
params[k.strip().lower()] = v.strip()
username = params.get("username")
password = params.get("password")
host = params.get("host")
port = params.get("port", "1433")
database = params.get("database")
if not all([username, password, host, database]):
raise ValueError("connection.txt incompleto: servono username/password/host/database.")
installed = [d for d in pyodbc.drivers()]
driver_q = "ODBC+Driver+18+for+SQL+Server" if "ODBC Driver 18 for SQL Server" in installed else "ODBC+Driver+17+for+SQL+Server"
return f"mssql+pyodbc://{username}:{password}@{host}:{port}/{database}?driver={driver_q}"
def _db_fetch_returns(conn_str: str,
isins: List[str],
sp_name: Optional[str] = None,
@@ -152,17 +164,6 @@ def _db_fetch_returns(conn_str: str,
sql_sp = sql_text(f"EXEC {sp} @ISIN = :isin, @n = :n, @PtfCurr = :ptf")
frames: List[pd.DataFrame] = []
def _pick(df: pd.DataFrame, candidates: List[str]) -> Optional[str]:
low = {c.lower(): c for c in df.columns}
for c in candidates:
if c.lower() in low:
return low[c.lower()]
for c in df.columns:
cl = c.lower()
if any(tok in cl for tok in [x.lower() for x in candidates]):
return c
return None
with engine.begin() as conn:
for i, isin in enumerate(isins, start=1):
print(f"[DB] ({i}/{len(isins)}) scarico serie storica per {isin} ...", flush=True)
@@ -176,8 +177,8 @@ def _db_fetch_returns(conn_str: str,
print(f"[WARN] Nessun dato per {isin}")
continue
col_date = _pick(df, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = _pick(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"])
col_date = detect_column(df, ["Date", "Data", "Datetime", "Timestamp", "Time"])
col_ret = detect_column(df, ["Ret", "Return", "Rendimento", "Rend", "Ret_%", "RET"])
if not col_date or not col_ret:
print(f"[WARN] Colonne mancanti per {isin}")
continue
@@ -278,102 +279,6 @@ def get_open_price(isin: str, universe: pd.DataFrame) -> Optional[float]:
# =========================
# HURST ESTIMATOR & MAP
# =========================
from typing import Optional # in cima al file c'è già Optional nei type hints, quindi ok
def _hurst_rs(series: pd.Series) -> Optional[float]:
"""
Stima semplice del coefficiente di Hurst tramite Rescaled Range (R/S) su un'unica finestra.
Ritorna NaN se la serie è troppo corta o degenerata.
"""
x = pd.to_numeric(series.dropna(), errors="coerce").astype(float).values
n = len(x)
if n < 100:
return None
x = x - x.mean()
z = np.cumsum(x)
R = z.max() - z.min()
S = x.std(ddof=1)
if S <= 0 or R <= 0:
return None
H = np.log(R / S) / np.log(n)
if not np.isfinite(H):
return None
return float(H)
def build_hurst_map(returns_long: pd.DataFrame,
lookback: int = 252) -> Dict[str, float]:
"""
Costruisce una mappa ISIN -> Hurst usando gli ultimi `lookback` rendimenti.
"""
if returns_long.empty:
return {}
ret_wide = returns_long.pivot(index="Date", columns="ISIN", values="Ret").sort_index()
hurst_map: Dict[str, float] = {}
for isin in ret_wide.columns:
s = ret_wide[isin].dropna().astype(float)
if len(s) < max(lookback, 100):
continue
h = _hurst_rs(s.iloc[-lookback:])
if h is None or not np.isfinite(h):
continue
hurst_map[str(isin)] = float(h)
return hurst_map
# =========================
# PATTERN RECOGNITION (WP/HA)
# =========================
def z_norm(arr: np.ndarray) -> Optional[np.ndarray]:
arr = np.asarray(arr, dtype=float)
mu = arr.mean()
sd = arr.std()
if sd < 1e-12:
return None
return (arr - mu) / (sd + 1e-12)
def build_pattern_library(ret_series: pd.Series, Wp: int, Ha: int) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
x = ret_series.dropna().values
N = len(x)
if N < Wp + Ha + 10:
return None, None
wins, outs = [], []
for t in range(0, N - Wp - Ha):
win = x[t:t+Wp]
winzn = z_norm(win)
if winzn is None:
continue
outcome = np.sum(x[t+Wp : t+Wp+Ha]) # somma rendimenti futuri su Ha (decimali)
wins.append(winzn); outs.append(outcome)
if not wins:
return None, None
return np.array(wins), np.array(outs)
def predict_from_library(curr_win: np.ndarray,
lib_wins: np.ndarray,
lib_out: np.ndarray,
k: int = 25) -> Tuple[float, float, np.ndarray]:
dists = np.linalg.norm(lib_wins - curr_win, axis=1)
idx = np.argsort(dists)[:min(k, len(dists))]
return float(np.median(lib_out[idx])), float(np.mean(dists[idx])), idx
def characterize_window(ret_series: pd.Series, Wp: int) -> Tuple[Optional[str], float]:
x = ret_series.dropna().values
if len(x) < max(WP, 30):
return None, 0.0
win = x[-Wp:]
mu, sd = win.mean(), win.std()
if sd < 1e-12:
return "compression", 0.5
last3 = win[-3:] if len(win) >= 3 else win
if np.sign(last3).sum() in (3, -3):
return "momentum_burst", min(1.0, abs(last3.sum())/(sd+1e-12))
return None, 0.0
# =========================
# GENERAZIONE SEGNALI (EOD su D)
# =========================
@@ -404,17 +309,17 @@ def generate_signals_today(universe: pd.DataFrame,
lib_wins, lib_out = build_pattern_library(r, WP, HA)
if lib_wins is None or len(r) < WP + HA:
est_out, avg_dist, sig = np.nan, np.nan, 0
ptype, pconf = characterize_window(r, WP)
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
else:
curr = r.values[-WP:]
curr_zn = z_norm(curr)
if curr_zn is None:
est_out, avg_dist, sig = np.nan, np.nan, 0
ptype, pconf = characterize_window(r, WP)
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
else:
est_out, avg_dist, _ = predict_from_library(curr_zn, lib_wins, lib_out, k=KNN_K)
sig = 1 if (pd.notna(est_out) and float(est_out) > float(theta_entry)) else 0
ptype, pconf = characterize_window(r, WP)
ptype, pconf = characterize_window(r, WP, z_rev=Z_REV, z_vol=Z_VOL, std_comp_pct=STD_COMP_PCT)
rows.append({
"Date": decision_date, "ISIN": isin,
@@ -487,13 +392,15 @@ def load_open_trades(strategy: str) -> pd.DataFrame:
p = open_trades_path(strategy)
if not p.exists():
return pd.DataFrame(columns=[
"Strategy","ISIN","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes"
"Strategy","ISIN","AssetName","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes"
])
df = pd.read_csv(p)
if "EntryDate" in df.columns:
df["EntryDate"] = pd.to_datetime(df["EntryDate"], errors="coerce").dt.date
if "WeakDays" not in df.columns:
df["WeakDays"] = 0
if "AssetName" not in df.columns:
df["AssetName"] = ""
df["Strategy"] = strategy
return df
@@ -590,7 +497,8 @@ def update_positions_and_build_orders(universe: pd.DataFrame,
signals_today: pd.DataFrame,
today: dt.date,
buy_rank_df: Optional[pd.DataFrame],
allowed_open_isins: Optional[List[str]] = None) -> Tuple[pd.DataFrame, List[Dict]]:
allowed_open_isins: Optional[List[str]] = None,
asset_name_map: Optional[pd.Series] = None) -> Tuple[pd.DataFrame, List[Dict]]:
"""
- decision_date = ultima data disponibile (EOD)
- target giornaliero = primi MAX_OPEN del ranking buy (uguale per tutte le strategie)
@@ -719,6 +627,17 @@ def update_positions_and_build_orders(universe: pd.DataFrame,
}])], ignore_index=True)
current_set.add(isin)
if asset_name_map is not None:
df_open["AssetName"] = df_open["ISIN"].astype(str).map(asset_name_map).fillna("")
else:
if "AssetName" not in df_open.columns:
df_open["AssetName"] = ""
if "AssetName" in df_open.columns:
cols = list(df_open.columns)
if "ISIN" in cols and "AssetName" in cols:
cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName")))
df_open = df_open[cols]
save_open_trades(strat, df_open)
df_open["Strategy"] = strat
open_concat.append(df_open)
@@ -748,6 +667,19 @@ def main_run(run_date: Optional[dt.date] = None):
# 1) Universo
universe = load_universe(UNIVERSO_XLSX)
asset_name_col = detect_column(universe, [
"Nome", "Name", "Asset", "Asset Name", "Descrizione", "Description"
])
if not asset_name_col:
print("[WARN] Colonna con il nome dell'asset non trovata nell'universo.")
asset_name_map: Optional[pd.Series] = None
if asset_name_col:
asset_name_map = (
universe[["ISIN", asset_name_col]]
.dropna(subset=["ISIN"])
.assign(ISIN=lambda df: df["ISIN"].astype(str).str.strip())
)
asset_name_map = asset_name_map.set_index("ISIN")[asset_name_col].astype(str).str.strip()
# 2) Ritorni (DB)
conn_str = read_connection_txt(CONNECTION_TXT)
@@ -777,7 +709,8 @@ def main_run(run_date: Optional[dt.date] = None):
open_df, audit_rows = update_positions_and_build_orders(
universe, returns_long, sig_df, today,
buy_rank_df=buy_rank_df,
allowed_open_isins=allowed_open
allowed_open_isins=allowed_open,
asset_name_map=asset_name_map,
)
# 5) Append audit log (TUTTE le strategie operative)
@@ -787,8 +720,20 @@ def main_run(run_date: Optional[dt.date] = None):
# 6) Snapshot Excel datato — fogli con nomi completi
ensure_dir(OPEN_TRADES_DIR)
signals_path = _dated_signals_filename()
signals_sheet = sig_df.reset_index()
if asset_name_map is not None:
signals_sheet["AssetName"] = signals_sheet["ISIN"].astype(str).map(asset_name_map).fillna("")
else:
signals_sheet["AssetName"] = ""
# inserisci la colonna subito dopo l'ISIN
if "AssetName" in signals_sheet.columns:
cols = list(signals_sheet.columns)
cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName")))
signals_sheet = signals_sheet[cols]
with pd.ExcelWriter(signals_path) as xw:
sig_df.reset_index().to_excel(xw, sheet_name="Signals", index=False)
signals_sheet.to_excel(xw, sheet_name="Signals", index=False)
if not open_df.empty:
for strat, g in open_df.groupby("Strategy"):
sheet_name_map = {
@@ -798,6 +743,12 @@ def main_run(run_date: Optional[dt.date] = None):
sheet_name = sheet_name_map.get(strat, f"Open_{strat}")[:31]
g.to_excel(xw, sheet_name=sheet_name, index=False)
copy_to_dropbox(signals_path)
for strat in ["Equal_Weight", "Risk_Parity"]:
csv_path = open_trades_path(strat)
if csv_path.exists():
copy_to_dropbox(csv_path)
print(f"✅ Signals generated for {today}. Saved to {signals_path}")
print(f"Open trades saved in {OPEN_TRADES_DIR}")
print(f"Audit log updated at {AUDIT_LOG_CSV}")