The following data analysis is estimating the daily return (natural log difference) of the foreign exchange rate (FX) of Euro (EUR) based on US-Dollar (USD) from 1997 to 2023.
The signal processing of the entire time period from January 1997 to 8th May 2023:
The signal processing representing the last one year (to 17th of May, 2023) (IntervalRange=10 ### adjust! )
The data analysis introduced here is the brand-new approach to the financial economics which often involves non-linear variables with an exponentially expanding/decaying signal which the conventional econometric methods tend to struggle dealing with.
In terms of the non-linear signal processing like approach, many financial data analysts have adapted Fourier transfer. However, Fourier transfer has a critical disadvantage of dealing with the non-stable signal as shown in these graphs and failing to detect more than one signal frequencies close to each other.
De Prony was the mathematician famous at the contemporary time period of when Fourier was alive. Fourier method has been popular for a long time period mainly because of the convenience with a limited computational capability. Nevertheless, the current advancement of computational mathematics has enabled to apply Prony method to the advanced mathematical data analyses.
Prony method is advantageous to detect various signal frequencies with their amplitude (coefficient) even for an unstable signal processing. Through the time length, it can describe various waving formations of a time series. For example, the seasonality and the event risk can be detected by observing these varying trends.
The conventional financial economic models are limited to finding one coefficient of the autocorrelation of either a variable or its error-terms/residuals. Furthermore, they are biased while dealing with a variable where no convergence can be proven. In contrast, this estimate based on Prony method figures out the entire dynamic movement of a variable.
I have just started studying about this novel econometric method so that I admit that both my understanding and my implementation are still incomplete, and its improvement is quite likely to be required. At the same time, I am confident in having understood the core concept and the fundamentally required mathematical methods. Therefore, I have produced my Python codes implementing this data analysis method.
The following Python codes implement Prony method (Subspace method) to generate the signal processing model of the daily return (log-difference) of EUR/USD. This includes all the necessary functions such as Hankel matrix, finding Eigenvalues with Verndermonde matrices, Singular Value Decomposition (SVD), the matrix pencil method (MPM) to find the coefficients and the signal frequencies, and the total least square (TLS) to specify the amplifier coefficients and the signal frequencies.
# Execute this cell to have access to the API of Frankfurter App
import requests
import json
# importing necessary tools
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy.optimize import newton
from scipy import stats
import statsmodels.formula.api as sm
import math
import cmath
from scipy.linalg import hankel
import scipy.odr
from scipy import odr
# Showing the plots bigger
plt.rcParams["figure.figsize"] = (12,6)
# defining complex number
i=complex(0,1)
i
# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
# Write your code here
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)
print("Type:", type(resp))
# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))
#mydata #show the accessed data
EURUSD=(mydata["rates"])
#EURUSD
# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]
for key, value in EURUSD.items():
Dates_EURUSD.append(key)
for k,v in value.items():
Rates_EURUSD.append(v)
EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
#EURUSD
#Rates_EURUSD
print("Length of data:",len(Rates_EURUSD))
# Converting the rate to natural logarithm
ln_Rates_EURUSD=np.log(Rates_EURUSD)
ln_Rates_EURUSD
#plt.plot(ln_Rates_EURUSD) # Uncomment to show plot!
# Creating the log-difference (Daily return) of EUR/USD
d_ln_Rates_EURUSD=np.diff(np.log(Rates_EURUSD))
len(d_ln_Rates_EURUSD)
#plt.plot(d_ln_Rates_EURUSD) # Uncomment to show plot!
# Obtaining the length which is half of the total length of this variable.
# This is needed for creating the matrix of this variable i.e. n/2 * n/2 matrix
X=d_ln_Rates_EURUSD.copy()
d=(int(len(X)/2-len(X)%2*0.5)) # Subtracting the remainder if it is an odd number
d
f=d_ln_Rates_EURUSD.copy()
n=len(f)
M=100 # sampling rate.
# M is supposed to be twice of the actual signal frequency.
# But, we have never known the true frequency of the financial data.
# So, it is assumed to be 100 because it seems to provide an appropriate answer
print(n) # Checking the total length of this variable.
# Creating Hankel matrix which is needed to
def mat_ge(samples):
n_samples = len(samples)
d = int(n_samples/2)
H0=hankel(samples[0:-1])[0:d,0:d]
H1=hankel(samples[1:])[0:d,0:d]
return H0, H1
H0, H1 =mat_ge(f)
H=(H0,H1)
for h in range(len(H)):
x=H[h]
panda_df = pd.DataFrame(data = x)
display(panda_df)
# Defining the function for plotting a line graph
#Threshold=100
def PLOT(x,Tittle,Start,End):
x=x[Start:End]
plt.plot(x)
plt.title(Tittle)
plt.xlabel('')
plt.ylabel('- Days ->')
plt.show
# Solving for B # But not used for this MPM method
# compute the amplitudes (coefficients)
B=LA.solve(H0, -1*f[d:]) # Only pick the real part of V matrix!
beta_rep=B.real
x=beta_rep
panda_df = pd.DataFrame(data = x)
panda_df
#PLOT(x,"The coefficient Beta for root-finding",0,-1) # Uncomment to show plot!
# # Please remember that these functions are used for creating Xs
# # for the TLS for alpha and phi
# # Defining the function listing the real part of each row of a matrix A
# # which is supposed to be the hankel matrix
# # showing lagged time series variables t_0, t_1, t_2, ..., t_n/2
# def X_list_gen(A): # A must be a matrix
# d_c=len(A)
# X_list=[]
# for k in range(d_c):
# X_list.append((A.real)[k])
# return X_list
# # Generating the lagged time series variables t_0, t_1, t_2, ..., t_n/2
# # from Hankel matrix
# y=-1*f[len(H0):] # Length?
# x_list=X_list_gen(H0)
# len(y)-len(x_list)
# # Trying to find Beta s with the Total Least Square (TLS) estiamtes
# # The code is quoted from
# # Linear: https://docs.scipy.org/doc/scipy/reference/generated/scipy.odr.multilinear.html
# # Multiple: https://stackoverflow.com/questions/35041266/scipy-odr-multiple-variable-regression
# # The following is the fullset codes of the TLS estimate
# def linfit(beta, x):
# Y= beta[len(x)]
# for k in range(len(x)):
# Y+=beta[k]*x[k]
# return Y #notice changed indices for x # Here!
# x = np.row_stack( x_list ) #odr doesn't seem to work with column_stack # Here!
# linmod = scipy.odr.Model(linfit)
# data = scipy.odr.Data(x, y)
# beta0=[1.]*(len(x)+1)
# odrfit = scipy.odr.ODR(data, linmod, beta0) # Here!
# odrres = odrfit.run()
# odrres.pprint()
# Defnining the function plotting Singular Value Decomposition (SDV)
# This apprximate the number of the physical/true signals i.e. the number of true alpha s and phi s
def plot_SVD(A):
SDV=LA.svd(A, full_matrices=True, compute_uv=True, hermitian=False)[1]
Ln_SDV=[math.log(SDV[k]) for k in range(len(SDV)) ]
plt.plot(Ln_SDV)
plt.title('Singular Values')
plt.xlabel('')
plt.ylabel('Natural Log')
plt.show
#plot_SVD(H0) # Uncomment to show plot!
# Finding the Eigenvalue
### But the order is different from MATLAB version!
E= scipy.linalg.eig(H0, H1)[0]
#E=np.sort(E)
#E
# Showing Verndermonde Matrix
V=np.transpose(np.vander(E, N=None, increasing=True))
panda_df = pd.DataFrame(data = V)
panda_df.head()
len(f[:d])
# Calculating the coefficient alpha s with the matrix pencil method (MPM)
# Compute the amplitudes (coefficients) by the mldivide V\f[d:]=A
if len(f[d:])!=d:
Adj=1
else:
Adj=0
A=LA.solve(V.real, f[d+Adj:]) # Only pick the real part of V matrix! # Length?
alpha_rep=A.real
x=alpha_rep
panda_df = pd.DataFrame(data = x)
panda_df
#PLOT(x,Threshold,0,-1)
#PLOT(x,"Coeff. alpha (The Amplifier) calculated by MPM",0,-1) # Uncomment to show plot!
(math.ceil(4.2))
round(d/20,0)
# The Total Least Square (TLS) code is quoted from
# Linear: https://docs.scipy.org/doc/scipy/reference/generated/scipy.odr.multilinear.html
# Multiple: https://stackoverflow.com/questions/35041266/scipy-odr-multiple-variable-regression
# The following is the fullset codes of the TLS estimate
# Trying the TLS to find alpha for each different interval
X=E.real
Y=f[d:]
X_List=[]
Y_List=[]
alpha_List_bin=[]
alphaTest_List=[]
alpha_List=[]
alpha_Itvl_Loc=[]
#Num_Interval=30
#IntervalRange=int(d/Num_Interval)
IntervalRange=10 ### adjust!
for k in range(int(math.ceil(d/IntervalRange))):
Start=k*IntervalRange
End=min((k+1)*IntervalRange,d)
X_List.append(X[Start:End])
Y_List.append(Y[Start:End])
#print('X length',len(X_List[k]))
#print('Y length',len(Y_List[k]))
x=X_List[k].copy()
y=Y_List[k].copy()
data = odr.Data(x, y)
odr_obj = odr.ODR(data, odr.multilinear)
output = odr_obj.run()
alpha_List_bin.append(output.beta[1]) # Saving all the alpha s calculated
alphaTest_List.append(output.sd_beta[1]*1.96) # 95% Confidence T-interval based on the standard error of each alpha
T=abs(alpha_List_bin[k])-abs(alphaTest_List[k]) # T-test to compare the coeff. alpha with the T-interval
#print(T)
if T>0:
alpha_List.append(output.beta[0]) # Picking up only the coeff. alpha s significant with the 95% confidence level
alpha_Itvl_Loc.append(End) # Which time period this alpha is
else:
continue
print(alpha_List)
print(alpha_Itvl_Loc)
#plt.plot(alpha_List) # Uncomment to show plot!
### Around 1.5 is the biggest spike of alpha also shown in the mldivide V\f[d:]=A so that these candidates of alpha found by the TLS seem to be suitable to be selected
# Recovering phi s by transforming the eigenvalues to their natural logarithm
# phi is the power of the exponential which is the complex number
# phi s represent the frequency of this signal
# frequencies and damping factors
X = [(cmath.log(E[k])*M/(2*math.pi)) for k in range(len(E))]
#phi_rep = [(cmath.log(E[k])*M/(2*math.pi)).imag for k in range(len(E))] # .imag shows only the imaginary part
x=X
panda_df = pd.DataFrame(data = x)
panda_df.head()
#display(panda_df)
# PLOT(x,"The real part of phi s")
# The real part of phi s
# frequencies and damping factors
# phi_rep = [(cmath.log(E[k])*M/(2*math.pi)) for k in range(len(E))]
X_real = [(cmath.log(E[k])*M/(2*math.pi)).real for k in range(len(E))] # .imag shows only the imaginary part
x=X_real
panda_df = pd.DataFrame(data = x)
panda_df.head()
#display(panda_df)
#PLOT(x,"The real part of phi s",0,-1) # Uncomment to show plot!
# The imaginary part of phi s
# frequencies and damping factors
#phi_rep = [(cmath.log(E[k])*M/(2*math.pi)) for k in range(len(E))]
X_img = [(cmath.log(E[k])*M/(2*math.pi)).imag for k in range(len(E))] # .imag shows only the imaginary part
x=X_img
panda_df = pd.DataFrame(data = x)
panda_df.head()
#display(panda_df)
#PLOT(x,"The imaginary part of phi s",0,-1) # Uncomment to show plot!
# definining number of exponentials in the signal
n=len(alpha_List)
n
# determining sampling rate
M = 100 # M is supposed to be twice of the actual signal frequency.
# But, we have never known the true frequency of the financial data.
# So, it is assumed to be 100 because it seems to provide an appropriate answer
print(n) # Checking the total length of this variable.
#omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)]
sample_step = []
for k in range(d):
a=2*math.pi/M*(k)
b=2*math.pi/M*(k)*i
sample_step.append((a, b))
#sample_step
# Creating the time trend variable for the complex analysis
sample_step= [2*math.pi/M*k for k in range(d)]
#sample_step
X_img=[X[k].imag for k in range(len(X))]
# The Total Least Square (TLS) code is quoted from
# Linear: https://docs.scipy.org/doc/scipy/reference/generated/scipy.odr.multilinear.html
# Multiple: https://stackoverflow.com/questions/35041266/scipy-odr-multiple-variable-regression
# The following is the fullset codes of the TLS estimate
# Trying the TLS to find phi for each different interval
X=sample_step.copy()
Y=X_img.copy()
X_List=[]
Y_List=[]
phi_List_bin=[]
phiTest_List=[]
phi_List=[]
phi_Itvl_Loc=[]
IntervalRange=6 ### adjust!
for k in range(int(math.ceil(d/IntervalRange))):
Start=k*IntervalRange
X_List.append(X[:d-Start])
Y_List.append(Y[Start:d])
x=X_List[k].copy()
y=Y_List[k].copy()
data = odr.Data(x, y)
odr_obj = odr.ODR(data, odr.multilinear)
output = odr_obj.run()
phi_List_bin.append(output.beta[1]) # Saving all the phi s calculated
phiTest_List.append(output.sd_beta[1]*1.960) # 95% Confidence T-interval based on the standard error of each alpha
T=abs(phi_List_bin[k])-abs(phiTest_List[k]) # T-test to compare the coeff. phi with the T-interval
#print(T)
if T>0:
phi_List.append(output.beta[0]) # Picking up only the coeff. phi s significant with the 95% confidence level
phi_Itvl_Loc.append(Start) # Which time period this phi is
else:
continue
#print(phi_List_bin)
#print(phiTest_List)
print(phi_List)
print(phi_Itvl_Loc)
#plt.plot(phi_List) # Uncomment to show plot!
phi_List
# copying these alpha s and the imaginary part of phi s
# as assumed to be the true alpha s and the true phi s
PhysicalParts=min(len(alpha_List),len(phi_List))
alpha = alpha_List[:PhysicalParts].copy()
PHI = phi_List[:PhysicalParts].copy()
phi = [PHI[k]*i for k in range(len(PHI))]
### Simulation part
### Simulation part ###
# The following section is simulating the signal processing based on
# alpha s and phi s which are assumed to be the physical/true parts
# sampling rate
M = 100 # M is supposed to be twice of the actual signal frequency.
# But, we have never known the true frequency of the financial data.
# So, it is assumed to be 100 because it seems to provide an appropriate answer
print(n) # Checking the total length of this variable.
# number of exponentials in the signal
PhysicalParts=min(len(alpha_List),len(phi_List))
NoiseParts=0 # Adjust
n = PhysicalParts+NoiseParts
# f_syn: samples used by Prony's methhod
_2n_1=(2*n-1)+1
omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)]
#omega
# Defining the function generating a time series variable
# based on the given alpha s, phi s, and the time denoted as the small omega
def syn_exp(amp,freq,x):
t=len(amp)
y=[]
for omg in range(len(x)):
yi=0
for ap in range(len(amp)):
yi+=amp[ap]*cmath.exp(freq[ap]*x[omg])
y.append(yi)
return y
# Defininig Synch
f_syn = syn_exp(alpha, phi, omega)
#f_syn
f = f_syn
# plot the real part of the signal
def plot_signal(amp,freq,x,Start,End,Title):
Mrend = 2*math.pi;
Mint = math.pi/2;
t=np.linspace(0, Mrend, 10000)
M=len(t)
x= [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(len(t))]
F=syn_exp(amp,freq,x)
F_real=[F[k].real for k in range(len(x))]
plt.plot(F_real[Start:End])
plt.title(Title)
plt.xlabel("'pi/2'_______________'pi'_______________'3pi/2'_______________'2pi'")
plt.ylabel('Signal: real part')
plt.show
# Plotting with defined alpha, phi, and omega (steps)
# plot_signal(alpha,phi,omega,0,-1,"Title") # Uncomment to show plot!
# Creating Hankel Matrix
def mat_ge(samples):
n_samples = len(samples)
d = int(n_samples/2)
H0=hankel(samples[0:-1])[0:d,0:d]
H1=hankel(samples[1:])[0:d,0:d]
return H0, H1
H0, H1 =mat_ge(f)
H=(H0,H1)
for h in range(len(H)):
x=H[h]
panda_df = pd.DataFrame(data = x)
display(panda_df)
# Defnining the function plotting Singular Value Decomposition (SDV)
# This apprximate the number of the physical/true signals i.e. the number of true alpha s and phi s
def plot_SVD(A):
SDV=LA.svd(A, full_matrices=True, compute_uv=True, hermitian=False)[1]
Ln_SDV=[math.log(SDV[k]) for k in range(len(SDV)) ]
plt.plot(Ln_SDV)
plt.title('Singular Values')
plt.xlabel('')
plt.ylabel('Natural Log')
plt.show
# plot_SVD(H0) # Uncomment to show plot!
# Finding the Eigenvalue
### But the order is different from MATLAB version!
E= scipy.linalg.eig(H0, H1)[0]
#E=np.sort(E)
E
# Showing Verndermonde Matrix
V=np.transpose(np.vander(E, N=None, increasing=True))
panda_df = pd.DataFrame(data = V)
display(panda_df)
# Now, recovering alpha and check these alpha s generated by the MPM
# are the same value the ones generated by the last TLS
# compute the amplitudes (coefficients)
A=LA.solve(V.real, f[:n]) # Only pick the real part of V matrix!
alpha_rec=A.real
alpha_rec
alpha
# Now, recovering phi. Their value are different from those generated by the TLS.
# But, the values of their exponent are supposed to be almost identical to
# the exponents of those generated by the last TLS.
# Remember that the complex exponents are the cycle
# e.g., exp((2*pi/M)*(M+3) = exp(2*pi/M)*3
# Therefore phi = (M+3)*i and phi = 3*i produce the same answer for exp(phi)
# frequencies and damping factors
phi_rec = [cmath.log(E[k])*M/(2*math.pi) for k in range(len(E))]
phi_rec
# The complex part exactly matches with the MATLAB result!
phi=phi_rec.copy()
phi
# The trend model of EUR/USD for the whole time series
plot_signal(alpha,phi,omega,0,-1, "The trend model of EUR/USD for the whole 2π time series") # Uncomment to show plot!
# The trend model of EUR/USD for the last one year
# plot_signal(alpha,phi,omega,int(10000/d*365),-1, "The trend model of EUR/USD for the last one year") # Uncomment to show plot!