Wednesday, August 14, 2024

Matrix Pencil (MP) methods for financial dataset (Prony method for finance) - example EURO-DOLLAR from ECB

Update record:
1. The originally posted on 31st May 2024
2. Sorting added on the function of MP on 3rd June 2024
3. Added line" f=f[0:1326] # Dataset from the first week of January 1990 to the last week of May 2024 " on 6th June 2024
4. Deleted " alpha_rec, phi_rec = MP(f,NTERMS) "and"f_hat=Exp(alpha_rec,phi_rec,f)" inside "plot2(f,f_hat,Title,Xticks):" function on 6th June 2024
5. The image of formulae modified on 10th June 2024
6. The image of formulae corrected on 25th June 2024
7. The explanation of selecting the time series range and its relation to the condition number added on 27th June 2024
8. "omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)] " replaced  with " omega = np.arange(_2n_1) " on 17th July 2024
9. The image of formulae revised on 17th July 2024
10. "if nterms>math.floor(n_samples*ncols_value):" in the function "MP(samples,nterms)" adjust the column of the rectangular Hankel matrix to be 1/3 of the time series length if the number of the term is smaller than it on 23rd July 2024
11. The analysis continued on https://art-blue-liberalism.blogspot.com/2024/07/estimates-based-on-matrix-pencil-mp.html (added on 23rd July 2024)
12. Amended "N=len(f)" to "N=len(samples)" and "y = f" to "y = samples" in the function "MP(samples,nterms)"

I have reproduced one of the State-Of-The-Art Prony-like methods called Matrix Pencil Method abbreviated as MP with Python.

As this is for analysing and reconstructing a financial dataset not based on the signal processing model, the exponential function is a little bit more simplified e.g. the sampling rate, denoted as M, is set as 1 in this algorithm.

In process of finding the generalised eigenvalues, Singular Value Decomposition (SVD) is implemented: The generalised eigenvalues of the unitary matrix from SVD of the rectangular Hankel matrix is used instead of the eigenvalue of the square Hankel matrix introduced last time.

This enables the method to mitigate the noise disrupting the parameter decomposition and the time series reconstruction. 

This also enables the method to set and adjust the number of the reconstruction terms of the parameters to adjust the shape of the model.


The following display my Python codes implementing MP: You may simply copy and paste these codes to try MP for financial data!

### !! Please find NTERMS (number of the reconstruction terns) to setup !! ##

## Importing libraries and setting up graphs and complex number
# Execute this cell to have access to the API of Frankfurter App
import requests
import json
import ast

# importing necessary tools
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy.optimize import newton
from scipy import stats
import statsmodels.formula.api as sm

import math
import cmath

from scipy.linalg import hankel

# Showing the plots bigger
plt.rcParams["figure.figsize"] = (20,10)

# defining imaginary number
i=complex(0,1)


## Downloading the dataset: Euro-Dollar weekly data from European Central Bank (ECB)

# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)

print("Type:", type(resp))

# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))

#mydata #show the accessed data
EURUSD=(mydata["rates"])
#EURUSD

# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]

for key, value in EURUSD.items():
    Dates_EURUSD.append(key)
    for k,v in value.items():
        Rates_EURUSD.append(v)

EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
#EURUSD

# # Show the dataset plot
# Xticks=list(EURUSD.keys())
# f=list(EURUSD.values()) # Define the time series f
# xi = list(range(len(Xticks)))
# # plot the index for the x-values
# plt.plot(xi, f)
# plt.xlabel('Weeks')
# plt.ylabel('EUR/USD')
# plt.xticks(xi, Xticks, rotation=90)
# plt.title('Weekly EUR/USD FX rate')
# plt.legend()
# plt.show()



## Prony-like method analysis starts!

# Define basic parameters
f=list(EURUSD.values()) # Define the time series f
f_org=f.copy() # Saving the original data
f=f[0:1326] # Dataset from the first week of January 1990 to the last week of May 2024
n=len(f)

