Wednesday, July 12, 2023

Novel econometric method: Short-Time Prony Transform with Python


 

The following data analysis is estimating the daily return (natural log difference) of the foreign exchange rate (FX) of Euro (EUR) based on US-Dollar (USD) from 1997 to 2023.

 


The data analysis introduced here is the brand-new approach to the financial economics which often involves non-linear variables with an exponentially expanding/decaying signal which the conventional econometric methods tend to struggle dealing with.

In terms of the non-linear signal processing like approach, many financial data analysts have adapted Fourier transfer. However, Fourier transfer has a critical disadvantage of dealing with the non-stable signal as shown in these graphs and failing to detect more than one signal frequencies close to each other.

This brand-new method adapts the signal processing method called Prony method invented by Baron Gaspard Clair François Marie Riche de Prony (22 July 1755 – 29 July 1839)

 


De Prony was the mathematician famous at the contemporary time period of when Fourier was alive. Fourier method has been popular for a long time period mainly because of the convenience with a limited computational capability. Nevertheless, the current advancement of computational mathematics has enabled to apply Prony method to the advanced mathematical data analyses.

Prony method is advantageous to detect various signal frequencies with their amplitude (coefficient) even for an unstable signal processing. Through the time length, it can describe various waving formations of a time series. For example, the seasonality and the event risk can be detected by observing these varying trends.

The conventional financial economic models are limited to finding one coefficient of the autocorrelation of either a variable or its error-terms/residuals. Furthermore, they are biased while dealing with a variable where no convergence can be proven. In contrast, this estimate based on Prony method figures out the entire dynamic movement of a variable. 

I have just started studying about this novel econometric method so that I admit that both my understanding and my implementation are still incomplete, and its improvement is quite likely to be required. At the same time, I am confident in having understood the core concept and the fundamentally required mathematical methods. Therefore, I have produced my Python codes implementing this data analysis method. 

Newly add function: Version with Short-Time Windows

The following Python codes decompose the time series of EUR/USD to transform into multiple time domain windows which break down data into years.

It allows to analyse the change in the signal frequencies and their amplitudes. It uses the Matrix Pencil Method (MPM) for finding the eigenvalues deriving the frequencies. 

It is more or less similar to Short-Time Fourier Transform (STFT). However, Fourier method can only depict stable signalling cycles. In contrast, this alternative model allows to describe the aperiodic cycle (expanding or contracting) of signals.

The linear algebra tool of solving the equation to find the amplitudes (alpha s) working in Jupyter Notebook somehow does not work in Google Colaboratory so that there are two sets of codes for each. 

 

# Execute this cell to have access to the API of Frankfurter App
import requests
import json

# importing necessary tools
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy.optimize import newton
from scipy import stats
import statsmodels.formula.api as sm

import math
import cmath

from scipy.linalg import hankel

import scipy.odr
from scipy import odr

# Showing the plots bigger
plt.rcParams["figure.figsize"] = (20,10)

# defining imaginary number
i=complex(0,1)

# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)
print("Type:", type(resp))

# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))

EURUSD=(mydata["rates"])
#EURUSD

# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]

for key, value in EURUSD.items():
    Dates_EURUSD.append(key)
    for k,v in value.items():
        Rates_EURUSD.append(v)

EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
#EURUSD

#Rates_EURUSD

# Converting the rate to natural logarithm
ln_Rates_EURUSD=np.log(Rates_EURUSD)
ln_Rates_EURUSD
#plt.plot(ln_Rates_EURUSD)



### Defining the function for plotting a signal processing model ###

# Assuming the sampling rate used for reconstructing the signal processing model
M=100

# f_syn: samples used by Prony's methhod
def OMEGA(n):
    _2n_1=(2*n-1)+1
    omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)]
    return omega
#omega=OMEGA(n)
#omega

# Defining the function generating a time series variable
# based on the given alpha s, phi s, and the time denoted as the small omega
def syn_exp(amp,freq,x):
    t=len(amp)
    y=[]
    for omg in range(len(x)):
        yi=0
        for ap in range(len(amp)):
            yi+=amp[ap]*cmath.exp(freq[ap]*x[omg])
        y.append(yi)
    return y

