0

The code below plots numbers received from serial port. However, the values of locals()[self.axes_mapping[axis]] are not all the received data from the function receive_data. It seems some values are lost. If I try to print the values of x, they correspond with the received data. How do I make sure that all the received data are plotted?

import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial
BUFFER_SIZE = 100
class RealTimePlot(QMainWindow):
 def __init__(self):
 super().__init__()
 self.setup_ui()
 
 self.serial_port = serial.Serial('COM6', 115200)
 self.N = 100
 self.fs = 1000 # Sampling frequency in Hz (adjust according to your setup)
 self.T = 1/self.fs
 self.x_values = np.arange(0, self.N*self.T, self.T)
 # Circular buffers for time domain plots
 self.axes = ['X', 'RMS']
 self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
 self.z_index = {axis: 0 for axis in self.axes}
 # Axes variable mapping
 self.axes_mapping = {'X': 'x', 'RMS': 'rms'}
 # Plotting setup
 self.setup_plots()
 self.data_queue = queue.Queue()
 # Lock for synchronizing access to the data queue
 self.data_queue_lock = threading.Lock()
 # Create and start the receiving thread
 self.receive_thread = threading.Thread(target=self.receive_data)
 self.receive_thread.daemon = True
 self.receive_thread.start()
 # Start the animation
 self.timer = QtCore.QTimer(self)
 self.timer.timeout.connect(self.update_plot)
 self.timer.start(10)
 def setup_ui(self):
 self.central_widget = pg.GraphicsLayoutWidget()
 self.setCentralWidget(self.central_widget)
 def setup_plots(self):
 self.plots = {axis: self.central_widget.addPlot(row=i, col=0, title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
 for i, axis in enumerate(self.axes)}
 for plot in self.plots.values():
 plot.setLabel('bottom', 'Time', 's')
 # plot.setLabel('left', 'Amplitude', 'g')
 plot.setYRange(-2, 5000)
 linha1 = pg.mkPen((52, 255, 52), width=2) # R G B & width
 # linha4 = pg.mkPen((255, 255, 255), width=2) 
 self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
 # self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
 self.plots['RMS'].setYRange(0, 5000)
 def receive_data(self):
 data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
 data_cnt = 0
 rx_flag = True
 while True:
 if self.serial_port.in_waiting > 0.0: # Check if there is data waiting
 data_str = int.from_bytes(self.serial_port.read(2), byteorder='little', signed = False)
 rx_flag = True
 with self.data_queue_lock:
 if rx_flag == True:
 data_buffer[data_cnt] = data_str
 data_cnt = data_cnt+1
 rx_flag = False
 # Check if the buffer size is reached, then update the plot
 if data_cnt >= BUFFER_SIZE:
 self.data_queue.put(data_buffer)
 data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
 data_cnt = 0
 
 def calculate_rms(self, x):
 return np.sqrt(np.mean(np.square([x])))
 def update_plot(self):
 with self.data_queue_lock:
 while not self.data_queue.empty():
 data_buffer = self.data_queue.get()
 for data_str in data_buffer:
 x = data_str
 rms = self.calculate_rms(data_buffer)
 for axis in self.axes:
 self.z_values[axis].append(locals()[self.axes_mapping[axis]])
 self.lines[axis].setData(self.x_values, self.z_values[axis])
 print(locals()[self.axes_mapping[axis]])
 return self.lines.values()
 
def closeEvent(self, event):
 self.csv_file.close()
 event.accept()
def main():
 app = QApplication(sys.argv)
 window = RealTimePlot()
 window.show()
 sys.exit(app.exec())
if __name__ == '__main__':
 main()

The example is from here

https://www.reddit.com/r/embedded/comments/18h9smt/attempted_switch_from_dpc_blue_screen_error_while/

asked Dec 13, 2024 at 9:49

1 Answer 1

1

You have a bug. This code

for data_str in data_buffer:
 x = data_str # x = one item, drops the rest
 rms = self.calculate_rms(data_buffer)
for axis in self.axes:
 self.z_values[axis].append(locals()[self.axes_mapping[axis]])
 self.lines[axis].setData(self.x_values, self.z_values[axis])

should be

x = data_buffer # x = all items
rms = [self.calculate_rms(data_buffer)]
for axis in self.axes: # note the use of extend for list/array
 self.z_values[axis].extend(locals()[self.axes_mapping[axis]])
 self.lines[axis].setData(self.x_values, self.z_values[axis])

Also both axes should have different time scales, since the second one is the average of 100 points in the first one.


Lastly the use of locals is very discouraged, it is too brittle and prevents future refactoring or code modification. I'd probably just use a dictionary instead.

new_plot_data = {} # replaces locals()
new_plot_data["x"] = data_buffer
new_plot_data["rms"] = [self.calculate_rms(data_buffer)]

full example (without pyserial use)

import random
import time
import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial
BUFFER_SIZE = 100
class RealTimePlot(QMainWindow):
 def __init__(self):
 super().__init__()
 self.setup_ui()
 self.serial_port = None
 self.N = 100
 self.fs = 1000 # Sampling frequency in Hz (adjust according to your setup)
 self.T = 1 / self.fs
 self.x_values = np.arange(0, self.N * self.T, self.T)
 # Circular buffers for time domain plots
 self.axes = ['X', 'RMS']
 self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
 self.z_index = {axis: 0 for axis in self.axes}
 # Axes variable mapping
 self.axes_mapping = {'X': 'x', 'RMS': 'rms'}
 # Plotting setup
 self.setup_plots()
 self.data_queue = queue.Queue()
 # Lock for synchronizing access to the data queue
 self.data_queue_lock = threading.Lock()
 # Create and start the receiving thread
 self.receive_thread = threading.Thread(target=self.receive_data)
 self.receive_thread.daemon = True
 self.receive_thread.start()
 # Start the animation
 self.timer = QtCore.QTimer(self)
 self.timer.timeout.connect(self.update_plot)
 self.timer.start(10)
 def setup_ui(self):
 self.central_widget = pg.GraphicsLayoutWidget()
 self.setCentralWidget(self.central_widget)
 def setup_plots(self):
 self.plots = {axis: self.central_widget.addPlot(row=i, col=0,
 title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
 for i, axis in enumerate(self.axes)}
 for plot in self.plots.values():
 plot.setLabel('bottom', 'Time', 's')
 # plot.setLabel('left', 'Amplitude', 'g')
 plot.setYRange(-2, 5000)
 linha1 = pg.mkPen((52, 255, 52), width=2) # R G B & width
 # linha4 = pg.mkPen((255, 255, 255), width=2)
 self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
 # self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
 self.plots['RMS'].setYRange(0, 5000)
 def receive_data(self):
 data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
 data_cnt = 0
 rx_flag = True
 while True:
 time.sleep(0.01)
 data_str = random.random() * 256
 rx_flag = True
 with self.data_queue_lock:
 if rx_flag == True:
 data_buffer[data_cnt] = data_str
 data_cnt = data_cnt + 1
 rx_flag = False
 # Check if the buffer size is reached, then update the plot
 if data_cnt >= BUFFER_SIZE:
 self.data_queue.put(data_buffer)
 data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
 data_cnt = 0
 def calculate_rms(self, x):
 return np.sqrt(np.mean(np.square([x])))
 def update_plot(self):
 with self.data_queue_lock:
 while not self.data_queue.empty():
 data_buffer = self.data_queue.get()
 new_plot_data = {}
 new_plot_data["x"] = data_buffer
 new_plot_data["rms"] = [self.calculate_rms(data_buffer)]
 for axis in self.axes:
 self.z_values[axis].extend(new_plot_data[self.axes_mapping[axis]])
 self.lines[axis].setData(self.x_values, self.z_values[axis])
 return self.lines.values()
def closeEvent(self, event):
 self.csv_file.close()
 event.accept()
def main():
 app = QApplication(sys.argv)
 window = RealTimePlot()
 window.show()
 sys.exit(app.exec())
if __name__ == '__main__':
 main()
answered Dec 13, 2024 at 11:33
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks, I got it. I have two more questions. Sorry if these two are obvious because I am new to python. Firstly, what does the square bracket do for the self.calculate_rms(data_buffer) when it passes to rms. Second, what does with self.data_queue_lock: do? It seems the code can run without it.
@JOSEPH129009 the brackets converts rms into a list, so it can be passed to extend which can only accept lists or arrays, and x is an array. as for the lock, you can remove the lock. python queues are already thread-safe, i guess whoever originally wrote that code wasn't too aware of how python works.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.