I am now developing a precautionary measure against a potential financial crisis. This analysis of financial data adopts a signal analysis method. There is a research outcome claiming that the magnitude of a low frequency signal becomes higher than usual.
This property is related to Minsky's moment (over-speculation of the credit market as compared to the economic structural capacity). During the market overheat, financial markets become less volatile. In contrast, these markets become highly volatile after a sharp plunge after the over-expansion.
When financial markets suffer from an uncertainty, the volatility becomes higher. This implies that both the magnitude of signals with a high frequency becomes high. The volatile market looks worrying but it often maintains the long term stability (neither overheat nor plunging ).
The over-speculation with a little volatile market induces a huge swing to the downfall. The short-term fluctuation (volatile market) somehow mitigates the long-term fluctuation. This seems to imply "Euphoria is way worse than uncertainty"!
The following is my Python code decomposing a financial time series following Prony-like Matrix Pencil Method (MPM) decomposition. For this experiment, the weekly foreign exchange rate of Euro based on US dollar (EUR/USD) is used. The MPM is implemented for each short-time window of the time domain (A window represent one year). Afterward, |imag(phi)|, representing the low-high frequency of a signal, is regressed on |real(alpha)|, representing the magnitude of this signal. At the end, the comparison between this correlation coefficients of |imag(phi)| and |real(alpha)| are compared with the exchange rate changes.
# These codes are furthermore developed from
# https://art-blue-liberalism.blogspot.com/2023/07/novel-econometric-method-short-time.html
# It has added the correlation coefficients of |imag(phi)| and |real(alpha)|
# to show that the amplitude of the low frequency signal rises one year before the market plunge
# Execute this cell to have access to the API of Frankfurter App
import requests
import json
# importing necessary tools
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy.optimize import newton
from scipy import stats
import statsmodels.formula.api as sm
import math
import cmath
from scipy.linalg import hankel
import scipy.odr
from scipy import odr
# For plotting with multiple axis with different scales
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA
# Showing the plots bigger
plt.rcParams["figure.figsize"] = (18,9)
# defining imaginary number
i=complex(0,1)
# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)
print("Type:", type(resp))
# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))
EURUSD=(mydata["rates"])
#EURUSD
# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]
for key, value in EURUSD.items():
Dates_EURUSD.append(key)
for k,v in value.items():
Rates_EURUSD.append(v)
EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
#EURUSD
#Rates_EURUSD
# Converting the rate to natural logarithm
ln_Rates_EURUSD=np.log(Rates_EURUSD)
ln_Rates_EURUSD
#plt.plot(ln_Rates_EURUSD)
### Defining the function for plotting a signal processing model ###
# Assuming the sampling rate used for reconstructing the signal processing model
M=100
# f_syn: samples used by Prony's methhod
def OMEGA(n):
_2n_1=(2*n-1)+1
omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)]
return omega
#omega=OMEGA(n)
#omega
# Defining the function generating a time series variable
# based on the given alpha s, phi s, and the time denoted as the small omega
def syn_exp(amp,freq,x):
t=len(amp)
y=[]
for omg in range(len(x)):
yi=0
for ap in range(len(amp)):
yi+=amp[ap]*cmath.exp(freq[ap]*x[omg])
y.append(yi)
return y
# plot the real part of the signal
def plot_signal(amp,freq,x,StartDate):
Mrend = 2*math.pi;
Mint = math.pi/2;
t=np.linspace(0, Mrend, 10000)
M=len(t)
x= [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(len(t))]
F=syn_exp(amp,freq,x)
F_real=[F[k].real for k in range(len(x))]
plt.plot(F_real,label=StartDate)
plt.title("Comparison of the signal frequencies of the time domain windows")
plt.xlabel("'pi/2'_______________'pi'_______________'3pi/2'_______________'2pi'")
plt.ylabel('Signal: real part')
plt.legend()
plt.show
return F
# Plotting with defined alpha, phi, and omega (steps)
# plot_signal(alpha,phi,omega,0,-1,"Title")
###### Main Short-Time Prony Transform ######
# Defining the time series F
F=ln_Rates_EURUSD
#F=Rates_EURUSD
n=len(F)
N=len(F)
print("Length of data:",n)
# Number of the windows
Interval=52 # Dont' change!!
Cut=0
Windows=math.floor(n/Interval)
print('Number of the windows:',Windows)
# Creating lists for parameters of Prony's methods
DateStart_List=[]
DateEnd_List=[]
alpha_list=[]
phi_list=[]
# Creating lists for the comparison between the correlation coefficients
# of |imag(phi)| and |real(alpha)| and the price level
Year_s=[]
f_anu_avrg_s=[]
correlcoeff_s=[]
for w in range(Windows-1):
#for w in range(1):
#print('Date starts:',Dates_EURUSD[Cut],' Date ends:',Dates_EURUSD[Cut+Interval])
DateStart_List.append(Dates_EURUSD[Cut])
DateEnd_List.append(Dates_EURUSD[Cut+Interval])
Year_s.append(Dates_EURUSD[Cut][:4])
# Extracting ln(price) for this year from the entire list
f=F[Cut:Cut+Interval].copy()
# Calculating anual average of the natural-log the rate
# for the later comparison
f_anu_avrg=statistics.mean(f)
# Recording the annual average rate in the list to use
# for the comparison later
f_anu_avrg_s.append(f_anu_avrg)
#display(f_anu_avrg_s)
Cut=Cut+Interval+(1-math.ceil(((w+1)%5)/1000))
# subtract the last week data once 5 years to make sure it starts from a beginning of the calendar
#plt.plot(f)
n=len(f) # number of exponentials
d=(int(len(f)/2-len(f)%2*0.5)) # Subtracting the remainder if it is an odd number
# Creating Hankel matrix which is needed to
def mat_ge(samples):
n_samples = len(samples)
d = int(n_samples/2)
H0=hankel(samples[0:-1])[0:d,0:d]
H1=hankel(samples[1:])[0:d,0:d]
return H0, H1
H0, H1 =mat_ge(f)
H=(H0,H1)
for h in range(len(H)):
x=H[h]
#panda_df = pd.DataFrame(data = x)
#display(panda_df)
# Finding the Eigenvalue
E= scipy.linalg.eig(H0, H1)[0]
#panda_df_E = pd.DataFrame(data = E)
#display(panda_df_E)
V=np.transpose(np.vander(E, N=None, increasing=True))
#panda_df_V = pd.DataFrame(data = V)
#display(panda_df_V)
# Calculating the coefficient alpha s with the matrix pencil method (MPM)
# Compute the amplitudes (coefficients) by the mldivide V\f[d:]=A
if len(f[d:])!=d:
Adj=1
else:
Adj=0
######## Recovering alphas #######
###### For Jupyter Notebook ######
#A=LA.solve(V.real, f[d+Adj:]) # Only pick the real part of V matrix! # Length?
#alpha_rep=A
#alpha=alpha_rep.tolist()
###### For google colaboratory ######
A=LA.solve(V, f[d:])
#alpha_rep=A.real
alpha_rep=A
alpha=alpha_rep.tolist()
# Recovering phi s by transforming the eigenvalues to their natural logarithm
# phi is the power of the exponential which is the complex number
# phi s represent the frequency of this signal
# frequencies and damping factors
phi_rep = [(cmath.log(E[t])) for t in range(len(E))] # .imag shows only the imaginary part
phi=phi_rep
# Removing relatively less significant alphas and phis
# Adjust this number of terms to recover.
NumTerms=NumTerms # Define number of terms to recover # Adjust here!
for l in range(len(alpha_rep)-NumTerms):
abs_alpha=[abs(alpha[k]) for k in range(len(alpha))]
#print(abs_alpha.index(min(abs_alpha)))
#print(alpha[abs_alpha.index(min(abs_alpha))])
alpha.pop(alpha.index(alpha[abs_alpha.index(min(abs_alpha))]))
phi.pop(phi.index(phi[abs_alpha.index(min(abs_alpha))]))
abs_alpha.pop(abs_alpha.index(min(abs_alpha)))
#display(alpha)
#display(phi)
### The process for generating the correlation coefficients
### between |imag(phi)| and |real(alpha)|
# Imaginary part of phis
abs_Im_phi=[abs(phi[k].imag) for k in range(len(phi))]
#display("|Imag(phi)|",abs_Im_phi)
# Real part of alphas
abs_Re_alpha=[abs(alpha[k].real) for k in range(len(alpha))]
#display("|Real(alpha)|",abs_Re_alpha)
# Calculating the correlation coefficient of between |imag(phi)| and |real(alpha)|
correlcoeff=np.corrcoef(abs_Im_phi,abs_Re_alpha)[0][1]
# Add the correlation coefficient of this year to the list
correlcoeff_s.append(correlcoeff)
#display(correlcoeff_s)
### For plotting signals
## Showing the plots bigger
#plt.rcParams["figure.figsize"] = (20,10)
## Plotting with defined alpha, phi, and omega (steps)
#omega=OMEGA(n)
#plot_signal(alpha,phi,omega,Dates_EURUSD[Cut])
# Add the list
#alpha_list.append(alpha)
#phi_list.append(phi)
## Plotting a graph comparing the correlation coefficients
## of |imag(phi)| and |real(alpha)| and the price level
#f_anu_avrg_s
d_f_anu_avrg_s=[0]
for k in range(1,len(f_anu_avrg_s)):
d_f_anu_avrg_s.append((f_anu_avrg_s[k]-f_anu_avrg_s[k-1]))
#display("ln(Rate_t)-ln(Rate_t-1)","length",len(d_f_anu_avrg_s),d_f_anu_avrg_s)
#display("Correl_phi_alpha","length",len(correlcoeff_s),correlcoeff_s)
#display("Year","length",len(Year_s),Year_s)
## Plottting graph with multiple y-axis with different labels
# https://stackoverflow.com/questions/9103166/multiple-axis-in-matplotlib-with-different-scales
host = host_subplot(111, axes_class=AA.Axes)
plt.subplots_adjust(right=0.75)
par1 = host.twinx()
par2 = host.twinx()
YLabelL="ln(EURUSD(Current Year))-ln(EURUSD(Previous Year))"
YLabelR="Correlation Coefficient of |imag(phi)| and |real(alpha)|"
VariableL=d_f_anu_avrg_s
VariableR=correlcoeff_s
host.set_xlabel("Year")
host.set_ylabel(YLabelL)
par1.set_ylabel(YLabelR)
p1, = host.plot(Year_s, VariableL, label=YLabelL)
p2, = par1.plot(Year_s, VariableR, label=YLabelR)
host.legend()
host.axis["left"].label.set_color(p1.get_color())
par1.axis["right"].label.set_color(p2.get_color())
plt.draw()
plt.xticks(rotation='vertical')
plt.show()
No comments:
Post a Comment
Note: only a member of this blog may post a comment.