# # plot Singular Value Decomposition (SVD)
# # for a reference to check how many exponential components can be grouped together.
# # ref. https://docs.scipy.org/doc/scipy/reference/generated/scipy.linalg.hankel.html
# def plot_SVD(samples):
#     d=math.floor(len(samples)/2)
#     H=hankel(samples)[0:d,0:d]
#     SVD=LA.svd(H, full_matrices=True, compute_uv=True, hermitian=False)[1]
#     Ln_SVD=[math.log(SVD[k]) for k in range(len(SVD)) ]
#     plt.plot(Ln_SVD)
#     plt.title('Singular Values')
#     plt.xlabel('')
#     plt.ylabel('Natural Log')
#     plt.show
# plot_SVD(f)


## Defininig the functions for the exponential analysis and reconstruction

# Definining the function of Matrix Pencil (MP) method,
# one of Prony-like methods.
def MP(samples,nterms):
    # Firstly creating a rectangular hankel matrix of the direct imput of time series
    n_samples=len(samples)
    ncols_value=1/3
    ncols=int(n_samples*ncols_value)
    nrows=int(n_samples+1-ncols)
    if nterms>math.floor(n_samples*ncols_value):
        if min(ncols,nrows)==ncols: #Increase the number of columns
            d=nterms-ncols
            ncols=ncols+d
            nrows=nrows-d
        elif min(ncols,nrows)==nrows: #Increase the number of rows
            d=nterms-nrows
            ncols=ncols-d
            nrows=nrows+d
    H=hankel(samples)[0:nrows,0:ncols]

    # Secondly, extracting the unitary matrix (left eigenvectors) of the hankel matrix
    SVD_H=LA.svd(H, full_matrices=True, compute_uv=True, hermitian=False)
    U=SVD_H[0]
    U0=U[0:len(U)-1,0:nterms]
    U1=U[1:len(U),0:nterms]

    # Generating (snall) omega, the periodic time-steps.
    n=math.floor(len(samples)/2)
    _2n_1=(2*n-1)+1
    M=1 # Sampling rate:
    #According to Shannon-Nyquist sampling theorem, the sampling rate must be twice of the maximum signal length.
    #However, M=1 the financial data not based on a particular signalling cycle model.
    #omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)] # Signal processing time steps
    omega = np.arange(_2n_1) # using a simple discrete time steps   Updated 17 07 2024

    # Obtaining (big) Omega containing the frequencies and the time step
    # Generalised engenvalues are decomposed by eig(U0\U1)
    # https://fr.mathworks.com/help/matlab/ref/eig.html
    OMEGA= scipy.linalg.eig(np.dot(np.linalg.pinv(U0),(U1)))[0]
    # frequencies and damping factors

    # phi_rec = [cmath.log(OMEGA[k])*M/(2*math.pi) for k in range(len(OMEGA))] # if using the signal process time step
    phi_rec = [cmath.log(OMEGA[k]) for k in range(len(OMEGA))]   # if using a simple discrete time steps   Updated 17 07 2024

    # Retrieving alpha
    V=np.transpose(np.vander(OMEGA, N=len(samples), increasing=True))
    # https://stackoverflow.com/questions/11479064/multiple-linear-regression-in-python
    y = samples
    X = V
    #X = X.T  # transpose so input vectors are along the rows
    #X = np.c_[X, np.ones(X.shape[0])]  # add bias term
    coeffs = np.linalg.lstsq(X, y, rcond=None)[0]
    alpha_rec=coeffs
    alpha_rec=alpha_rec.tolist()

    #shorting alphas and phs according to absolute values of alphas
    abs_alpha=[abs(alpha_rec[k]) for k in range(len(alpha_rec))]
    alpha_rec = [abs_alpha for _,abs_alpha in sorted(zip(abs_alpha,alpha_rec), reverse=True)]
    phi_rec = [abs_alpha for _,abs_alpha in sorted(zip(abs_alpha,phi_rec), reverse=True)]

    return alpha_rec,phi_rec


