434 lines
16 KiB
Python
434 lines
16 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
"""
|
||
|
Created on Tue Aug 30 14:25:12 2022
|
||
|
|
||
|
@author: timof
|
||
|
"""
|
||
|
|
||
|
from plot import qtplot
|
||
|
|
||
|
import sys
|
||
|
import os
|
||
|
import json
|
||
|
|
||
|
import pyqtgraph as qt
|
||
|
import pyqtgraph.exporters
|
||
|
|
||
|
|
||
|
import math as m
|
||
|
import numpy as np
|
||
|
from scipy.fftpack import fft, fftfreq
|
||
|
from scipy.stats import cauchy
|
||
|
|
||
|
vect = np.vectorize
|
||
|
|
||
|
|
||
|
|
||
|
@vect
|
||
|
def log2(x):
|
||
|
try:
|
||
|
return m.log2(x)
|
||
|
except ValueError:
|
||
|
if x==0:
|
||
|
return float(0)
|
||
|
else:
|
||
|
raise
|
||
|
|
||
|
basepath = f'/cloud/Public/_data/neuropercolation/2lay/steps=1000100/'
|
||
|
vals = [[],[]]
|
||
|
|
||
|
|
||
|
TOTP_eps=[]
|
||
|
TOTP_eps = []
|
||
|
SIGP_eps = []
|
||
|
SN_eps = []
|
||
|
STON_eps = []
|
||
|
FMAX_eps = []
|
||
|
FWHM_eps = []
|
||
|
Q_eps = []
|
||
|
LHM_eps = []
|
||
|
UHM_eps = []
|
||
|
SOS_eps = []
|
||
|
SPECC_eps=[]
|
||
|
LORENTZ_eps=[]
|
||
|
|
||
|
dims=[9]
|
||
|
runsteps = 1000000
|
||
|
for dim in dims:
|
||
|
path=basepath+f'dim={dim:02}/'
|
||
|
eps_space = np.linspace(0.005, 0.5, 100)
|
||
|
eps_space=eps_space[1::2]
|
||
|
TOTP = []
|
||
|
SIGP = []
|
||
|
SN = []
|
||
|
STON = []
|
||
|
FMAX = []
|
||
|
FWHM = []
|
||
|
Q = []
|
||
|
LHM=[]
|
||
|
UHM=[]
|
||
|
SOS=[]
|
||
|
#%%
|
||
|
for eps in eps_space:
|
||
|
eps=round(eps,3)
|
||
|
halfh_parts = []
|
||
|
sigtonoi_parts = []
|
||
|
noisefloor_parts = []
|
||
|
totalpower_parts = []
|
||
|
sigpower_parts = []
|
||
|
sn_parts = []
|
||
|
freqmax_parts = []
|
||
|
freqfwhm_parts = []
|
||
|
specc_list = []
|
||
|
lorentz_list = []
|
||
|
|
||
|
subfreqmax_parts = []
|
||
|
subfreqfwhm_parts = []
|
||
|
totlorentz_list = []
|
||
|
lhm_list=[]
|
||
|
uhm_list=[]
|
||
|
|
||
|
parts = 1
|
||
|
partlen = int(runsteps/parts)
|
||
|
for part in range(0,runsteps,partlen):
|
||
|
with open(path+f"eps={eps:.3f}_activation.txt", 'r', encoding='utf-8') as f:
|
||
|
activation = json.load(f)[100+part:100+part+partlen]
|
||
|
phases = []
|
||
|
for act in activation:
|
||
|
ph = m.atan2(*act[::-1])
|
||
|
phase = act[0] + act[1]*1j #m.e**(ph*1j)
|
||
|
phases.append(phase)
|
||
|
|
||
|
spectrum = abs(fft(phases))**2
|
||
|
freqs = fftfreq(partlen, 1)
|
||
|
|
||
|
accum = 1#int(500/parts)
|
||
|
|
||
|
|
||
|
if accum==1:
|
||
|
try:
|
||
|
with open(path+f"eps={eps:.3f}_smooth_spectrum.txt", 'r', encoding='utf-8') as f:
|
||
|
specc = json.load(f)
|
||
|
freqqs = np.array([freqs[pos] for pos in range(-int(partlen/2),int(partlen/2))])
|
||
|
print(f'Loaded specc for eps={eps:.3f}')
|
||
|
except:
|
||
|
freqqs = np.array([freqs[pos] for pos in range(-int(partlen/2),int(partlen/2))])
|
||
|
spec = np.array([spectrum[pos] for pos in range(-int(partlen/2),int(partlen/2))])
|
||
|
|
||
|
kernelres=2500
|
||
|
kernelbase = np.linspace(-1,1,2*kernelres+1)
|
||
|
kernel = np.exp(-1/(1-np.abs(kernelbase)**2))
|
||
|
kernel=kernel/np.sum(kernel)
|
||
|
|
||
|
spec_ext = np.hstack((spec[-kernelres:],spec,spec[:kernelres]))
|
||
|
|
||
|
specc=np.convolve(spec_ext,kernel,mode='same')[kernelres:-kernelres]
|
||
|
with open(path+f"eps={eps:.3f}_smooth_spectrum.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(list(specc),f,indent=1)
|
||
|
else:
|
||
|
freqqs = np.array([freqs[pos]
|
||
|
for pos in range(int((-partlen+accum)/2), int((partlen+accum)/2),accum)])
|
||
|
specc = np.array([( spectrum[pos-int(accum/2)]/2 + sum(spectrum[pos-int(accum/2)+1:pos+int(accum/2)-1]) + spectrum[pos+int(accum/2)-1] + spectrum[pos+int(accum/2)]/2 )/accum
|
||
|
for pos in range(int((-partlen+accum)/2), int((partlen+accum)/2),accum)])
|
||
|
|
||
|
halfh = (max(specc)+min(specc))/2
|
||
|
|
||
|
#sigtonoi = sum([specc[pos] for pos in range(len(specc)) if specc[pos]>=halfh])/sum([specc[pos] for pos in range(len(specc)) if specc[pos]<halfh])
|
||
|
|
||
|
fars = int(runsteps/accum/100)
|
||
|
noisefloor = min(np.average(specc[:fars]),np.average(specc[-fars:]))
|
||
|
speccnorm = specc - noisefloor
|
||
|
|
||
|
totalpower = sum(specc)
|
||
|
sigpower = sum(speccnorm)
|
||
|
sig = max(specc)
|
||
|
sn = sigpower/(totalpower-sigpower)
|
||
|
sigtonoi = sig/(totalpower-sig)
|
||
|
|
||
|
halfh_parts.append(halfh)
|
||
|
sigtonoi_parts.append(sigtonoi)
|
||
|
noisefloor_parts.append(noisefloor)
|
||
|
totalpower_parts.append(totalpower)
|
||
|
sigpower_parts.append(sig)
|
||
|
sn_parts.append(sn)
|
||
|
specc_list.append(specc)
|
||
|
|
||
|
|
||
|
cumspecc = [speccnorm[0]]
|
||
|
for pos in range(1,len(speccnorm)):
|
||
|
cumspecc.append(cumspecc[pos-1]+speccnorm[pos])
|
||
|
|
||
|
for pos in range(len(specc)):
|
||
|
if cumspecc[pos+1]>=sigpower/2:
|
||
|
median = pos
|
||
|
stepdiff = speccnorm[pos]
|
||
|
halfdiff = sum(speccnorm[:pos+1]) - sigpower/2
|
||
|
|
||
|
freqstep = freqqs[pos] - freqqs[pos-1]
|
||
|
freqmax = freqqs[pos] - freqstep * halfdiff/stepdiff
|
||
|
|
||
|
break
|
||
|
|
||
|
for pos in range(median, len(speccnorm)):
|
||
|
if cumspecc[pos+1]>=sigpower*3/4:
|
||
|
stepdiff = speccnorm[pos]
|
||
|
halfdiff = sum(speccnorm[:pos+1]) - sigpower*3/4
|
||
|
|
||
|
|
||
|
freqstep = freqqs[pos] - freqqs[pos-1]
|
||
|
freqquart3 = freqqs[pos] - freqstep * halfdiff/stepdiff
|
||
|
|
||
|
break
|
||
|
for pos in range(median+1):
|
||
|
if cumspecc[pos+1]>=sigpower/4:
|
||
|
stepdiff = speccnorm[pos]
|
||
|
halfdiff = sum(speccnorm[:pos+1]) - sigpower/4
|
||
|
|
||
|
freqstep = freqqs[pos] - freqqs[pos-1]
|
||
|
freqquart1 = freqqs[pos] - freqstep * halfdiff/stepdiff
|
||
|
|
||
|
break
|
||
|
|
||
|
|
||
|
|
||
|
freqfwhm = freqquart3-freqquart1
|
||
|
gamma = freqfwhm/2
|
||
|
|
||
|
freqmax_parts.append(freqmax)
|
||
|
freqfwhm_parts.append(freqfwhm)
|
||
|
|
||
|
print(gamma**2/median**2)
|
||
|
|
||
|
lorentz = sigpower/m.pi * accum/partlen * gamma/((freqqs-freqmax)**2+gamma**2) + noisefloor
|
||
|
lorentz_list.append(lorentz)
|
||
|
|
||
|
speccpdf = speccnorm/sigpower
|
||
|
deviation = np.sqrt(sum((specc-lorentz)**2)/len(specc))/sigpower
|
||
|
|
||
|
maxpos = 0
|
||
|
for i in range(len(freqqs)):
|
||
|
if abs(freqqs[i]-freqmax)<abs(freqqs[maxpos]-freqmax):
|
||
|
maxpos = i
|
||
|
pos=maxpos
|
||
|
try:
|
||
|
while lorentz[pos]>lorentz[maxpos]/2:
|
||
|
pos-=1
|
||
|
lhm=freqqs[pos]
|
||
|
except:
|
||
|
lhm=freqqs[0]
|
||
|
pos=maxpos
|
||
|
try:
|
||
|
while lorentz[pos]>lorentz[maxpos]/2:
|
||
|
pos+=1
|
||
|
uhm=freqqs[pos]
|
||
|
except:
|
||
|
uhm=freqqs[-1]
|
||
|
|
||
|
SPECC = np.average(specc_list,axis=0)
|
||
|
LORENTZ = np.average(lorentz_list,axis=0)
|
||
|
DEV = np.sqrt(sum((SPECC-LORENTZ)**2)/len(SPECC))/sum(SPECC)
|
||
|
|
||
|
TOTP.append(np.average(totalpower_parts))
|
||
|
SIGP.append(np.average(sigpower_parts))
|
||
|
SN.append(np.average(sn_parts))
|
||
|
STON.append(sigtonoi)
|
||
|
|
||
|
FMAX.append(np.average(freqmax_parts))
|
||
|
FWHM.append(np.average(freqfwhm_parts))
|
||
|
LHM.append(lhm)
|
||
|
UHM.append(uhm)
|
||
|
Q.append(np.average(freqmax_parts)/np.average(freqfwhm_parts))
|
||
|
SOS.append(DEV)
|
||
|
#%%
|
||
|
with open(path+f"signal/SN_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(SN, f, ensure_ascii=False, indent=4)
|
||
|
with open(path+f"signal/FMAX_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(FMAX, f, ensure_ascii=False, indent=4)
|
||
|
with open(path+f"signal/FWHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(FWHM, f, ensure_ascii=False, indent=4)
|
||
|
with open(path+f"signal/LHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(LHM, f, ensure_ascii=False, indent=4)
|
||
|
with open(path+f"signal/UHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(UHM, f, ensure_ascii=False, indent=4)
|
||
|
with open(path+f"signal/TOTP_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(TOTP, f, ensure_ascii=False, indent=4)
|
||
|
with open(path+f"signal/SIGP_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(SIGP, f, ensure_ascii=False, indent=4)
|
||
|
with open(path+f"signal/Q_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(Q, f, ensure_ascii=False, indent=4)
|
||
|
with open(path+f"signal/STON_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(STON, f, ensure_ascii=False, indent=4)
|
||
|
|
||
|
#%%
|
||
|
plotpath = path + 'spectra/'
|
||
|
# qtplot(f'Frequency spectrum for dim={dim} eps={eps:.3f}',
|
||
|
# [freqqs]*2,
|
||
|
# [LORENTZ,SPECC],
|
||
|
# [f'lorentz regression dev={DEV}','spectrum'],
|
||
|
# x_tag = 'frequency f',
|
||
|
# y_tag = 'spectral power p',
|
||
|
# y_log = False,
|
||
|
# export=True,
|
||
|
# lw=1,
|
||
|
# path=plotpath,
|
||
|
# filename=f'Lorentz and Cauchy fit to spectrum {parts} parts accum={accum} eps={round(eps,3):.3f}.png',
|
||
|
# close=True)
|
||
|
|
||
|
|
||
|
#%%
|
||
|
|
||
|
print("Power {}, Signal {}".format(round(totalpower), round(sigpower)))
|
||
|
|
||
|
print("Done {:.3f}: DEV {}, FMAX {}, FWHM {}".format(eps, round(deviation,3), round(freqmax,5), round(FWHM[-1],5)))
|
||
|
|
||
|
LORENTZ_eps.append(LORENTZ)
|
||
|
SPECC_eps.append(SPECC)
|
||
|
|
||
|
#%%
|
||
|
# vareps=chr(949)
|
||
|
# qtplot(f'Frequency spectra to noise level',
|
||
|
# [freqqs[50::100]]*len(eps_space),
|
||
|
# [SPECC[50::100] for SPECC in SPECC_eps],
|
||
|
# [f'{vareps}={eps:.3f}' for eps in eps_space],
|
||
|
# x_tag = 'frequency f',
|
||
|
# y_tag = 'energy spectral density',
|
||
|
# y_log = True,
|
||
|
# export=False,
|
||
|
# lw=2,
|
||
|
# path=plotpath,
|
||
|
# filename=f'Lorentz and Cauchy fit to spectrum log {parts} parts accum={accum} eps={round(eps,3):.3f}.png',
|
||
|
# close=False)
|
||
|
#%%
|
||
|
TOTP_eps.append(TOTP)
|
||
|
SIGP_eps.append(SIGP)
|
||
|
SN_eps.append(SN)
|
||
|
STON_eps.append(STON)
|
||
|
FMAX_eps.append(FMAX)
|
||
|
FWHM_eps.append(FWHM)
|
||
|
Q_eps.append(Q)
|
||
|
LHM_eps.append(LHM)
|
||
|
UHM_eps.append(UHM)
|
||
|
SOS_eps.append(SOS)
|
||
|
|
||
|
#%%
|
||
|
sigpath = basepath + 'signal/'
|
||
|
if not os.path.exists(sigpath):
|
||
|
os.makedirs(sigpath)
|
||
|
|
||
|
# qtplot(f'Signal to noise for dim={dim} eps={eps:.3f}',
|
||
|
# [eps_space],
|
||
|
# [SN],
|
||
|
# ['signalpower to noisefloor'],
|
||
|
# x_tag = 'epsilon',
|
||
|
# y_tag = 'ratio',
|
||
|
# y_log = False,
|
||
|
# export=True,
|
||
|
# path=sigpath,
|
||
|
# filename=f'SN_plot.png',
|
||
|
# close=True)
|
||
|
|
||
|
qtplot(f'Q facor for odd automaton sizes',
|
||
|
[eps_space]*3,
|
||
|
Q_eps,
|
||
|
[f'dim={dim}x{dim}' for dim in dims],
|
||
|
y_tag = 'purity',
|
||
|
y_log = False,
|
||
|
export=True,
|
||
|
path=sigpath,
|
||
|
filename=f'Q_plot_{parts}parts_accum={accum}.png',
|
||
|
close=False)
|
||
|
#%%
|
||
|
|
||
|
# with open(path+f"signal/FMAX_1_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'r', encoding='utf-8') as f:
|
||
|
# FMAX=json.load(f)
|
||
|
# with open(path+f"signal/FWHM_1_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'r', encoding='utf-8') as f:
|
||
|
# FWHM=json.load(f)
|
||
|
|
||
|
qtplot(f'Dominant frequency',
|
||
|
[[0]+list(eps_space[:-1])]*3,
|
||
|
[[0]+FMAX[-1],[0]+LHM[-1],[0]+UHM[-1]],
|
||
|
# [f'dim={dim}x{dim}' for dim in dims],
|
||
|
['dominant frequency', 'lesser half maximum', 'upper half maximum'],
|
||
|
#colors=['g','r','r'],
|
||
|
y_tag = 'frequency f',
|
||
|
y_log = True,
|
||
|
export=True,
|
||
|
path=sigpath,
|
||
|
filename=f'FMAX+FWHMto50_plot_{parts}parts_accum={accum}.png',
|
||
|
close=False)
|
||
|
#%%
|
||
|
qtplot(f'Dominant frequency',
|
||
|
[[0]+list(eps_space[:-1])]*3,
|
||
|
[[0]+FMAX[:-1] for FMAX in FMAX_eps],#, [[0]+LHM[i][:50] for LHM in LHM_eps], [[0]+UHM[i][:50] for UHM in UHM_eps],
|
||
|
[f'dim={dim}x{dim}' for dim in dims],
|
||
|
#[['dominant frequency', 'lesser half maximum', 'upper half maximum'],
|
||
|
#colors=['g','r','r'],
|
||
|
y_tag = 'frequency f',
|
||
|
y_log = True,
|
||
|
export=True,
|
||
|
path=sigpath,
|
||
|
filename=f'FMAX+FWHMto50_plot_{parts}parts_accum={accum}.png',
|
||
|
close=False)
|
||
|
#%%
|
||
|
qtplot(f'Total and signal energy to noise level',
|
||
|
[eps_space]*6,
|
||
|
list(np.array(SIGP_eps)/runsteps*accum)+list(np.array(TOTP_eps)/runsteps*accum),
|
||
|
[f'signal energy for dim={dim:02d}' for dim in dims]+[f'total energy for dim={dim:02d}' for dim in dims],
|
||
|
colors=['b','g','r','c','y','m'],
|
||
|
y_tag = f'energy',
|
||
|
y_log = True,
|
||
|
export=True,
|
||
|
path=sigpath,
|
||
|
filename=f'POWER_plot_{parts}parts_accum={accum}.png',
|
||
|
close=False)
|
||
|
#%%
|
||
|
qtplot(f'Signal-to-noise ratio to noise level',
|
||
|
[eps_space]*len(dims),
|
||
|
STON_eps,
|
||
|
[f'dim={dim:02d}x{dim:02d}' for dim in dims],
|
||
|
y_tag = f'ratio S/N',
|
||
|
y_log = True,
|
||
|
export=True,
|
||
|
path=sigpath,
|
||
|
filename=f'SN_plot_{parts}parts_accum={accum}.png',
|
||
|
close=False)
|
||
|
#%%
|
||
|
qtplot(f'Goodness of fit to Lorentz curve',
|
||
|
[eps_space]*len(dims),
|
||
|
SOS_eps,
|
||
|
[f'dim={dim:02d}x{dim:02d}' for dim in dims],
|
||
|
y_tag = f'mean deviation',
|
||
|
y_log = False,
|
||
|
export=True,
|
||
|
path=sigpath,
|
||
|
filename=f'DEV_plot_{parts}parts_accum={accum}.png',
|
||
|
close=False)
|
||
|
#%%
|
||
|
with open(basepath+f"signal/SN_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(SN_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/FMAX_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(FMAX_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/FWHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(FWHM_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/LHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(LHM_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/UHM_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(UHM_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/TOTP_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(TOTP_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/SIGP_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(SIGP_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/Q_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(Q_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/STON_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(STON_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/Q_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(Q_eps, f, ensure_ascii=False, indent=4)
|
||
|
with open(basepath+f"signal/STON_{parts}_accum={accum}_dim={dim}_runsteps={runsteps}.txt", 'w', encoding='utf-8') as f:
|
||
|
json.dump(STON_eps, f, ensure_ascii=False, indent=4)
|
||
|
|
||
|
def plot_execute():
|
||
|
if sys.flags.interactive != 1 or not hasattr(qt.QtCore, 'PYQT_VERSION'):
|
||
|
qt.QtGui.QApplication.exec_()
|
||
|
|