How to display an output from ZMQ to a Qt GUI in run time?
I am trying to take the output of the client of ‘ZeroMQ’ and display it through Qt designer python script. The code already receives data correctly but I can’t display it on the GUI window and receiving it. It only displays the first line. I am trying threading and ‘subprocess’ but it doesn’t work. i receive data well here in the function (consumer) in the class(Ui_Form)
class Ui_Form(object): def setupUi(self, Form): def consumer(self): consumer_id = random.randrange(1, 10005) # print("I am consumer #%s" % (consumer_id)) context = zmq.Context() consumer_receiver = context.socket(zmq.PULL) consumer_receiver.connect("tcp://127.0.0.1:5557") while True: buff = consumer_receiver.recv() # print(time.time()) data = np.frombuffer(buff, dtype="float32") l = np.where(data == 0) data = data[0:l[0][0]:1] print(data) bandwidth = [] signals = [] for i in range(0, len(data), 2): signals.append(data[i]) bandwidth.append(data[i + 1]) for i in range(0, len(signals)): self.textEdit.append(str(signals[i]) + "\n") self.textEdit_2.append(str(bandwidth[i]) + "\n")
and when i run the main:
if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) Form = QtWidgets.QWidget() ui = Ui_Form() ui.setupUi(Form) Form.show() ui.consumer() sys.exit(app.exec_())
the date receive it correct but can’t display it .
Explanation:
The cause of the problem is that the infinite loop is blocking the Qt eventloop preventing it from handling tasks like updating the gui, listening to OS events, etc.
Solution:
You must implement some logic that allows to communicate with ZMQ and does not block the eventloop, and for this there are several options (in the following code I show the example of the server of my test):
server.py
import random import time import zmq import numpy as np import logging logging.basicConfig(level=logging.DEBUG) context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557") while True: d = [] start = random.randint(0, 10) for i in range(10): x = start + i d.append(x) d.append(x ** 2) arr = np.array(d, dtype="float32") buf = arr.tobytes() logging.debug(buf) socket.send(buf) time.sleep(1)
1. threading.Thread
import sys import threading from PyQt5 import QtCore, QtWidgets import zmq import numpy as np class ZMQReceiver(QtCore.QObject): dataChanged = QtCore.pyqtSignal(bytes) def start(self): threading.Thread(target=self._execute, daemon=True).start() def _execute(self): context = zmq.Context() consumer_receiver = context.socket(zmq.PULL) consumer_receiver.connect("tcp://127.0.0.1:5557") while True: buff = consumer_receiver.recv() self.dataChanged.emit(buff) class Widget(QtWidgets.QWidget): def __init__(self, parent=None): super().__init__(parent) self.logedit_1 = QtWidgets.QTextEdit(readOnly=True) self.logedit_2 = QtWidgets.QTextEdit(readOnly=True) lay = QtWidgets.QHBoxLayout(self) lay.addWidget(self.logedit_1) lay.addWidget(self.logedit_2) zmq_receiver = ZMQReceiver(self) zmq_receiver.dataChanged.connect(self.on_data_changed) zmq_receiver.start() @QtCore.pyqtSlot(bytes) def on_data_changed(self, buff): data = np.frombuffer(buff, dtype="float32") bandwidth = [] signals = [] for i in range(0, len(data), 2): signals.append(data[i]) bandwidth.append(data[i + 1]) for sg, bw in zip(signals, bandwidth): self.logedit_1.append(str(sg)) self.logedit_2.append(str(bw)) if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) w = Widget() w.show() sys.exit(app.exec_())
2. QSocketNotifier
import sys from PyQt5 import QtCore, QtWidgets import zmq import numpy as np class Widget(QtWidgets.QWidget): def __init__(self, parent=None): super().__init__(parent) self.logedit_1 = QtWidgets.QTextEdit(readOnly=True) self.logedit_2 = QtWidgets.QTextEdit(readOnly=True) lay = QtWidgets.QHBoxLayout(self) lay.addWidget(self.logedit_1) lay.addWidget(self.logedit_2) context = zmq.Context() self.consumer_receiver = context.socket(zmq.PULL) self.consumer_receiver.connect("tcp://127.0.0.1:5556") self.read_notifier = QtCore.QSocketNotifier( self.consumer_receiver.getsockopt(zmq.FD), QtCore.QSocketNotifier.Read, self ) self.read_notifier.activated.connect(self.on_read_msg) @QtCore.pyqtSlot() def on_read_msg(self): self.read_notifier.setEnabled(False) if self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN: while self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN: buff = self.consumer_receiver.recv() data = np.frombuffer(buff, dtype="float32") bandwidth = [] signals = [] for i in range(0, len(data), 2): signals.append(data[i]) bandwidth.append(data[i + 1]) for sg, bw in zip(signals, bandwidth): self.logedit_1.append(str(sg)) self.logedit_2.append(str(bw)) self.read_notifier.setEnabled(True) if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) w = Widget() w.show() sys.exit(app.exec_())
3. asyncio
import sys import asyncio from PyQt5 import QtCore, QtWidgets from asyncqt import QEventLoop, asyncClose # from qasync import QEventLoop, asyncClose import zmq from zmq.asyncio import Context, Poller import numpy as np class Widget(QtWidgets.QWidget): def __init__(self, parent=None): super().__init__(parent) self.logedit_1 = QtWidgets.QTextEdit(readOnly=True) self.logedit_2 = QtWidgets.QTextEdit(readOnly=True) lay = QtWidgets.QHBoxLayout(self) lay.addWidget(self.logedit_1) lay.addWidget(self.logedit_2) context = Context() self.consumer_receiver = context.socket(zmq.PULL) self.consumer_receiver.connect("tcp://127.0.0.1:5557") self.poller = Poller() self.poller.register(self.consumer_receiver, zmq.POLLIN) async def start_consumer(self): while True: events = await self.poller.poll() if self.consumer_receiver in dict(events): buff = await self.consumer_receiver.recv() data = np.frombuffer(buff, dtype="float32") bandwidth = [] signals = [] for i in range(0, len(data), 2): signals.append(data[i]) bandwidth.append(data[i + 1]) for sg, bw in zip(signals, bandwidth): self.logedit_1.append(str(sg)) self.logedit_2.append(str(bw)) @asyncClose async def closeEvent(self, event): self.consumer_receiver.close() if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) loop = QEventLoop(app) asyncio.set_event_loop(loop) w = Widget() w.show() try: loop.run_until_complete(w.start_consumer()) except asyncio.CancelledError: print("start_consumer is cancelled now") finally: loop.close()