Code 2: Unterschied zwischen den Versionen

Aus PhiloWiki
Zur Navigation springen Zur Suche springen
Zeile 733: Zeile 733:
  
 
==Peakerkennung: Peaks werden bei jeden Array [100] ermittelt ==
 
==Peakerkennung: Peaks werden bei jeden Array [100] ermittelt ==
 
+
[[Datei:Bild Plotter mit Peaks.png]]
 
<pre>
 
<pre>
 
import pyqtgraph as pg
 
import pyqtgraph as pg

Version vom 18. Juni 2024, 10:03 Uhr

Plotter der high und low peaks findet

Erklärung:

Das Skript hat einen Worker Thread ( Q Thread) und eine Klasse für das Hauptfenster PulseWindow. Im Pulse Window gibt es eine Funktion add data:

1. Beladen des main_buffers und des consumer_buffers

 def add_data(self, new_data):
        self.main_buffer.extend(new_data)
        print("Data received at", time.monotonic(), ":", new_data)
        print("Total number of data in main buffer:", len(self.main_buffer))
 
        if len(self.main_buffer) >= 500 and not self.consumer_buffer:
            self.consumer_buffer.extend(self.main_buffer[:500])
            del self.main_buffer[:500]
            print("consumer_buffer load 500 numbers")
            self.start_plotting()
 
        if len(self.main_buffer) >= 1500:
            print("Main buffer exceeds 1500 values, clearing...")
            del self.main_buffer[:]

Diese Funktion dient dazu neue Daten in den Hauptpuffer (main_buffer) zu laden. Wenn der main_buffer über oder gleich 500 Einträge hat, dann werden die ersten 500 Einträge des main_buffer in den consumer_buffer geladen ( self.consumer_buffer.extend(self.main_buffer[:500]), sofern dieser leer ist. Danach werden die ersten 500 Daten aus dem main_buffer gelöscht (del self.main_buffer[:500]). Anschließend wird die Funktion start_plotting aufgerufen.

Ist der Main_buffer >= 1500 wird dieser gelöscht.

Plotten starten

Starten des Worker Q Threads und von dort Aufruf der Funktion processed next data

Nachdem der consumer_buffer mit den letzten 500 Daten des main_buffers geladen wurde, wurde die Funktion start_plotting aufgerufen.

   def start_plotting(self):
        self.worker = Worker(self.consumer_buffer)
        self.worker.data_processed.connect(self.process_next_data)
        self.worker.start()
class Worker(QThread):
    data_processed = pyqtSignal(list)
 
    def __init__(self, consumer_buffer):
        super().__init__()
        self.consumer_buffer = consumer_buffer
 
    def run(self):
        while self.consumer_buffer:
            data = [self.consumer_buffer.pop(0)]
            self.data_processed.emit(data)
            time.sleep(0.0182)  # Konsistente Verzögerung

Diese definiert zuerst den Q Thread zusammen mit den consumer_buffer. Danach verbindet er die Funktion data_processed (pyqt Signal) aus dem QThread mit der Funktion process_next_data. Das Signal data_processed wird immer gesendet, sobald ein Datenpunkt im Q Thread Worker geplottet wird (dies erfolgt im Q Thread). Das Plotten und das Aussenden des Signales erfolgt solange Daten im consumer_buffer sind (in einer while Schleife). Im Pulse Window wird nun die Funktion process_next_data aufgerufen, wann immer das Signal processed data gesendet wird.

Anmerkung zum Worker:

1.) self.consumer_buffer.pop(0): Diese Anweisung nimmt das erste Element aus der Liste self.consumer_buffer heraus und entfernt es aus der Liste. Das pop(0)-Methodenaufruf entfernt das Element an der Position 0 (dem Anfang) der Liste und gibt dieses Element zurück.

2.) data = [self.consumer_buffer.pop(0)]: Der Rückgabewert von pop(0) (das entfernte Element) wird in einer Liste data gespeichert. Das Element wird in einer Liste gespeichert, damit es später über das Signal data_processed.emit(data) an andere Teile des Programms übergeben werden kann.


  def process_next_data(self, data):
        value = data[0]
        self.data = np.append(self.data[1:], value)

1.) value = data[0]: Hier wird der erste Wert (data[0]) aus der übergebenen Liste data genommen und in der Variablen value gespeichert. Diese Liste data wird durch das data_processed-Signal übergeben, wenn der Worker Daten verarbeitet hat und diese zurückgibt.

2.) self.data = np.append(self.data[1:], value): In dieser Zeile wird das Attribut self.data aktualisiert. self.data ist ein NumPy-Array mit einer festen Länge von 500 (wie in der __init__-Methode initialisiert). Diese Zeile fügt den neuen Wert value an das Ende von self.data an und entfernt gleichzeitig das erste Element des Arrays (self.data[1:]).

a.) self.data[1:]: Dies ist ein Slicing-Ausdruck in NumPy, der alle Elemente von self.data außer dem ersten Element auswählt.

