Files
Pricer/pricer/calc.py
fredmaloggia f707b9c95b primo commit
2026-01-05 19:15:03 +01:00

1446 lines
53 KiB
Python

import math
from dataclasses import dataclass
from typing import List, Optional, Tuple
import os
from concurrent.futures import ProcessPoolExecutor
import numpy as np
try:
from .db import default_sql_query
from .models import PrezziSottostanti, UnderlyingStats
from .utils import cumulative_sums, standard_deviation
except ImportError: # Allow running as a script without a package context.
import sys
from pathlib import Path
pkg_root = Path(__file__).resolve().parents[1]
if str(pkg_root) not in sys.path:
sys.path.insert(0, str(pkg_root))
from pricer.db import default_sql_query
from pricer.models import PrezziSottostanti, UnderlyingStats
from pricer.utils import cumulative_sums, standard_deviation
try:
from numba import njit
_USE_NUMBA = True
except Exception:
_USE_NUMBA = False
def njit(*args, **kwargs):
def wrap(fn):
return fn
return wrap
_cached_eod_underlyings: List[PrezziSottostanti] = []
_cached_fair_value_array = None
@dataclass
class FairValueResult:
fair_value: float
fair_value_array: Optional[List[float]]
stdev: float
simulations: int
caso_fair_value: str
@dataclass
class FairValueResultArray:
fair_value_array: List[float]
caso_fair_value: Optional[str]
def get_cached_eod_underlyings() -> List[PrezziSottostanti]:
return _cached_eod_underlyings
def compute_underlying_stats(prezzi_ul: List[PrezziSottostanti]) -> List[UnderlyingStats]:
results = []
for ul in prezzi_ul:
prezzi = ul.prezzi_close
n = len(prezzi)
log_returns = []
for i in range(n - 1):
if prezzi[i] > 0 and prezzi[i + 1] > 0:
log_returns.append(math.log(prezzi[i] / prezzi[i + 1]))
else:
log_returns.append(0.0)
if len(log_returns) > 1:
avg = sum(log_returns) / len(log_returns)
sum_sq = sum((r - avg) ** 2 for r in log_returns)
variance = sum_sq / (len(log_returns) - 1)
std_dev = math.sqrt(variance)
else:
std_dev = 0.0
volatility_annualized = std_dev * math.sqrt(252)
results.append(
UnderlyingStats(
nome=ul.sottostante,
prezzi=prezzi,
log_returns=log_returns,
volatility=volatility_annualized,
)
)
return results
def fair_value(
prices_ul: List[float],
cor_mat: List[List[float]],
num_sottostanti: int,
num_sims: int,
tasso_interesse: float,
days_to_maturity: int,
dividends: List[float],
volatility: List[float],
days_to_observation: List[int],
days_to_observation_y_fract: List[float],
coupon_values: List[float],
coupon_triggers: List[float],
autocall_values: List[float],
autocall_triggers: List[float],
memory_flags: List[int],
coupon_in_memory: float,
pdi_style: Optional[str],
pdi_strike: float,
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
airbag: int,
sigma: int,
twinwin: int,
relief: int,
fattore_airbag: float,
one_star: int,
trigger_one_star: float,
cap: float,
leva: float,
) -> FairValueResult:
pv_digits = [0.0] * num_sims
caso_fair_value = ""
if _cached_fair_value_array is not None:
pv_digits = _cached_fair_value_array.fair_value_array
caso_fair_value = _cached_fair_value_array.caso_fair_value or ""
else:
pv_digits = simulate_payoffs(
num_sims,
prices_ul,
cor_mat,
days_to_maturity,
tasso_interesse,
dividends,
volatility,
days_to_observation,
days_to_observation_y_fract,
coupon_values,
coupon_triggers,
autocall_values,
autocall_triggers,
memory_flags,
coupon_in_memory,
pdi_style,
pdi_strike,
pdi_barrier,
capital_value,
prot_min_val,
airbag,
sigma,
twinwin,
relief,
fattore_airbag,
one_star,
trigger_one_star,
cap,
)
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0:
caso_fair_value = "Standard"
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0:
caso_fair_value = "Airbag"
if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 0:
caso_fair_value = "Sigma"
if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 0:
caso_fair_value = "Relief"
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0:
caso_fair_value = "TwinWin"
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1:
caso_fair_value = "OneStar"
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1:
caso_fair_value = "Airbag + OneStar"
if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 1:
caso_fair_value = "Sigma + OneStar"
if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 1:
caso_fair_value = "Relief + OneStar"
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 1:
caso_fair_value = "TwinWin + OneStar"
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0:
caso_fair_value = "Airbag + TwinWin"
fair_value_value = float(np.mean(pv_digits)) * 100.0
stdev = standard_deviation([x * 100.0 for x in pv_digits])
return FairValueResult(
fair_value=fair_value_value,
fair_value_array=[x * 100.0 for x in pv_digits],
stdev=stdev,
simulations=num_sims,
caso_fair_value=caso_fair_value,
)
def fair_value_array(
prices_ul: List[float],
cor_mat: List[List[float]],
num_sottostanti: int,
num_sims: int,
tasso_interesse: float,
days_to_maturity: int,
dividends: List[float],
volatility: List[float],
days_to_observation: List[int],
days_to_observation_y_fract: List[float],
coupon_values: List[float],
coupon_triggers: List[float],
autocall_values: List[float],
autocall_triggers: List[float],
memory_flags: List[int],
coupon_in_memory: float,
pdi_style: Optional[str],
pdi_strike: float,
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
airbag: int,
sigma: int,
twinwin: int,
relief: int,
fattore_airbag: float,
one_star: int,
trigger_one_star: float,
cap: float,
leva: float,
) -> FairValueResultArray:
caso_fair_value = ""
pv_digits = [0.0] * num_sims
pv_digits = simulate_payoffs(
num_sims,
prices_ul,
cor_mat,
days_to_maturity,
tasso_interesse,
dividends,
volatility,
days_to_observation,
days_to_observation_y_fract,
coupon_values,
coupon_triggers,
autocall_values,
autocall_triggers,
memory_flags,
coupon_in_memory,
pdi_style,
pdi_strike,
pdi_barrier,
capital_value,
prot_min_val,
airbag,
sigma,
twinwin,
relief,
fattore_airbag,
one_star,
trigger_one_star,
cap,
)
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0:
caso_fair_value = "Standard"
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0:
caso_fair_value = "Airbag"
if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 0:
caso_fair_value = "Sigma"
if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 0:
caso_fair_value = "Relief"
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0:
caso_fair_value = "TwinWin"
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1:
caso_fair_value = "OneStar"
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1:
caso_fair_value = "Airbag + OneStar"
if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 1:
caso_fair_value = "Sigma + OneStar"
if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 1:
caso_fair_value = "Relief + OneStar"
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 1:
caso_fair_value = "TwinWin + OneStar"
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0:
caso_fair_value = "Airbag + TwinWin"
pv_digits = [x * 100.0 for x in pv_digits]
return FairValueResultArray(fair_value_array=pv_digits, caso_fair_value=caso_fair_value)
def _caso_one_star(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
max_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
trigger_one_star: float,
coupon_triggers: List[float],
coupon_values: List[float],
) -> float:
if min_price_at_maturity < pdi_barrier:
if max_price_at_maturity < trigger_one_star:
payout_capitale = max(prot_min_val, min_price_at_maturity / 100.0)
else:
payout_capitale = capital_value
else:
payout_capitale = capital_value
payout_cedola = 0.0
if min_price_at_maturity >= coupon_triggers[k]:
payout_cedola = memory_coupon + coupon_values[k]
return (
payout_capitale * math.exp(-r * days_to_observation_y_fract[k])
+ payout_cedola * math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
def _caso_twinwin(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
pdi_strike: float,
trigger_autocall: float,
cap: float,
) -> float:
if pdi_style == "European" and min_price_at_maturity >= pdi_strike and trigger_autocall == 999:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k])
* (memory_coupon + min(min_price_at_maturity / 100.0, cap))
+ coupon_paid
)
if pdi_style == "European" and min_price_at_maturity >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k])
* ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))
+ coupon_paid
)
if pdi_style == "European" and min_price_at_maturity < pdi_barrier:
pv_digit = (
max(prot_min_val, min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
if pdi_style == "American" and min_price_at_maturity >= pdi_strike and trigger_autocall == 999:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k])
* (memory_coupon + min(min_price_at_maturity / 100.0, cap))
+ coupon_paid
)
if pdi_style == "American" and min_price >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k])
* ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))
+ coupon_paid
)
if pdi_style == "American" and min_price < pdi_barrier:
pv_digit = (
max(prot_min_val, min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
return pv_digit
def _caso_relief(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
second_min_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
) -> float:
if pdi_style == "European" and min_price_at_maturity >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
if pdi_style == "European" and min_price_at_maturity < pdi_barrier:
pv_digit = (
max(prot_min_val, second_min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
if pdi_style == "American" and min_price >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
if pdi_style == "American" and min_price < pdi_barrier:
pv_digit = (
max(prot_min_val, second_min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
return pv_digit
def _caso_sigma(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
pdi_strike: float,
) -> float:
if pdi_style == "European" and min_price_at_maturity >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
if pdi_style == "European" and min_price_at_maturity < pdi_barrier:
pv_digit = (
max(prot_min_val, (min_price_at_maturity + pdi_strike - pdi_barrier) / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
if pdi_style == "American" and min_price >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
if pdi_style == "American" and min_price < pdi_barrier:
pv_digit = (
max(prot_min_val, (min_price_at_maturity + pdi_strike - pdi_barrier) / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
return pv_digit
def _caso_airbag(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
fattore_airbag: float,
) -> float:
if pdi_style == "European" and min_price_at_maturity >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
if pdi_style == "European" and min_price_at_maturity < pdi_barrier:
pv_digit = (
max(prot_min_val, min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
* fattore_airbag
+ coupon_paid
)
if pdi_style == "American" and min_price >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
if pdi_style == "American" and min_price < pdi_barrier:
pv_digit = (
max(prot_min_val, min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
* fattore_airbag
+ coupon_paid
)
return pv_digit
def _caso_airbag_one_star(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
max_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
fattore_airbag: float,
trigger_one_star: float,
) -> float:
if max_price_at_maturity >= trigger_one_star:
return (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
return _caso_airbag(
r,
days_to_observation_y_fract,
pdi_style,
pdi_barrier,
capital_value,
prot_min_val,
min_price,
min_price_at_maturity,
pv_digit,
memory_coupon,
coupon_paid,
k,
fattore_airbag,
)
def _caso_sigma_one_star(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
max_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
pdi_strike: float,
trigger_one_star: float,
) -> float:
if max_price_at_maturity >= trigger_one_star:
return (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
return _caso_sigma(
r,
days_to_observation_y_fract,
pdi_style,
pdi_barrier,
capital_value,
prot_min_val,
min_price,
min_price_at_maturity,
pv_digit,
memory_coupon,
coupon_paid,
k,
pdi_strike,
)
def _caso_relief_one_star(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
second_min_price_at_maturity: float,
max_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
trigger_one_star: float,
) -> float:
if max_price_at_maturity >= trigger_one_star:
return (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
return _caso_relief(
r,
days_to_observation_y_fract,
pdi_style,
pdi_barrier,
capital_value,
prot_min_val,
min_price,
min_price_at_maturity,
second_min_price_at_maturity,
pv_digit,
memory_coupon,
coupon_paid,
k,
)
def _caso_twinwin_one_star(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
max_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
pdi_strike: float,
autocall_trigger: float,
cap: float,
trigger_one_star: float,
) -> float:
if max_price_at_maturity >= trigger_one_star:
return (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
return _caso_twinwin(
r,
days_to_observation_y_fract,
pdi_style,
pdi_barrier,
capital_value,
prot_min_val,
min_price,
min_price_at_maturity,
pv_digit,
memory_coupon,
coupon_paid,
k,
pdi_strike,
autocall_trigger,
cap,
)
def _caso_airbag_twinwin(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
pdi_strike: float,
autocall_trigger: float,
cap: float,
fattore_airbag: float,
) -> float:
if min_price_at_maturity >= pdi_barrier:
if autocall_trigger == 999 and min_price_at_maturity >= pdi_strike:
return (
math.exp(-r * days_to_observation_y_fract[k])
* (memory_coupon + min(min_price_at_maturity / 100.0, cap))
+ coupon_paid
)
return (
math.exp(-r * days_to_observation_y_fract[k])
* ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))
+ coupon_paid
)
return (
max(prot_min_val, min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
* fattore_airbag
+ coupon_paid
)
def _caso_standard(
r: float,
days_to_observation_y_fract: List[float],
pdi_style: Optional[str],
pdi_barrier: float,
capital_value: float,
prot_min_val: float,
min_price: float,
min_price_at_maturity: float,
pv_digit: float,
memory_coupon: float,
coupon_paid: float,
k: int,
) -> float:
if pdi_style == "European" and min_price_at_maturity >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
if pdi_style == "European" and min_price_at_maturity < pdi_barrier:
pv_digit = (
max(prot_min_val, min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
if pdi_style == "American" and min_price >= pdi_barrier:
pv_digit = (
math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon)
+ coupon_paid
)
if pdi_style == "American" and min_price < pdi_barrier:
pv_digit = (
max(prot_min_val, min_price_at_maturity / 100.0)
* math.exp(-r * days_to_observation_y_fract[k])
+ coupon_paid
)
return pv_digit
def gbm_multi_equity(
prices_ul: List[float],
cor_mat: List[List[float]],
num_sottostanti: int,
days_to_maturity: int,
tasso_interesse: float,
q_assets: List[float],
vol_assets: List[float],
) -> List[List[float]]:
dt = 1.0 / 252.0
mean = np.zeros(num_sottostanti)
cov = np.array(cor_mat, dtype=float)
df_w = np.random.multivariate_normal(mean, cov, size=days_to_maturity)
prices = np.array(prices_ul, dtype=float)
vol = np.array(vol_assets, dtype=float)
div = np.array(q_assets, dtype=float)
drift = (tasso_interesse - div) - (0.5 * (vol ** 2))
increments = drift * dt + vol * math.sqrt(dt) * df_w
cum_sum = np.cumsum(increments, axis=0)
df_s = np.empty((days_to_maturity + 1, num_sottostanti), dtype=float)
df_s[0] = prices
df_s[1:] = prices * np.exp(cum_sum)
return df_s
def gbm_multi_equity_garch(
prices_ul: List[float],
correl_matrix: List[List[float]],
garch_params: List,
num_sottostanti: int,
days_to_maturity: int,
tasso_interesse: float,
q_assets: List[float],
) -> List[List[float]]:
dt = 1.0 / 252.0
mean = np.zeros(num_sottostanti)
cov = np.array(correl_matrix, dtype=float)
z = np.random.multivariate_normal(mean, cov, size=days_to_maturity)
s = [[0.0 for _ in range(num_sottostanti)] for _ in range(days_to_maturity + 1)]
sigma2 = [[0.0 for _ in range(num_sottostanti)] for _ in range(days_to_maturity + 1)]
for i in range(num_sottostanti):
s[0][i] = prices_ul[i]
sigma2[0][i] = garch_params[i].sigma0 ** 2
for t in range(1, days_to_maturity + 1):
for i in range(num_sottostanti):
epsilon = z[t - 1][i]
vol_t = math.sqrt(sigma2[t - 1][i])
drift = ((tasso_interesse - q_assets[i]) - 0.5 * sigma2[t - 1][i]) * dt
shock = vol_t * math.sqrt(dt) * epsilon
s[t][i] = s[t - 1][i] * math.exp(drift + shock)
sigma2[t][i] = (
garch_params[i].omega
+ garch_params[i].alpha * (shock ** 2)
+ garch_params[i].beta * sigma2[t - 1][i]
)
return s
def max_array_at_row(jagged_array: List[List[float]], num_sottostanti: int, row_index: int) -> float:
if isinstance(jagged_array, np.ndarray):
return float(jagged_array[row_index].max())
value = -999999.0
for j in range(num_sottostanti):
if jagged_array[row_index][j] > value:
value = jagged_array[row_index][j]
return value
def min_array_at_row(jagged_array: List[List[float]], num_sottostanti: int, row_index: int) -> float:
if isinstance(jagged_array, np.ndarray):
return float(jagged_array[row_index].min())
value = 999999.0
for j in range(num_sottostanti):
if jagged_array[row_index][j] < value:
value = jagged_array[row_index][j]
return value
def second_min_array_at_row(jagged_array: List[List[float]], num_sottostanti: int, row_index: int) -> float:
if isinstance(jagged_array, np.ndarray):
row = np.sort(jagged_array[row_index])
return float(row[1] if row.size >= 2 else row[0])
temp_array = [jagged_array[row_index][j] for j in range(num_sottostanti)]
temp_array.sort()
if len(temp_array) >= 2:
return temp_array[1]
return temp_array[0]
def _min_overall(jagged_array: List[List[float]]) -> float:
if isinstance(jagged_array, np.ndarray):
return float(jagged_array.min())
return min(min(row) for row in jagged_array)
@njit(cache=True)
def _nj_min_row(path, row_index):
return path[row_index].min()
@njit(cache=True)
def _nj_max_row(path, row_index):
return path[row_index].max()
@njit(cache=True)
def _nj_second_min_row(path, row_index):
row = np.sort(path[row_index])
if row.shape[0] >= 2:
return row[1]
return row[0]
@njit(cache=True)
def _nj_min_overall(path):
return path.min()
@njit(cache=True)
def _nj_caso_standard(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, memory_coupon, coupon_paid, k):
pv_digit = 0.0
if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
if style_is_eur == 1 and min_price_at_maturity < pdi_barrier:
pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
if style_is_eur == 0 and min_price >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
if style_is_eur == 0 and min_price < pdi_barrier:
pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
return pv_digit
@njit(cache=True)
def _nj_caso_airbag(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, fattore_airbag):
pv_digit = 0.0
if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
if style_is_eur == 1 and min_price_at_maturity < pdi_barrier:
pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) * fattore_airbag + coupon_paid
if style_is_eur == 0 and min_price >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
if style_is_eur == 0 and min_price < pdi_barrier:
pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) * fattore_airbag + coupon_paid
return pv_digit
@njit(cache=True)
def _nj_caso_sigma(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike):
pv_digit = 0.0
if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
if style_is_eur == 1 and min_price_at_maturity < pdi_barrier:
pv_digit = (max(prot_min_val, (min_price_at_maturity + pdi_strike - pdi_barrier) / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
if style_is_eur == 0 and min_price >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
if style_is_eur == 0 and min_price < pdi_barrier:
pv_digit = (max(prot_min_val, (min_price_at_maturity + pdi_strike - pdi_barrier) / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
return pv_digit
@njit(cache=True)
def _nj_caso_relief(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, second_min_price_at_maturity, memory_coupon, coupon_paid, k):
pv_digit = 0.0
if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
if style_is_eur == 1 and min_price_at_maturity < pdi_barrier:
pv_digit = (max(prot_min_val, second_min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
if style_is_eur == 0 and min_price >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
if style_is_eur == 0 and min_price < pdi_barrier:
pv_digit = (max(prot_min_val, second_min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
return pv_digit
@njit(cache=True)
def _nj_caso_twinwin(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, trigger_autocall, cap):
pv_digit = 0.0
if style_is_eur == 1 and min_price_at_maturity >= pdi_strike and trigger_autocall == 999:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (memory_coupon + (min(min_price_at_maturity / 100.0, cap)))) + coupon_paid
if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))) + coupon_paid
if style_is_eur == 1 and min_price_at_maturity < pdi_barrier:
pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
if style_is_eur == 0 and min_price_at_maturity >= pdi_strike and trigger_autocall == 999:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (memory_coupon + (min(min_price_at_maturity / 100.0, cap)))) + coupon_paid
if style_is_eur == 0 and min_price >= pdi_barrier:
pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))) + coupon_paid
if style_is_eur == 0 and min_price < pdi_barrier:
pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
return pv_digit
@njit(cache=True)
def _nj_caso_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, trigger_one_star,
coupon_triggers, coupon_values):
if min_price_at_maturity < pdi_barrier:
if max_price_at_maturity < trigger_one_star:
payout_capitale = max(prot_min_val, min_price_at_maturity / 100.0)
else:
payout_capitale = capital_value
else:
payout_capitale = capital_value
payout_cedola = 0.0
if min_price_at_maturity >= coupon_triggers[k]:
payout_cedola = memory_coupon + coupon_values[k]
return payout_capitale * math.exp(-r * days_to_obs_y_fract[k]) + payout_cedola * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
@njit(cache=True)
def _nj_caso_airbag_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid,
k, fattore_airbag, trigger_one_star, style_is_eur):
if max_price_at_maturity >= trigger_one_star:
return (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
return _nj_caso_airbag(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, fattore_airbag)
@njit(cache=True)
def _nj_caso_sigma_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid,
k, pdi_strike, trigger_one_star, style_is_eur):
if max_price_at_maturity >= trigger_one_star:
return (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
return _nj_caso_sigma(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike)
@njit(cache=True)
def _nj_caso_relief_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, second_min_price_at_maturity, max_price_at_maturity,
memory_coupon, coupon_paid, k, trigger_one_star, style_is_eur):
if max_price_at_maturity >= trigger_one_star:
return (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
return _nj_caso_relief(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, second_min_price_at_maturity, memory_coupon, coupon_paid, k)
@njit(cache=True)
def _nj_caso_twinwin_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid,
k, pdi_strike, autocall_trigger, cap, trigger_one_star, style_is_eur):
if max_price_at_maturity >= trigger_one_star:
return (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid
return _nj_caso_twinwin(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, autocall_trigger, cap)
@njit(cache=True)
def _nj_caso_airbag_twinwin(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, autocall_trigger,
cap, fattore_airbag):
if min_price_at_maturity >= pdi_barrier:
if autocall_trigger == 999 and min_price_at_maturity >= pdi_strike:
return (math.exp(-r * days_to_obs_y_fract[k]) * (memory_coupon + min(min_price_at_maturity / 100.0, cap))) + coupon_paid
return (math.exp(-r * days_to_obs_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))) + coupon_paid
return (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) * fattore_airbag + coupon_paid
@njit(cache=True)
def _nj_payoff_for_path(path, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers,
autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur,
pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief,
fattore_airbag, one_star, trigger_one_star, cap, days_to_maturity, r):
min_price = _nj_min_overall(path)
min_price_at_maturity = _nj_min_row(path, days_to_maturity)
pv_digit = 0.0
memory_coupon = coupon_in_memory
coupon_paid = 0.0
n_obs = days_to_obs.shape[0]
for k in range(n_obs):
min_by_row = _nj_min_row(path, days_to_obs[k])
if min_by_row >= autocall_triggers[k]:
pv_digit = (autocall_values[k] + memory_coupon) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid
return pv_digit
if min_by_row >= coupon_triggers[k]:
coupon_paid += (coupon_values[k] + memory_coupon) * math.exp(-r * days_to_obs_y_fract[k])
memory_coupon = 0.0
else:
memory_coupon += memory_flags[k] * coupon_values[k]
if k == n_obs - 1:
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0:
return _nj_caso_standard(r, days_to_obs_y_fract, style_is_eur, pdi_barrier,
capital_value, prot_min_val, min_price, min_price_at_maturity,
memory_coupon, coupon_paid, k)
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0:
return _nj_caso_airbag(r, days_to_obs_y_fract, style_is_eur, pdi_barrier,
capital_value, prot_min_val, min_price, min_price_at_maturity,
memory_coupon, coupon_paid, k, fattore_airbag)
if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 0:
return _nj_caso_sigma(r, days_to_obs_y_fract, style_is_eur, pdi_barrier,
capital_value, prot_min_val, min_price, min_price_at_maturity,
memory_coupon, coupon_paid, k, pdi_strike)
if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 0:
second_min = _nj_second_min_row(path, days_to_maturity)
return _nj_caso_relief(r, days_to_obs_y_fract, style_is_eur, pdi_barrier,
capital_value, prot_min_val, min_price, min_price_at_maturity,
second_min, memory_coupon, coupon_paid, k)
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0:
return _nj_caso_twinwin(r, days_to_obs_y_fract, style_is_eur, pdi_barrier,
capital_value, prot_min_val, min_price, min_price_at_maturity,
memory_coupon, coupon_paid, k, pdi_strike, autocall_triggers[k], cap)
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1:
max_price_at_maturity = _nj_max_row(path, days_to_obs[k])
return _nj_caso_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k,
trigger_one_star, coupon_triggers, coupon_values)
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1:
max_price_at_maturity = _nj_max_row(path, days_to_obs[k])
return _nj_caso_airbag_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, max_price_at_maturity,
memory_coupon, coupon_paid, k, fattore_airbag, trigger_one_star,
style_is_eur)
if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 1:
max_price_at_maturity = _nj_max_row(path, days_to_obs[k])
return _nj_caso_sigma_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, max_price_at_maturity,
memory_coupon, coupon_paid, k, pdi_strike, trigger_one_star,
style_is_eur)
if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 1:
second_min = _nj_second_min_row(path, days_to_maturity)
max_price_at_maturity = _nj_max_row(path, days_to_obs[k])
return _nj_caso_relief_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, second_min, max_price_at_maturity,
memory_coupon, coupon_paid, k, trigger_one_star, style_is_eur)
if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 1:
max_price_at_maturity = _nj_max_row(path, days_to_obs[k])
return _nj_caso_twinwin_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price, min_price_at_maturity, max_price_at_maturity,
memory_coupon, coupon_paid, k, pdi_strike, autocall_triggers[k],
cap, trigger_one_star, style_is_eur)
if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0:
return _nj_caso_airbag_twinwin(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val,
min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike,
autocall_triggers[k], cap, fattore_airbag)
return pv_digit
@njit(cache=True)
def _nj_payoffs_for_paths(paths, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers,
autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur,
pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief,
fattore_airbag, one_star, trigger_one_star, cap, days_to_maturity, r):
n_sims = paths.shape[0]
out = np.empty(n_sims, dtype=np.float64)
for i in range(n_sims):
out[i] = _nj_payoff_for_path(paths[i], days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers,
autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur,
pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief,
fattore_airbag, one_star, trigger_one_star, cap, days_to_maturity, r)
return out
def _generate_paths_batch(prices_ul, cor_mat, num_sims, days_to_maturity, tasso_interesse, dividends, volatility):
num_assets = prices_ul.shape[0]
dt = 1.0 / 252.0
sqrt_dt = math.sqrt(dt)
chol = np.linalg.cholesky(cor_mat)
z = np.random.standard_normal((num_sims, days_to_maturity, num_assets))
w = np.matmul(z, chol.T)
drift = (tasso_interesse - dividends) - (0.5 * (volatility ** 2))
increments = drift[None, None, :] * dt + volatility[None, None, :] * sqrt_dt * w
cum_sum = np.cumsum(increments, axis=1)
paths = np.empty((num_sims, days_to_maturity + 1, num_assets), dtype=np.float64)
paths[:, 0, :] = prices_ul
paths[:, 1:, :] = prices_ul[None, None, :] * np.exp(cum_sum)
return paths
def _simulate_batch(args: Tuple):
(num_sims, prices_ul, cor_mat, days_to_maturity, tasso_interesse, dividends, volatility, days_to_obs,
days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags,
coupon_in_memory, style_is_eur, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma,
twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap) = args
paths = _generate_paths_batch(prices_ul, cor_mat, num_sims, days_to_maturity, tasso_interesse, dividends, volatility)
return _nj_payoffs_for_paths(
paths,
days_to_obs,
days_to_obs_y_fract,
coupon_values,
coupon_triggers,
autocall_values,
autocall_triggers,
memory_flags,
coupon_in_memory,
style_is_eur,
pdi_strike,
pdi_barrier,
capital_value,
prot_min_val,
airbag,
sigma,
twinwin,
relief,
fattore_airbag,
one_star,
trigger_one_star,
cap,
days_to_maturity,
tasso_interesse,
)
def simulate_payoffs(
num_sims,
prices_ul,
cor_mat,
days_to_maturity,
tasso_interesse,
dividends,
volatility,
days_to_obs,
days_to_obs_y_fract,
coupon_values,
coupon_triggers,
autocall_values,
autocall_triggers,
memory_flags,
coupon_in_memory,
pdi_style,
pdi_strike,
pdi_barrier,
capital_value,
prot_min_val,
airbag,
sigma,
twinwin,
relief,
fattore_airbag,
one_star,
trigger_one_star,
cap,
):
if cap is None:
cap = 0.0
if trigger_one_star is None:
trigger_one_star = 0.0
if pdi_strike is None:
pdi_strike = 0.0
if pdi_barrier is None:
pdi_barrier = 0.0
if capital_value is None:
capital_value = 0.0
if prot_min_val is None:
prot_min_val = 0.0
if fattore_airbag is None:
fattore_airbag = 0.0
if coupon_in_memory is None:
coupon_in_memory = 0.0
prices_ul = np.asarray(prices_ul, dtype=np.float64)
cor_mat = np.asarray(cor_mat, dtype=np.float64)
dividends = np.asarray(dividends, dtype=np.float64)
volatility = np.asarray(volatility, dtype=np.float64)
days_to_obs = np.asarray(days_to_obs, dtype=np.int64)
days_to_obs_y_fract = np.asarray(days_to_obs_y_fract, dtype=np.float64)
coupon_values = np.asarray(coupon_values, dtype=np.float64)
coupon_triggers = np.asarray(coupon_triggers, dtype=np.float64)
autocall_values = np.asarray(autocall_values, dtype=np.float64)
autocall_triggers = np.asarray(autocall_triggers, dtype=np.float64)
memory_flags = np.asarray(memory_flags, dtype=np.int64)
style_is_eur = 1 if pdi_style == "European" else 0
cpu_count = os.cpu_count() or 1
use_parallel = cpu_count > 1 and num_sims >= 20000
if not _USE_NUMBA:
use_parallel = False
if use_parallel:
workers = max(1, cpu_count - 1)
base_batch = max(256, num_sims // (workers * 4))
batch_size = min(4096, base_batch)
batches = []
remaining = num_sims
while remaining > 0:
take = batch_size if remaining >= batch_size else remaining
remaining -= take
batches.append(take)
args = [
(
b,
prices_ul,
cor_mat,
days_to_maturity,
tasso_interesse,
dividends,
volatility,
days_to_obs,
days_to_obs_y_fract,
coupon_values,
coupon_triggers,
autocall_values,
autocall_triggers,
memory_flags,
coupon_in_memory,
style_is_eur,
pdi_strike,
pdi_barrier,
capital_value,
prot_min_val,
airbag,
sigma,
twinwin,
relief,
fattore_airbag,
one_star,
trigger_one_star,
cap,
)
for b in batches
]
results = []
with ProcessPoolExecutor(max_workers=workers) as executor:
for res in executor.map(_simulate_batch, args):
results.append(res)
return np.concatenate(results)
batch_size = 4096
results = []
remaining = num_sims
while remaining > 0:
take = batch_size if remaining >= batch_size else remaining
remaining -= take
res = _simulate_batch(
(
take,
prices_ul,
cor_mat,
days_to_maturity,
tasso_interesse,
dividends,
volatility,
days_to_obs,
days_to_obs_y_fract,
coupon_values,
coupon_triggers,
autocall_values,
autocall_triggers,
memory_flags,
coupon_in_memory,
style_is_eur,
pdi_strike,
pdi_barrier,
capital_value,
prot_min_val,
airbag,
sigma,
twinwin,
relief,
fattore_airbag,
one_star,
trigger_one_star,
cap,
)
)
results.append(res)
return np.concatenate(results)
def volatility_certificates(isin: str, num_prezzi_eod: int, enable_warning: bool) -> List[float]:
global _cached_eod_underlyings
if _cached_eod_underlyings:
list_prezzi = _cached_eod_underlyings
if enable_warning:
num_ul = len(list_prezzi)
tot_prezzi = sum(len(p.prezzi_close) for p in list_prezzi)
if num_ul * num_prezzi_eod > tot_prezzi:
print(f"Volatilita calcolata usando {tot_prezzi // num_ul} prezzi eod")
else:
fs_db = default_sql_query()
hist_price = fs_db.get_hist_price_ul(isin, num_prezzi_eod)
details_ul = fs_db.get_details_ul(isin)
num_ul = len(details_ul)
if enable_warning and num_ul * num_prezzi_eod != len(hist_price):
print("Attenzione! Rilevati meno di prezzi eod in uno dei sottostanti")
print(f"Volatilita calcolata usando {len(hist_price) // max(num_ul, 1)} prezzi eod")
input("Premere un tasto per continuare...")
list_prezzi = []
for dettaglio in details_ul:
prezzi = [r.px_close for r in hist_price if r.id_underlyings == dettaglio.id_underlyings]
list_prezzi.append(
PrezziSottostanti(
id_underlyings=dettaglio.id_underlyings,
sottostante=dettaglio.sottostante,
prezzi_close=prezzi,
)
)
_cached_eod_underlyings = list_prezzi
stats = compute_underlying_stats(list_prezzi)
return [s.volatility for s in stats]
def correl_certificates(isin: str, num_prezzi_eod: int, enable_warning: bool) -> List[List[float]]:
global _cached_eod_underlyings
fs_db = default_sql_query()
hist_price = fs_db.get_hist_price_ul(isin, num_prezzi_eod)
details_ul = fs_db.get_details_ul(isin)
num_ul = len(details_ul)
if enable_warning and num_ul * num_prezzi_eod != len(hist_price):
print("Attenzione! Rilevati meno di prezzi eod in uno dei sottostanti")
print(f"Matrice di correlazione calcolata usando {len(hist_price) // max(num_ul, 1)} prezzi eod")
input("Premere un tasto per continuare...")
list_prezzi = []
for dettaglio in details_ul:
arr_prezzi = [
r.px_close
for r in hist_price
if r.id_underlyings == dettaglio.id_underlyings
]
list_prezzi.append(
PrezziSottostanti(
id_underlyings=dettaglio.id_underlyings,
sottostante=dettaglio.sottostante,
prezzi_close=arr_prezzi,
)
)
_cached_eod_underlyings = list_prezzi
correl_matrix = [[0.0 for _ in range(num_ul)] for _ in range(num_ul)]
for i in range(num_ul):
correl_matrix[i][i] = 1.0
for j in range(i + 1, num_ul):
a = list_prezzi[i].prezzi_close
b = list_prezzi[j].prezzi_close
if not a or not b:
corr_val = 0.0
else:
corr_val = float(np.corrcoef(a, b)[0, 1])
correl_matrix[i][j] = corr_val
for k in range(num_ul):
if correl_matrix[i][k] == 0.0:
correl_matrix[i][k] = correl_matrix[k][i]
return correl_matrix
def array_to_table(numbers: List[List[float]], column_names: List[str]) -> List[List[float]]:
rows = []
for i in range(len(numbers)):
row = [round(numbers[i][j], 4) for j in range(len(numbers[i]))]
rows.append(row)
return rows
def array_jagged_to_table(numbers: List[List[float]], column_names: List[str]) -> List[List[float]]:
rows = []
for i in range(len(numbers)):
row = [round(numbers[i][j], 4) for j in range(len(column_names))]
rows.append(row)
return rows