# Function for generating time series based on alphas and phis
def Exp(amp,freq,samples):
    n=math.floor(len(samples)/2)
    y=[]
    # Generating x, the periodic time-steps.
    _2n_1=(2*n-1)+1
    M=1 # Sampling rate:
    #According to Shannon-Nyquist sampling theorem, the sampling rate must be twice of the maximum signal length.
    #However, M=1 the financial data not based on a particular signalling cycle model.
    #omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)]
    omega = np.arange(_2n_1) # using a simple discrete time steps  Updated 17 07 2024
    for omg in range(len(omega)):
        yi=0
        for ap in range(len(amp)):
            yi+=amp[ap]*cmath.exp(freq[ap]*omega[omg])
        y.append(yi)
    return y

# Displaying alphas and phis in a table
def PametersDisplay(AlphasPhis,nterms):
    Header=['alphas','phis']
    List=AlphasPhis
    Table=dict(zip(Header,List))
    panda_df = pd.DataFrame(data = Table)
    pd.set_option('display.max_rows',nterms)
    panda_df.head(nterms)
    display(panda_df)

# Plotting two lines
def plot2(f,f_hat,Title,Xticks):
    # generating the reconstructed data given alphas and phis
    #print(Title)
    xi = list(range(len(Xticks)))
    plt.plot(f, label="original")
    plt.plot(f_hat, label="reconstructed")
    plt.legend(loc="upper right")
    plt.title(Title)
    plt.xlabel('Weeks')
    plt.ylabel('EUR/USD')
    plt.xticks(xi, Xticks, rotation=90)
    plt.show


## !!! Set the number of terms (NTERMS)  to reconstruct!!!
NTERMS=math.floor(n/2)-4 # <-- Set up NTERMS

# the number of the reconstruction terms i.e. the amplitude (alpha) + the frequency (phi)
# Adjust it until actually working for the reconstruction
# The fully fitting reconstruction requires the integer slightly smaller than a half of the time series length
# You may set up any integer for NTERMS e.g. small enough to extract only the trend
# Set the lag parameter e.g.4 to make the number of the reconstruction

## Output
# Showing the MP analysis and reconstruction result
print(f'NTERMS={NTERMS}')
Parameters=MP(f,NTERMS)
# generating the reconstructed data given alphas and phis
alpha_rec, phi_rec = Parameters
f_hat=Exp(alpha_rec,phi_rec,f)
f_hat=np.array(f_hat).real # Extracting only the real number
f_hat=f_hat.tolist() # Converting back to list from numpy
EDkeys=list(EURUSD.keys())
Tittle=(f'Euro vs. US-Dollar FX rate from {EDkeys[0]} to {EDkeys[-1]}  Number of terms used {NTERMS}')
plot2(f,f_hat,Tittle,list(EURUSD.keys()))
PametersDisplay(Parameters,NTERMS)





At this attempt, this method reconstructs the time series almost perfectly fitting with the original time series.  The output of a table (showing the top) and a graph is as below:




The snapshot of the main function of MP



On the other hand, we must note that the condition of modelling this financial time series is selecting the appropriate range of the time series.

The cycle period has to be more or less evenly distributed so that it should not be cut off in the middle of one fiscal period. Otherwise, this signal processing analysis method are less likely to retrieve the appropriate parameters.

In case of failing to retrieve the parameters, the condition number of the Vandermonde matrix of the exponents becomes significantly higher. The following codes indicate how the selection of the time series range influences the condition number of the matrix.  The condition number is the linear algebraic reference of the reliability of the values returned from the algebraic calculus.

These additional codes also visualise the Vandermonde matrices of each time series range selected. The matrix becomes often ill-conditioned when some exponents out of the series become extremely higher than the others after multiply powered.

##############

## Checking Condition Number of the selected dataset range

