neuropercolation/evaluation/2Layer fft.py
2023-12-10 21:47:15 +00:00

434 lines
16 KiB
Python

# -*- coding: utf-8 -*-
"""
Created on Tue Aug 30 14:25:12 2022
@author: timof
"""
from plot import qtplot
import sys
import os
import json
import pyqtgraph as qt
import pyqtgraph.exporters
import math as m
import numpy as np
from scipy.fftpack import fft, fftfreq
from scipy.stats import cauchy
vect = np.vectorize
@vect
def log2(x):
try:
return m.log2(x)
except ValueError:
if x==0:
return float(0)
else:
raise
basepath = f'/cloud/Public/_data/neuropercolation/2lay/steps=1000100/'
vals = [[],[]]
TOTP_eps=[]
TOTP_eps = []
SIGP_eps = []
SN_eps = []
STON_eps = []
FMAX_eps = []
FWHM_eps = []
Q_eps = []
LHM_eps = []
UHM_eps = []
SOS_eps = []
SPECC_eps=[]
LORENTZ_eps=[]
dims=[9]
runsteps = 1000000
for dim in dims:
path=basepath+f'dim={dim:02}/'
eps_space = np.linspace(0.005, 0.5, 100)
eps_space=eps_space[1::2]
TOTP = []
SIGP = []
SN = []
STON = []
FMAX = []
FWHM = []
Q = []
LHM=[]
UHM=[]
SOS=[]
#%%
for eps in eps_space:
eps=round(eps,3)
halfh_parts = []
sigtonoi_parts = []
noisefloor_parts = []
totalpower_parts = []
sigpower_parts = []
sn_parts = []
freqmax_parts = []
freqfwhm_parts = []
specc_list = []
lorentz_list = []
subfreqmax_parts = []
subfreqfwhm_parts = []
totlorentz_list = []
lhm_list=[]
uhm_list=[]
parts = 1
partlen = int(runsteps/parts)
for part in range(0,runsteps,partlen):
with open(path+f"eps={eps:.3f}_activation.txt", 'r', encoding='utf-8') as f:
activation = json.load(f)[100+part:100+part+partlen]
phases = []
for act in activation:
ph = m.atan2(*act[::-1])
phase = act[0] + act[1]*1j #m.e**(ph*1j)
phases.append(phase)
spectrum = abs(fft(phases))**2
freqs = fftfreq(partlen, 1)
accum = 1#int(500/parts)
if accum==1:
try:
with open(path+f"eps={eps:.3f}_smooth_spectrum.txt", 'r', encoding='utf-8') as f:
specc = json.load(f)
freqqs = np.array([freqs[pos] for pos in range(-int(partlen/2),int(partlen/2))])
print(f'Loaded specc for eps={eps:.3f}')
except:
freqqs = np.array([freqs[pos] for pos in range(-int(partlen/2),int(partlen/2))])
spec = np.array([spectrum[pos] for pos in range(-int(partlen/2),int(partlen/2))])
kernelres=2500
kernelbase = np.linspace(-1,1,2*kernelres+1)
kernel = np.exp(-1/(1-np.abs(kernelbase)**2))
kernel=kernel/np.sum(kernel)
spec_ext = np.hstack((spec[-kernelres:],spec,spec[:kernelres]))
specc=np.convolve(spec_ext,kernel,mode='same')[kernelres:-kernelres]
with open(path+f"eps={eps:.3f}_smooth_spectrum.txt", 'w', encoding='utf-8') as f:
json.dump(list(specc),f,indent=1)
else:
freqqs = np.array([freqs[pos]
for pos in range(int((-partlen+accum)/2), int((partlen+accum)/2),accum)])
specc = np.array([( spectrum[pos-int(accum/2)]/2 + sum(spectrum[pos-int(accum/2)+1:pos+int(accum/2)-1]) + spectrum[pos+int(accum/2)-1] + spectrum[pos+int(accum/2)]/2 )/accum
for pos in range(int((-partlen+accum)/2), int((partlen+accum)/2),accum)])
halfh = (max(specc)+min(specc))/2
#sigtonoi = sum([specc[pos] for pos in range(len(specc)) if specc[pos]>=halfh])/sum([specc[pos] for pos in range(len(specc)) if specc[pos]<halfh])
fars = int(runsteps/accum/100)
noisefloor = min(np.average(specc[:fars]),np.average(specc[-fars:]))
speccnorm = specc - noisefloor
totalpower = sum(specc)
sigpower = sum(speccnorm)
sig = max(specc)
sn = sigpower/(totalpower-sigpower)
sigtonoi = sig/(totalpower-sig)
halfh_parts.append(halfh)
sigtonoi_parts.append(sigtonoi)
noisefloor_parts.append(noisefloor)
totalpower_parts.append(totalpower)
sigpower_parts.append(sig)
sn_parts.append(sn)
specc_list.append(specc)
cumspecc = [speccnorm[0]]
for pos in range(1,len(speccnorm)):
cumspecc.append(cumspecc[pos-1]+speccnorm[pos])
for pos in range(len(specc)):
if cumspecc[pos+1]>=sigpower/2:
median = pos
stepdiff = speccnorm[pos]
halfdiff = sum(speccnorm[:pos+1]) - sigpower/2
freqstep = freqqs[pos] - freqqs[pos-1]
freqmax = freqqs[pos] - freqstep * halfdiff/stepdiff
break
for pos in range(median, len(speccnorm)):
if cumspecc[pos+1]>=sigpower*3/4:
stepdiff = speccnorm[pos]
halfdiff = sum(speccnorm[:pos+1]) - sigpower*3/4
freqstep = freqqs[pos] - freqqs[pos-1]
freqquart3 = freqqs[pos] - freqstep * halfdiff/stepdiff
break
for pos in range(median+1):
if cumspecc[pos+1]>=sigpower/4:
stepdiff = speccnorm[pos]
halfdiff = sum(speccnorm[:pos+1]) - sigpower/4
freqstep = freqqs[pos] - freqqs[pos-1]
freqquart1 = freqqs[pos] - freqstep * halfdiff/stepdiff
break
freqfwhm = freqquart3-freqquart1
gamma = freqfwhm/2
freqmax_parts.append(freqmax)
freqfwhm_parts.append(freqfwhm)
print(gamma**2/median**2)
lorentz = sigpower/m.pi * accum/partlen * gamma/((freqqs-freqmax)**2+gamma**2) + noisefloor
lorentz_list.append(lorentz)
speccpdf = speccnorm/sigpower
deviation = np.sqrt(sum((specc-lorentz)**2)/len(specc))/sigpower
maxpos = 0
for i in range(len(freqqs)):
if abs(freqqs[i]-freqmax)<abs(freqqs[maxpos]-freqmax):
maxpos = i
pos=maxpos
try:
while lorentz[pos]>lorentz[maxpos]/2:
pos-=1
lhm=freqqs[pos]
except:
lhm=freqqs[0]
pos=maxpos
try:
while lorentz[pos]>lorentz[maxpos]/2:
pos+=1
uhm=freqqs[pos]
except:
uhm=freqqs[-1]
SPECC = np.average(specc_list,axis=0)
LORENTZ = np.average(lorentz_list,axis=0)
DEV = np.sqrt(sum((SPECC-LORENTZ)**2)/len(SPECC))/sum(SPECC)
TOTP.append(np.average(totalpower_parts))
SIGP.append(np.average(sigpower_parts))
SN.append(np.average(sn_parts))
STON.append(sigtonoi)
FMAX.append(np.average(freqmax_parts))
FWHM.append(np.average(freqfwhm_parts))
LHM.append(lhm)
UHM.append(uhm)
Q.append(np.average(freqmax_parts)/np.average(freqfwhm_parts))
SOS.append(DEV)
#%%
with open(path+f"signal/SN_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(SN, f, ensure_ascii=False, indent=4)
with open(path+f"signal/FMAX_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(FMAX, f, ensure_ascii=False, indent=4)
with open(path+f"signal/FWHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(FWHM, f, ensure_ascii=False, indent=4)
with open(path+f"signal/LHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(LHM, f, ensure_ascii=False, indent=4)
with open(path+f"signal/UHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(UHM, f, ensure_ascii=False, indent=4)
with open(path+f"signal/TOTP_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(TOTP, f, ensure_ascii=False, indent=4)
with open(path+f"signal/SIGP_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(SIGP, f, ensure_ascii=False, indent=4)
with open(path+f"signal/Q_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(Q, f, ensure_ascii=False, indent=4)
with open(path+f"signal/STON_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(STON, f, ensure_ascii=False, indent=4)
#%%
plotpath = path + 'spectra/'
# qtplot(f'Frequency spectrum for dim={dim} eps={eps:.3f}',
# [freqqs]*2,
# [LORENTZ,SPECC],
# [f'lorentz regression dev={DEV}','spectrum'],
# x_tag = 'frequency f',
# y_tag = 'spectral power p',
# y_log = False,
# export=True,
# lw=1,
# path=plotpath,
# filename=f'Lorentz and Cauchy fit to spectrum {parts} parts accum={accum} eps={round(eps,3):.3f}.png',
# close=True)
#%%
print("Power {}, Signal {}".format(round(totalpower), round(sigpower)))
print("Done {:.3f}: DEV {}, FMAX {}, FWHM {}".format(eps, round(deviation,3), round(freqmax,5), round(FWHM[-1],5)))
LORENTZ_eps.append(LORENTZ)
SPECC_eps.append(SPECC)
#%%
# vareps=chr(949)
# qtplot(f'Frequency spectra to noise level',
# [freqqs[50::100]]*len(eps_space),
# [SPECC[50::100] for SPECC in SPECC_eps],
# [f'{vareps}={eps:.3f}' for eps in eps_space],
# x_tag = 'frequency f',
# y_tag = 'energy spectral density',
# y_log = True,
# export=False,
# lw=2,
# path=plotpath,
# filename=f'Lorentz and Cauchy fit to spectrum log {parts} parts accum={accum} eps={round(eps,3):.3f}.png',
# close=False)
#%%
TOTP_eps.append(TOTP)
SIGP_eps.append(SIGP)
SN_eps.append(SN)
STON_eps.append(STON)
FMAX_eps.append(FMAX)
FWHM_eps.append(FWHM)
Q_eps.append(Q)
LHM_eps.append(LHM)
UHM_eps.append(UHM)
SOS_eps.append(SOS)
#%%
sigpath = basepath + 'signal/'
if not os.path.exists(sigpath):
os.makedirs(sigpath)
# qtplot(f'Signal to noise for dim={dim} eps={eps:.3f}',
# [eps_space],
# [SN],
# ['signalpower to noisefloor'],
# x_tag = 'epsilon',
# y_tag = 'ratio',
# y_log = False,
# export=True,
# path=sigpath,
# filename=f'SN_plot.png',
# close=True)
qtplot(f'Q facor for odd automaton sizes',
[eps_space]*3,
Q_eps,
[f'dim={dim}x{dim}' for dim in dims],
y_tag = 'purity',
y_log = False,
export=True,
path=sigpath,
filename=f'Q_plot_{parts}parts_accum={accum}.png',
close=False)
#%%
# with open(path+f"signal/FMAX_1_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'r', encoding='utf-8') as f:
# FMAX=json.load(f)
# with open(path+f"signal/FWHM_1_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'r', encoding='utf-8') as f:
# FWHM=json.load(f)
qtplot(f'Dominant frequency',
[[0]+list(eps_space[:20])]*3,
[[0]+FMAX[:20],[0]+LHM[:20],[0]+UHM[:20]],
# [f'dim={dim}x{dim}' for dim in dims],
['FMAX', 'FMAX-FWHM/2', 'FMAX+FWHM/2'],
colors=['r','r','g'],
y_tag = 'frequency f',
y_log = False,
export=True,
path=sigpath,
filename=f'FMAX+FWHMto50_plot_{parts}parts_accum={accum}.png',
close=False)
#%%
qtplot(f'Dominant frequency',
[[0]+list(eps_space[:-1])]*3,
[[0]+FMAX[:-1] for FMAX in FMAX_eps],#, [[0]+LHM[i][:50] for LHM in LHM_eps], [[0]+UHM[i][:50] for UHM in UHM_eps],
[f'dim={dim}x{dim}' for dim in dims],
#[['dominant frequency', 'lesser half maximum', 'upper half maximum'],
#colors=['g','r','r'],
y_tag = 'frequency f',
y_log = True,
export=True,
path=sigpath,
filename=f'FMAX+FWHMto50_plot_{parts}parts_accum={accum}.png',
close=False)
#%%
qtplot(f'Total and signal energy to noise level',
[eps_space]*6,
list(np.array(SIGP_eps)/runsteps*accum)+list(np.array(TOTP_eps)/runsteps*accum),
[f'signal energy for dim={dim:02d}' for dim in dims]+[f'total energy for dim={dim:02d}' for dim in dims],
colors=['b','g','r','c','y','m'],
y_tag = f'energy',
y_log = True,
export=True,
path=sigpath,
filename=f'POWER_plot_{parts}parts_accum={accum}.png',
close=False)
#%%
qtplot(f'Signal-to-noise ratio to noise level',
[eps_space]*len(dims),
STON_eps,
[f'dim={dim:02d}x{dim:02d}' for dim in dims],
y_tag = f'ratio S/N',
y_log = True,
export=True,
path=sigpath,
filename=f'SN_plot_{parts}parts_accum={accum}.png',
close=False)
#%%
qtplot(f'Goodness of fit to Lorentz curve',
[eps_space]*len(dims),
SOS_eps,
[f'dim={dim:02d}x{dim:02d}' for dim in dims],
y_tag = f'mean deviation',
y_log = False,
export=True,
path=sigpath,
filename=f'DEV_plot_{parts}parts_accum={accum}.png',
close=False)
#%%
with open(basepath+f"signal/SN_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(SN_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/FMAX_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(FMAX_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/FWHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(FWHM_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/LHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(LHM_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/UHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(UHM_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/TOTP_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(TOTP_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/SIGP_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(SIGP_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/Q_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(Q_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/STON_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(STON_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/Q_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(Q_eps, f, ensure_ascii=False, indent=4)
with open(basepath+f"signal/STON_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
json.dump(STON_eps, f, ensure_ascii=False, indent=4)
def plot_execute():
if sys.flags.interactive != 1 or not hasattr(qt.QtCore, 'PYQT_VERSION'):
qt.QtGui.QApplication.exec_()