# plot the real part of the signal
def plot_signal(amp,freq,x,StartDate):
    Mrend = 2*math.pi;
    Mint = math.pi/2;
    t=np.linspace(0, Mrend, 10000)
    M=len(t)
    x= [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(len(t))]
    F=syn_exp(amp,freq,x)
    F_real=[F[k].real for k in range(len(x))]
    plt.plot(F_real,label=StartDate)
    plt.title("Comparison of the signal frequencies of the time domain windows")
    plt.xlabel("'pi/2'_______________'pi'_______________'3pi/2'_______________'2pi'")
    plt.ylabel('Signal: real part')
    plt.legend()
    plt.show
    return F

# Plotting with defined alpha, phi, and omega (steps)
# plot_signal(alpha,phi,omega,0,-1,"Title")


###### Main Short-Time Prony Transform ######

# Defining the time series F
#F=ln_Rates_EURUSD
F=Rates_EURUSD
n=len(F)
N=len(F)
print("Length of data:",n)

# Number of the windows
Interval=52 # Dont' change!!
Cut=0
Windows=math.floor(n/Interval)
print('Number of the windows:',Windows)

DateStart_List=[]
DateEnd_List=[]
alpha_list=[]
phi_list=[]

for w in range(Windows-1):
#for w in range(1):
    #print('Date starts:',Dates_EURUSD[Cut],'  Date ends:',Dates_EURUSD[Cut+Interval])
    DateStart_List.append(Dates_EURUSD[Cut])
    DateEnd_List.append(Dates_EURUSD[Cut+Interval])

    f=F[Cut:Cut+Interval].copy()
    Cut=Cut+Interval+(1-math.ceil(((w+1)%5)/1000))
    # subtract the last week data once 5 years to make sure it starts from a beginning of the calendar
    #plt.plot(f)
    n=len(f) # number of exponentials

    d=(int(len(f)/2-len(f)%2*0.5)) # Subtracting the remainder if it is an odd number

    # Creating Hankel matrix which is needed to
    def mat_ge(samples):
        n_samples = len(samples)
        d = int(n_samples/2)
        H0=hankel(samples[0:-1])[0:d,0:d]
        H1=hankel(samples[1:])[0:d,0:d]
        return H0, H1
    H0, H1 =mat_ge(f)
    H=(H0,H1)
    for h in range(len(H)):
        x=H[h]
        #panda_df = pd.DataFrame(data = x)
        #display(panda_df)

    # Finding the Eigenvalue
    E= scipy.linalg.eig(H0, H1)[0]
    #panda_df_E = pd.DataFrame(data = E)
    #display(panda_df_E)

    V=np.transpose(np.vander(E, N=None, increasing=True))
    #panda_df_V = pd.DataFrame(data = V)
    #display(panda_df_V)

    # Calculating the coefficient alpha s with the matrix pencil method (MPM)
    # Compute the amplitudes (coefficients) by the mldivide V\f[d:]=A
    if len(f[d:])!=d:
        Adj=1
    else:
        Adj=0

    ######## Recovering alphas #######

    ###### For Jupyter Notebook ######
    #A=LA.solve(V.real, f[d+Adj:]) # Only pick the real part of V matrix! # Length?
    #alpha_rep=A
    #alpha=alpha_rep.tolist()

    ###### For google colaboratory ######
    A=LA.solve(V, f[d:])
    alpha_rep=A.real
    alpha_rep=A
    alpha=alpha_rep.tolist()


    # Recovering phi s by transforming the eigenvalues to their natural logarithm
    # phi is the power of the exponential which is the complex number
    # phi s represent the frequency of this signal
    # frequencies and damping factors
    phi_rep = [(cmath.log(E[t])) for t in range(len(E))] # .imag shows only the imaginary part
    phi=phi_rep

    # Removing relatively less significant alphas and phis

    # Adjust this number of terms to recover.
    NumTerms=9 # Define number of terms to recover # Adjust here!

    for l in range(len(alpha_rep)-NumTerms):
        abs_alpha=[abs(alpha[k]) for k in range(len(alpha))]
        #print(abs_alpha.index(min(abs_alpha)))
        #print(alpha[abs_alpha.index(min(abs_alpha))])
        alpha.pop(alpha.index(alpha[abs_alpha.index(min(abs_alpha))]))
        phi.pop(phi.index(phi[abs_alpha.index(min(abs_alpha))]))
        abs_alpha.pop(abs_alpha.index(min(abs_alpha)))
    #display(alpha)
    #display(phi)

    ## Showing the plots bigger
    #plt.rcParams["figure.figsize"] = (20,10)
    ## Plotting with defined alpha, phi, and omega (steps)
    ##omega=OMEGA(n)
    ##print(n, len(omega))
    #plot_signal(alpha,phi,omega,Dates_EURUSD[Cut])

    # Add the list
    alpha_list.append(alpha)
    phi_list.append(phi)


