Monday, March 04, 2024

Python: Combination of Singular Spectrum Analysis (SSA) and Fast Fourier Transform (FFT)

 

 

This is my non-linear time series analysis combining Singular Spectrum Analysis (SSA) and Fast Fourier Transform (FFT). 

The dataset is the foreign exchange (FX) rate of Euro (EUR) based on US-Dollar (USD) obtained from Frankfurter App provided by the European Central Bank (ECB).

Implementing FFT for each reconstruction component (RC) of SSA gives us much more precise insights of the periodicity of the time series fluctuation. 

# Execute this cell to have access to the API of Frankfurter App
import requests
import json

# importing necessary tools
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy import stats
import statsmodels.formula.api as sm
import math
import cmath
from scipy.linalg import hankel

import scipy.odr
from scipy import odr

# For using Frankfurter app
import ast

# For graphing
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import seaborn as sns

# Fast Fourier Transform (FFT) https://docs.scipy.org/doc/scipy/tutorial/fft.html
from scipy.fft import fft, ifft, fftfreq

# Showing the plots bigger
plt.rcParams["figure.figsize"] = (20,10)
# defining imaginary number
i=complex(0,1)
i


## Downloading the time series data from Frankfurter App
# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)

print("Type:", type(resp))

# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))

#mydata #show the accessed data
EURUSD=(mydata["rates"])
#EURUSD

#mydata #show the accessed data
EURUSD=(mydata["rates"])
#EURUSD

# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]

for key, value in EURUSD.items():
    Dates_EURUSD.append(key)
    for ky,vl in value.items():
        Rates_EURUSD.append(vl)

EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
print('First week:',list(EURUSD.keys())[0],',  Last week:',list(EURUSD.keys())[-1])

#Rates_EURUSD

### Graph output (1) ###

NW=6 # Number of windows to slice this time series dataset
LI=math.floor(len(EURUSD)/NW) # The length of interval

fig, axs = plt.subplots(NW)
for k in range(NW):
    axs[k].plot(Dates_EURUSD[k*LI:(k+1)*LI], Rates_EURUSD[k*LI:(k+1)*LI])

print("Length of data:",len(Rates_EURUSD))


## Singular Spectrum Analysis (SSA)
print("The following refers to https://stats.stackexchange.com/questions/544105/singular-spectrum-analysis-and-their-eigentriplets")
print('This SSA method is almost identical to the one based on MATLAB "Singular Spectrum Analysis - Beginners guide" (MathWorks)')
print(' https://uk.mathworks.com/matlabcentral/fileexchange/58967-singular-spectrum-analysis-beginners-guide which I attempted to duplicate with Python. ')

# Re-defining the time series and its length
s=Rates_EURUSD;
N=len(Rates_EURUSD)

# Getting window length L and lagged length K
#L = 500-1
L = 100
K = N - L + 1

# Constructing the time lagged Hankel matrix
X=np.zeros((K,L))
for m in range (0, L):
    X[:,m] = s[m:K+m]
   

# Trajectory matrix
Cemb = np.dot(X.T,X)/K

# Eigen decomposition
eigenValues, eigenVectors = np.linalg.eig(Cemb)
idx = eigenValues.argsort()[::-1]  
eigenValues = eigenValues[idx]
eigenVectors = eigenVectors[:,idx]

# Vectors of Principal Components
PC = np.dot(X,eigenVectors)

# Pre-allocating Reconstructed Component Matrix
RC = np.zeros((N, L))
# Reconstruct the elementary matrices without storing them
for k in range(L):
    myBuf = np.outer(PC[:,k], eigenVectors[:,k].T)
    myBuf = myBuf[::-1]
    RC[:,k] = [myBuf.diagonal(j).mean()\
               for j in range(-myBuf.shape[0]+1, myBuf.shape[1])]
       
### Graph output (2) ###

# First 6 RC
fig, ax = plt.subplots(3,2)
ax = ax.flatten()
for k in range (0, 6):
    ax[k].plot(RC[:,k])
    ax[k].set_title(str(i))
plt.tight_layout()

### Graph output (3) ###

# Plotting a graph comparison
RawSSA=RC[:,0]
for k in range(L-1):
    RawSSA=np.add(RawSSA,RC[:,k+1])
SCs=10 # Smoothed cycle components
SmoothedSSA=RC[:,0]
for k in range(SCs-1):
    SmoothedSSA=np.add(SmoothedSSA,RC[:,k+1])
plt.subplot(2,1,1)
plt.title("Original vs. Reconstructed time series")
plt.plot(s[:])
plt.plot(RawSSA)
plt.subplot(2,1,2)
plt.title("Reconstructed raw vs. smoothed time series")
plt.plot(RawSSA)
plt.plot(SmoothedSSA)


# sample spacing
Days=N*7 # Number of days
Months=N*7//30 # Number of months
Years=N*7//360 # Number of years
T=1/Months # Defining the sample spacing step-size

yf = fft(s)
xf = fftfreq(N, T) #[:N//2]


# FFT for each RC
RC_fft=[]
RC_fft_power=[]
for l in range(L):
    RC_fft.append(fft(RC[:,l])) #[0:N//2])
    RC_fft_power.append(np.abs(RC_fft[l]))

# Denoise FFT of RC
dn_RC_fft=RC_fft.copy()
dn_RC_fft_power=RC_fft_power.copy()
for j in range(len(RC_fft)):
    for k in range(len(RC_fft[0])):
        DenoisingThreshhold=(sum(RC_fft_power[j])/len(RC_fft_power[j]))
        #DenoisingThreshhold=(max(RC_fft_power[j]))*0.90
        if RC_fft_power[j][k]<DenoisingThreshhold:
            dn_RC_fft_power[j][k]=0
            dn_RC_fft[j][k]=0
        else:
            continue

# Defining the cycle length
CycLen=(xf[0:N//2]) # Cycle length
#CycLen=CycLen[::-1]
CycLen

### Graph output (4) ###

# Showing some examples of the cycle lengths of the frequencies

fig, axs = plt.subplots(7)
for k in range(1,8):
    axs[k-1].stem(CycLen,dn_RC_fft_power[-k*10][N//2-1:-1]) #[0:N//2])


# Summing all the frequencies of RCs for the inverse FFT
tp_dn_RC_fft=list(map(list, zip(*dn_RC_fft)))
tp_dn_RC_fft_power=list(map(list, zip(*dn_RC_fft_power)))

sum_dn_RC_fft=[]
sum_dn_RC_fft_power=[]

for k in range(len(tp_dn_RC_fft)):
    sum_dn_RC_fft.append(sum(tp_dn_RC_fft[k]))
    sum_dn_RC_fft_power.append(sum(tp_dn_RC_fft_power[k]))


# ### Graph output (5) ### Uncomment the following to display

#plt.stem of the denoised power stemplot
plt.stem(CycLen, sum_dn_RC_fft_power[N//2-1:-1])
plt.grid()
plt.show()

### Graph output (6) ### Uncomment the following to display
# Inverse FFT to reconstruct the time series
plt.plot(s)
plt.plot(ifft(sum_dn_RC_fft).real)






No comments:

Post a Comment

Note: only a member of this blog may post a comment.