203 lines
6.7 KiB
Python
203 lines
6.7 KiB
Python
from typing import Any, Dict, Iterable, List, Tuple
|
|
import pyodbc
|
|
from decimal import Decimal
|
|
|
|
from .config import get_connection_string
|
|
from . import models
|
|
|
|
|
|
def _rows_to_dicts(cursor, rows):
|
|
columns = [col[0] for col in cursor.description]
|
|
return [dict(zip(columns, row)) for row in rows]
|
|
|
|
|
|
def _normalize_keys(row: Dict[str, Any]) -> Dict[str, Any]:
|
|
normalized = {}
|
|
for k, v in row.items():
|
|
if isinstance(v, Decimal):
|
|
normalized[k.lower()] = float(v)
|
|
else:
|
|
normalized[k.lower()] = v
|
|
return normalized
|
|
|
|
|
|
class SqlDataAccess:
|
|
def load_data(self, stored_proc: str, params: Dict[str, Any], connection_string: str):
|
|
placeholders = ", ".join([f"@{k}=?" for k in params.keys()])
|
|
sql = f"EXEC {stored_proc} {placeholders}" if placeholders else f"EXEC {stored_proc}"
|
|
values = list(params.values())
|
|
|
|
with pyodbc.connect(connection_string) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(sql, values)
|
|
rows = cur.fetchall()
|
|
return _rows_to_dicts(cur, rows)
|
|
|
|
|
|
class SqlQuery:
|
|
def __init__(self, connection_string: str):
|
|
self._connection_string = connection_string
|
|
self._db = SqlDataAccess()
|
|
|
|
def get_hist_price_ul(self, isin: str, num_prezzi_eod: int):
|
|
rows = self._db.load_data(
|
|
"pricer_ULPrices1",
|
|
{"Isin": isin, "numPrezzi": num_prezzi_eod},
|
|
self._connection_string,
|
|
)
|
|
return [
|
|
models.HistPriceULModel(
|
|
id_underlyings=r["idunderlyings"],
|
|
sottostante=r["sottostante"],
|
|
px_close=r["px_close"],
|
|
px_date=r["px_date"],
|
|
)
|
|
for r in map(_normalize_keys, rows)
|
|
]
|
|
|
|
def get_details_ul(self, isin: str):
|
|
rows = self._db.load_data(
|
|
"pricer_ULDetails",
|
|
{"Isin": isin},
|
|
self._connection_string,
|
|
)
|
|
return [
|
|
models.DetailsULModel(
|
|
id_underlyings=r["idunderlyings"],
|
|
sottostante=r["sottostante"],
|
|
maturity_date=r.get("maturitydate"),
|
|
last_price=r["lastprice"],
|
|
strike=r["strike"],
|
|
spot_price_normalized=r["spotpricenormalized"],
|
|
dividend=r["dividend"],
|
|
volatility=r["volatility"],
|
|
days_to_maturity=r.get("daystomaturity", 0),
|
|
tasso_interesse=r.get("tassointeresse", 0.0),
|
|
)
|
|
for r in map(_normalize_keys, rows)
|
|
]
|
|
|
|
def get_details_event(self, isin: str):
|
|
rows = self._db.load_data(
|
|
"pricer_EventDetails",
|
|
{"Isin": isin},
|
|
self._connection_string,
|
|
)
|
|
return [
|
|
models.DetailsEventModel(
|
|
days_to_obs=r["daystoobs"],
|
|
days_to_ex_date=r.get("daystoexdate", 0),
|
|
days_to_obs_y_fract=r["daystoobsyfract"],
|
|
coupon_value=r["couponvalue"],
|
|
coupon_trigger=r["coupontrigger"],
|
|
autocall_value=r["autocallvalue"],
|
|
autocall_trigger=r["autocalltrigger"],
|
|
memory=r["memory"],
|
|
)
|
|
for r in map(_normalize_keys, rows)
|
|
]
|
|
|
|
def get_details_event_exdate(self, isin: str):
|
|
rows = self._db.load_data(
|
|
"pricer_EventDetails_ExDate",
|
|
{"Isin": isin},
|
|
self._connection_string,
|
|
)
|
|
return [
|
|
models.DetailsEventModel(
|
|
days_to_obs=r.get("daystoobs", 0),
|
|
days_to_ex_date=r["daystoexdate"],
|
|
days_to_obs_y_fract=r.get("daystoobsyfract", 0.0),
|
|
coupon_value=r.get("couponvalue", 0.0),
|
|
coupon_trigger=r.get("coupontrigger", 0.0),
|
|
autocall_value=r.get("autocallvalue", 0.0),
|
|
autocall_trigger=r.get("autocalltrigger", 0.0),
|
|
memory=r.get("memory", 0),
|
|
)
|
|
for r in map(_normalize_keys, rows)
|
|
]
|
|
|
|
def get_details_ctf(self, isin: str):
|
|
rows = self._db.load_data(
|
|
"pricer_CTFDetails",
|
|
{"Isin": isin},
|
|
self._connection_string,
|
|
)
|
|
return [
|
|
models.DetailsCTFModel(
|
|
tasso_interesse=r["tassointeresse"],
|
|
days_to_maturity=r["daystomaturity"],
|
|
pdi_style=r["pdi_style"],
|
|
pdi_strike=r["pdi_strike"],
|
|
pdi_barrier=r["pdi_barrier"],
|
|
capital_value=r["capitalvalue"],
|
|
nominal_amount=r["nominalamount"],
|
|
bid=r["bid"],
|
|
ask=r["ask"],
|
|
last_date_price=r["lastdateprice"],
|
|
coupon_in_memory=r["couponinmemory"],
|
|
prot_min_val=r["protminval"],
|
|
air_bag=r["airbag"],
|
|
fattore_airbag=r["fattoreairbag"],
|
|
one_star=r["onestar"],
|
|
trigger_onestar=r["triggeronestar"],
|
|
twin_win=r["twinwin"],
|
|
sigma=r["sigma"],
|
|
relief=r["relief"],
|
|
domino=r["domino"],
|
|
category=r["category"],
|
|
cap=r["cap"],
|
|
leva=r["leva"],
|
|
)
|
|
for r in map(_normalize_keys, rows)
|
|
]
|
|
|
|
def get_isin_universe(self):
|
|
rows = self._db.load_data(
|
|
"pricer_LoadISINUniverse",
|
|
{},
|
|
self._connection_string,
|
|
)
|
|
values = []
|
|
for row in rows:
|
|
if isinstance(row, dict):
|
|
if len(row) == 1:
|
|
values.append(next(iter(row.values())))
|
|
else:
|
|
values.append(row)
|
|
else:
|
|
values.append(row)
|
|
return values
|
|
|
|
def check_isin(self, isin: str):
|
|
rows = self._db.load_data(
|
|
"pricer_CheckISIN1",
|
|
{"Isin": isin},
|
|
self._connection_string,
|
|
)
|
|
values = []
|
|
for row in rows:
|
|
if isinstance(row, dict):
|
|
if len(row) == 1:
|
|
values.append(next(iter(row.values())))
|
|
else:
|
|
values.append(row)
|
|
else:
|
|
values.append(row)
|
|
return values
|
|
|
|
def bulk_update_certificates(self, table_name: str, list_fair_values: List[models.FairValues]) -> None:
|
|
if not list_fair_values:
|
|
return
|
|
|
|
sql = f"UPDATE {table_name} SET FairValue = ? WHERE ISIN = ?"
|
|
data = [(fv.fair_value, fv.isin) for fv in list_fair_values]
|
|
with pyodbc.connect(self._connection_string) as conn:
|
|
cur = conn.cursor()
|
|
cur.executemany(sql, data)
|
|
conn.commit()
|
|
|
|
|
|
def default_sql_query():
|
|
return SqlQuery(get_connection_string("FirstSolutionDB"))
|