Wednesday, June 28, 2023

Synthetic complex time series data: Python generator

The following Python code generates the synthetic time series based on real numbers and complex numbers.

This is useful to simulate the business cycles, the investment returns, and the other wave formed time series models of finance and macroeconomics.

You will need to adjust some parameters depending on your usage. 

Running this Python programme saves a csv file to Download folder in your PC but you may change owing to your preference.

# importing necessary tools
import os

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy.optimize import newton
from scipy import stats
import statsmodels.formula.api as sm

import math
import cmath

from scipy.linalg import hankel

# Showing the plots bigger
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (20,10)

# Initializing real numbers
x = -1.0
y = 0.001
 
# converting x and y into complex number
z = complex(x,y);

# defining complex number
i=complex(0,1)

# printing phase of a complex number using phase()
print ("The phase of complex number is : ",end="")
print (cmath.phase(z))

# signal Business Cycle
phi = [0.1, 0.3*i, 1*i, -1+3*i, -1-2*i, 0.03+0.5*i, 11*i, -9*i, 30*i]
alpha = [4, 3, 2, 2, 2, 1, 1, 1, 0.1]

# Defining the function generating the complex time series
def syn_exp(amp,freq,x):
    t=len(amp)
    y=[]
    for omg in range(len(x)):
        yi=0
        for ap in range(len(amp)):
            yi+=amp[ap]*cmath.exp(freq[ap]*x[omg])
        y.append(yi)
    return y

# sampling rate
M = 100

# number of exponentials in the signal
n = 600

# Generating the time trend for the complex time series
_2n_1=(2*n-1)+1
omega = [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(_2n_1)]
omega

# Generating the synthetic time series data of the phytical part (without noise)
f_syn = syn_exp(alpha, phi, omega)

# Generating the noise of the time trend
noise=[np.random.normal(0) for k in range(len(f_syn))]
noise=noise/LA.norm(noise)

# eps: noise level, also try 10^(-0)
eps =  10^(-1);
#eps = 0

# Combining the physical parts and the noise parts of the time series
# f = f_syn added with random noise
f = f_syn + eps*noise;

# plot the real part of the signal
def plot_signal(amp,freq,x):
    Mrend = 2*math.pi;
    Mint = math.pi/2;
    t=np.linspace(0, Mrend, 10000)
    M=len(t)
    x= [2*math.pi/M*(_0__2n_1) for _0__2n_1 in range(len(t))]
    F=syn_exp(amp,freq,x)
    F_real=[F[k].real for k in range(len(x))]
    plt.plot(F_real)
    plt.title('Signal: real part')
    plt.xlabel("'pi/2'__'pi'__'3pi/2'__'2pi'")
    plt.ylabel('')
    plt.show

plot_signal(alpha,phi,omega)

# Generating the time stamp
# ref: https://stackoverflow.com/questions/4479800/python-generate-dates-series
import datetime
stY=2023
stM=6
stD=1
fLen=len(f)

edY=stY+math.floor(fLen/365)+math.floor((stM+(fLen%365)/30)/12)
edM=math.floor((stM+(fLen%365)/30)%12)
edD=math.floor((stD+fLen%365%30))

dt = datetime.datetime(stY, stM, stD)
end = datetime.datetime(edY, edM, edD, 0, 0, 0)
step = datetime.timedelta(days=1)

TimeLine0 = []

while dt < end:
    TimeLine0.append(dt.strftime('%Y-%m-%d %H:%M:%S'))
    dt += step

TimeLine=TimeLine0[:fLen].copy()
len(TimeLine)

SensorId=[1]*fLen
len(f)

# Combining the time stamp and the time series data in Panda Dataframe
dict = {'Timestamp': TimeLine, 'SensorId': SensorId, 'Value': f, }
df = pd.DataFrame(dict)
df.tail()

# Download this time series data to your PC

FileLocation_Folder="C:\\Users\\OWNER\\Downloads" ## Change here for your PC!!!
Direct = (r"","\\ComplexTimeSeries.csv")
os.makedirs(FileLocation_Folder, exist_ok=True)  
df.to_csv(FileLocation_Folder.join(Direct))






Monday, June 19, 2023

Python: Hankel Matrices H0, H1, Generalised Eigenvalue of H0 and H1, and Singular Value Decomposition (SVD) used for the Matrix Pencil Method (MPM) and Singular Spectrum Analysis (SSA)

 