b.) np.append(self.data[1:], value): Fügt value am Ende des Slices self.data[1:] an.

c.) self.data = ...: Weist dann self.data das aktualisierte Array zu, das nun einen neuen Wert enthält, wobei die Länge des Arrays konstant bei 500 bleibt.

Diese Operation stellt sicher, dass self.data immer die neuesten 500 Datenpunkte enthält, wobei ältere Daten kontinuierlich aus dem Array entfernt werden, wenn neue hinzukommen. Dies ist besonders wichtig für die Echtzeit-Datenvisualisierung und -verarbeitung, die in dieser Anwendung durchgeführt wird.

Prüfen ob erster Durchlauf (Index 0) [Start time wird gesetzt]

Jetzt wird geprüft, ob der Index (der beim Initialisieren des PulseWindow auf 0 gesetzt wird) auf 0 ist. Dies erfolgt ebenfalls in der process_next_data Funktion. Ist der Index auf 0, wird die Start time gesetzt und geprintet, dass ein neue Zyklus beginnt. Die Zeit des letzten Plotts wird zudem auf die start time gelegt:

    if self.current_index == 0:
            self.start_time = time.monotonic()
            self.last_plot_time = self.start_time
            print(f"Consumer buffer processing started at: {self.start_time}")

Aktualisierung des Plot Items mit neuen Daten

    self.plot_item.setData(self.data)
    self.plot_peaks_and_troughs(self.data)

    self.current_index += 1

1.) self.plot_item.setData(self.data): Aktualisiert das Plot-Element (plot_item) mit den neuen Daten (self.data), um die Darstellung zu aktualisieren.

2.) self.plot_peaks_and_troughs(self.data): Ruft die Methode plot_peaks_and_troughs auf, um Peaks und Tiefpunkte in den Daten zu finden und zu plotten.

3.) self.current_index += 1: Erhöht den current_index um 1, um anzuzeigen, dass ein weiterer Datenpunkt verarbeitet wurde.

Das Plot Item wurde im Pulse Window in der _init_ Initalisiert: "self.plot_item = self.graphWidget.plot(pen='w')". Die FUnktion plot_peaks_and_troghts sehen wir uns gleich an, zuerst schauen wir was ab den Wert 500 beim Index passiert.

Beenden eines Zyklus bei Index 500 sowie Pause

      if self.current_index == 500:
            end_time = time.monotonic()
            duration = end_time - self.start_time
            print(f"Consumer buffer processing ended at: {end_time}")
            print(f"Processing duration: {duration} seconds")
            print("consumer_buffer consumed")
            self.consumer_buffer = []
            self.current_index = 0
 
            time_diff = end_time - self.last_plot_time
 
            if time_diff < 0.02:
                time.sleep(0.02 - time_diff)
 
            print(f"Time between plots: {time_diff} seconds")
            self.last_plot_time = time.monotonic()

Wenn der Index 500 erreicht hat, dann wird die end_time gesetzt und die start_time wird von der end_time abgezogen. Es wird geprintet wann der Zyklus aus ist und wie lange dieser gedauert hat. Der Index wird nun auf 0 gesetzt und von der end_time wird die last_plot_time abgezogen, wodurch sich die Zeitdifferenz errechnet. Zuvor wird auch noch der consumer_buffer geleert. Sollte nun die Zeit-Differenz über 0,02 sein, so wird eine Pause eingelegt die die Zeit-Differenz - 0.02 beträgt.

Der Unterschied zwischen der start time und der last plot time

Zweck: start_time wird verwendet, um den Beginn der Verarbeitung eines Batches von Daten (in diesem Fall 500 Datenpunkte) zu markieren.

Verwendung: Es dient hauptsächlich zur Berechnung der gesamten Dauer, die benötigt wird, um alle 500 Datenpunkte aus dem consumer_buffer zu verarbeiten.

Setzen: start_time wird zu Beginn der Verarbeitung des consumer_buffer gesetzt (wenn self.current_index == 0). last_plot_time:


Zweck: last_plot_time wird verwendet, um die Zeit des letzten Plot-Updates zu verfolgen.

Verwendung: Es dient zur Berechnung der Zeitdifferenz zwischen zwei aufeinanderfolgenden Plot-Updates, um sicherzustellen, dass die Plots mit einer konsistenten Rate aktualisiert werden.

Setzen: last_plot_time wird ebenfalls zu Beginn der Verarbeitung des consumer_buffer gesetzt (um initialisiert zu werden), aber es wird auch nach jedem Plot-Update aktualisiert.

BUG last_plot_time wird nicht jedes mal beim Durchlauf aktualisiert

Das Programm oben ist fehlerhaft, da die last_plot_time nur bei Index == 0 und bei Index == 500 aktualisiert wird. Die last plot time wird deshalb gelöscht und die Zeitdifferenz wird so errechnet, dass die Pause zwischen den plots ermittelt wird und die mal die Anzahl der Plots und danach wird dies von der Zeitdauer abgezogen.

