import math from dataclasses import dataclass from typing import List, Optional, Tuple import os from concurrent.futures import ProcessPoolExecutor import numpy as np try: from .db import default_sql_query from .models import PrezziSottostanti, UnderlyingStats from .utils import cumulative_sums, standard_deviation except ImportError: # Allow running as a script without a package context. import sys from pathlib import Path pkg_root = Path(__file__).resolve().parents[1] if str(pkg_root) not in sys.path: sys.path.insert(0, str(pkg_root)) from pricer.db import default_sql_query from pricer.models import PrezziSottostanti, UnderlyingStats from pricer.utils import cumulative_sums, standard_deviation try: from numba import njit _USE_NUMBA = True except Exception: _USE_NUMBA = False def njit(*args, **kwargs): def wrap(fn): return fn return wrap _cached_eod_underlyings: List[PrezziSottostanti] = [] _cached_fair_value_array = None @dataclass class FairValueResult: fair_value: float fair_value_array: Optional[List[float]] stdev: float simulations: int caso_fair_value: str @dataclass class FairValueResultArray: fair_value_array: List[float] caso_fair_value: Optional[str] def get_cached_eod_underlyings() -> List[PrezziSottostanti]: return _cached_eod_underlyings def compute_underlying_stats(prezzi_ul: List[PrezziSottostanti]) -> List[UnderlyingStats]: results = [] for ul in prezzi_ul: prezzi = ul.prezzi_close n = len(prezzi) log_returns = [] for i in range(n - 1): if prezzi[i] > 0 and prezzi[i + 1] > 0: log_returns.append(math.log(prezzi[i] / prezzi[i + 1])) else: log_returns.append(0.0) if len(log_returns) > 1: avg = sum(log_returns) / len(log_returns) sum_sq = sum((r - avg) ** 2 for r in log_returns) variance = sum_sq / (len(log_returns) - 1) std_dev = math.sqrt(variance) else: std_dev = 0.0 volatility_annualized = std_dev * math.sqrt(252) results.append( UnderlyingStats( nome=ul.sottostante, prezzi=prezzi, log_returns=log_returns, volatility=volatility_annualized, ) ) return results def fair_value( prices_ul: List[float], cor_mat: List[List[float]], num_sottostanti: int, num_sims: int, tasso_interesse: float, days_to_maturity: int, dividends: List[float], volatility: List[float], days_to_observation: List[int], days_to_observation_y_fract: List[float], coupon_values: List[float], coupon_triggers: List[float], autocall_values: List[float], autocall_triggers: List[float], memory_flags: List[int], coupon_in_memory: float, pdi_style: Optional[str], pdi_strike: float, pdi_barrier: float, capital_value: float, prot_min_val: float, airbag: int, sigma: int, twinwin: int, relief: int, fattore_airbag: float, one_star: int, trigger_one_star: float, cap: float, leva: float, ) -> FairValueResult: pv_digits = [0.0] * num_sims caso_fair_value = "" if _cached_fair_value_array is not None: pv_digits = _cached_fair_value_array.fair_value_array caso_fair_value = _cached_fair_value_array.caso_fair_value or "" else: pv_digits = simulate_payoffs( num_sims, prices_ul, cor_mat, days_to_maturity, tasso_interesse, dividends, volatility, days_to_observation, days_to_observation_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, pdi_style, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, ) if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0: caso_fair_value = "Standard" if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0: caso_fair_value = "Airbag" if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 0: caso_fair_value = "Sigma" if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 0: caso_fair_value = "Relief" if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0: caso_fair_value = "TwinWin" if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1: caso_fair_value = "OneStar" if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1: caso_fair_value = "Airbag + OneStar" if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 1: caso_fair_value = "Sigma + OneStar" if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 1: caso_fair_value = "Relief + OneStar" if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 1: caso_fair_value = "TwinWin + OneStar" if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0: caso_fair_value = "Airbag + TwinWin" fair_value_value = float(np.mean(pv_digits)) * 100.0 stdev = standard_deviation([x * 100.0 for x in pv_digits]) return FairValueResult( fair_value=fair_value_value, fair_value_array=[x * 100.0 for x in pv_digits], stdev=stdev, simulations=num_sims, caso_fair_value=caso_fair_value, ) def fair_value_array( prices_ul: List[float], cor_mat: List[List[float]], num_sottostanti: int, num_sims: int, tasso_interesse: float, days_to_maturity: int, dividends: List[float], volatility: List[float], days_to_observation: List[int], days_to_observation_y_fract: List[float], coupon_values: List[float], coupon_triggers: List[float], autocall_values: List[float], autocall_triggers: List[float], memory_flags: List[int], coupon_in_memory: float, pdi_style: Optional[str], pdi_strike: float, pdi_barrier: float, capital_value: float, prot_min_val: float, airbag: int, sigma: int, twinwin: int, relief: int, fattore_airbag: float, one_star: int, trigger_one_star: float, cap: float, leva: float, ) -> FairValueResultArray: caso_fair_value = "" pv_digits = [0.0] * num_sims pv_digits = simulate_payoffs( num_sims, prices_ul, cor_mat, days_to_maturity, tasso_interesse, dividends, volatility, days_to_observation, days_to_observation_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, pdi_style, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, ) if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0: caso_fair_value = "Standard" if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0: caso_fair_value = "Airbag" if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 0: caso_fair_value = "Sigma" if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 0: caso_fair_value = "Relief" if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0: caso_fair_value = "TwinWin" if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1: caso_fair_value = "OneStar" if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1: caso_fair_value = "Airbag + OneStar" if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 1: caso_fair_value = "Sigma + OneStar" if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 1: caso_fair_value = "Relief + OneStar" if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 1: caso_fair_value = "TwinWin + OneStar" if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0: caso_fair_value = "Airbag + TwinWin" pv_digits = [x * 100.0 for x in pv_digits] return FairValueResultArray(fair_value_array=pv_digits, caso_fair_value=caso_fair_value) def _caso_one_star( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, max_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, trigger_one_star: float, coupon_triggers: List[float], coupon_values: List[float], ) -> float: if min_price_at_maturity < pdi_barrier: if max_price_at_maturity < trigger_one_star: payout_capitale = max(prot_min_val, min_price_at_maturity / 100.0) else: payout_capitale = capital_value else: payout_capitale = capital_value payout_cedola = 0.0 if min_price_at_maturity >= coupon_triggers[k]: payout_cedola = memory_coupon + coupon_values[k] return ( payout_capitale * math.exp(-r * days_to_observation_y_fract[k]) + payout_cedola * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) def _caso_twinwin( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, pdi_strike: float, trigger_autocall: float, cap: float, ) -> float: if pdi_style == "European" and min_price_at_maturity >= pdi_strike and trigger_autocall == 999: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (memory_coupon + min(min_price_at_maturity / 100.0, cap)) + coupon_paid ) if pdi_style == "European" and min_price_at_maturity >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0)) + coupon_paid ) if pdi_style == "European" and min_price_at_maturity < pdi_barrier: pv_digit = ( max(prot_min_val, min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) if pdi_style == "American" and min_price_at_maturity >= pdi_strike and trigger_autocall == 999: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (memory_coupon + min(min_price_at_maturity / 100.0, cap)) + coupon_paid ) if pdi_style == "American" and min_price >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0)) + coupon_paid ) if pdi_style == "American" and min_price < pdi_barrier: pv_digit = ( max(prot_min_val, min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) return pv_digit def _caso_relief( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, second_min_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, ) -> float: if pdi_style == "European" and min_price_at_maturity >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) if pdi_style == "European" and min_price_at_maturity < pdi_barrier: pv_digit = ( max(prot_min_val, second_min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) if pdi_style == "American" and min_price >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) if pdi_style == "American" and min_price < pdi_barrier: pv_digit = ( max(prot_min_val, second_min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) return pv_digit def _caso_sigma( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, pdi_strike: float, ) -> float: if pdi_style == "European" and min_price_at_maturity >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) if pdi_style == "European" and min_price_at_maturity < pdi_barrier: pv_digit = ( max(prot_min_val, (min_price_at_maturity + pdi_strike - pdi_barrier) / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) if pdi_style == "American" and min_price >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) if pdi_style == "American" and min_price < pdi_barrier: pv_digit = ( max(prot_min_val, (min_price_at_maturity + pdi_strike - pdi_barrier) / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) return pv_digit def _caso_airbag( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, fattore_airbag: float, ) -> float: if pdi_style == "European" and min_price_at_maturity >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) if pdi_style == "European" and min_price_at_maturity < pdi_barrier: pv_digit = ( max(prot_min_val, min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) * fattore_airbag + coupon_paid ) if pdi_style == "American" and min_price >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) if pdi_style == "American" and min_price < pdi_barrier: pv_digit = ( max(prot_min_val, min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) * fattore_airbag + coupon_paid ) return pv_digit def _caso_airbag_one_star( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, max_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, fattore_airbag: float, trigger_one_star: float, ) -> float: if max_price_at_maturity >= trigger_one_star: return ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) return _caso_airbag( r, days_to_observation_y_fract, pdi_style, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, pv_digit, memory_coupon, coupon_paid, k, fattore_airbag, ) def _caso_sigma_one_star( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, max_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, pdi_strike: float, trigger_one_star: float, ) -> float: if max_price_at_maturity >= trigger_one_star: return ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) return _caso_sigma( r, days_to_observation_y_fract, pdi_style, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, pv_digit, memory_coupon, coupon_paid, k, pdi_strike, ) def _caso_relief_one_star( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, second_min_price_at_maturity: float, max_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, trigger_one_star: float, ) -> float: if max_price_at_maturity >= trigger_one_star: return ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) return _caso_relief( r, days_to_observation_y_fract, pdi_style, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, second_min_price_at_maturity, pv_digit, memory_coupon, coupon_paid, k, ) def _caso_twinwin_one_star( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, max_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, pdi_strike: float, autocall_trigger: float, cap: float, trigger_one_star: float, ) -> float: if max_price_at_maturity >= trigger_one_star: return ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) return _caso_twinwin( r, days_to_observation_y_fract, pdi_style, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, pv_digit, memory_coupon, coupon_paid, k, pdi_strike, autocall_trigger, cap, ) def _caso_airbag_twinwin( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, pdi_strike: float, autocall_trigger: float, cap: float, fattore_airbag: float, ) -> float: if min_price_at_maturity >= pdi_barrier: if autocall_trigger == 999 and min_price_at_maturity >= pdi_strike: return ( math.exp(-r * days_to_observation_y_fract[k]) * (memory_coupon + min(min_price_at_maturity / 100.0, cap)) + coupon_paid ) return ( math.exp(-r * days_to_observation_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0)) + coupon_paid ) return ( max(prot_min_val, min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) * fattore_airbag + coupon_paid ) def _caso_standard( r: float, days_to_observation_y_fract: List[float], pdi_style: Optional[str], pdi_barrier: float, capital_value: float, prot_min_val: float, min_price: float, min_price_at_maturity: float, pv_digit: float, memory_coupon: float, coupon_paid: float, k: int, ) -> float: if pdi_style == "European" and min_price_at_maturity >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) if pdi_style == "European" and min_price_at_maturity < pdi_barrier: pv_digit = ( max(prot_min_val, min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) if pdi_style == "American" and min_price >= pdi_barrier: pv_digit = ( math.exp(-r * days_to_observation_y_fract[k]) * (capital_value + memory_coupon) + coupon_paid ) if pdi_style == "American" and min_price < pdi_barrier: pv_digit = ( max(prot_min_val, min_price_at_maturity / 100.0) * math.exp(-r * days_to_observation_y_fract[k]) + coupon_paid ) return pv_digit def gbm_multi_equity( prices_ul: List[float], cor_mat: List[List[float]], num_sottostanti: int, days_to_maturity: int, tasso_interesse: float, q_assets: List[float], vol_assets: List[float], ) -> List[List[float]]: dt = 1.0 / 252.0 mean = np.zeros(num_sottostanti) cov = np.array(cor_mat, dtype=float) df_w = np.random.multivariate_normal(mean, cov, size=days_to_maturity) prices = np.array(prices_ul, dtype=float) vol = np.array(vol_assets, dtype=float) div = np.array(q_assets, dtype=float) drift = (tasso_interesse - div) - (0.5 * (vol ** 2)) increments = drift * dt + vol * math.sqrt(dt) * df_w cum_sum = np.cumsum(increments, axis=0) df_s = np.empty((days_to_maturity + 1, num_sottostanti), dtype=float) df_s[0] = prices df_s[1:] = prices * np.exp(cum_sum) return df_s def gbm_multi_equity_garch( prices_ul: List[float], correl_matrix: List[List[float]], garch_params: List, num_sottostanti: int, days_to_maturity: int, tasso_interesse: float, q_assets: List[float], ) -> List[List[float]]: dt = 1.0 / 252.0 mean = np.zeros(num_sottostanti) cov = np.array(correl_matrix, dtype=float) z = np.random.multivariate_normal(mean, cov, size=days_to_maturity) s = [[0.0 for _ in range(num_sottostanti)] for _ in range(days_to_maturity + 1)] sigma2 = [[0.0 for _ in range(num_sottostanti)] for _ in range(days_to_maturity + 1)] for i in range(num_sottostanti): s[0][i] = prices_ul[i] sigma2[0][i] = garch_params[i].sigma0 ** 2 for t in range(1, days_to_maturity + 1): for i in range(num_sottostanti): epsilon = z[t - 1][i] vol_t = math.sqrt(sigma2[t - 1][i]) drift = ((tasso_interesse - q_assets[i]) - 0.5 * sigma2[t - 1][i]) * dt shock = vol_t * math.sqrt(dt) * epsilon s[t][i] = s[t - 1][i] * math.exp(drift + shock) sigma2[t][i] = ( garch_params[i].omega + garch_params[i].alpha * (shock ** 2) + garch_params[i].beta * sigma2[t - 1][i] ) return s def max_array_at_row(jagged_array: List[List[float]], num_sottostanti: int, row_index: int) -> float: if isinstance(jagged_array, np.ndarray): return float(jagged_array[row_index].max()) value = -999999.0 for j in range(num_sottostanti): if jagged_array[row_index][j] > value: value = jagged_array[row_index][j] return value def min_array_at_row(jagged_array: List[List[float]], num_sottostanti: int, row_index: int) -> float: if isinstance(jagged_array, np.ndarray): return float(jagged_array[row_index].min()) value = 999999.0 for j in range(num_sottostanti): if jagged_array[row_index][j] < value: value = jagged_array[row_index][j] return value def second_min_array_at_row(jagged_array: List[List[float]], num_sottostanti: int, row_index: int) -> float: if isinstance(jagged_array, np.ndarray): row = np.sort(jagged_array[row_index]) return float(row[1] if row.size >= 2 else row[0]) temp_array = [jagged_array[row_index][j] for j in range(num_sottostanti)] temp_array.sort() if len(temp_array) >= 2: return temp_array[1] return temp_array[0] def _min_overall(jagged_array: List[List[float]]) -> float: if isinstance(jagged_array, np.ndarray): return float(jagged_array.min()) return min(min(row) for row in jagged_array) @njit(cache=True) def _nj_min_row(path, row_index): return path[row_index].min() @njit(cache=True) def _nj_max_row(path, row_index): return path[row_index].max() @njit(cache=True) def _nj_second_min_row(path, row_index): row = np.sort(path[row_index]) if row.shape[0] >= 2: return row[1] return row[0] @njit(cache=True) def _nj_min_overall(path): return path.min() @njit(cache=True) def _nj_caso_standard(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k): pv_digit = 0.0 if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid if style_is_eur == 1 and min_price_at_maturity < pdi_barrier: pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid if style_is_eur == 0 and min_price >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid if style_is_eur == 0 and min_price < pdi_barrier: pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid return pv_digit @njit(cache=True) def _nj_caso_airbag(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, fattore_airbag): pv_digit = 0.0 if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid if style_is_eur == 1 and min_price_at_maturity < pdi_barrier: pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) * fattore_airbag + coupon_paid if style_is_eur == 0 and min_price >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid if style_is_eur == 0 and min_price < pdi_barrier: pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) * fattore_airbag + coupon_paid return pv_digit @njit(cache=True) def _nj_caso_sigma(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike): pv_digit = 0.0 if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid if style_is_eur == 1 and min_price_at_maturity < pdi_barrier: pv_digit = (max(prot_min_val, (min_price_at_maturity + pdi_strike - pdi_barrier) / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid if style_is_eur == 0 and min_price >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid if style_is_eur == 0 and min_price < pdi_barrier: pv_digit = (max(prot_min_val, (min_price_at_maturity + pdi_strike - pdi_barrier) / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid return pv_digit @njit(cache=True) def _nj_caso_relief(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, second_min_price_at_maturity, memory_coupon, coupon_paid, k): pv_digit = 0.0 if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid if style_is_eur == 1 and min_price_at_maturity < pdi_barrier: pv_digit = (max(prot_min_val, second_min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid if style_is_eur == 0 and min_price >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid if style_is_eur == 0 and min_price < pdi_barrier: pv_digit = (max(prot_min_val, second_min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid return pv_digit @njit(cache=True) def _nj_caso_twinwin(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, trigger_autocall, cap): pv_digit = 0.0 if style_is_eur == 1 and min_price_at_maturity >= pdi_strike and trigger_autocall == 999: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (memory_coupon + (min(min_price_at_maturity / 100.0, cap)))) + coupon_paid if style_is_eur == 1 and min_price_at_maturity >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))) + coupon_paid if style_is_eur == 1 and min_price_at_maturity < pdi_barrier: pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid if style_is_eur == 0 and min_price_at_maturity >= pdi_strike and trigger_autocall == 999: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * (memory_coupon + (min(min_price_at_maturity / 100.0, cap)))) + coupon_paid if style_is_eur == 0 and min_price >= pdi_barrier: pv_digit = (math.exp(-r * days_to_obs_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))) + coupon_paid if style_is_eur == 0 and min_price < pdi_barrier: pv_digit = (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid return pv_digit @njit(cache=True) def _nj_caso_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, trigger_one_star, coupon_triggers, coupon_values): if min_price_at_maturity < pdi_barrier: if max_price_at_maturity < trigger_one_star: payout_capitale = max(prot_min_val, min_price_at_maturity / 100.0) else: payout_capitale = capital_value else: payout_capitale = capital_value payout_cedola = 0.0 if min_price_at_maturity >= coupon_triggers[k]: payout_cedola = memory_coupon + coupon_values[k] return payout_capitale * math.exp(-r * days_to_obs_y_fract[k]) + payout_cedola * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid @njit(cache=True) def _nj_caso_airbag_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, fattore_airbag, trigger_one_star, style_is_eur): if max_price_at_maturity >= trigger_one_star: return (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid return _nj_caso_airbag(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, fattore_airbag) @njit(cache=True) def _nj_caso_sigma_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, trigger_one_star, style_is_eur): if max_price_at_maturity >= trigger_one_star: return (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid return _nj_caso_sigma(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike) @njit(cache=True) def _nj_caso_relief_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, second_min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, trigger_one_star, style_is_eur): if max_price_at_maturity >= trigger_one_star: return (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid return _nj_caso_relief(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, second_min_price_at_maturity, memory_coupon, coupon_paid, k) @njit(cache=True) def _nj_caso_twinwin_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, autocall_trigger, cap, trigger_one_star, style_is_eur): if max_price_at_maturity >= trigger_one_star: return (math.exp(-r * days_to_obs_y_fract[k]) * (capital_value + memory_coupon)) + coupon_paid return _nj_caso_twinwin(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, autocall_trigger, cap) @njit(cache=True) def _nj_caso_airbag_twinwin(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, autocall_trigger, cap, fattore_airbag): if min_price_at_maturity >= pdi_barrier: if autocall_trigger == 999 and min_price_at_maturity >= pdi_strike: return (math.exp(-r * days_to_obs_y_fract[k]) * (memory_coupon + min(min_price_at_maturity / 100.0, cap))) + coupon_paid return (math.exp(-r * days_to_obs_y_fract[k]) * ((2 * capital_value) + memory_coupon - (min_price_at_maturity / 100.0))) + coupon_paid return (max(prot_min_val, min_price_at_maturity / 100.0)) * math.exp(-r * days_to_obs_y_fract[k]) * fattore_airbag + coupon_paid @njit(cache=True) def _nj_payoff_for_path(path, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, days_to_maturity, r): min_price = _nj_min_overall(path) min_price_at_maturity = _nj_min_row(path, days_to_maturity) pv_digit = 0.0 memory_coupon = coupon_in_memory coupon_paid = 0.0 n_obs = days_to_obs.shape[0] for k in range(n_obs): min_by_row = _nj_min_row(path, days_to_obs[k]) if min_by_row >= autocall_triggers[k]: pv_digit = (autocall_values[k] + memory_coupon) * math.exp(-r * days_to_obs_y_fract[k]) + coupon_paid return pv_digit if min_by_row >= coupon_triggers[k]: coupon_paid += (coupon_values[k] + memory_coupon) * math.exp(-r * days_to_obs_y_fract[k]) memory_coupon = 0.0 else: memory_coupon += memory_flags[k] * coupon_values[k] if k == n_obs - 1: if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0: return _nj_caso_standard(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k) if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 0: return _nj_caso_airbag(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, fattore_airbag) if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 0: return _nj_caso_sigma(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike) if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 0: second_min = _nj_second_min_row(path, days_to_maturity) return _nj_caso_relief(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, second_min, memory_coupon, coupon_paid, k) if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0: return _nj_caso_twinwin(r, days_to_obs_y_fract, style_is_eur, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, autocall_triggers[k], cap) if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1: max_price_at_maturity = _nj_max_row(path, days_to_obs[k]) return _nj_caso_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, trigger_one_star, coupon_triggers, coupon_values) if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 0 and one_star == 1: max_price_at_maturity = _nj_max_row(path, days_to_obs[k]) return _nj_caso_airbag_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, fattore_airbag, trigger_one_star, style_is_eur) if airbag == 0 and sigma == 1 and relief == 0 and twinwin == 0 and one_star == 1: max_price_at_maturity = _nj_max_row(path, days_to_obs[k]) return _nj_caso_sigma_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, trigger_one_star, style_is_eur) if airbag == 0 and sigma == 0 and relief == 1 and twinwin == 0 and one_star == 1: second_min = _nj_second_min_row(path, days_to_maturity) max_price_at_maturity = _nj_max_row(path, days_to_obs[k]) return _nj_caso_relief_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, second_min, max_price_at_maturity, memory_coupon, coupon_paid, k, trigger_one_star, style_is_eur) if airbag == 0 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 1: max_price_at_maturity = _nj_max_row(path, days_to_obs[k]) return _nj_caso_twinwin_one_star(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price, min_price_at_maturity, max_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, autocall_triggers[k], cap, trigger_one_star, style_is_eur) if airbag == 1 and sigma == 0 and relief == 0 and twinwin == 1 and one_star == 0: return _nj_caso_airbag_twinwin(r, days_to_obs_y_fract, pdi_barrier, capital_value, prot_min_val, min_price_at_maturity, memory_coupon, coupon_paid, k, pdi_strike, autocall_triggers[k], cap, fattore_airbag) return pv_digit @njit(cache=True) def _nj_payoffs_for_paths(paths, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, days_to_maturity, r): n_sims = paths.shape[0] out = np.empty(n_sims, dtype=np.float64) for i in range(n_sims): out[i] = _nj_payoff_for_path(paths[i], days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, days_to_maturity, r) return out def _generate_paths_batch(prices_ul, cor_mat, num_sims, days_to_maturity, tasso_interesse, dividends, volatility): num_assets = prices_ul.shape[0] dt = 1.0 / 252.0 sqrt_dt = math.sqrt(dt) chol = np.linalg.cholesky(cor_mat) z = np.random.standard_normal((num_sims, days_to_maturity, num_assets)) w = np.matmul(z, chol.T) drift = (tasso_interesse - dividends) - (0.5 * (volatility ** 2)) increments = drift[None, None, :] * dt + volatility[None, None, :] * sqrt_dt * w cum_sum = np.cumsum(increments, axis=1) paths = np.empty((num_sims, days_to_maturity + 1, num_assets), dtype=np.float64) paths[:, 0, :] = prices_ul paths[:, 1:, :] = prices_ul[None, None, :] * np.exp(cum_sum) return paths def _simulate_batch(args: Tuple): (num_sims, prices_ul, cor_mat, days_to_maturity, tasso_interesse, dividends, volatility, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap) = args paths = _generate_paths_batch(prices_ul, cor_mat, num_sims, days_to_maturity, tasso_interesse, dividends, volatility) return _nj_payoffs_for_paths( paths, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, days_to_maturity, tasso_interesse, ) def simulate_payoffs( num_sims, prices_ul, cor_mat, days_to_maturity, tasso_interesse, dividends, volatility, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, pdi_style, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, ): if cap is None: cap = 0.0 if trigger_one_star is None: trigger_one_star = 0.0 if pdi_strike is None: pdi_strike = 0.0 if pdi_barrier is None: pdi_barrier = 0.0 if capital_value is None: capital_value = 0.0 if prot_min_val is None: prot_min_val = 0.0 if fattore_airbag is None: fattore_airbag = 0.0 if coupon_in_memory is None: coupon_in_memory = 0.0 prices_ul = np.asarray(prices_ul, dtype=np.float64) cor_mat = np.asarray(cor_mat, dtype=np.float64) dividends = np.asarray(dividends, dtype=np.float64) volatility = np.asarray(volatility, dtype=np.float64) days_to_obs = np.asarray(days_to_obs, dtype=np.int64) days_to_obs_y_fract = np.asarray(days_to_obs_y_fract, dtype=np.float64) coupon_values = np.asarray(coupon_values, dtype=np.float64) coupon_triggers = np.asarray(coupon_triggers, dtype=np.float64) autocall_values = np.asarray(autocall_values, dtype=np.float64) autocall_triggers = np.asarray(autocall_triggers, dtype=np.float64) memory_flags = np.asarray(memory_flags, dtype=np.int64) style_is_eur = 1 if pdi_style == "European" else 0 cpu_count = os.cpu_count() or 1 use_parallel = cpu_count > 1 and num_sims >= 20000 if not _USE_NUMBA: use_parallel = False if use_parallel: workers = max(1, cpu_count - 1) base_batch = max(256, num_sims // (workers * 4)) batch_size = min(4096, base_batch) batches = [] remaining = num_sims while remaining > 0: take = batch_size if remaining >= batch_size else remaining remaining -= take batches.append(take) args = [ ( b, prices_ul, cor_mat, days_to_maturity, tasso_interesse, dividends, volatility, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, ) for b in batches ] results = [] with ProcessPoolExecutor(max_workers=workers) as executor: for res in executor.map(_simulate_batch, args): results.append(res) return np.concatenate(results) batch_size = 4096 results = [] remaining = num_sims while remaining > 0: take = batch_size if remaining >= batch_size else remaining remaining -= take res = _simulate_batch( ( take, prices_ul, cor_mat, days_to_maturity, tasso_interesse, dividends, volatility, days_to_obs, days_to_obs_y_fract, coupon_values, coupon_triggers, autocall_values, autocall_triggers, memory_flags, coupon_in_memory, style_is_eur, pdi_strike, pdi_barrier, capital_value, prot_min_val, airbag, sigma, twinwin, relief, fattore_airbag, one_star, trigger_one_star, cap, ) ) results.append(res) return np.concatenate(results) def volatility_certificates(isin: str, num_prezzi_eod: int, enable_warning: bool) -> List[float]: global _cached_eod_underlyings if _cached_eod_underlyings: list_prezzi = _cached_eod_underlyings if enable_warning: num_ul = len(list_prezzi) tot_prezzi = sum(len(p.prezzi_close) for p in list_prezzi) if num_ul * num_prezzi_eod > tot_prezzi: print(f"Volatilita calcolata usando {tot_prezzi // num_ul} prezzi eod") else: fs_db = default_sql_query() hist_price = fs_db.get_hist_price_ul(isin, num_prezzi_eod) details_ul = fs_db.get_details_ul(isin) num_ul = len(details_ul) if enable_warning and num_ul * num_prezzi_eod != len(hist_price): print("Attenzione! Rilevati meno di prezzi eod in uno dei sottostanti") print(f"Volatilita calcolata usando {len(hist_price) // max(num_ul, 1)} prezzi eod") input("Premere un tasto per continuare...") list_prezzi = [] for dettaglio in details_ul: prezzi = [r.px_close for r in hist_price if r.id_underlyings == dettaglio.id_underlyings] list_prezzi.append( PrezziSottostanti( id_underlyings=dettaglio.id_underlyings, sottostante=dettaglio.sottostante, prezzi_close=prezzi, ) ) _cached_eod_underlyings = list_prezzi stats = compute_underlying_stats(list_prezzi) return [s.volatility for s in stats] def correl_certificates(isin: str, num_prezzi_eod: int, enable_warning: bool) -> List[List[float]]: global _cached_eod_underlyings fs_db = default_sql_query() hist_price = fs_db.get_hist_price_ul(isin, num_prezzi_eod) details_ul = fs_db.get_details_ul(isin) num_ul = len(details_ul) if enable_warning and num_ul * num_prezzi_eod != len(hist_price): print("Attenzione! Rilevati meno di prezzi eod in uno dei sottostanti") print(f"Matrice di correlazione calcolata usando {len(hist_price) // max(num_ul, 1)} prezzi eod") input("Premere un tasto per continuare...") list_prezzi = [] for dettaglio in details_ul: arr_prezzi = [ r.px_close for r in hist_price if r.id_underlyings == dettaglio.id_underlyings ] list_prezzi.append( PrezziSottostanti( id_underlyings=dettaglio.id_underlyings, sottostante=dettaglio.sottostante, prezzi_close=arr_prezzi, ) ) _cached_eod_underlyings = list_prezzi correl_matrix = [[0.0 for _ in range(num_ul)] for _ in range(num_ul)] for i in range(num_ul): correl_matrix[i][i] = 1.0 for j in range(i + 1, num_ul): a = list_prezzi[i].prezzi_close b = list_prezzi[j].prezzi_close if not a or not b: corr_val = 0.0 else: corr_val = float(np.corrcoef(a, b)[0, 1]) correl_matrix[i][j] = corr_val for k in range(num_ul): if correl_matrix[i][k] == 0.0: correl_matrix[i][k] = correl_matrix[k][i] return correl_matrix def array_to_table(numbers: List[List[float]], column_names: List[str]) -> List[List[float]]: rows = [] for i in range(len(numbers)): row = [round(numbers[i][j], 4) for j in range(len(numbers[i]))] rows.append(row) return rows def array_jagged_to_table(numbers: List[List[float]], column_names: List[str]) -> List[List[float]]: rows = [] for i in range(len(numbers)): row = [round(numbers[i][j], 4) for j in range(len(column_names))] rows.append(row) return rows