###### Plotting each window of the Short-Time Prony Transform ######
YearInterval=4
NumInt=math.ceil(len(alpha_list)/YearInterval)

plt.rcParams["figure.figsize"] = (NumInt*20,NumInt*10)
gs = gridspec.GridSpec(10, 10)
Cnt=0
for k in range(NumInt):
    ax = plt.subplot(gs[k, 0]) # row k, col 0
    for l in range(min(YearInterval,len(alpha_list)-YearInterval*k)):
        # Plotting with defined alpha, phi, and omega (steps)
        #omega=OMEGA(n)
        omega=OMEGA(N)
        plot_signal(alpha_list[Cnt],phi_list[Cnt],omega,DateStart_List[Cnt])
        Cnt=Cnt+1



 


Friday, July 07, 2023

Comparing the clusters with the different sampling rate following Shannon-Nyquist theorem

 


This Python programming tests how important to set the sampling rate denoted by M.

Shannon-Nyquist requirements claims that the sampling rate M has to be exact: either too small or too big. 

# importing necessary tools
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import numpy as np
import pandas as pd
import random
import math
import cmath

# Defining the imaginary number
i=complex(0,1)

###### Drawing the full complex field circle with a different step size ######

# Definining the sampling rate
M=10

# Generating the list of the imaginary numbers
ImagList=[]
for j in range(1,M+1):
    ImagList.append((j/M)*i)

# Generating the exponentials of the imaginary numbers in ImagList
ExpList=[cmath.exp(2*math.pi*ImagList[j]) for j in range(len(ImagList))]

# Separating the exponentials into the real part and the imaginary part
ExpListR=[ExpList[j].real for j in range(len(ExpList))]
ExpListI=[ExpList[j].imag for j in range(len(ExpList))]

# # # Uncomment the following lines to show the circle graph
# # Plotting graph for a complex field circle
# plt.rcParams["figure.figsize"] = (9,9)
# plt.plot(ExpListR,ExpListI,'ro')
# plt.xlabel('Real field')
# plt.ylabel('Imaginary field')
# plt.show


###### ###### Comparison of the sampling rates ###### ######
###### ###### to test Shannon-Nyquist theorem  ###### ######

FreqSamples=[3*i,4*i,13*i]
SampleRateMs=[10,100,1000]
ColorList=['b','c','m']

Freq=[]
for k in range(len(SampleRateMs)):
    TempFreq=[FreqSamples[j]/SampleRateMs[k] for j in range(len(FreqSamples))]
    Freq.append(TempFreq)

ExpFreq=[]
for k in range(len(Freq)):
    TempExpFreq=[cmath.exp(2*math.pi*Freq[k][j]) for j in range(len(Freq[k]))]
    ExpFreq.append(TempExpFreq)

ExpFreqR=[]
ExpFreqI=[]
for k in range(len(ExpFreq)):
    BenchR=[1,0,-1,0]
    TempExpFreqR0=[ExpFreq[k][j].real for j in range(len(ExpFreq[k]))]
    TempExpFreqR1=BenchR+TempExpFreqR0
    ExpFreqR.append(TempExpFreqR1)
    BenchI=[0,1,0,-1]
    TempExpFreqI0=[ExpFreq[k][j].imag for j in range(len(ExpFreq[k]))]
    TempExpFreqI1=BenchI+TempExpFreqI0
    ExpFreqI.append(TempExpFreqI1)

panda_df = pd.DataFrame(data = ExpFreq)
display(panda_df)

# Compare plots with the different sampling rates
# Ref. https://stackoverflow.com/questions/37360568/python-organisation-of-3-subplots-with-matplotlib
plt.rcParams["figure.figsize"] = (20,20)
gs = gridspec.GridSpec(3, 3)
for k in range(len(ExpFreq)):
    ax = plt.subplot(gs[0, k]) # row 0, col 1
    plt.plot(ExpFreqR[k],ExpFreqI[k],'ro', color=ColorList[k])
    plt.title(f'Sampling Rate M= {SampleRateMs[k]}')
    plt.xlabel('Real field')
    plt.ylabel('Imaginary field')