Thursday, August 31, 2023

Monday, August 21, 2023

Attempt to use Singular Spectrum Analysis (SSA) for Frankfurter App with Python


Singular Spectrum Analysis (SSA) is nowadays a popular analysis method for financial time series. 

SSA is similar to Prony's method (the analogy would describe them as cousins) which can decompose a time series with non-periodic signalling cycles and a trend (Not a waving signal). 

Autoregressive method has a limitation of distinguishing the physical terms and the noise terms of the volatility, and it is not well suited for the future forecast for a non-linear time series.

In contrast,  SSA and Prony method can decompose such time series with a multi-dimensional decomposition process to offer more detailed estimates than the other conventional methods.

The part implementing SSA in the following Python codes refers to MATLAB codes displayed in https://uk.mathworks.com/matlabcentral/fileexchange/58967-singular-spectrum-analysis-beginners-guide.

This is an attempt to implement SSA for Frankfurter App with Python.

The following Python codes will be transferable to analysing the other financial data.

# Execute this cell to have access to the API of Frankfurter App
import requests
import json

# importing necessary tools
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy import stats
import statsmodels.formula.api as sm
import math
from scipy.linalg import hankel

# For plotting with multiple axis with different scales
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA

# Showing the plots bigger
plt.rcParams["figure.figsize"] = (18,9)

# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
# Write your code here
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)

print("Type:", type(resp))

# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))

#mydata #show the accessed data
EURUSD=(mydata["rates"])
#EURUSD

# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]

for key, value in EURUSD.items():
    Dates_EURUSD.append(key)
    for k,v in value.items():
        Rates_EURUSD.append(v)

EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
#EURUSD

#Rates_EURUSD
print("Length of data:",len(Rates_EURUSD))

# Converting the rate to natural logarithm
ln_Rates_EURUSD=np.log(Rates_EURUSD)
ln_Rates_EURUSD
#plt.plot(ln_Rates_EURUSD)   # Uncomment to show plot!

# Creating the log-difference (Daily return) of EUR/USD
d_ln_Rates_EURUSD=np.diff(np.log(Rates_EURUSD))
len(d_ln_Rates_EURUSD)
#plt.plot(d_ln_Rates_EURUSD)   # Uncomment to show plot!

# Obtaining the length which is half of the total length of this variable.
# This is needed for creating the matrix of this variable i.e. n/2 * n/2 matrix
f=ln_Rates_EURUSD.copy()
#f=d_ln_Rates_EURUSD.copy()

# Creating Hankel matrix which is needed to
n = len(f)
d=(int(n/2-n%2*0.5)) # Subtracting the remainder if it is an odd number
d
H=hankel(f[0:-1])[0:d,0:d]

#print(f'\n\n \033[1m Hankel Matrix H \033[0m')
#panda_df = pd.DataFrame(data = H[h])
#display(panda_df)

# Finding the Generalised Eigenvalue of these Hankel matrices H0 and H1
print('\n\n Finding the Generalised Eigenvalue and Eigenvectors of these Hankel matrices H0 and H1 ')
Eig = scipy.linalg.eig(H)
Evalue= Eig[0]
Evectors = Eig[1:]
Evalue_real=Evalue.real
print('\n\n \033[1m Eigenvalue \033[0m')
panda_df = pd.DataFrame(data = Evalue)
display(panda_df)
print('\n\n \033[1m Eigenvectors \033[0m')
display(Eig[1:])

# Defnining the function plotting Singular Value Decomposition (SDV)
# This apprximate the number of the physical/true signals i.e. the number of true alpha s and phi s
def plot_SVD(A):
    SDV=LA.svd(A, full_matrices=True, compute_uv=True, hermitian=False)[1]
    Ln_SDV=[math.log(SDV[k]) for k in range(len(SDV)) ]
    plt.plot(Ln_SDV)
    plt.title('Singular Values')
    plt.xlabel('')
    plt.ylabel('Natural Log')
    plt.show
print('\n\n Singular Value decomposition (SVD) of the Hankel matrix H0')
# Refers to the plot to determine the number of terms (NTERMS) to recover
#plot_SVD(H) # Uncomment to show the SVD plot

# Determining NTERMS
SDV=LA.svd(H, full_matrices=True, compute_uv=True, hermitian=False)[1]
Ln_SDV=[math.log(SDV[k]) for k in range(len(SDV)) ]
NTERMS=0
SlopeSVD=1 # Adjust accoording to the data type and its log(SV) plot
for k in range(len(Ln_SDV)):
    if Ln_SDV[k]>SlopeSVD:
        NTERMS+=1
        #print(NTERMS)
        continue
    else:
        break


