1. Originally posted on 14 August 2024
2. Cadzow iteration for determining the number of the physical terms added on 22nd August 2024 (The first plot graph replaced with the new one)
At this time, MP is experimented with the stock market dataset. Time series datasets are the market indices e.g. S&P 500, NASDAQ, and TOPIX, and company stocks (Amazon.com is used as an example). MP reconstructs the smoothed time series of the index values and the stock values by extracting the noises/errors. The advantage of MP is enabling to forecast the direction of how these values move on the top of the volatility. Furthermore, it offers the future forecast of the seasonal behaviour of this time series.
My Python codes are as follows. I designed to pick up which index and stock to use by changing their ticker symbol at the beginning of this set of the codes. It is coded to call the API of Yahoo Finance to automatically download the dataset. Both the index value and the stock value are individually analysed and estimated by MP. The last graph shows the movement of the percentage difference between the stock value and the index value by means of the MP analysis.
-- Cadzow iteration for determining the number of the physical terms added on 22nd August 2024 --
The codes for the noise filtering method Cadzow iteration are added to ease finding the number of the physical terms. The number of the terms founds by filtering a time series with Cadzow iteration offers a more smoothed consistent estimate with less fluctuation. It now also offers a clearer plot of the future estimate which helps investors to know whether a stock over-performs or under-performs compared to the market index.
# Parameter Adjustment
# Set Ticker Symbol
# e.g. Ticker="^GSPC" for S&P 500, Ticker="NDAQ" for NASDAQ, Ticker="TOPX" for TOPIX
Ticker_index="NDAQ" # Ticker sympol for the index
# Ticker="AMZN" for Amazon, Ticker="NTDOY" for NINTENDO
Ticker_stock="AMZN" # Ticker symbok for a stock
# Set the number of years to download data
No_Years_Data=10;
# Set the numbers of days for the future forecast
No_Days_FutureForecast=250*1
# importing necessary tools for math and plot
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy import stats
import statsmodels.formula.api as sm
import math
import cmath
from scipy.linalg import hankel
import scipy.odr
from scipy import odr
# defining imaginary number
i=complex(0,1)
# For plotting with multiple axis with different scales
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA
# Showing the plots bigger
plt.rcParams["figure.figsize"] = (18,9)
## Calling API from Yahoo Finance
# # https://www.kaggle.com/code/alessandrozanette/s-p500-analysis-using-yfinance-data
# !pip install yfinance # uncomment this part to run independently at the first time
# Calling stock values from Yahoo finance API
# Ref. https://www.kaggle.com/code/alessandrozanette/s-p500-analysis-using-yfinance-data
import yfinance as yf
%config InlineBackend.figure_format='retina'
import warnings
warnings.filterwarnings("ignore")
# Now, let's retrieve the S&P 500 data of the past x years using yfinance
Years="".join([str(No_Years_Data),'y']) # Set the number of years ! Adjust
Index = yf.Ticker(Ticker_index).history(period=Years) # Ticker Symbokl ! Adjust !
Stock = yf.Ticker(Ticker_stock).history(period=Years) # Ticker Symbokl ! Adjust !
# Let's take a look at the data
display(Index.tail())
display(Stock.tail())
# Change to list
# https://stackoverflow.com/questions/39597553/from-datetimeindex-to-list-of-times
Dates=Stock.index.tolist()
Dates=[Dates[t].strftime("%Y-%m-%d") for t in range(len(Dates))]
Values_Index=Index["Close"].tolist()
Values_Stock=Stock["Close"].tolist()
# Storing them as a dictionary
Stock_Info_dictionary=dict(zip(Dates,zip(Values_Index,Values_Stock)))
# Defining variables for time series analysis
t_k=Dates.copy() # Dates for time step k
Y_1t=Values_Index.copy() # Index values on date
Y_2t=Values_Stock.copy() # Stock values on date
y_1t=np.log(Y_1t).tolist() # natural logged index values
y_2t=np.log(Y_2t).tolist() # natural logged stock values
N=len(t_k) # Time series length
# Creating a new variable
# % difference between Ticker Symbol {Ticker_stock} and Ticker Symbol {Ticker_index} from {EDkeys[len(EDkeys)-len(f_t)]} to {EDkeys[-1]} Number of terms used {NTERMS}')
y_1t=np.diff(np.log(Y_1t))
y_2t=np.diff(np.log(Y_2t))
f_t=(np.log(Y_2t)-np.log(Y_1t))/np.log(Y_1t)
# Prony method
## Defininig the functions for the exponential analysis and reconstruction
# Definining the function of Matrix Pencil (MP) method,
# one of Prony-like methods.
def MP(samples,nterms):
# Firstly creating a rectangular hankel matrix of the direct imput of time series
n_samples=len(samples)
ncols_value=1/3
ncols=int(n_samples*ncols_value)
nrows=int(n_samples+1-ncols)
if nterms>math.floor(n_samples*ncols_value):
if min(ncols,nrows)==ncols: #Increase the number of columns
d=nterms-ncols
ncols=ncols+d
nrows=nrows-d
elif min(ncols,nrows)==nrows: #Increase the number of rows
d=nterms-nrows
ncols=ncols-d
nrows=nrows+d
H=hankel(samples)[0:nrows,0:ncols]
# Secondly, extracting the unitary matrix (left eigenvectors) of the hankel matrix
SVD_H=LA.svd(H, full_matrices=True, compute_uv=True, hermitian=False)
U=SVD_H[0]
U0=U[0:len(U)-1,0:nterms]
U1=U[1:len(U),0:nterms]
# Generating (snall) omega, the periodic time-steps.
n=math.floor(len(samples)/2)
_2n_1=(2*n-1)+1
M=1 # Sampling rate:
#According to Shannon-Nyquist sampling theorem, the sampling rate must be twice of the maximum signal length.
#However, M=1 the financial data not based on a particular signalling cycle model.
#omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)] # Signal processing time steps
omega = np.arange(_2n_1) # using a simple discrete time steps Updated 17 07 2024
# Obtaining (big) Omega containing the frequencies and the time step
# Generalised engenvalues are decomposed by eig(U0\U1)
# https://fr.mathworks.com/help/matlab/ref/eig.html
OMEGA= scipy.linalg.eig(np.dot(np.linalg.pinv(U0),(U1)))[0]
# frequencies and damping factors
# phi_rec = [cmath.log(OMEGA[k])*M/(2*math.pi) for k in range(len(OMEGA))] # if using the signal process time step
phi_rec = [cmath.log(OMEGA[k]) for k in range(len(OMEGA))] # if using a simple discrete time steps Updated 17 07 2024
# Retrieving alpha
V=np.transpose(np.vander(OMEGA, N=len(samples), increasing=True))
# https://stackoverflow.com/questions/11479064/multiple-linear-regression-in-python
y = samples
X = V
coeffs = np.linalg.lstsq(X, y, rcond=None)[0]
alpha_rec=coeffs
alpha_rec=alpha_rec.tolist()
#shorting alphas and phs according to absolute values of alphas
abs_alpha=[abs(alpha_rec[k]) for k in range(len(alpha_rec))]
alpha_rec = [abs_alpha for _,abs_alpha in sorted(zip(abs_alpha,alpha_rec), reverse=True)]
phi_rec = [abs_alpha for _,abs_alpha in sorted(zip(abs_alpha,phi_rec), reverse=True)]
return alpha_rec,phi_rec
# Function for generating time series based on alphas and phis
def Exp(amp,freq,samples):
n=math.floor(len(samples)/2)
y=[]
# Generating x, the periodic time-steps.
_2n_1=(2*n-1)+1
omega = np.arange(_2n_1) # using a simple discrete time steps
for omg in range(len(omega)):
yi=0
for ap in range(len(amp)):
yi+=amp[ap]*cmath.exp(freq[ap]*omega[omg])
y.append(yi)
return y
# Displaying alphas and phis in a table
def PametersDisplay(AlphasPhis,nterms):
Header=['alphas','phis']
List=AlphasPhis
Table=dict(zip(Header,List))
panda_df = pd.DataFrame(data = Table)
pd.set_option('display.max_rows',nterms)
panda_df.head(nterms)
display(panda_df)
# Plotting two lines
def plot2(f,f_hat,Title,Xticks):
# generating the reconstructed data given alphas and phis
#print(Title)
xi = list(range(len(Xticks)))
plt.plot(f, label="original")
plt.plot(f_hat, label="reconstructed")
plt.legend(loc="upper right")
plt.title(Title)
plt.xlabel('Weeks')
plt.ylabel('EUR/USD')
plt.xticks(xi, Xticks, rotation=90)
plt.show
# Plotting two lines
def plot2_future(amp,freq,samples,NoWeeks,Title):
# Function for generating time series based on alphas and phis
N=len(samples)
N_new=N+NoWeeks
n=math.floor(N_new/2)
y=[]
# Generating x, the periodic time-steps.
_2n_1=(2*n-1)+1
omega = np.arange(_2n_1) # using a simple discrete time steps
for omg in range(len(omega)):
yi=0
for ap in range(len(amp)):
yi+=amp[ap]*cmath.exp(freq[ap]*omega[omg])
y.append(yi)
# generating the reconstructed data given alphas and phis
#print(Title)
plt.plot(samples, label="Original")
plt.plot(y, label="Reconstructed")
plt.legend(loc="upper right")
plt.title(Title)
plt.xlabel('Weeks')
plt.ylabel('EUR/USD')
plt.show
return y
# plot logged Singular Value Decomposition (SVD) and the log difference
# and returns the odd number indices of the terms representing the physical components
# for a reference to check how many exponential components can be grouped together.
# ref. https://docs.scipy.org/doc/scipy/reference/generated/scipy.linalg.hankel.html
def plot_lnSVD_logdiff(samples):
# calculating logged SVs and their log difference
N=math.floor(len(samples))
H=hankel(samples)[0:math.floor(N*2/3),0:math.floor(N*1/3)]
SVD=LA.svd(H, full_matrices=True, compute_uv=True, hermitian=False)[1]
ln_SVD=[math.log(SVD[k]) for k in range(len(SVD)) ]
d_ln_SVD=[abs(ln_SVD[k+1]-ln_SVD[k]) for k in range(len(SVD)-1) ]
# Plotting two line plots with each different scale
fig, ax1 = plt.subplots()
color = 'tab:blue'
ax1.set_xlabel('time (s)')
ax1.set_ylabel('exp', color=color)
ax1.plot(ln_SVD, color=color)
ax1.tick_params(axis='y', labelcolor=color)
ax2 = ax1.twinx() # instantiate a second Axes that shares the same x-axis
color = 'tab:red'
ax2.set_ylabel('sin', color=color) # we already handled the x-label with ax1
ax2.plot(d_ln_SVD, color=color)
ax2.tick_params(axis='y', labelcolor=color)
fig.tight_layout() # otherwise the right y-label is slightly clipped
plt.show()
# Selecting the indices of the significant terms representing the physical components
# The number of indices has to be odd as the first term with its index no 0 is the real term constant
# and the rest complex terms are the pairs
EnumSVD = [list(enumerate(d_ln_SVD), start=1)[k] for k in range(len(d_ln_SVD))]
TermIndices=[]
for k in range(math.floor(len(samples)/3*2/3)):
if EnumSVD[k][1]>statistics.mean(d_ln_SVD)*3: # !! ** Select the threshold of selecting terms ** !!
if EnumSVD[k][0]%2==0:
TermIndices.append(EnumSVD[k][0]+1)
else:
TermIndices.append(EnumSVD[k][0])
return list(set(TermIndices))
## Cadzow iteration
# Cadzow iteration to filter the noise terms to ease picking up the phisical component terms
N=len(f_t)
m=math.floor(N*2/3) # rows
H=hankel(f_t)[0:m,0:(N-m)]
NTRY=plot_lnSVD_logdiff(f_t)[-1] # Obtaining the number of the columns to keep in Cadzof Iteration
tf=1; thresh=0.05;hmax=0.0005; CadzowILoopCount=0;
hmean=np.zeros(N)
while tf==1: # Loop while hmax>thresh i.e. until hmax<thresh
tf=0; # Set the boolen as false at the beginning of the loop
U,s,Wtrans = LA.svd(H, full_matrices=True, compute_uv=True, hermitian=False) # Decomposing Hankel to the eigentriple
S=np.diag(s) # Transforming an array of SVs to a diagonal matrix
# Cutting out the columns of the eigentriple
U=U[:,0:NTRY]
S=S[0:NTRY,0:NTRY]
Wtrans=Wtrans[0:NTRY,:]
H=np.dot(np.dot(U,S),Wtrans) # multiplying these eigentriples to recreate a matrix
# The following operation for Hankelising this matrix
# First block iteration: Denominator increasing
for k in range(N-m):
hmean[k]=0;
for l in range(k+1):
hmean[k]=hmean[k]+H[k-l,l]; #print(1, k, l, H[k-l,l], hmean[k])
hmean[k]=hmean[k]/(k+1) ;
hmax=0.0005;
for l in range(k+1):
hmax=max(hmax,abs(H[k-l,l]-hmean[k])/abs(hmean[k]));
H[k-l,l]=hmean[k]
#print(tf)
tf = tf or int(hmax>thresh);
# Second block iterlation: Denominator fixed
for k in range(N-m,m):
hmean[k]=0;
for l in range(N-m):
hmean[k]=hmean[k]+H[k-l,l];
hmean[k]=hmean[k]/(N-m) ;
hmax=0.0005;
for l in range(N-m):
hmax=max(hmax,abs(H[k-l,l]-hmean[k])/abs(hmean[k]));
H[k-l,l]=hmean[k]
tf = tf or int(hmax>thresh);
# Third block iteration
for k in range(m,N):
hmean[k]=0;
for l in range((N-m),(k-m),-1):
hmean[k]=hmean[k]+H[k-l,l-1];
hmean[k]=hmean[k]/(N-k) ;
hmax=0.0005;
for l in range((N-m),(k-m),-1):
hmax=max(hmax,abs(H[k-l,l-1]-hmean[k])/abs(hmean[k]));
H[k-l,l-1]=hmean[k];
tf = tf or int(hmax>thresh)
CadzowILoopCount=CadzowILoopCount+1
print('Cadzow Loop Count',CadzowILoopCount)
# Obtaining a filtered time series data from Hankel matrix after Cadzow iteration
f_t_tilde=np.concatenate((H[0:m,1],H[m-1,0:(N-m)]))
EDkeys=list(Stock_Info_dictionary.keys())
Tittle=(f'Original VS Cadzow iterated {EDkeys[0]} to {EDkeys[-1]} Number of terms used {NTRY}')
plot2(f_t,f_t_tilde,Tittle,list(Stock_Info_dictionary.keys()))
# Finding #terms from Cadzow integrated series
NTERMS_list=plot_lnSVD_logdiff(f_t_tilde)
NTERMS=NTERMS_list[-1]
## MP reconstruction Output
# based on the original time series data (before filtered) and the terms obtained from Cadzow-iterated data
# Showing the MP analysis and reconstruction result
print(f'NTERMS={NTERMS}')
Parameters=MP(f_t,NTERMS)
# generating the reconstructed data given alphas and phis
alpha_rec, phi_rec = Parameters
f_t_hat=Exp(alpha_rec,phi_rec,f_t)
f_t_hat=np.array(f_t_hat).real # Extracting only the real number
f_t_hat=f_t_hat.tolist() # Converting back to list from numpy
EDkeys=list(Stock_Info_dictionary.keys())
Tittle=(f'Estimate of % difference between Ticker Symbol {Ticker_stock} and Ticker Symbol {Ticker_index} from {EDkeys[len(EDkeys)-len(f_t)]} to {EDkeys[-1]} Number of terms used {NTERMS}')
alpha_rec, phi_rec = Parameters
Future_Days=No_Days_FutureForecast # ! ** Adjust **
f_t_future=plot2_future(alpha_rec, phi_rec,f_t,Future_Days,Tittle)