# Producing a Vandermonde Matrix of Omega based on the MP method
def CondNo_VanderM_MP(samples,nterms):
    # Firstly creating a rectangular hankel matrix of the direct imput of time series
    n_samples=len(samples)
    ncols_value=1/3
    ncols=int(n_samples*ncols_value)
    nrows=int(n_samples+1-ncols)
    if nterms>math.floor(n_samples*ncols_value):
        if min(ncols,nrows)==ncols: #Increase the number of columns
            d=nterms-ncols
            ncols=ncols+d
            nrows=nrows-d
        elif min(ncols,nrows)==nrows: #Increase the number of rows
            d=nterms-nrows
            ncols=ncols-d
            nrows=nrows+d
    H=hankel(samples)[0:nrows,0:ncols]
   
    # Secondly, extracting the unitary matrix (left eigenvectors) of the hankel matrix
    SVD_H=LA.svd(H, full_matrices=True, compute_uv=True, hermitian=False)
    U=SVD_H[0]
    U0=U[0:len(U)-1,0:nterms]
    U1=U[1:len(U),0:nterms]
   
    # Generating (snall) omega, the periodic time-steps.
    n=math.floor(len(samples)/2)
    _2n_1=(2*n-1)+1
    M=1 # Sampling rate:
    #According to Shannon-Nyquist sampling theorem, the sampling rate must be twice of the maximum signal length.
    #However, M=1 the financial data not based on a particular signalling cycle model.
    #omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)]
    t = list(range(_2n_1)) # using a simple discrete time trend instead of omega

    # Obtaining (big) Omega containing the frequencies and the time step
    # Generalised engenvalues are decomposed by eig(U0\U1)
    # https://fr.mathworks.com/help/matlab/ref/eig.html
    OMEGA= scipy.linalg.eig(np.dot(np.linalg.pinv(U0),(U1)))[0]
    # frequencies and damping factors
   
    phi_rec = [cmath.log(OMEGA[k])*M/(2*math.pi) for k in range(len(OMEGA))]
   
    # Vandermonde Matrix
    VandermondeMatrix=np.transpose(np.vander(OMEGA, N=n_samples, increasing=True))
    # and then its condition number
    ConditionNumber=LA.cond(VandermondeMatrix)
    return VandermondeMatrix, ConditionNumber

# Showing the plots bigger
plt.rcParams["figure.figsize"] = (20,20)
NoInt=3 # Set to determine the number of interval windows
ShiftWindow=0
List_absVM=[]
List_lnCN=[]
List_CD=[]
for intvl in range(NoInt):
    NoWeeks=[]
    CondNos=[]
    f=f_org.copy()
    EDkeys=list(EURUSD.keys())
    IntvlRg=8
    plt.figure(intvl)
    plt.subplot(NoInt,1,intvl+1)
    for tcut in range(IntvlRg):
        f=f_org.copy()
        tlen=len(f)-(tcut+ShiftWindow)*(1+IntvlRg*intvl)
        f=f[0:tlen]
        NTERMS=math.floor(len(f)/2)-4 # <-- Set up NTERMS
        # Generating the condition number of the Vandermonde matrix
        VM, CN = CondNo_VanderM_MP(f,NTERMS)
        NoWeeks.append(EDkeys[tlen-1])
        CondNos.append(CN)
       
        # Saving the elements of the Vandermonde Matrix
        List_absVM.append(abs(VM))
        List_lnCN.append(math.log(CN))
        List_CD.append(EDkeys[tlen-1])
    ShiftWindow+=tcut # Shifting the point of cutting time series for another window
   
    LnCondNos=[math.log(CondNos[k]) for k in range(len(CondNos))]
    xi = list(range(len(NoWeeks)))
    LabelName="Logged Condition No"
    plt.plot(LnCondNos, label=LabelName)
    #plt.legend(loc="upper right")
    plt.title(f'Condition Number of Vandermonde Matrix (MP) for different #weeks pg. {intvl+1}')
    plt.xlabel(f'Weeks')
    plt.ylabel(LabelName)
    plt.xticks(xi, NoWeeks) #, rotation=90)
    plt.show

plt.rcParams["figure.figsize"] = (7,7) # Resizing for showing the matrix elements' magnitudes
for lvm in range(len(List_absVM)):
    plt.figure(intvl+1+lvm)
    LCNtct='%.3f'%(List_lnCN[lvm])
    plt.title(f'The end week {List_CD[lvm]}, Logged Condition No. {LCNtct} ')
    img = plt.imshow(List_absVM[lvm], origin='upper', cmap='jet', interpolation='nearest', aspect='auto')
    plt.colorbar()
    plt.show()

plt.rcParams["figure.figsize"] = (20,10) # Making it back to the default setting size

 


No comments: