Merge pull request #4 from fredmaloggia/codex/review-function-consolidation-across-files-rhuc38

Add asset names to open trade exports and copy outputs
This commit is contained in:
fredmaloggia
2025-11-17 17:13:40 +01:00
committed by GitHub
3 changed files with 227 additions and 74 deletions

View File

@@ -28,6 +28,8 @@ from shared_utils import (
load_config,
predict_from_library,
read_connection_txt,
require_section,
require_value,
z_norm,
)
#from math import isfinite
@@ -69,6 +71,10 @@ def savefig_safe(path, **kwargs):
# PARAMETRI GLOBALI
# =========================================
CONFIG = load_config()
DB_CONFIG = require_section(CONFIG, "db")
PATTERN_CONFIG = require_section(CONFIG, "pattern")
TAGGING_CONFIG = require_section(CONFIG, "tagging")
RANKING_CONFIG = require_section(CONFIG, "ranking")
DB_CONFIG = CONFIG.get("db", {})
PATTERN_CONFIG = CONFIG.get("pattern", {})
TAGGING_CONFIG = CONFIG.get("tagging", {})
@@ -82,6 +88,34 @@ OUTPUT_PATTERN_XLSX = "pattern_signals.xlsx"
ERROR_LOG_CSV = "errori_isin.csv"
# Stored Procedure & parametri
STORED_PROC = str(require_value(DB_CONFIG, "stored_proc", "db"))
N_BARS = int(require_value(DB_CONFIG, "n_bars", "db"))
PTF_CURR = str(require_value(DB_CONFIG, "ptf_curr", "db"))
# Pattern-matching (iper-parametri)
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern")) # lunghezza finestra pattern (barre)
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern")) # orizzonte outcome (barre)
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern")) # numero di vicini
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # soglia su outcome per generare segnale
EMBARGO = require_value(PATTERN_CONFIG, "embargo", "pattern")
if EMBARGO is None:
EMBARGO = WP + HA
else:
EMBARGO = int(EMBARGO)
# Tagging rule-based (soglie)
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
DAYS_PER_YEAR = 252
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking")) # numero massimo di asset ammessi
RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # 2 x 1/15 ≈ 0.1333 = 13,33%
if RP_MAX_WEIGHT is None:
RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1)
else:
RP_MAX_WEIGHT = float(RP_MAX_WEIGHT)
STORED_PROC = DB_CONFIG.get("stored_proc", "opt_RendimentoGiornaliero1_ALL")
N_BARS = DB_CONFIG.get("n_bars", 1305)
PTF_CURR = DB_CONFIG.get("ptf_curr", "EUR")

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
@@ -21,6 +22,19 @@ def load_config(path: Optional[Path] = None) -> Dict:
return json.load(fh)
def require_section(config: Dict, section: str) -> Dict:
sect = config.get(section)
if not isinstance(sect, dict):
raise KeyError(f"Missing '{section}' section in configuration file")
return sect
def require_value(section: Dict, key: str, section_name: str) -> Any:
if key not in section:
raise KeyError(f"Missing key '{key}' inside '{section_name}' section of configuration file")
return section[key]
def detect_column(df: pd.DataFrame, candidates: Sequence[str]) -> Optional[str]:
"""Return the first column whose name matches one of the candidates (case insensitive)."""
low = {c.lower(): c for c in df.columns}
@@ -213,6 +227,8 @@ __all__ = [
"build_pattern_library",
"characterize_window",
"detect_column",
"require_section",
"require_value",
"hurst_rs",
"load_config",
"predict_from_library",

View File

@@ -24,6 +24,7 @@ import os
import ssl
import json
import time
import shutil
import warnings
import datetime as dt
from dataclasses import dataclass
@@ -47,6 +48,8 @@ from shared_utils import (
load_config,
predict_from_library,
read_connection_txt,
require_section,
require_value,
z_norm,
)
@@ -54,6 +57,11 @@ from shared_utils import (
# CONFIG
# =========================
CONFIG = load_config()
DB_CONFIG = require_section(CONFIG, "db")
PATTERN_CONFIG = require_section(CONFIG, "pattern")
TAGGING_CONFIG = require_section(CONFIG, "tagging")
RANKING_CONFIG = require_section(CONFIG, "ranking")
SIGNALS_CONFIG = require_section(CONFIG, "signals")
DB_CONFIG = CONFIG.get("db", {})
PATTERN_CONFIG = CONFIG.get("pattern", {})
TAGGING_CONFIG = CONFIG.get("tagging", {})
@@ -65,12 +73,49 @@ UNIVERSO_XLSX = BASE_DIR / "Universo per Trading System.xlsx"
CONNECTION_TXT = BASE_DIR / "connection.txt"
AUDIT_LOG_CSV = BASE_DIR / "trades_audit_log.csv"
OPEN_TRADES_DIR = BASE_DIR / "open_trades"
DROPBOX_EXPORT_DIR = Path(r"C:\Users\Admin\Dropbox\Condivisa Lavoro\Segnali di trading su ETF")
def _dated_signals_filename() -> Path:
date_prefix = pd.Timestamp.today().strftime("%Y%m%d")
return BASE_DIR / f"{date_prefix}_signals.xlsx"
# Stored procedure / parametri DB
SP_NAME_DEFAULT = str(require_value(DB_CONFIG, "stored_proc", "db"))
SP_N_DEFAULT = int(require_value(DB_CONFIG, "n_bars", "db"))
PTF_CURR_DEFAULT = str(require_value(DB_CONFIG, "ptf_curr", "db"))
# Pattern recognition (come backtest)
WP = int(require_value(PATTERN_CONFIG, "wp", "pattern"))
HA = int(require_value(PATTERN_CONFIG, "ha", "pattern"))
KNN_K = int(require_value(PATTERN_CONFIG, "knn_k", "pattern"))
THETA = float(require_value(PATTERN_CONFIG, "theta", "pattern")) # 0,005% in decimali (identico al backtest)
Z_REV = float(require_value(TAGGING_CONFIG, "z_rev", "tagging"))
Z_VOL = float(require_value(TAGGING_CONFIG, "z_vol", "tagging"))
STD_COMP_PCT = float(require_value(TAGGING_CONFIG, "std_comp_pct", "tagging"))
# Exit rules (identiche al backtest)
SL_BPS = float(require_value(SIGNALS_CONFIG, "sl_bps", "signals"))
TP_BPS = float(require_value(SIGNALS_CONFIG, "tp_bps", "signals"))
TRAIL_BPS = float(require_value(SIGNALS_CONFIG, "trail_bps", "signals"))
TIME_STOP_BARS = int(require_value(SIGNALS_CONFIG, "time_stop_bars", "signals"))
THETA_EXIT = float(require_value(SIGNALS_CONFIG, "theta_exit", "signals")) # soglia debolezza
WEAK_DAYS_EXIT = require_value(SIGNALS_CONFIG, "weak_days_exit", "signals") # uscita IMMEDIATA in caso di debolezza (come backtest)
# Ranking e selezione Top-N per APERTURE
MAX_OPEN = int(require_value(SIGNALS_CONFIG, "max_open", "signals")) # cap strumenti aperti oggi (come backtest)
# Allineamento al backtest v3.1.5 per il cap del Risk Parity
TOP_N_MAX = int(require_value(RANKING_CONFIG, "top_n_max", "ranking"))
RP_MAX_WEIGHT = require_value(RANKING_CONFIG, "rp_max_weight", "ranking") # ≈ 0.1333 = 13,33% per singolo asset
if RP_MAX_WEIGHT is None:
RP_MAX_WEIGHT = 2 / max(TOP_N_MAX, 1)
else:
RP_MAX_WEIGHT = float(RP_MAX_WEIGHT)
# Sizing
BASE_CAPITAL_PER_STRATEGY = float(require_value(SIGNALS_CONFIG, "base_capital_per_strategy", "signals"))
MIN_TRADE_NOTIONAL = float(require_value(SIGNALS_CONFIG, "min_trade_notional", "signals"))
RISK_PARITY_LOOKBACK = int(require_value(SIGNALS_CONFIG, "risk_parity_lookback", "signals"))
SP_NAME_DEFAULT = DB_CONFIG.get("stored_proc", "opt_RendimentoGiornaliero1_ALL")
SP_N_DEFAULT = DB_CONFIG.get("n_bars", 1305)
PTF_CURR_DEFAULT = DB_CONFIG.get("ptf_curr", "EUR")
@@ -117,6 +162,18 @@ np.random.seed(SEED)
def ensure_dir(p: Path):
p.mkdir(parents=True, exist_ok=True)
def copy_to_dropbox(src: Path, dst_dir: Path = DROPBOX_EXPORT_DIR):
if not src or not dst_dir:
return
if not src.exists():
return
try:
ensure_dir(dst_dir)
dst = dst_dir / src.name
shutil.copy2(src, dst)
except Exception as exc:
print(f"[WARN] impossibile copiare {src} su {dst_dir}: {exc}")
def next_business_day(d: dt.date) -> dt.date:
nd = d + dt.timedelta(days=1)
if not BUSINESS_DAYS_ONLY:
@@ -372,13 +429,15 @@ def load_open_trades(strategy: str) -> pd.DataFrame:
p = open_trades_path(strategy)
if not p.exists():
return pd.DataFrame(columns=[
"Strategy","ISIN","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes"
"Strategy","ISIN","AssetName","EntryDate","EntryIndex","EntryAmount","SizeWeight","PeakPnL","WeakDays","Notes"
])
df = pd.read_csv(p)
if "EntryDate" in df.columns:
df["EntryDate"] = pd.to_datetime(df["EntryDate"], errors="coerce").dt.date
if "WeakDays" not in df.columns:
df["WeakDays"] = 0
if "AssetName" not in df.columns:
df["AssetName"] = ""
df["Strategy"] = strategy
return df
@@ -475,7 +534,8 @@ def update_positions_and_build_orders(universe: pd.DataFrame,
signals_today: pd.DataFrame,
today: dt.date,
buy_rank_df: Optional[pd.DataFrame],
allowed_open_isins: Optional[List[str]] = None) -> Tuple[pd.DataFrame, List[Dict]]:
allowed_open_isins: Optional[List[str]] = None,
asset_name_map: Optional[pd.Series] = None) -> Tuple[pd.DataFrame, List[Dict]]:
"""
- decision_date = ultima data disponibile (EOD)
- target giornaliero = primi MAX_OPEN del ranking buy (uguale per tutte le strategie)
@@ -604,6 +664,17 @@ def update_positions_and_build_orders(universe: pd.DataFrame,
}])], ignore_index=True)
current_set.add(isin)
if asset_name_map is not None:
df_open["AssetName"] = df_open["ISIN"].astype(str).map(asset_name_map).fillna("")
else:
if "AssetName" not in df_open.columns:
df_open["AssetName"] = ""
if "AssetName" in df_open.columns:
cols = list(df_open.columns)
if "ISIN" in cols and "AssetName" in cols:
cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName")))
df_open = df_open[cols]
save_open_trades(strat, df_open)
df_open["Strategy"] = strat
open_concat.append(df_open)
@@ -633,6 +704,19 @@ def main_run(run_date: Optional[dt.date] = None):
# 1) Universo
universe = load_universe(UNIVERSO_XLSX)
asset_name_col = detect_column(universe, [
"Nome", "Name", "Asset", "Asset Name", "Descrizione", "Description"
])
if not asset_name_col:
print("[WARN] Colonna con il nome dell'asset non trovata nell'universo.")
asset_name_map: Optional[pd.Series] = None
if asset_name_col:
asset_name_map = (
universe[["ISIN", asset_name_col]]
.dropna(subset=["ISIN"])
.assign(ISIN=lambda df: df["ISIN"].astype(str).str.strip())
)
asset_name_map = asset_name_map.set_index("ISIN")[asset_name_col].astype(str).str.strip()
# 2) Ritorni (DB)
conn_str = read_connection_txt(CONNECTION_TXT)
@@ -662,7 +746,8 @@ def main_run(run_date: Optional[dt.date] = None):
open_df, audit_rows = update_positions_and_build_orders(
universe, returns_long, sig_df, today,
buy_rank_df=buy_rank_df,
allowed_open_isins=allowed_open
allowed_open_isins=allowed_open,
asset_name_map=asset_name_map,
)
# 5) Append audit log (TUTTE le strategie operative)
@@ -672,8 +757,20 @@ def main_run(run_date: Optional[dt.date] = None):
# 6) Snapshot Excel datato — fogli con nomi completi
ensure_dir(OPEN_TRADES_DIR)
signals_path = _dated_signals_filename()
signals_sheet = sig_df.reset_index()
if asset_name_map is not None:
signals_sheet["AssetName"] = signals_sheet["ISIN"].astype(str).map(asset_name_map).fillna("")
else:
signals_sheet["AssetName"] = ""
# inserisci la colonna subito dopo l'ISIN
if "AssetName" in signals_sheet.columns:
cols = list(signals_sheet.columns)
cols.insert(cols.index("ISIN") + 1, cols.pop(cols.index("AssetName")))
signals_sheet = signals_sheet[cols]
with pd.ExcelWriter(signals_path) as xw:
sig_df.reset_index().to_excel(xw, sheet_name="Signals", index=False)
signals_sheet.to_excel(xw, sheet_name="Signals", index=False)
if not open_df.empty:
for strat, g in open_df.groupby("Strategy"):
sheet_name_map = {
@@ -683,6 +780,12 @@ def main_run(run_date: Optional[dt.date] = None):
sheet_name = sheet_name_map.get(strat, f"Open_{strat}")[:31]
g.to_excel(xw, sheet_name=sheet_name, index=False)
copy_to_dropbox(signals_path)
for strat in ["Equal_Weight", "Risk_Parity"]:
csv_path = open_trades_path(strat)
if csv_path.exists():
copy_to_dropbox(csv_path)
print(f"✅ Signals generated for {today}. Saved to {signals_path}")
print(f"Open trades saved in {OPEN_TRADES_DIR}")
print(f"Audit log updated at {AUDIT_LOG_CSV}")