Plot Peaks Funktion

Die ganze Funktion:

def plot_peaks_and_troughs(self, data):
        # Hochpeaks finden
        peaks, _ = find_peaks(data, height=np.mean(data) + np.std(data))
 
        # Tiefs finden
        troughs = []
        for i in range(1, len(peaks)):
            segment = data[peaks[i-1]:peaks[i]]
            trough_index = np.argmin(segment) + peaks[i-1]
            troughs.append(trough_index)
 
        peak_data = data[peaks]
        trough_data = data[troughs]
 
        self.graphWidget.clear()
        self.graphWidget.plot(data, pen='w')
        self.graphWidget.plot(peaks, peak_data, pen=None, symbol='x', symbolBrush='r')
        self.graphWidget.plot(troughs, trough_data, pen=None, symbol='o', symbolBrush='b')
 
        if len(peaks) > 1:
            rr_intervals = np.diff(peaks) / 50.0
            bpm = 60.0 / np.mean(rr_intervals)
            print(f"Herzfrequenz: {bpm:.2f} BPM")
        else:
            print("Nicht genug Peaks zur Berechnung der Herzfrequenz gefunden.")

Im Detail:

def plot_peaks_and_troughs(self, data):
    # Hochpeaks finden
    peaks, _ = find_peaks(data, height=np.mean(data) + np.std(data))

Zweck: Diese Zeile sucht nach Hochpunkten (Peaks) in den Daten.

Details: find_peaks ist eine Funktion aus scipy.signal, die Peaks in den Daten findet, die höher als der Durchschnitt plus eine Standardabweichung sind.

 # Tiefs finden
    troughs = []
    for i in range(1, len(peaks)):
        segment = data[peaks[i-1]:peaks[i]]
        trough_index = np.argmin(segment) + peaks[i-1]
        troughs.append(trough_index)

Zweck: Diese Schleife findet die Tiefpunkte (Troughs) zwischen den Peaks.

Details: troughs ist eine leere Liste, die die Indizes der Tiefpunkte speichern wird. Die Schleife läuft über jedes Paar benachbarter Peaks. segment extrahiert den Datenabschnitt zwischen zwei aufeinanderfolgenden Peaks. trough_index findet den Index des Minimums in diesem Segment und korrigiert den Index relativ zur Gesamtdatenreihe. Dieser Index wird zu troughs hinzugefügt.

 peak_data = data[peaks]
 trough_data = data[troughs]

Zweck: Extrahiert die Werte der Daten an den Positionen der Peaks und Troughs.

Details: peak_data enthält die Werte der Daten an den Indizes, die in peaks gefunden wurden. trough_data enthält die Werte der Daten an den Indizes, die in troughs gefunden wurden.

self.graphWidget.clear()
self.graphWidget.plot(data, pen='w')
self.graphWidget.plot(peaks, peak_data, pen=None, symbol='x', symbolBrush='r')
self.graphWidget.plot(troughs, trough_data, pen=None, symbol='o', symbolBrush='b')


Zweck: Zeichnet die Daten, Peaks und Troughs auf dem Graphen.

Details: self.graphWidget.clear() löscht den Graphen. self.graphWidget.plot(data, pen='w') zeichnet die gesamten Daten in Weiß. self.graphWidget.plot(peaks, peak_data, pen=None, symbol='x', symbolBrush='r') zeichnet die Peaks mit roten 'x'-Symbolen. self.graphWidget.plot(troughs, trough_data, pen=None, symbol='o', symbolBrush='b') zeichnet die Troughs mit blauen 'o'-Symbolen.


Zweck: Berechnet die Herzfrequenz (BPM - Beats Per Minute).

Details: Falls mehr als ein Peak vorhanden ist, werden die RR-Intervalle (die Differenzen zwischen aufeinanderfolgenden Peaks) berechnet und durch 50.0 geteilt (wahrscheinlich aufgrund der Abtastrate der Daten). Der Mittelwert der RR-Intervalle wird verwendet, um die Herzfrequenz zu berechnen: 60.0 / np.mean(rr_intervals). Die Herzfrequenz wird ausgegeben. Falls nicht genügend Peaks vorhanden sind, wird eine entsprechende Nachricht ausgegeben.

Der ganze bisherige Code

import sys
import time
import numpy as np
import pyqtgraph as pg
from PyQt6.QtWidgets import QMainWindow, QApplication
from PyQt6 import uic
from PyQt6.QtCore import pyqtSignal, QThread
from scipy.signal import find_peaks
 
uiclass, baseclass = uic.loadUiType("pulse_live_window.ui")
 
class Worker(QThread):
    data_processed = pyqtSignal(list)
 
    def __init__(self, consumer_buffer):
        super().__init__()
        self.consumer_buffer = consumer_buffer
 
    def run(self):
        while self.consumer_buffer:
            data = [self.consumer_buffer.pop(0)]
            self.data_processed.emit(data)
            time.sleep(0.0182)  # Konsistente Verzögerung
 
