模型 import numpy as np import torch from torch import nn from torch . nn import functional as F class NBeatsNet ( nn . Module ): SEASONALITY_BLOCK = 'seasonality' TREND_BLOCK = 'trend' GENERIC_BLOCK = 'generic' def __init__ ( self , devi
模型
import numpy as npimport torch
from torch import nn
from torch.nn import functional as F
class NBeatsNet(nn.Module):
SEASONALITY_BLOCK = 'seasonality'
TREND_BLOCK = 'trend'
GENERIC_BLOCK = 'generic'
def __init__(self,
device,
stack_types=(TREND_BLOCK, SEASONALITY_BLOCK),
nb_blocks_per_stack=3,
forecast_length=5,
backcast_length=10,
thetas_dims=(4, 8),
share_weights_in_stack=False,
hidden_layer_units=256,
nb_harmonics=None,
Architecture='DRESS'):
super(NBeatsNet, self).__init__()
self.forecast_length = forecast_length
self.backcast_length = backcast_length
self.hidden_layer_units = hidden_layer_units
self.nb_blocks_per_stack = nb_blocks_per_stack
self.Architecture = Architecture
self.share_weights_in_stack = share_weights_in_stack
self.nb_harmonics = nb_harmonics
self.stack_types = stack_types
self.stacks = []
self.thetas_dim = thetas_dims
self.parameters = []
self.device = device
print(f'| N-Beats')
for stack_id in range(len(self.stack_types)):
self.stacks.append(self.create_stack(stack_id))
self.parameters = nn.ParameterList(self.parameters)
self.to(self.device)
def create_stack(self, stack_id):
stack_type = self.stack_types[stack_id]
print(f'| -- Stack {stack_type.title()} (#{stack_id}) (share_weights_in_stack={self.share_weights_in_stack})')
blocks = []
for block_id in range(self.nb_blocks_per_stack):
block_init = NBeatsNet.select_block(stack_type)
if self.share_weights_in_stack and block_id != 0:
block = blocks[-1] # pick up the last one when we share weights.
else:
block = block_init(self.hidden_layer_units, self.thetas_dim[stack_id],
self.device, self.backcast_length, self.forecast_length, self.nb_harmonics)
self.parameters.extend(block.parameters())
print(f' | -- {block}')
blocks.append(block)
return blocks
def select_block(block_type):
if block_type == NBeatsNet.SEASONALITY_BLOCK:
return SeasonalityBlock
elif block_type == NBeatsNet.TREND_BLOCK:
return TrendBlock
else:
return GenericBlock
def forward(self, backcast):
forecast = torch.zeros(size=(backcast.size()[0], self.forecast_length,)) # maybe batch size here.
for stack_id in range(len(self.stacks)):
for block_id in range(len(self.stacks[stack_id])):
b, f = self.stacks[stack_id][block_id](backcast)
if self.Architecture == 'DRESS':
backcast = backcast.to(self.device) - b
forecast = forecast.to(self.device) + f
else:
backcast = backcast.to(self.device)
forecast = forecast.to(self.device) + f
return backcast, forecast
def seasonality_model(thetas, t, device):
p = thetas.size()[-1]
assert p <= thetas.shape[1], 'thetas_dim is too big.'
p1, p2 = (p // 2, p // 2) if p % 2 == 0 else (p // 2, p // 2 + 1)
s1 = torch.tensor([np.cos(2 * np.pi * i * t) for i in range(p1)]).float() # H/2-1
s2 = torch.tensor([np.sin(2 * np.pi * i * t) for i in range(p2)]).float()
S = torch.cat([s1, s2])
return thetas.mm(S.to(device))
def trend_model(thetas, t, device):
p = thetas.size()[-1]
assert p <= 4, 'thetas_dim is too big.'
T = torch.tensor([t ** i for i in range(p)]).float()
return thetas.mm(T.to(device))
def linspace(backcast_length, forecast_length):
lin_space = np.linspace(-backcast_length, forecast_length, backcast_length + forecast_length)
b_ls = lin_space[:backcast_length]
f_ls = lin_space[backcast_length:]
return b_ls, f_ls
class Block(nn.Module):
def __init__(self, units, thetas_dim, device, backcast_length=10, forecast_length=5, share_thetas=False,
nb_harmonics=None):
super(Block, self).__init__()
self.units = units
self.thetas_dim = thetas_dim
self.backcast_length = backcast_length
self.forecast_length = forecast_length
self.share_thetas = share_thetas
self.fc1 = nn.Linear(backcast_length, units)
self.fc2 = nn.Linear(units, units)
self.fc3 = nn.Linear(units, units)
self.fc4 = nn.Linear(units, units)
self.device = device
self.backcast_linspace, self.forecast_linspace = linspace(backcast_length, forecast_length)
if share_thetas:
self.theta_f_fc = self.theta_b_fc = nn.Linear(units, thetas_dim, bias=False)
else:
self.theta_b_fc = nn.Linear(units, thetas_dim, bias=False)
self.theta_f_fc = nn.Linear(units, thetas_dim, bias=False)
def forward(self, x):
x = F.relu(self.fc1(x.to(self.device)))
x = F.relu(self.fc2(x))
x = F.relu(self.fc3(x))
x = F.relu(self.fc4(x))
return x
def __str__(self):
block_type = type(self).__name__
return f'{block_type}(units={self.units}, thetas_dim={self.thetas_dim}, ' \
f'backcast_length={self.backcast_length}, forecast_length={self.forecast_length}, ' \
f'share_thetas={self.share_thetas}) at @{id(self)}'
class SeasonalityBlock(Block):
def __init__(self, units, thetas_dim, device, backcast_length=10, forecast_length=5, nb_harmonics=None):
if nb_harmonics:
super(SeasonalityBlock, self).__init__(units, nb_harmonics, device, backcast_length,
forecast_length, share_thetas=True)
else:
super(SeasonalityBlock, self).__init__(units, forecast_length, device, backcast_length,
forecast_length, share_thetas=True)
def forward(self, x):
x = super(SeasonalityBlock, self).forward(x)
backcast = seasonality_model(self.theta_b_fc(x), self.backcast_linspace, self.device)
forecast = seasonality_model(self.theta_f_fc(x), self.forecast_linspace, self.device)
return backcast, forecast
class TrendBlock(Block):
def __init__(self, units, thetas_dim, device, backcast_length=10, forecast_length=5, nb_harmonics=None):
super(TrendBlock, self).__init__(units, thetas_dim, device, backcast_length,
forecast_length, share_thetas=True)
def forward(self, x):
x = super(TrendBlock, self).forward(x)
backcast = trend_model(self.theta_b_fc(x), self.backcast_linspace, self.device)
forecast = trend_model(self.theta_f_fc(x), self.forecast_linspace, self.device)
return backcast, forecast
class GenericBlock(Block):
def __init__(self, units, thetas_dim, device, backcast_length=10, forecast_length=5, nb_harmonics=None):
super(GenericBlock, self).__init__(units, thetas_dim, device, backcast_length, forecast_length)
self.backcast_fc = nn.Linear(thetas_dim, backcast_length)
self.forecast_fc = nn.Linear(thetas_dim, forecast_length)
def forward(self, x):
# no constraint for generic arch.
x = super(GenericBlock, self).forward(x)
theta_b = F.relu(self.theta_b_fc(x))
theta_f = F.relu(self.theta_f_fc(x))
backcast = self.backcast_fc(theta_b) # generic. 3.3.
forecast = self.forecast_fc(theta_f) # generic. 3.3.
return backcast, forecast
训练代码封装
import osimport matplotlib.pyplot as plt
import torch
from torch import optim
from torch.nn import functional as F
import numpy as np
from nb_self.models import NBeatsNet
class NeuralBeats:
def __init__(self, forecast_length, data=None, backcast_length=None, save_model=False, path='',
checkpoint_name='NBEATS-checkpoint.th', mode='cpu', batch_size=None, thetas_dims=[4, 8],
nb_blocks_per_stack=3, share_weights_in_stack=False, train_percent=0.8, hidden_layer_units=128,
stack=None, Architecture='DRESS'):
if (data is None):
print('For Prediction as no data passed')
batch_size = np.nan
else:
self.data = data
if (len(self.data.shape) != 2):
raise Exception('Array should be of nx1 shape')
if self.data.shape[1] != 1:
raise Exception('Array should be of nx1 shape')
self.forecast_length = forecast_length
if backcast_length == None:
self.backcast_length = 3 * self.forecast_length
else:
self.backcast_length = backcast_length
if batch_size == None:
self.batch_size = int(self.data.shape[0] / 10)
else:
self.batch_size = batch_size
self.hidden_layer_units = hidden_layer_units
if stack == None:
self.stacks = [NBeatsNet.GENERIC_BLOCK, NBeatsNet.GENERIC_BLOCK]
else:
Dict = {1: NBeatsNet.GENERIC_BLOCK, 2: NBeatsNet.TREND_BLOCK, 3: NBeatsNet.SEASONALITY_BLOCK}
self.stacks = [Dict.get(item, item) for item in stack]
self.CHECKPOINT_NAME = path + checkpoint_name
self.device = torch.device(mode) # change to gpu if gpu present for better performance
self.thetas_dims = thetas_dims
self.nb_blocks_per_stack = nb_blocks_per_stack
self.train_size = train_percent
self.share_weights_in_stack = share_weights_in_stack
self.Architecture = Architecture
self.net = NBeatsNet(stack_types=self.stacks,
forecast_length=self.forecast_length,
thetas_dims=self.thetas_dims,
nb_blocks_per_stack=self.nb_blocks_per_stack,
Architecture=self.Architecture,
backcast_length=self.backcast_length,
hidden_layer_units=self.hidden_layer_units,
share_weights_in_stack=self.share_weights_in_stack,
device=self.device)
self.parameters = self.net.parameters()
self.global_step_cl = 0
self.check_save = save_model
self.loaded = False
self.saved = True
def plot_scatter(self, *args, **kwargs):
plt.plot(*args, **kwargs)
plt.scatter(*args, **kwargs)
def data_generator(self, x_full, y_full, bs):
def split(arr, size):
arrays = []
while len(arr) > size:
slice_ = arr[:size]
arrays.append(slice_)
arr = arr[size:]
arrays.append(arr)
return arrays
while True:
for rr in split((x_full, y_full), bs):
yield rr
def loader(self, model, optimiser):
if os.path.exists(self.CHECKPOINT_NAME):
checkpoint = torch.load(self.CHECKPOINT_NAME)
model.load_state_dict(checkpoint['model_state_dict'])
optimiser.load_state_dict(checkpoint['optimizer_state_dict'])
grad_step = checkpoint['grad_step']
if self.loaded:
self.norm_constant = checkpoint['norm_constant']
return grad_step
return 0
def saver(self, model, optimiser, grad_step):
if self.saved:
torch.save({
'norm_constant': self.norm_constant,
'grad_step': grad_step,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimiser.state_dict(),
}, self.CHECKPOINT_NAME)
else:
torch.save({
'grad_step': grad_step,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimiser.state_dict(),
}, self.CHECKPOINT_NAME)
def load(self, file=None, optimiser=None):
if file == None:
raise Exception('Empty File Name')
elif file == '':
raise Exception('Empty File Name')
else:
if optimiser == None:
self.optimiser = optim.Adam(self.net.parameters())
else:
self.optimiser = optimiser
self.CHECKPOINT_NAME = file
self.loaded = True
self.global_step_cl = self.loader(self.net, self.optimiser)
self.CHECKPOINT_NAME = 'nbeats-training-checkpoint.th'
def save(self, file=None):
if file == None:
raise Exception('Empty File Name')
elif file == '':
raise Exception('Empty File Name')
else:
self.CHECKPOINT_NAME = file
self.saver(self.net, self.optimiser, self.global_step_cl)
self.saved = True
def train_100_grad_steps(self, data, test_losses):
if not self.loaded:
global_step = self.loader(self.net, self.optimiser)
self.loaded = False
self.global_step_cl = global_step
for x_train_batch, y_train_batch in data:
self.global_step_cl += 1
self.optimiser.zero_grad()
self.net.train()
_, forecast = self.net(torch.tensor(x_train_batch, dtype=torch.float).to(self.device))
loss = F.mse_loss(forecast, torch.tensor(y_train_batch, dtype=torch.float).to(self.device))
loss.backward()
self.optimiser.step()
if self.verbose:
if self.global_step_cl % 30 == 0:
print(
f'grad_step = {str(self.global_step_cl).zfill(6)}, tr_loss = {loss.item():.6f}, te_loss = {test_losses[-1]:.6f}')
if self.global_step_cl > 0 and self.global_step_cl % 100 == 0:
with torch.no_grad():
self.saver(self.net, self.optimiser, self.global_step_cl)
break
return forecast
def eval_test(self, test_losses, x_test, y_test):
self.net.eval()
_, forecast = self.net(torch.tensor(x_test, dtype=torch.float))
test_losses.append(F.mse_loss(forecast, torch.tensor(y_test, dtype=torch.float)).item())
p = forecast.detach().numpy()
if self.plot:
subplots = [221, 222, 223, 224]
plt.figure(1)
for plot_id, i in enumerate(np.random.choice(range(len(x_test)), size=4, replace=False)):
ff, xx, yy = p[i] * self.norm_constant, x_test[i] * self.norm_constant, y_test[i] * self.norm_constant
print(np.mean(np.abs(ff-yy)/ff))
plt.subplot(subplots[plot_id])
plt.grid()
self.plot_scatter(range(0, self.backcast_length), xx, color='b')
self.plot_scatter(range(self.backcast_length, self.backcast_length + self.forecast_length), yy,
color='g')
self.plot_scatter(range(self.backcast_length, self.backcast_length + self.forecast_length), ff,
color='r')
plt.show()
def fit(self, epoch=25, optimiser=None, plot=True, verbose=True):
self.plot = plot
self.verbose = verbose
self.epoch = epoch
self.norm_constant = np.max(self.data)
self.data = self.data / self.norm_constant
x_train_batch, y, self.x_forecast = [], [], []
for i in range(self.backcast_length, len(self.data) - self.forecast_length):
x_train_batch.append(self.data[i - self.backcast_length:i])
y.append(self.data[i:i + self.forecast_length])
self.x_forecast.append(self.data[i + 1 - self.backcast_length:i + 1])
i = i + self.forecast_length
x_train_batch = np.array(x_train_batch)[..., 0]
self.x_forecast = np.array(self.x_forecast)[..., 0]
y = np.array(y)[..., 0]
if optimiser == None:
self.optimiser = optim.Adam(self.net.parameters())
else:
self.optimiser = optimiser
c = int(len(x_train_batch) * self.train_size)
x_train, y_train = x_train_batch[:c], y[:c]
x_test, y_test = x_train_batch[c:], y[c:]
train_data = self.data_generator(x_train, y_train, self.batch_size)
test_losses = []
for i in range(self.epoch):
self.eval_test(test_losses, x_test, y_test)
self.train_100_grad_steps(train_data, test_losses)
if self.check_save:
pass
else:
if os.path.exists(self.CHECKPOINT_NAME):
os.remove(self.CHECKPOINT_NAME)
def predict(self, predict_data=None):
if (predict_data is None):
_, forecasted_values = self.net(torch.tensor(self.x_forecast, dtype=torch.float))
forecasted_values = forecasted_values.detach().numpy()
forecasted_values = forecasted_values * self.norm_constant
else:
if (predict_data.shape[0] != self.backcast_length):
raise Exception('Numpy array for prediction input should be of backcast_length: {} x 1 shape'.format(
self.backcast_length))
else:
predict_data = predict_data / self.norm_constant
predict_data = np.reshape(predict_data, (self.backcast_length, 1))
_, forecasted_values = self.net(torch.tensor(predict_data.T, dtype=torch.float))
forecasted_values = forecasted_values.detach().numpy()
forecasted_values = forecasted_values * self.norm_constant
return forecasted_values.T
实际训练代码
import numpy as npimport pandas
import torch
import pandas as pd
from nbeats import NeuralBeats
data_df = pd.read_csv("新的预测数据1.csv", sep="#")
# 输入为n*1维度的连续时间序列
train_data =pd.DataFrame()
test_data =pd.DataFrame()
# 使用80前预测后60
model=NeuralBeats(forecast_length=60, data=train_data["y"].values.reshape([-1,1]), backcast_length=80, save_model=False, path='',
checkpoint_name='NBEATS-checkpoint.th', mode='cpu', batch_size=30, thetas_dims=[4, 8],
nb_blocks_per_stack=5, share_weights_in_stack=False, train_percent=0.9, hidden_layer_units=128,
stack=None, Architecture='DRESS')
# nx1(numpy array)model=NeuralBeats(data=data,forecast_length=5,stack=[1,3],nb_blocks_per_stack=3,thetas_dims=[3,7])#or use prebuilt models#model.load(file='NBEATS.th')#use customised optimiser with parameters
model.fit(epoch=50,optimiser=torch.optim.AdamW(model.parameters, lr=0.001, betas=(0.9, 0.999), eps=1e-07, weight_decay=0.01,amsgrad=False),plot=True) # or #model.fit()
# 连续预测
pre_data=test_data["y"].values[:80].reshape([-1,1])
forecast=model.predict(predict_data=pre_data)
print("*" * 100)
print("forcast")
print(forecast)
print("*" * 100)
for _ in range(3):
pre_data=np.vstack([pre_data[-20:], forecast])
forecast=model.predict(predict_data=pre_data)
print("*" * 100)
print("forcast")
print(forecast)
print("*" * 100)
#or#model.predict(predict_data=pred_data) where pred_data is numpy array of size backcast_length*1
if __name__ == '__main__':
pass