# %pip install mapie
import os
"CUDA_VISIBLE_DEVICES"] = "3"
os.environ[
import numpy as np
import torch
import torch.nn as nn
import torch.distributions as dist
from tqdm import tqdm
from sklearn.calibration import calibration_curve
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import seaborn as sns
from mapie.metrics import regression_coverage_score
= torch.device("cuda" if torch.cuda.is_available() else "cpu") device
0)
torch.manual_seed(
= 100
N = dist.Uniform(-1, 1).sample((N, 1)).sort(dim=0).values
x = torch.linspace(-1, 1, 2 * N).view(-1, 1).sort(dim=0).values
x_test = 3 * x**3 - 2 * x + 1
y = y + dist.Gamma(0.1, 0.3).sample((N, 1))
y_noisy
="true", color="C0")
plt.plot(x, y, label="noisy data", color="C1")
plt.scatter(x, y_noisy, label
plt.legend()print("x.shape:", x.shape, "y.shape:", y.shape)
x.shape: torch.Size([100, 1]) y.shape: torch.Size([100, 1])
Define a Gaussian/Gamma MLP
class ProbabilisticMLP(nn.Module):
def __init__(self, input_dim, feature_dims, type):
super().__init__()
self.input_dim = input_dim
self.feature_dims = feature_dims
self.type = type # "gaussian" or "gamma"
self.layers = nn.ModuleList()
self.layers.append(nn.Linear(input_dim, feature_dims[0]))
for i in range(len(feature_dims) - 1):
self.layers.append(nn.Linear(feature_dims[i], feature_dims[i + 1]))
self.layers.append(nn.Linear(feature_dims[-1], 2))
# likelihood parameters
# if self.type == "gaussian":
# self.register_buffer("likelihood_mean", torch.zeros(1))
# self.likelihood_log_std = nn.Parameter(torch.zeros(1))
# elif self.type == "gamma":
# self.likelihood_log_concentration = nn.Parameter(torch.zeros(1))
# self.likelihood_log_rate = nn.Parameter(torch.zeros(1))
def forward(self, x):
for layer in self.layers[:-1]:
= torch.relu(layer(x))
x
if self.type == "gaussian":
# y_pred = self.layers[-1](x)
# likelihood_mean = self.likelihood_mean.expand(y_pred.shape[0])
# likelihood_log_std = self.likelihood_log_std.expand(y_pred.shape[0])
# likelihood_std = torch.exp(likelihood_log_std)
# return y_pred, likelihood_mean, likelihood_std
= self.layers[-1](x)
y_out = y_out[:, 0]
mean = y_out[:, 1]
log_std = torch.exp(log_std)
std return mean.ravel(), std.ravel()
elif self.type == "gamma":
# y_pred = self.layers[-1](x)
# likelihood_log_concentration = self.likelihood_log_concentration.expand(
# y_pred.shape[0]
# )
# likelihood_log_rate = self.likelihood_log_rate.expand(y_pred.shape[0])
# likelihood_concentration = torch.exp(likelihood_log_concentration)
# likelihood_rate = torch.exp(likelihood_log_rate)
# return y_pred, likelihood_concentration, likelihood_rate
= self.layers[-1](x)
y_out = y_out[:, 0]
log_concentration = y_out[:, 1]
log_rate = torch.exp(log_concentration)
concentration = torch.exp(log_rate)
rate return concentration, rate
def loss_fn(self, y, param1, param2):
if self.type == "gaussian":
# epsilon = y - y_pred
# mean = param1
# std = param2
# dist = torch.distributions.Normal(mean, std + 1e-6)
# return -dist.log_prob(epsilon).mean()
= param1
mean = param2
std = torch.distributions.Normal(mean, std + 1e-3)
dist return -dist.log_prob(y.ravel()).mean()
elif self.type == "gamma":
# epsilon = torch.clip(y - y_pred, min=1e-6, max=1e6)
# concentration = param1
# rate = param2
# dist = torch.distributions.Gamma(concentration, rate)
# return -dist.log_prob(epsilon).mean()
= param1
concentration = param2
rate = torch.distributions.Gamma(concentration + 1e-3, rate + 1e-3)
dist return -dist.log_prob(y.ravel()).mean()
Fit Gaussian MLP
0)
torch.manual_seed(
= ProbabilisticMLP(1, [32, 32], "gaussian").to(device)
model
= torch.optim.Adam(model.parameters(), lr=0.01)
optimizer = 500
n_epochs
= tqdm(range(n_epochs))
pbar = []
losses for epoch in pbar:
optimizer.zero_grad()= model(x.to(device))
param1, param2 = model.loss_fn(y_noisy.to(device), param1, param2)
loss
loss.backward()
optimizer.step()
losses.append(loss.item())
f"loss: {loss.item():.4f}")
pbar.set_description(
plt.plot(losses)
loss: 0.4503: 100%|██████████| 500/500 [00:01<00:00, 291.18it/s]
# sns.kdeplot(param2.cpu().detach().numpy(), label="std")
with torch.no_grad():
= model(x_test.to(device))
y_mean, y_std = y_mean.cpu().numpy().ravel()
y_mean = y_std.cpu().numpy().ravel()
y_std # y_mean = y_pred.cpu().numpy().ravel() + mean.cpu().numpy().ravel()
# y_std = std.cpu().numpy().ravel()
="true", color="C0")
plt.plot(x, y, label="noisy data", color="C1")
plt.scatter(x, y_noisy, label="y_mean", color="C2")
plt.plot(x_test, y_mean, label
plt.fill_between(
x_test.squeeze(),- 2 * y_std,
y_mean + 2 * y_std,
y_mean =0.3,
alpha="C2",
color="95% CI",
label
)
plt.legend()
with torch.no_grad():
= model(x.to(device))
y_mean, y_std = y_mean.cpu().numpy().ravel()
y_mean = y_std.cpu().numpy().ravel()
y_std
= y_mean + 2 * y_std
upper = y_mean - 2 * y_std
lower
regression_coverage_score(y_noisy.numpy(), lower, upper)
0.91
Fit Gamma MLP
= ProbabilisticMLP(1, [32, 32, 32], "gamma").to(device)
model
= torch.optim.Adam(model.parameters(), lr=0.01)
optimizer = 1000
n_epochs
= tqdm(range(n_epochs))
pbar = []
losses for epoch in pbar:
optimizer.zero_grad()= model(x.to(device))
param1, param2 = model.loss_fn(y_noisy.to(device), param1, param2)
loss
loss.backward()
optimizer.step()
losses.append(loss.item())
f"loss: {loss.item():.4f}")
pbar.set_description(
plt.plot(losses)
loss: 0.0775: 100%|██████████| 1000/1000 [00:03<00:00, 266.98it/s]
from scipy.special import gammaincinv, gamma
with torch.no_grad():
= model(x_test.to(device))
concetration, rate = concetration.cpu().ravel().numpy()
concetration = rate.cpu().ravel().numpy()
rate
= (concetration - 1) / rate
y_mode
= lambda p: gammaincinv(concetration, gamma(concetration) * p) / rate
quantile_fn
= quantile_fn(0.975)
upper = quantile_fn(0.025)
lower
="true", color="C0")
plt.plot(x, y, label="noisy data", color="C1")
plt.scatter(x, y_noisy, label="mean", color="C2")
plt.plot(x_test, y_mode, label
plt.fill_between(
x_test.squeeze(),
lower,
upper,=0.3,
alpha="C2",
color="95% CI",
label
)
plt.legend()
with torch.no_grad():
= model(x.to(device))
param1, param2 = param1.cpu().numpy().ravel()
concetration = param2.cpu().numpy().ravel()
rate
= quantile_fn(0.975)
upper = quantile_fn(0.025)
lower
regression_coverage_score(y_noisy.numpy(), lower, upper)
0.07