class PulseWindow(baseclass, uiclass):
    closed = pyqtSignal()
 
    def __init__(self):
        super().__init__()
        self.setupUi(self)
        self.graphWidget.setXRange(0, 500)
        
        self.data = np.zeros(500)
        self.plot_item = self.graphWidget.plot(pen='w')
        self.main_buffer = []  
        self.consumer_buffer = []  
        self.current_index = 0
        self.start_time = None

 
    def add_data(self, new_data):
        self.main_buffer.extend(new_data)
        print("Data received at", time.monotonic(), ":", new_data)
        print("Total number of data in main buffer:", len(self.main_buffer))
 
        if len(self.main_buffer) >= 500 and not self.consumer_buffer:
            self.consumer_buffer.extend(self.main_buffer[:500])
            del self.main_buffer[:500]
            print("consumer_buffer load 500 numbers")
            self.start_plotting()
 
        if len(self.main_buffer) >= 1500:
            print("Main buffer exceeds 1500 values, clearing...")
            del self.main_buffer[:]
 
    def start_plotting(self):
        self.worker = Worker(self.consumer_buffer)
        self.worker.data_processed.connect(self.process_next_data)
        self.worker.start()
 
    def process_next_data(self, data):
        value = data[0]
        self.data = np.append(self.data[1:], value)
 
        if self.current_index == 0:
            self.start_time = time.monotonic()
            print(f"Consumer buffer processing started at: {self.start_time}")
 
        self.plot_item.setData(self.data)
        self.plot_peaks_and_troughs(self.data)
 
        self.current_index += 1
 
        if self.current_index == 500:
            end_time = time.monotonic()
            duration = end_time - self.start_time
            print(f"Consumer buffer processing ended at: {end_time}")
            print(f"Processing duration: {duration} seconds")
            print("consumer_buffer consumed")
            self.consumer_buffer = []
            self.current_index = 0
 
            time_diff = 9,1 - duration #0,0182 x 500 
 
            if time_diff < 0.02:
                time.sleep(0.02 - time_diff)
 
 
    def plot_peaks_and_troughs(self, data):
        # Hochpeaks finden
        peaks, _ = find_peaks(data, height=np.mean(data) + np.std(data))
 
        # Tiefs finden
        troughs = []
        for i in range(1, len(peaks)):
            segment = data[peaks[i-1]:peaks[i]]
            trough_index = np.argmin(segment) + peaks[i-1]
            troughs.append(trough_index)
 
        peak_data = data[peaks]
        trough_data = data[troughs]
 
        self.graphWidget.clear()
        self.graphWidget.plot(data, pen='w')
        self.graphWidget.plot(peaks, peak_data, pen=None, symbol='x', symbolBrush='r')
        self.graphWidget.plot(troughs, trough_data, pen=None, symbol='o', symbolBrush='b')
 
        if len(peaks) > 1:
            rr_intervals = np.diff(peaks) / 50.0
            bpm = 60.0 / np.mean(rr_intervals)
            print(f"Herzfrequenz: {bpm:.2f} BPM")
        else:
            print("Nicht genug Peaks zur Berechnung der Herzfrequenz gefunden.")
 
    def closeEvent(self, event):
        self.closed.emit()
 
if __name__ == "__main__":
    app = QApplication([])
    pulse_plotter_window = PulseWindow()
    pulse_plotter_window.showFullScreen()
    app.exec()

Plotter der von links nach rechts überschreibt mit schwarzen Strich dazwischen

Der Plotter plottet die Daten von links nach rechts und wenn er ganz rechts angekommen ist, beginnt er wieder bei links. Zwischen dem neuen und dem alten Plot ist ein Abstand von 15 schwarzen Punkten.

Wie der Code genau arbeitet:

a) Neue Daten über add_data dem main_buffer hinzugefügen und den DC Wert bereinigen über numpy mean bei allen 100 Daten

Über die Funktion add data werden jeweils 100 Daten übergeben. Diese Daten (new_data) werden in ein numpy array convertiert, danach wird der durchschnittliche Wert aus dem Array ermittelt (np-mean). Danach wird der Durchschnitt des Arrays von den Daten abgezogen und die DC bereinigten Werte in das Array new_data_detrended geschrieben, das Array wird danach in den main_buffer hinzugefügt. Jetzt wird geprintet wie viele Werte in dem Mainbuffer vorhanden sind.

Wenn der main_buffer 500 erreicht hat und der consumer_buffer leer ist, dann werden die ersten 500 Daten des main_buffers in den consumer_buffer geladen und geprintet "...load 500 numbers". Danach wird start plotting aufgerufen. Andernfalls wird bei > 1500 Daten im main_buffer dieser gelöscht.

b) Starten des Worker Threads der die erste Nummer der consumer_buffers der Funktion processed_next_data übergiebt