This process is the part of the Python code for Prony method introduced in the last post. The process of finding Hankel matrices, Generalised Eigenvalues, and Singular Value Decomposition (SVD) are the essential processes for both the modern Prony method (Matrix Pencil Method (MPM) and Singular Spectrum Analysis (SSA). 

 SSA is the non-parametric method which is said to be influenced by Prony method. For financial time series analyses, SSA is popularly used nowadays. SSA is better at capturing non-linearly time series with erratic fluctuations. SSA is popular due to the multi-resolution decomposition of time series into time trends and volatilities. 

The following Python codes operates the following actions:

- Downloads the foreign exchange rates of Euro based on USD

- Generating a time series variable EUR/USD

- Creating Hankel matrices H0 and H1 based on this time series

- Finding Generalised Eigenvalue and Eigenvectors of H0 and H1

- Singular Value Decomposition (SVD) of the Hankel matrix H0

 

# Execute this cell to have access to the API of Frankfurter App
import requests
import json

# importing necessary tools
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import random
import statistics
import scipy.linalg
from numpy import linalg as LA
from scipy.stats import qmc
from scipy.optimize import newton
from scipy import stats
import statsmodels.formula.api as sm

import math
import cmath

from scipy.linalg import hankel

import scipy.odr
from scipy import odr

# Showing the plots bigger
plt.rcParams["figure.figsize"] = (12,6)

# defining complex number
i=complex(0,1)
i

# Connecting to the API https://www.frankfurter.app/docs/ to get the EUR/USD rates
url = "http://api.frankfurter.app/1999-01-01.."
# Write your code here
resp = requests.get(url,{'to':'USD'})
#print(resp.url)
#print(resp.status_code)
#print("Type:", type(resp.content))
#print(resp.content)

print("Type:", type(resp))

# Preparing for the right encoding
import ast
byte_str = resp.content
dict_str = byte_str.decode("UTF-8")
mydata = ast.literal_eval(dict_str)
print("Type:", type(mydata))

#mydata #show the accessed data
EURUSD=(mydata["rates"])
#EURUSD

# Sorting the data into the Python dictionary form
Dates_EURUSD=[]
Rates_EURUSD=[]

for key, value in EURUSD.items():
    Dates_EURUSD.append(key)
    for k,v in value.items():
        Rates_EURUSD.append(v)

EURUSD=dict(zip(Dates_EURUSD,Rates_EURUSD))
#EURUSD

#Rates_EURUSD
print("Length of data:",len(Rates_EURUSD))

# Converting the rate to natural logarithm
ln_Rates_EURUSD=np.log(Rates_EURUSD)
ln_Rates_EURUSD
#plt.plot(ln_Rates_EURUSD)   # Uncomment to show plot!

# Creating the log-difference (Daily return) of EUR/USD
d_ln_Rates_EURUSD=np.diff(np.log(Rates_EURUSD))
len(d_ln_Rates_EURUSD)
#plt.plot(d_ln_Rates_EURUSD)   # Uncomment to show plot!

# Obtaining the length which is half of the total length of this variable.
# This is needed for creating the matrix of this variable i.e. n/2 * n/2 matrix
X=d_ln_Rates_EURUSD.copy()
d=(int(len(X)/2-len(X)%2*0.5)) # Subtracting the remainder if it is an odd number
d


f=d_ln_Rates_EURUSD.copy()
n=len(f)
M=100  # sampling rate.
# M is supposed to be twice of the actual signal frequency.
# But, we have never known the true frequency of the financial data.
# So, it is assumed to be 100 because it seems to provide an appropriate answer
print(n) # Checking the total length of this variable.

# Creating Hankel matrix which is needed to
def mat_ge(samples):
    n_samples = len(samples)
    d = int(n_samples/2)
    H0=hankel(samples[0:-1])[0:d,0:d]
    H1=hankel(samples[1:])[0:d,0:d]
    return H0, H1
H0, H1 =mat_ge(f)
H=(H0,H1)

for h in range(len(H)):
    print(f'\n\n \033[1m Hankel Matrix H{h} \033[0m')
    panda_df = pd.DataFrame(data = H[h])
    display(panda_df)

# Finding the Generalised Eigenvalue of these Hankel matrices H0 and H1
print('\n\n Finding the Generalised Eigenvalue and Eigenvectors of these Hankel matrices H0 and H1 ')
Eig = scipy.linalg.eig(H0, H1)
Evalue= Eig[0]
Evectors = Eig[1:]
Evalue_real=Evalue.real
print('\n\n \033[1m Eigenvalue \033[0m')
panda_df = pd.DataFrame(data = Evalue)
display(panda_df)
print('\n\n \033[1m Eigenvectors \033[0m')
display(Eig[1:])
   
# Showing Verndermonde Matrix
Veig=np.transpose(np.vander(Evalue, N=None, increasing=True))
#print('\n\n Vandermonde Matrix of the eigenvalues')
#panda_df = pd.DataFrame(data = Veig)
#display(panda_df)


# Defnining the function plotting Singular Value Decomposition (SDV)
# This apprximate the number of the physical/true signals i.e. the number of true alpha s and phi s
def plot_SVD(A):
    SDV=LA.svd(A, full_matrices=True, compute_uv=True, hermitian=False)[1]
    Ln_SDV=[math.log(SDV[k]) for k in range(len(SDV)) ]
    plt.plot(Ln_SDV)
    plt.title('Singular Values')
    plt.xlabel('')
    plt.ylabel('Natural Log')
    plt.show
print('\n\n Singular Value decomposition (SVD) of the Hankel matrix H0')
plot_SVD(H0)

 

 

P.S. This following website shows the useful code to simulate Singular Spectrum Analysis (SSA) with Python

Introducing SSA for Time Series Decomposition
Python · MotionSense Dataset : Smartphone Sensor Data - HAR
https://www.kaggle.com/code/jdarcy/introducing-ssa-for-time-series-decomposition