# Generating the reconstruction components for NTERMS
RCs = []
for k in range(NTERMS):
    RCtemp=Evectors[0][:,k]
    RCs.append(RCtemp)

# Showing each reconstruction components
print('\n\n \033[1m Reconstruction Components for NTERMS \033[0m')
panda_df = pd.DataFrame(data = RCs)
display(panda_df)
# For plotting with multiple axis with different scales
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
plt.rcParams["figure.figsize"] = (7*NTERMS,30)
gs = gridspec.GridSpec(50,10)
for k in range(NTERMS):
    ax = plt.subplot(gs[k, 0]) # row k, col 0
    plt.plot(RCs[k])

# # Uncomment the entire following lines to show the graph of comparison

# # Summing all the reconstruction components
# Sum=0
# SRCs=[]
# for k in range(len(RCs[0])):
#     for l in range(len(RCs)):
#         Sum=Sum+RCs[l][k]
#     SRCs.append(Sum)
# len(SRCs)
# # Showing the sum of the reconstruction components
# plt.rcParams["figure.figsize"] = (20,10)
# plt.plot(SRCs)
# plt.xlabel("Time")
# plt.ylabel("Estimated price")
# plt.show

# # Comparing the original time series and the reconstructed time series

# # Cutting the length of f
# fcut=[(f[2*k-1]+f[2*k])/2 for k in range(math.floor(len(f)/2))]
# fcut=f

# # Streching the length of RCs
# StrSRCs=[0]*(len(SRCs)*2)
# for k in range(math.floor(len(f)/2)):
#     StrSRCs[2*k]=SRCs[k]
#     StrSRCs[2*k+1]=SRCs[k]
# len(StrSRCs)
# #StrSRCs=SRCs


# ## Plottting graph with multiple y-axis with different labels
# # https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
# host = host_subplot(111, axes_class=AA.Axes)
# plt.subplots_adjust(right=0.75)

# par1 = host.twinx()
# par2 = host.twinx()

# YLabelL="f"
# YLabelR="rcs"
# VariableL=fcut
# VariableR=StrSRCs

# host.set_xlabel("Year")
# host.set_ylabel(YLabelL)
# par1.set_ylabel(YLabelR)

# p1, = host.plot( VariableL, label=YLabelL)
# p2, = par1.plot( VariableR, label=YLabelR)

# host.legend()

# host.axis["left"].label.set_color(p1.get_color())
# par1.axis["right"].label.set_color(p2.get_color())

# plt.draw()
# plt.xticks(rotation='vertical')
# plt.show()


a

Thursday, August 03, 2023

The measurement against financial crisis with Prony's method (Matrix Pencil) --- Python _ _ _ Euphoria is way worse than uncertainty

 


I am now developing a precautionary measure against a potential financial crisis. This analysis of financial data adopts a signal analysis method. There is a research outcome claiming that the magnitude of a low frequency signal becomes higher than usual.

This property is related to Minsky's moment (over-speculation of the credit market as compared to the economic structural capacity). During the market overheat, financial markets become less volatile. In contrast, these markets become highly volatile after a sharp plunge after the over-expansion.

When financial markets suffer from an uncertainty, the volatility becomes higher. This implies that both the magnitude of signals with a high frequency becomes high. The volatile market looks worrying but it often maintains the long term stability (neither overheat nor plunging ). 

The over-speculation with a little volatile market induces a huge swing to the downfall. The short-term fluctuation (volatile market) somehow mitigates the long-term fluctuation. This seems to imply "Euphoria is way worse than uncertainty"!

The following is my Python code decomposing a financial time series following Prony-like Matrix Pencil Method (MPM) decomposition. For this experiment, the weekly foreign exchange rate of Euro based on US dollar (EUR/USD) is used. The MPM is implemented for each short-time window of the time domain (A window represent one year). Afterward, |imag(phi)|, representing the low-high frequency of a signal, is regressed on |real(alpha)|, representing the magnitude of this signal. At the end, the comparison between this correlation coefficients of |imag(phi)| and |real(alpha)| are compared with the exchange rate changes.

# These codes are furthermore developed from
# https://art-blue-liberalism.blogspot.com/2023/07/novel-econometric-method-short-time.html
# It has added the correlation coefficients of |imag(phi)| and |real(alpha)|
# to show that the amplitude of the low frequency signal rises one year before the market plunge

# Execute this cell to have access to the API of Frankfurter App
import requests
import json

# importing necessary tools
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy.optimize import newton
from scipy import stats
import statsmodels.formula.api as sm

import math
import cmath

from scipy.linalg import hankel

import scipy.odr
from scipy import odr

# For plotting with multiple axis with different scales
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA

# Showing the plots bigger
plt.rcParams["figure.figsize"] = (18,9)

# defining imaginary number
i=complex(0,1)

# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)
print("Type:", type(resp))

# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))

EURUSD=(mydata["rates"])
#EURUSD

# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]

for key, value in EURUSD.items():
    Dates_EURUSD.append(key)
    for k,v in value.items():
        Rates_EURUSD.append(v)

EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
#EURUSD

#Rates_EURUSD

# Converting the rate to natural logarithm
ln_Rates_EURUSD=np.log(Rates_EURUSD)
ln_Rates_EURUSD
#plt.plot(ln_Rates_EURUSD)

### Defining the function for plotting a signal processing model ###

# Assuming the sampling rate used for reconstructing the signal processing model
M=100

# f_syn: samples used by Prony's methhod
def OMEGA(n):
    _2n_1=(2*n-1)+1
    omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)]
    return omega
#omega=OMEGA(n)
#omega

# Defining the function generating a time series variable
# based on the given alpha s, phi s, and the time denoted as the small omega
def syn_exp(amp,freq,x):
    t=len(amp)
    y=[]
    for omg in range(len(x)):
        yi=0
        for ap in range(len(amp)):
            yi+=amp[ap]*cmath.exp(freq[ap]*x[omg])
        y.append(yi)
    return y

# plot the real part of the signal
def plot_signal(amp,freq,x,StartDate):
    Mrend = 2*math.pi;
    Mint = math.pi/2;
    t=np.linspace(0, Mrend, 10000)
    M=len(t)
    x= [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(len(t))]
    F=syn_exp(amp,freq,x)
    F_real=[F[k].real for k in range(len(x))]
    plt.plot(F_real,label=StartDate)
    plt.title("Comparison of the signal frequencies of the time domain windows")
    plt.xlabel("'pi/2'_______________'pi'_______________'3pi/2'_______________'2pi'")
    plt.ylabel('Signal: real part')
    plt.legend()
    plt.show
    return F

# Plotting with defined alpha, phi, and omega (steps)
# plot_signal(alpha,phi,omega,0,-1,"Title")

###### Main Short-Time Prony Transform ######

# Defining the time series F
F=ln_Rates_EURUSD
#F=Rates_EURUSD
n=len(F)
N=len(F)
print("Length of data:",n)

# Number of the windows
Interval=52 # Dont' change!!
Cut=0
Windows=math.floor(n/Interval)
print('Number of the windows:',Windows)

# Creating lists for parameters of Prony's methods
DateStart_List=[]
DateEnd_List=[]
alpha_list=[]
phi_list=[]

# Creating lists for the comparison between the correlation coefficients
# of |imag(phi)| and |real(alpha)| and the price level
Year_s=[]
f_anu_avrg_s=[]
correlcoeff_s=[]