Nachdem der consumer_buffer geladen wurde, wurde der Worker Thread gestartet und das Signal in diesem mit einer Funktion verbunden.

Im Worker Thread (Q Thread) wird das Signal data_processed definiert. In der run wird eine Schleife gesetzt die läuft solange Daten im consumer_buffer vorhanden sind. Zuerst wird die erste Zahl im consumer_buffer in eine Liste gespeichert (data). Diese Liste wird nun mit dem Signal data_processed-emit(data) übermittelt. Das Signal wurde in der Funktion start_plotting mit der Funktion processed_next_data verbunden.

c) Verarbeitung der Daten im process_next_data

Plotten des einzelnen weißen Punktes

Die übergebene Liste (data[0]) wird der Variable Value übergeben. Danach wird der Index errechnet (plot_index = self.current_index % 500). Nachdem der Index übermittelt wurde, wird der neue Datenpunkt in self.data in dem aktuellen Index geschrieben. Ist der Index zudem 0, wird nun die Startzeit per Zeitstempel festgehalten. Anschließend wird bei plot_item das Daten_Array self-data gesetzt und mit set.data die Daten in den Plot geschrieben.

Plotten der schwarzen Punkte

Nun wird die Anzahl der schwarzen Punkte definiert (hier 25). Anschließend wird der Index der schwarzen Punkte ermittelt und die letzten Daten werden ermittelt, dies sind die Daten, die aktuell auf den Index im array self.data liegen, der gerade ermittelt wurde. Danach werden die Daten mit dem last_indices und den Werten last_data über das Plot Item black_plot_item.setData schwarz geplottet.


    # Anzahl der schwarzen Datenpunkte
        num_black_points = 25

        if self.current_index >= num_black_points:
            last_indices = [(plot_index + i - 1) % 500 for i in range(num_black_points)]
            self.last_data = self.data[last_indices]
            self.black_plot_item.setData(last_indices, self.last_data)

        self.current_index += 1

Anschließend wird bei Index == 500 die Endzeit des Zyklus als Zeitstempel gesetzt und die Dauer des Zyklus ermittelt, zudem wird der Index auf 0 gesetzt und geprintet dass der consumer_buffer konsumiert wurde.

Ganzer Code

import pyqtgraph as pg
from PyQt6.QtWidgets import QMainWindow, QApplication
from PyQt6 import uic
from PyQt6.QtCore import pyqtSignal, QTimer, Qt, QThread
import time
import numpy as np

uiclass, baseclass = uic.loadUiType("pulse_live_window.ui")

class Worker(QThread):
    data_processed = pyqtSignal(list)

    def __init__(self, consumer_buffer):
        super().__init__()
        self.consumer_buffer = consumer_buffer

    def run(self):
        while self.consumer_buffer:
            data = [self.consumer_buffer.pop(0)]  # Datum in eine Liste verpacken
            self.data_processed.emit(data)
            time.sleep(0.0182)  # Konsistente Verzögerung

class PulseWindow(baseclass, uiclass):
    closed = pyqtSignal() 

    def __init__(self):
        super().__init__()
        self.setupUi(self)
        self.graphWidget.setXRange(0, 500)
        
        self.data = np.zeros(500)  # Initialisierung als 1D-ndarray
        self.last_data = np.zeros(15)  # Letzte 15 Datenpunkte

        self.plot_item = self.graphWidget.plot(pen='w')
        self.black_plot_item = self.graphWidget.plot(pen='k')  # Schwarzer Plot für die letzten 15 Daten
        self.main_buffer = []  
        self.consumer_buffer = []  
        self.current_index = 0
        self.start_time = None  # Start time initialisieren
        self.last_plot_time = None  # Zeit des letzten Plots

    def add_data(self, new_data):
        # Konvertiere new_data zu einem numpy Array
        new_data_array = np.array(new_data)

        # Berechne den Durchschnitt der neuen Daten
        mean_value = np.mean(new_data_array)

        # Subtrahiere den Durchschnitt von jedem Datenpunkt
        new_data_detrended = new_data_array - mean_value

        # Füge die bereinigten Daten dem main_buffer hinzu
        self.main_buffer.extend(new_data_detrended.tolist())

        print("Data received at", time.monotonic(), ":", new_data)
        print("Total number of data in main buffer:", len(self.main_buffer))

        if len(self.main_buffer) >= 500 and not self.consumer_buffer:
            self.consumer_buffer.extend(self.main_buffer[:500])
            del self.main_buffer[:500]  
            print("consumer_buffer load 500 numbers")
            self.start_plotting()

        if len(self.main_buffer) >= 1500:
            print("Main buffer exceeds 1500 values, clearing...")
            del self.main_buffer[:]

    def start_plotting(self):
        self.worker = Worker(self.consumer_buffer)
        self.worker.data_processed.connect(self.process_next_data)
        self.worker.start()

    def process_next_data(self, data):
        value = data[0]  # Konvertierung von list zu ndarray

        # Berechne den aktuellen Index im Ringpuffer
        plot_index = self.current_index % 500
        
        # Überschreibe das Daten-Array an der aktuellen Index-Position
        self.data[plot_index] = value

        if self.current_index == 0:
            self.start_time = time.monotonic()
            self.last_plot_time = self.start_time
            print(f"Consumer buffer processing started at: {self.start_time}")

        # Daten an den Plot übergeben
        self.plot_item.setData(self.data)

        # Anzahl der schwarzen Datenpunkte
        num_black_points = 25

        if self.current_index >= num_black_points:
            last_indices = [(plot_index + i - 1) % 500 for i in range(num_black_points)]
            self.last_data = self.data[last_indices]
            self.black_plot_item.setData(last_indices, self.last_data)

        self.current_index += 1

        if self.current_index == 500:
            end_time = time.monotonic()
            duration = end_time - self.start_time
            print(f"Consumer buffer processing ended at: {end_time}")
            print(f"Processing duration: {duration} seconds")
            print("consumer_buffer consumed")
            self.consumer_buffer = []  # Puffer leeren
            self.current_index = 0  # Index zurücksetzen

            time_diff = end_time - self.last_plot_time

            if time_diff < 0.02:
                time.sleep(0.02 - time_diff)

            print(f"Time between plots: {time_diff} seconds")

            self.last_plot_time = time.monotonic()

    def closeEvent(self, event):
        self.closed.emit()

if __name__ == "__main__":
    app = QApplication([])
    pulse_plotter_window = PulseWindow()
    pulse_plotter_window.showFullScreen()
    app.exec()

BUG schwarzer Strich läuft bei 500 im Ringpuffer über (fix mit min und max)

Der Bug: Es wurde ein Ringpuffer verwendet um die Index des schwarzen Striches zu berechnen. Wenn dieser aber (auf 25 eingestellt) so hoch gezählt wird, dass er bei -1 über 499 kommt, dann beginnt er wieder bei 0 und so schiebt sich der schwarze Strich am Ende des Durchlaufes noch an den Anfang des Plottes nach vorne, dieser Fehler wird durch diesen Code verursacht:

        # Anzahl der schwarzen Datenpunkte
        num_black_points = 25

        if self.current_index >= num_black_points:
            last_indices = [(plot_index + i - 1) % 500 for i in range(num_black_points)]
            self.last_data = self.data[last_indices]
            self.black_plot_item.setData(last_indices, self.last_data)

Dieser Code wird nun durch folgenden Code ersetzt:

  if self.current_index: #Hat keine Funktion, sieht nur übersichtlicher aus
            self.last_indices = [
                min(max(plot_index + i - 1, 0), 499) 
                for i in range(num_black_points)
            ]
            self.last_data = self.data[self.last_indices]
            self.black_plot_item.setData(self.last_indices, self.last_data)
        print("last_indices : ",self.last_indices)

Der Code sorgt dafür, dass ein Wert bei den errechneten Index 499 nicht überschreitet und 0 nicht unterschreitet. last_indices wird hier in der init mit self initalisiert.

max(plot_index + i - 1, 0):

max wird verwendet, um sicherzustellen, dass der Index nicht kleiner als 0 wird. Dies ist notwendig, da Indizes in Python-Listen nicht negativ sein können. Es bedeutet, dass der berechnete Index mindestens 0 ist (niedrigster gültiger Index).

min(..., 499):

min wird verwendet, um sicherzustellen, dass der Index nicht größer als 500 wird. Das bedeutet, dass der berechnete Index höchstens 500 ist (höchster gültiger Index in diesem Fall).

Genauere Erklärung:

max(plot_index + i - 1, 0)

max(a, b) gibt den größeren der beiden Werte zurück.

min(max(plot_index + i - 1, 0), 500)

min(a, b) gibt den kleineren der beiden Werte zurück.

Ganzer Code

import pyqtgraph as pg
from PyQt6.QtWidgets import QMainWindow, QApplication
from PyQt6 import uic
from PyQt6.QtCore import pyqtSignal, QTimer, Qt, QThread
import time
import numpy as np

uiclass, baseclass = uic.loadUiType("pulse_live_window.ui")

class Worker(QThread):
    data_processed = pyqtSignal(list)

    def __init__(self, consumer_buffer):
        super().__init__()
        self.consumer_buffer = consumer_buffer

    def run(self):
        while self.consumer_buffer:
            data = [self.consumer_buffer.pop(0)]  # Datum in eine Liste verpacken
            self.data_processed.emit(data)
            time.sleep(0.0182)  # Konsistente Verzögerung

