#!/usr/bin/env python3 __author__ = "kll" __copyright__ = "Copyright 2018, KLL engineering" #__credits__ = ["1", "2"] __license__ = "CC BY SA" __version__ = "0.x.0" # also see vertxt variable vertxt = 'rev 0.x.0' # now come via setup file __maintainer__ = "kll" __email__ = "raspberryonpi@gmail.com" __status__ = "still play" """my_filter.py a numeric recursive average filter first order use the most simple idea Fin is a incoming signal, sensor value, here a random number [ range 50.0 +- moise ] from rev 0.1.3 we can add also 2 different sinus signals ( to show filter ( and its phase / delay ) better ) F is the filtered signal used for processing / control / trending ... F = Fin * A + F * B rule ( to keep the amplitude of that average ) : A + B = 1.0 add we remember the MIN MAX values ( in integer ) of Fin print of it [enable][disable] and now also try a max high filter // a filtered PEAK VU meter means we make a second ( not so strong ) filter what only uses the Fin values what are higher as the filtered value F to create FH but if Fin is lower the filtered value F what we do with this FH? we must drag it down to F by FH - FH_downramp links: http://kll.engineering-news.org/kllfusion01/articles.php?article_id=146 looks like ( when used from PUTTY SSH ) http://kll.engineering-news.org/kllfusion01/downloads/PY3_signal_filter_linechart_putty.jpg https://www.raspberrypi.org/forums/viewtopic.php?f=91&t=211126#p1303104 rev 0.1.0 use command line parameters ( args / opts ) https://www.tutorialspoint.com/python/python_command_line_arguments.htm rev 0.2.0 use setup file use pickle ( makes a unreadable record file at ~/.config/KLL/my_filter ) """ from random import uniform from time import sleep import sys, getopt import math import os, pickle, copy # for setup file from subprocess import PIPE, Popen # for read cpu temp def main(argv): # pls note that following setting will be overwritten by the setup file!!! #_________________________Filter setup # A = 0.1 # A = 0.001 .. 1.0 ( strong .. to .. no filter ) # B = 1.0 - A #_________________________Signal Variables and settings F = 50.0 # start value / easy swing in for filter #_________________________Random signal generation setup # there is no DISABLE switch, just use [-n 0.0] # also disables print noise value Fin = 0.0 Fin_base = 50.0 # set fix input to 50.0 +- random noise # Fn_noise = 10.0 # noise amplitude Fn = 0.0 # noise value #_________________________how many lines we want produce #_________________________filter might need to swing in depending on tuning and startvalues # longlist = 30 # set line count ( 0 means run until [ctrl][z] ) # simrealtime = 0.1 # a delaytimer for each line makes it look like actually reading some hardware # samplefrq = 1.0 / simrealtime # sampling [Hz] #_________________________adding 2 sinus / as we simulate realtime see also simrealtime setting # print_SIN = False SIN_loop = 0 # looper SIN_samples = 320 # SIN1_a = 10.0 # amplitude # SIN1_f = 0.10 # frequency Hz w1 = 0.0 SIN1_v = 0.0 # SIN2_a = 3.0 # amplitude # SIN2_f = 1.00 # frequency Hz w2 = 0.0 SIN2_v = 0.0 #_________________________my LINE CHART dejavu # print_chart_line = True # set print chart line enable chart_line='' val_line = '' osci_F = 'o' osci_Fin = '+' osci_FH = '>' # use terminal color defs as from BASH WHT='\033[37;44m' # white on blue background BLU='\033[47;34m' # blue on white background MAG='\033[47;35m' # magenta on white background GRN='\033[0;32m' # green on NO background RED='\033[47;31m' # red on white background BLK='\033[47;30m' # black on white background BOLD='\033[1m' # and BOLD END2='\033[0m' # end style #_________________________Signal MIN MAX # print_Fi = False Fimin = 100 Fimax = 0 Fi = int(F) #_________________________read cpu temp as signal # print_T = False Ft='' #_________________________Signal HIGH FILTER # print_FH = False FH = F # FH_downramp = 2.0 # kind of memory setting, drag it back down to F # FH_A = 0.5 # set not too strong filter on that # FH_B = 1.0 - FH_A #_________________________save to CSV file # save_CSV=False # outcsv = 'my_filter.csv' csv_vals = '' #_________________________setup file handling # print(os.environ['HOME']) # print(os.environ['USER']) setup_file_path = os.environ['HOME']+'/.config/KLL' setup_file = setup_file_path+'/my_filter' m_mem=False # -m --mmem write user record M_mem=False # -M --Mmem use user record def get_setup(): if (os.path.isfile(setup_file)): with open(setup_file,"rb") as f: my_opts = pickle.load(f) # read setup from file else: print("first run? not find setup file") my_opts=make_my_default_opts() # create setup write_my_opts(my_opts) # and write to new file return my_opts def make_my_default_opts(): # here set the setup values MASTER={ # but only if NO file exists 'vertxt':'rev 0.2.4', 'print_Fi':False, 'save_CSV':False, 'outcsv':'my_filter.csv', 'print_FH':False, 'FH_downramp':2.00, 'FH_A':0.60, 'print_T':False, 'print_chart_line':True, 'longlist':20, 'simrealtime':0.1, 'A':0.1, 'Fn_noise':10.00, 'print_SIN':False, 'SIN1_a':10.00, 'SIN1_f':0.10, 'SIN2_a':3.00, 'SIN2_f':1.00 } my_opts={} my_opts['default']=MASTER my_opts['user']=copy.deepcopy(MASTER) # problem shallow copy solved my_opts['user']['vertxt'] = my_opts['default']['vertxt']+'_user' # change name default to user return my_opts def write_my_opts(my_opts): if not os.path.exists(setup_file_path): os.makedirs(setup_file_path) print("make path") with open(setup_file,"wb") as f: pickle.dump(my_opts, f) print("write setup file: %s" % (setup_file)) #_________________________setup file my_setup = get_setup() # read ( or if not exists, make new default ) record # print(MAG,my_setup,END2) # show record I = 'default' if ( True ): # and store record to ( the now here declared ) local vars vertxt = my_setup[I]['vertxt'] print_Fi = my_setup[I]['print_Fi'] save_CSV = my_setup[I]['save_CSV'] outcsv = my_setup[I]['outcsv'] # 'outcsv':'my_filter.csv' print_FH = my_setup[I]['print_FH'] print_T = my_setup[I]['print_T'] FH_downramp = my_setup[I]['FH_downramp'] FH_A = my_setup[I]['FH_A'] FH_B = 1.0 - FH_A # calc print_chart_line = my_setup[I]['print_chart_line'] longlist = my_setup[I]['longlist'] simrealtime = my_setup[I]['simrealtime'] samplefrq = 1.0 / simrealtime # calc A = my_setup[I]['A'] B = 1.0 - A # calc Fn_noise = my_setup[I]['Fn_noise'] print_SIN = my_setup[I]['print_SIN'] SIN1_a = my_setup[I]['SIN1_a'] SIN1_f = my_setup[I]['SIN1_f'] SIN2_a = my_setup[I]['SIN2_a'] SIN2_f = my_setup[I]['SIN2_f'] #_______________________________________________example how to use that array / matrix / list of lists # my_setup['default']['vertxt'] = 'changed' # change # print(my_setup) # write_my_opts(my_setup) # write #_________________________check for command line parameters my_prog='my_filter.py' helptxt = my_prog+'\n [-v][--version] version\n [-h][--help] this help' helptxt += '\n show:' helptxt += '\n [-c][--chart] print chart line OFF' helptxt += '\n [-i][--iminmax] print min max ON' helptxt += '\n [-p][--peakon] peakfilter ON, default print: '+str(print_FH) helptxt += '\n [-s][--samples] <0> run forever .. or fixed length, default '+str(longlist) helptxt += '\n ** in any case can stop by [ctrl][z] or [ctrl][c]' helptxt += '\n [-f][--ftime] float (sim / sleep), default: '+str(simrealtime) helptxt += '\n [-n][--noise] float (0.0 .. 50.0), default: '+str(Fn_noise) helptxt += '\n [-t][--tempon] read cpu temp, default : '+str(print_T) helptxt += '\n tuning:' helptxt += '\n [-A][--Atune] (float) 0.001 .. 1.0, default: '+str(A) helptxt += '\n [-H][--Htune] (float) > A, default: '+str(FH_A) helptxt += '\n [-d][--downramp] (float), default: '+str(FH_downramp) helptxt += '\n simulate 2 additional sinus signals:' helptxt += '\n [-S][--sinon] enable two add sinus signals, default: '+str(print_SIN) helptxt += '\n [-j][--sin1_a] (float), default: '+str(SIN1_a) helptxt += '\n [-J][--sin1_f] (float), default: '+str(SIN1_f) helptxt += '\n [-k][--sin2_a] (float), default: '+str(SIN2_a) helptxt += '\n [-K][--sin2_f] (float), default: '+str(SIN2_f) helptxt += '\n user setting:' helptxt += '\n [-m][--mmem] save cli options to user settings, default: '+str(m_mem) helptxt += '\n [-M][--Mmem] read user settings, default: '+str(M_mem) helptxt += '\n or combine: [-M] [-x yy] [-m] for next run with [-M]' helptxt += '\n [-o][--outcvs] save line values to: , default: '+str(save_CSV) + ' filename: ' + outcsv try: # short opt : and long opt = means required opts, args = getopt.getopt(argv,"hvcips:f:A:H:d:n:Sj:J:k:K:mMto:",["help","version","chart","iminmax","peakon","ftime=","samples=","Atune=","Htune=","downramp=","noise=","sinon","sin1_a=","sin1_f=","sin2_a=","sin2_f=","mmem","Mmem","tempon","outcsv="]) except getopt.GetoptError: print ('error: ',helptxt) sys.exit(2) for opt, arg in opts: if opt in ('-h','--help'): print (helptxt) sys.exit() elif opt in ('-v','--version'): print (vertxt) sys.exit() elif opt in ('-m', '--mmem'): m_mem=True elif opt in ('-M', '--Mmem'): M_mem=True #I = 'default' # [0] default record [1] user record #if ( M_mem ): # do it again but use: I = 'user' # and store [1] user record to local vars vertxt = my_setup[I]['vertxt'] print_Fi = my_setup[I]['print_Fi'] print_FH = my_setup[I]['print_FH'] save_CSV = my_setup[I]['save_CSV'] outcsv = my_setup[I]['outcsv'] # 'outcsv':'my_filter.csv' print_T = my_setup[I]['print_T'] FH_downramp = my_setup[I]['FH_downramp'] FH_A = my_setup[I]['FH_A'] FH_B = 1.0 - FH_A # calc print_chart_line = my_setup[I]['print_chart_line'] longlist = my_setup[I]['longlist'] simrealtime = my_setup[I]['simrealtime'] samplefrq = 1.0 / simrealtime # calc A = my_setup[I]['A'] B = 1.0 - A # calc Fn_noise = my_setup[I]['Fn_noise'] print_SIN = my_setup[I]['print_SIN'] SIN1_a = my_setup[I]['SIN1_a'] SIN1_f = my_setup[I]['SIN1_f'] SIN2_a = my_setup[I]['SIN2_a'] SIN2_f = my_setup[I]['SIN2_f'] # but now again can have / overwrite with / new command parameters elif opt in ('-i', '--iminmax'): print_Fi=True elif opt in ('-p', '--peakon'): print_FH=True elif opt in ('-o', '--outcsv'): save_CSV=True try: outfile = arg outfile = outfile+'.csv' print("save line values to %s" %(outfile)) outcsv = outfile except ValueError: print("That is not a filename") sys.exit() elif opt in ('-t', '--tempon'): print_T=True elif opt in ('-c', '--chart'): print_chart_line=False elif opt in ('-s', '--samples'): try: longlist = int(arg) except ValueError: print("That is not an integer number!") sys.exit() elif opt in ('-f', '--ftime'): try: simrealtime = float(arg) samplefrq = 1.0 / simrealtime # sampling [Hz] except ValueError: print("That is not an floating point number!") sys.exit() elif opt in ('-A', '--Atune'): try: A = float(arg) B = 1.0 - A except ValueError: print("That is not an floating point number!") sys.exit() elif opt in ('-H', '--Htune'): try: FH_A = float(arg) FH_B = 1.0 - FH_A except ValueError: print("That is not an floating point number!") sys.exit() elif opt in ('-d', '--downramp'): try: FH_downramp = float(arg) except ValueError: print("That is not an floating point number!") sys.exit() elif opt in ('-n', '--noise'): try: Fn_noise = float(arg) except ValueError: print("That is not an floating point number!") sys.exit() elif opt in ('-S', '--sinon'): print_SIN = True elif opt in ('-j', '--sin1_a'): try: SIN1_a = float(arg) except ValueError: print("That is not an floating point number!") sys.exit() elif opt in ('-J', '--sin1_f'): try: SIN1_f = float(arg) except ValueError: print("That is not an floating point number!") sys.exit() elif opt in ('-k', '--sin2_a'): try: SIN2_a = float(arg) except ValueError: print("That is not an floating point number!") sys.exit() elif opt in ('-K', '--sin2_f'): try: SIN2_f = float(arg) w2 = ( ( (SIN2_f) * samplefrq ) / (2 * math.pi ) ) except ValueError: print("That is not an floating point number!") sys.exit() if ( m_mem ): # store settings to user record I = 'user' # and store [1] user record to local vars my_setup[I]['vertxt'] = vertxt+'+u' # problem!! come from -M or only do -m my_setup[I]['print_Fi'] = print_Fi my_setup[I]['print_FH'] = print_FH my_setup[I]['print_T'] = print_T my_setup[I]['save_CSV'] = save_CSV my_setup[I]['outcsv'] = outcsv # 'outcsv':'my_filter.csv' my_setup[I]['FH_downramp'] = FH_downramp my_setup[I]['FH_A'] = FH_A # FH_B = 1.0 - FH_A # calc my_setup[I]['print_chart_line'] = print_chart_line my_setup[I]['longlist'] = longlist my_setup[I]['simrealtime'] = simrealtime # samplefrq = 1.0 / simrealtime # calc my_setup[I]['A'] = A # B = 1.0 - A # calc my_setup[I]['Fn_noise'] = Fn_noise my_setup[I]['print_SIN'] = print_SIN my_setup[I]['SIN1_a'] = SIN1_a my_setup[I]['SIN1_f'] = SIN1_f my_setup[I]['SIN2_a'] = SIN2_a my_setup[I]['SIN2_f'] = SIN2_f write_my_opts(my_setup) #_________________________oscilloscope / chart print output resolution 100 (chars) def make_osci(F,Fin,FH): # Fx expected in range 0.0 .. 100.0 mapped to 100 character position of a text terminal osci='' Fpos=int(F) Finpos=int(Fin) FHpos=int(FH) for k in range(101): adds=' ' # '_' use this when you need to copy result as text to HTML web... if ( k == 0 or k == 10 or k == 20 or k == 30 or k == 40 or k == 50 or k == 60 or k == 70 or k == 80 or k == 90 or k == 100 ): adds='|' if (print_FH): if (k == FHpos ): adds=osci_FH if (k == Fpos ): adds=osci_F if (k == Finpos ): adds=osci_Fin osci = osci+adds return osci #_________________________simulate random signal def get_noise(): signal = Fn_noise*uniform(-1.0,1.0) sleep(simrealtime) # simulate realtime return signal def get_temp(): signal = -5.0 # try: # would need SUDO # tFile = open('/sys/class/thermal/thermal_zone0/temp') # signal = float(tFile.read())/1000 process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE) output, _error = process.communicate() outstr=output.decode('ascii') tempstring=outstr[outstr.index('=') + 1:outstr.rindex("'")] signal = float(tempstring) return signal def get_sin1(): signal = 0.0 if (print_SIN): w1 = ( ( SIN1_f / samplefrq ) * (2 * math.pi ) ) signal = SIN1_a * math.sin(SIN_loop * w1) # print ('loop %.0f, w1 %.2f, w %.2f, a1 %.2f, v1 %.2f' % (SIN_loop,w1,SIN_loop*w1,SIN1_a,signal)) return signal def get_sin2(): signal = 0.0 if (print_SIN): w2 = ( ( SIN2_f / samplefrq ) * (2 * math.pi ) ) signal = SIN2_a * math.sin(SIN_loop * w2) # print ('loop %.0f, w2 %.2f, w %.2f, a2 %.2f, v2 %.2f' % (SIN_loop,w2,SIN_loop*w2,SIN2_a,signal)) return signal def save_to_CSV(csv_vals,outcsv): # print(" to %s : %s " % (outcsv,csv_vals)) fcsv = open(outcsv,"a") # open a file for append fcsv.write(csv_vals+"\n") fcsv.close() #_________________________print headerline print ("my_filter (%s) : CLI: [ -s %4.0f] samples" % (vertxt,longlist)) print ("operation: [ctrl][z] quit / [ctrl][s] hold [ctrl][q] resume") print ("filter use: F = Fin * %4.1f + F * %4.1f | Fin from %4.1f +- Fn (noise) %4.1f | sampling %4.1f Hz" % (A,B,Fin_base,Fn_noise,samplefrq)) print ("CLI: filter A [ -A %4.2f] | noise [ -n %4.2f] | sampletime [ -f %4.2f] " % (A,Fn_noise,simrealtime)) if (print_SIN): print("SIN1 amplitude %.2f, frequency %.2f " % (SIN1_a,SIN1_f)) print("SIN2 amplitude %.2f, frequency %.2f " % (SIN2_a,SIN2_f)) print ("CLI: [ -S ] SIN enable, SIN1 amp [ -j %4.2f] SIN1 freq [ -J %4.2f] SIN2 amp [ -k %4.2f] SIN2 freq [ -K %4.2f]" % (SIN1_a,SIN1_f,SIN2_a,SIN2_f)) if (print_T): print ("read temperature of cpu ") if (print_FH): print ("PEAKVU use: if (Fin > F ): FH = Fin * %4.1f + FH * %4.1f else: FH = FH - %4.1f" % (FH_A,FH_B,FH_downramp)) print ("CLI: [ -p ] FH filter A [ -H %4.2f] FH downramp [ -d %4.2f]" % (FH_A,FH_downramp)) if (print_Fi): print ("CLI: [ -i ] enable Fi min, Fi, Fi max") if (print_chart_line): if (print_FH): print ("Line_Chart: F (%s) FH (%s) Fin (%s)" % (osci_F,osci_FH,osci_Fin)) else: print ("Line_Chart: F (%s) Fin (%s)" % (osci_F,osci_Fin)) if (save_CSV): csv_head = '' if (print_Fi): csv_head=',Fimin,Fi,Fimax,' if (print_FH): csv_head +=',FH' csv_head +=',F,Fin' if (Fn_noise > 0.0): csv_head +=',Fn' if (print_SIN): csv_head +=',SIN1_v,SIN2_v,' # print(csv_head) save_to_CSV(csv_head,outcsv) #_________________________produce "longlist" signal values myrun = True i = 0 while (myrun): i = i + 1 # loop counter if (longlist == 0): # run forever or [ctrl][c] myrun = True else: if (i == longlist): myrun = False #_________________________SAMPLE AND FILTER Fn = get_noise() # there in is also the timer Fin = Fin_base + Fn if (print_T): Fin = get_temp() # not use 50.0 + moise... if (print_SIN): SIN1_v = get_sin1() SIN2_v = get_sin2() Fin += SIN1_v + SIN2_v SIN_loop += 1 if ( SIN_loop >= SIN_samples ): SIN_loop = 0 F = Fin * A + F * B # the filter job #_________________________MIN VAL MAX ( integer ) if (print_Fi): Fi = int(F) if ( Fi > Fimax ): Fimax = Fi if ( Fi < Fimin ): Fimin = Fi #_________________________FH by not so strong filter /and/ only on the high values if (print_FH): if ( Fin > F ): FH = Fin * FH_A + FH * FH_B # filtered high values else: if (FH > F): FH = FH - FH_downramp # drag it back down to F if (FH < F): FH = F # repair if now too low by FH_dragdown stepsize #_________________________Output ( chart + data ) chart_line = '' if ( print_chart_line) : chart_line=make_osci(F,Fin,FH) chart_line = RED + chart_line + END2 # text RED on WHITE val_line = '' csv_vals = '' if (print_Fi): val_line = 'Fimin {:.1f}'.format(Fimin) + '_Fi {:.1f}'.format(Fi) + '_Fimax {:.1f}'.format(Fimax) + '_' if (save_CSV): csv_vals = ',{:.0f}'.format(Fimin)+',{:.0f}'.format(Fi)+',{:.0f}'.format(Fimax) if (print_FH): val_line = val_line + 'FH {:.1f}'.format(FH) + '_' if (save_CSV): csv_vals += ',{:.2f}'.format(FH) if (True): val_line = val_line + 'F {:.1f}'.format(F) + '_Fin {:.1f}'.format(Fin) if (save_CSV): csv_vals += ',{:.2f}'.format(F)+',{:.2f}'.format(Fin) if (Fn_noise > 0.0): val_line = val_line + '_Fn {:.1f}'.format(Fn) if (save_CSV): csv_vals += ',{:.2f}'.format(Fn) if (print_SIN): val_line = val_line + '_S1 {:.1f}'.format(SIN1_v) + '_S2 {:.1f}'.format(SIN2_v) if (save_CSV): csv_vals += ',{:.2f}'.format(SIN1_v)+',{:.2f}'.format(SIN2_v)+',' print(chart_line+val_line) if (save_CSV): save_to_CSV(csv_vals,outcsv) if __name__ == '__main__': main(sys.argv[1:]) # end