for w in range(Windows-1):
#for w in range(1):
    #print('Date starts:',Dates_EURUSD[Cut],'  Date ends:',Dates_EURUSD[Cut+Interval])
    DateStart_List.append(Dates_EURUSD[Cut])
    DateEnd_List.append(Dates_EURUSD[Cut+Interval])
    Year_s.append(Dates_EURUSD[Cut][:4])
   
    # Extracting ln(price) for this year from the entire list
    f=F[Cut:Cut+Interval].copy()
   
    # Calculating anual average of the natural-log the rate
    # for the later comparison
    f_anu_avrg=statistics.mean(f)
    # Recording the annual average rate in the list to use
    # for the comparison later
    f_anu_avrg_s.append(f_anu_avrg)
    #display(f_anu_avrg_s)
   
    Cut=Cut+Interval+(1-math.ceil(((w+1)%5)/1000))
    # subtract the last week data once 5 years to make sure it starts from a beginning of the calendar
    #plt.plot(f)
    n=len(f) # number of exponentials
   
    d=(int(len(f)/2-len(f)%2*0.5)) # Subtracting the remainder if it is an odd number
   
    # Creating Hankel matrix which is needed to
    def mat_ge(samples):
        n_samples = len(samples)
        d = int(n_samples/2)
        H0=hankel(samples[0:-1])[0:d,0:d]
        H1=hankel(samples[1:])[0:d,0:d]
        return H0, H1
    H0, H1 =mat_ge(f)
    H=(H0,H1)
    for h in range(len(H)):
        x=H[h]
        #panda_df = pd.DataFrame(data = x)
        #display(panda_df)
       
    # Finding the Eigenvalue
    E= scipy.linalg.eig(H0, H1)[0]
    #panda_df_E = pd.DataFrame(data = E)
    #display(panda_df_E)
   
    V=np.transpose(np.vander(E, N=None, increasing=True))
    #panda_df_V = pd.DataFrame(data = V)
    #display(panda_df_V)
   
    # Calculating the coefficient alpha s with the matrix pencil method (MPM)
    # Compute the amplitudes (coefficients) by the mldivide V\f[d:]=A
    if len(f[d:])!=d:
        Adj=1
    else:
        Adj=0
   
    ######## Recovering alphas #######
   
    ###### For Jupyter Notebook ######
    #A=LA.solve(V.real, f[d+Adj:]) # Only pick the real part of V matrix! # Length?
    #alpha_rep=A
    #alpha=alpha_rep.tolist()
   
    ###### For google colaboratory ######
    A=LA.solve(V, f[d:])
    #alpha_rep=A.real  
    alpha_rep=A
    alpha=alpha_rep.tolist()
       
   
    # Recovering phi s by transforming the eigenvalues to their natural logarithm
    # phi is the power of the exponential which is the complex number  
    # phi s represent the frequency of this signal
    # frequencies and damping factors
    phi_rep = [(cmath.log(E[t])) for t in range(len(E))] # .imag shows only the imaginary part
    phi=phi_rep
   
    # Removing relatively less significant alphas and phis
   
    # Adjust this number of terms to recover.
    NumTerms=NumTerms # Define number of terms to recover # Adjust here!
   
    for l in range(len(alpha_rep)-NumTerms):
        abs_alpha=[abs(alpha[k]) for k in range(len(alpha))]
        #print(abs_alpha.index(min(abs_alpha)))
        #print(alpha[abs_alpha.index(min(abs_alpha))])
        alpha.pop(alpha.index(alpha[abs_alpha.index(min(abs_alpha))]))
        phi.pop(phi.index(phi[abs_alpha.index(min(abs_alpha))]))
        abs_alpha.pop(abs_alpha.index(min(abs_alpha)))
    #display(alpha)
    #display(phi)
   
    ### The process for generating the correlation coefficients
    ###  between |imag(phi)| and |real(alpha)|
   
    # Imaginary part of phis
    abs_Im_phi=[abs(phi[k].imag) for k in range(len(phi))]
    #display("|Imag(phi)|",abs_Im_phi)
    # Real part of alphas
    abs_Re_alpha=[abs(alpha[k].real) for k in range(len(alpha))]
    #display("|Real(alpha)|",abs_Re_alpha)    
   
    # Calculating the correlation coefficient of between |imag(phi)| and |real(alpha)|
    correlcoeff=np.corrcoef(abs_Im_phi,abs_Re_alpha)[0][1]
    # Add the correlation coefficient of this year to the list
    correlcoeff_s.append(correlcoeff)
    #display(correlcoeff_s)
   
    ### For plotting signals
    ## Showing the plots bigger
    #plt.rcParams["figure.figsize"] = (20,10)
    ## Plotting with defined alpha, phi, and omega (steps)
    #omega=OMEGA(n)
    #plot_signal(alpha,phi,omega,Dates_EURUSD[Cut])
   
    # Add the list
    #alpha_list.append(alpha)
    #phi_list.append(phi)

## Plotting a graph comparing the correlation coefficients
## of |imag(phi)| and |real(alpha)| and the price level

#f_anu_avrg_s
d_f_anu_avrg_s=[0]
for k in range(1,len(f_anu_avrg_s)):
    d_f_anu_avrg_s.append((f_anu_avrg_s[k]-f_anu_avrg_s[k-1]))

#display("ln(Rate_t)-ln(Rate_t-1)","length",len(d_f_anu_avrg_s),d_f_anu_avrg_s)
#display("Correl_phi_alpha","length",len(correlcoeff_s),correlcoeff_s)
#display("Year","length",len(Year_s),Year_s)

## Plottting graph with multiple y-axis with different labels
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
host = host_subplot(111, axes_class=AA.Axes)
plt.subplots_adjust(right=0.75)

par1 = host.twinx()
par2 = host.twinx()

YLabelL="ln(EURUSD(Current Year))-ln(EURUSD(Previous Year))"
YLabelR="Correlation Coefficient of |imag(phi)| and |real(alpha)|"
VariableL=d_f_anu_avrg_s
VariableR=correlcoeff_s

host.set_xlabel("Year")
host.set_ylabel(YLabelL)
par1.set_ylabel(YLabelR)


p1, = host.plot(Year_s, VariableL, label=YLabelL)
p2, = par1.plot(Year_s, VariableR, label=YLabelR)


host.legend()

host.axis["left"].label.set_color(p1.get_color())
par1.axis["right"].label.set_color(p2.get_color())

plt.draw()
plt.xticks(rotation='vertical')
plt.show()