class PulseWindow(baseclass, uiclass):
    closed = pyqtSignal() 

    def __init__(self):
        super().__init__()
        self.setupUi(self)
        self.graphWidget.setXRange(0, 500)
        
        self.data = np.zeros(500)  # Initialisierung als 1D-ndarray
        self.last_data = np.zeros(15)  # Letzte 15 Datenpunkte

        self.plot_item = self.graphWidget.plot(pen='w')
        self.black_plot_item = self.graphWidget.plot(pen='k')  # Schwarzer Plot für die letzten 15 Daten
        self.main_buffer = []  
        self.consumer_buffer = []  
        self.current_index = 0
        self.start_time = None  # Start time initialisieren
        self.last_plot_time = None  # Zeit des letzten Plots
        self.last_indices  = [] 

    def add_data(self, new_data):
        # Konvertiere new_data zu einem numpy Array
        new_data_array = np.array(new_data)

        # Berechne den Durchschnitt der neuen Daten
        mean_value = np.mean(new_data_array)

        # Subtrahiere den Durchschnitt von jedem Datenpunkt
        new_data_detrended = new_data_array - mean_value

        # Füge die bereinigten Daten dem main_buffer hinzu
        self.main_buffer.extend(new_data_detrended.tolist())

        print("Data received at", time.monotonic(), ":", new_data)
        print("Total number of data in main buffer:", len(self.main_buffer))

        if len(self.main_buffer) >= 500 and not self.consumer_buffer:
            self.consumer_buffer.extend(self.main_buffer[:500])
            del self.main_buffer[:500]  
            print("consumer_buffer load 500 numbers")
            self.start_plotting()

        if len(self.main_buffer) >= 1500:
            print("Main buffer exceeds 1500 values, clearing...")
            del self.main_buffer[:]

    def start_plotting(self):
        self.worker = Worker(self.consumer_buffer)
        self.worker.data_processed.connect(self.process_next_data)
        self.worker.start()

    def process_next_data(self, data):
        value = data[0]  # Konvertierung von list zu ndarray

        # Berechne den aktuellen Index im Ringpuffer
        plot_index = self.current_index % 500
        
        # Überschreibe das Daten-Array an der aktuellen Index-Position
        self.data[plot_index] = value

        if self.current_index == 0:
            self.start_time = time.monotonic()
            self.last_plot_time = self.start_time
            print(f"Consumer buffer processing started at: {self.start_time}")

        # Daten an den Plot übergeben
        self.plot_item.setData(self.data)

        # Anzahl der schwarzen Datenpunkte
        num_black_points = 25

        if self.current_index: #Hat keine Funktion, sieht nur übersichtlicher aus
            self.last_indices = [
                min(max(plot_index + i - 1, 0), 499) 
                for i in range(num_black_points)
            ]
            self.last_data = self.data[self.last_indices]
            self.black_plot_item.setData(self.last_indices, self.last_data)
        print("last_indices : ",self.last_indices)
        
        self.current_index += 1

        if self.current_index == 500:
            end_time = time.monotonic()
            duration = end_time - self.start_time
            print(f"Consumer buffer processing ended at: {end_time}")
            print(f"Processing duration: {duration} seconds")
            print("consumer_buffer consumed")
            self.consumer_buffer = []  # Puffer leeren
            self.current_index = 0  # Index zurücksetzen

            time_diff = end_time - self.last_plot_time

            if time_diff < 0.02:
                time.sleep(0.02 - time_diff)

            print(f"Time between plots: {time_diff} seconds")

            self.last_plot_time = time.monotonic()

    def closeEvent(self, event):
        self.closed.emit()

if __name__ == "__main__":
    app = QApplication([])
    pulse_plotter_window = PulseWindow()
    pulse_plotter_window.showFullScreen()
    app.exec()

Peakerkennung: Peaks werden bei jeden Array [100] ermittelt

Bild Plotter mit Peaks.png

import pyqtgraph as pg
from PyQt6.QtWidgets import QMainWindow, QApplication
from PyQt6 import uic
from PyQt6.QtCore import pyqtSignal, QThread
import time
import numpy as np
from scipy.signal import find_peaks

uiclass, baseclass = uic.loadUiType("pulse_live_window.ui")

class Worker(QThread):
    data_processed = pyqtSignal(list)

    def __init__(self, consumer_buffer):
        super().__init__()
        self.consumer_buffer = consumer_buffer

    def run(self):
        while self.consumer_buffer:
            data = [self.consumer_buffer.pop(0)]  # Datum in eine Liste verpacken
            self.data_processed.emit(data)
            time.sleep(0.0182)  # Konsistente Verzögerung

