Monday, August 21, 2023

Attempt to use Singular Spectrum Analysis (SSA) for Frankfurter App with Python


Singular Spectrum Analysis (SSA) is nowadays a popular analysis method for financial time series. 

SSA is similar to Prony's method (the analogy would describe them as cousins) which can decompose a time series with non-periodic signalling cycles and a trend (Not a waving signal). 

Autoregressive method has a limitation of distinguishing the physical terms and the noise terms of the volatility, and it is not well suited for the future forecast for a non-linear time series.

In contrast,  SSA and Prony method can decompose such time series with a multi-dimensional decomposition process to offer more detailed estimates than the other conventional methods.

The part implementing SSA in the following Python codes refers to MATLAB codes displayed in https://uk.mathworks.com/matlabcentral/fileexchange/58967-singular-spectrum-analysis-beginners-guide.

This is an attempt to implement SSA for Frankfurter App with Python.

The following Python codes will be transferable to analysing the other financial data.

# Execute this cell to have access to the API of Frankfurter App
import requests
import json

# importing necessary tools
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy import stats
import statsmodels.formula.api as sm
import math
from scipy.linalg import hankel

# For plotting with multiple axis with different scales
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA

# Showing the plots bigger
plt.rcParams["figure.figsize"] = (18,9)

# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
# Write your code here
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)

print("Type:", type(resp))

# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))

#mydata #show the accessed data
EURUSD=(mydata["rates"])
#EURUSD

# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]

for key, value in EURUSD.items():
    Dates_EURUSD.append(key)
    for k,v in value.items():
        Rates_EURUSD.append(v)

EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
#EURUSD

#Rates_EURUSD
print("Length of data:",len(Rates_EURUSD))

# Converting the rate to natural logarithm
ln_Rates_EURUSD=np.log(Rates_EURUSD)
ln_Rates_EURUSD
#plt.plot(ln_Rates_EURUSD)   # Uncomment to show plot!

# Creating the log-difference (Daily return) of EUR/USD
d_ln_Rates_EURUSD=np.diff(np.log(Rates_EURUSD))
len(d_ln_Rates_EURUSD)
#plt.plot(d_ln_Rates_EURUSD)   # Uncomment to show plot!

# Obtaining the length which is half of the total length of this variable.
# This is needed for creating the matrix of this variable i.e. n/2 * n/2 matrix
f=ln_Rates_EURUSD.copy()
#f=d_ln_Rates_EURUSD.copy()

# Creating Hankel matrix which is needed to
n = len(f)
d=(int(n/2-n%2*0.5)) # Subtracting the remainder if it is an odd number
d
H=hankel(f[0:-1])[0:d,0:d]

#print(f'\n\n \033[1m Hankel Matrix H \033[0m')
#panda_df = pd.DataFrame(data = H[h])
#display(panda_df)

# Finding the Generalised Eigenvalue of these Hankel matrices H0 and H1
print('\n\n Finding the Generalised Eigenvalue and Eigenvectors of these Hankel matrices H0 and H1 ')
Eig = scipy.linalg.eig(H)
Evalue= Eig[0]
Evectors = Eig[1:]
Evalue_real=Evalue.real
print('\n\n \033[1m Eigenvalue \033[0m')
panda_df = pd.DataFrame(data = Evalue)
display(panda_df)
print('\n\n \033[1m Eigenvectors \033[0m')
display(Eig[1:])

# Defnining the function plotting Singular Value Decomposition (SDV)
# This apprximate the number of the physical/true signals i.e. the number of true alpha s and phi s
def plot_SVD(A):
    SDV=LA.svd(A, full_matrices=True, compute_uv=True, hermitian=False)[1]
    Ln_SDV=[math.log(SDV[k]) for k in range(len(SDV)) ]
    plt.plot(Ln_SDV)
    plt.title('Singular Values')
    plt.xlabel('')
    plt.ylabel('Natural Log')
    plt.show
print('\n\n Singular Value decomposition (SVD) of the Hankel matrix H0')
# Refers to the plot to determine the number of terms (NTERMS) to recover
#plot_SVD(H) # Uncomment to show the SVD plot

# Determining NTERMS
SDV=LA.svd(H, full_matrices=True, compute_uv=True, hermitian=False)[1]
Ln_SDV=[math.log(SDV[k]) for k in range(len(SDV)) ]
NTERMS=0
SlopeSVD=1 # Adjust accoording to the data type and its log(SV) plot
for k in range(len(Ln_SDV)):
    if Ln_SDV[k]>SlopeSVD:
        NTERMS+=1
        #print(NTERMS)
        continue
    else:
        break


# Generating the reconstruction components for NTERMS
RCs = []
for k in range(NTERMS):
    RCtemp=Evectors[0][:,k]
    RCs.append(RCtemp)

# Showing each reconstruction components
print('\n\n \033[1m Reconstruction Components for NTERMS \033[0m')
panda_df = pd.DataFrame(data = RCs)
display(panda_df)
# For plotting with multiple axis with different scales
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
plt.rcParams["figure.figsize"] = (7*NTERMS,30)
gs = gridspec.GridSpec(50,10)
for k in range(NTERMS):
    ax = plt.subplot(gs[k, 0]) # row k, col 0
    plt.plot(RCs[k])

# # Uncomment the entire following lines to show the graph of comparison

# # Summing all the reconstruction components
# Sum=0
# SRCs=[]
# for k in range(len(RCs[0])):
#     for l in range(len(RCs)):
#         Sum=Sum+RCs[l][k]
#     SRCs.append(Sum)
# len(SRCs)
# # Showing the sum of the reconstruction components
# plt.rcParams["figure.figsize"] = (20,10)
# plt.plot(SRCs)
# plt.xlabel("Time")
# plt.ylabel("Estimated price")
# plt.show

# # Comparing the original time series and the reconstructed time series

# # Cutting the length of f
# fcut=[(f[2*k-1]+f[2*k])/2 for k in range(math.floor(len(f)/2))]
# fcut=f

# # Streching the length of RCs
# StrSRCs=[0]*(len(SRCs)*2)
# for k in range(math.floor(len(f)/2)):
#     StrSRCs[2*k]=SRCs[k]
#     StrSRCs[2*k+1]=SRCs[k]
# len(StrSRCs)
# #StrSRCs=SRCs


# ## Plottting graph with multiple y-axis with different labels
# # https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
# host = host_subplot(111, axes_class=AA.Axes)
# plt.subplots_adjust(right=0.75)

# par1 = host.twinx()
# par2 = host.twinx()

# YLabelL="f"
# YLabelR="rcs"
# VariableL=fcut
# VariableR=StrSRCs

# host.set_xlabel("Year")
# host.set_ylabel(YLabelL)
# par1.set_ylabel(YLabelR)

# p1, = host.plot( VariableL, label=YLabelL)
# p2, = par1.plot( VariableR, label=YLabelR)

# host.legend()

# host.axis["left"].label.set_color(p1.get_color())
# par1.axis["right"].label.set_color(p2.get_color())

# plt.draw()
# plt.xticks(rotation='vertical')
# plt.show()


a