class PulseWindow(baseclass, uiclass):
    closed = pyqtSignal()

    def __init__(self):
        super().__init__()
        self.setupUi(self)
        self.graphWidget.setXRange(0, 500)
        
        self.data = np.zeros(500)  # Initialisierung als 1D-ndarray
        self.last_data = np.zeros(25)  # Letzte 25 Datenpunkte

        self.plot_item = self.graphWidget.plot(pen='w')
        self.black_plot_item = self.graphWidget.plot(pen='k')  # Schwarzer Plot für die letzten 25 Daten
        self.main_buffer = []  
        self.consumer_buffer = []  
        self.current_index = 0
        self.start_time = None  # Start time initialisieren
        self.last_plot_time = None  # Zeit des letzten Plots
        self.peak_plot_item = None  # Initialisierung für Peak-Plot-Item
        self.previous_peaks = []  # Vorherige Peaks
        self.previous_peak_values = []  # Vorherige Peak-Werte
        self.current_peaks = []  # Aktuelle Peaks
        self.current_peak_values = []  # Aktuelle Peak-Werte

        self.last_indices = [] 

    def add_data(self, new_data):
        # Konvertiere new_data zu einem numpy Array
        new_data_array = np.array(new_data)

        # Berechne den Durchschnitt der neuen Daten
        mean_value = np.mean(new_data_array)

        # Subtrahiere den Durchschnitt von jedem Datenpunkt
        new_data_detrended = new_data_array - mean_value

        # Füge die bereinigten Daten dem main_buffer hinzu
        self.main_buffer.extend(new_data_detrended.tolist())

        print("Data received at", time.monotonic(), ":", new_data)
        print("Total number of data in main buffer:", len(self.main_buffer))

        if len(self.main_buffer) >= 500 and not self.consumer_buffer:
            # Peaks vor dem Laden der 500 Daten in den Consumer Buffer ermitteln
            data_to_process = np.array(self.main_buffer[:500])
            peaks, _ = find_peaks(data_to_process, height=0.5)

            self.current_peaks = peaks.tolist()
            self.current_peak_values = data_to_process[peaks].tolist()

            self.consumer_buffer.extend(self.main_buffer[:500])
            del self.main_buffer[:500]  
            print("consumer_buffer load 500 numbers")
            self.start_plotting()

        if len(self.main_buffer) >= 1500:
            print("Main buffer exceeds 1500 values, clearing...")
            del self.main_buffer[:]

    def start_plotting(self):
        self.worker = Worker(self.consumer_buffer)
        self.worker.data_processed.connect(self.process_next_data)
        self.worker.start()

    def process_next_data(self, data):
        value = data[0]  # Konvertierung von list zu ndarray

        # Berechne den aktuellen Index im Ringpuffer
        plot_index = self.current_index % 500
        
        # Überschreibe das Daten-Array an der aktuellen Index-Position
        self.data[plot_index] = value

        if self.current_index == 0:
            self.start_time = time.monotonic()
            self.last_plot_time = self.start_time
            print(f"Consumer buffer processing started at: {self.start_time}")

        # Daten an den Plot übergeben
        self.plot_item.setData(self.data)

        # Anzahl der schwarzen Datenpunkte
        num_black_points = 25



        if self.current_index: #Hat keine Funktion, sieht nur übersichtlicher aus
            self.last_indices = [

                min(max(plot_index + i - 1, 0), 499) 
                for i in range(num_black_points)
                ]
        
            self.last_data = self.data[self.last_indices]
            self.black_plot_item.setData(self.last_indices, self.last_data)
            

            # Prüfen, ob das Peak-Plot-Item vorhanden ist und entfernen, wenn nötig
            if self.peak_plot_item:
                self.graphWidget.removeItem(self.peak_plot_item)
                self.peak_plot_item = None

            # Peaks plotten
            peak_x = [p for p in self.current_peaks if p <= self.current_index]
            peak_y = [self.data[p] for p in peak_x]

            if self.previous_peaks:
                prev_peak_x = [p for p in self.previous_peaks if p >= self.current_index + num_black_points]
                prev_peak_y = [self.data[p % 500] for p in prev_peak_x]
                peak_x.extend(prev_peak_x)
                peak_y.extend(prev_peak_y)

            self.peak_plot_item = self.graphWidget.plot(peak_x, peak_y, pen=None, symbol='o', symbolPen='r', symbolBrush='r', symbolSize=10)

        self.current_index += 1

        if self.current_index == 500:
            end_time = time.monotonic()
            duration = end_time - self.start_time
            print(f"Consumer buffer processing ended at: {end_time}")
            print(f"Processing duration: {duration} seconds")
            print("consumer_buffer consumed")
            self.consumer_buffer = []  # Puffer leeren

            # Speichern der aktuellen Peaks als vorherige Peaks
            self.previous_peaks = self.current_peaks
            self.previous_peak_values = self.current_peak_values
            self.current_peaks = []
            self.current_peak_values = []

            self.current_index = 0  # Index zurücksetzen

            time_diff = end_time - self.last_plot_time

            if time_diff < 0.02:
                time.sleep(0.02 - time_diff)

            print(f"Time between plots: {time_diff} seconds")

            self.last_plot_time = time.monotonic()

    def closeEvent(self, event):
        self.closed.emit()

if __name__ == "__main__":
    app = QApplication([])
    pulse_plotter_window = PulseWindow()
    pulse_plotter_window.showFullScreen()
    app.exec()