How to display an output from ZMQ to a Qt GUI in run time?

I am trying to take the output of the client of ‘ZeroMQ’ and display it through Qt designer python script. The code already receives data correctly but I can’t display it on the GUI window and receiving it. It only displays the first line. I am trying threading and ‘subprocess’ but it doesn’t work. i receive data well here in the function (consumer) in the class(Ui_Form)

class Ui_Form(object): def setupUi(self, Form):      def consumer(self):     consumer_id = random.randrange(1, 10005)     # print("I am consumer #%s" % (consumer_id))     context = zmq.Context()     consumer_receiver = context.socket(zmq.PULL)     consumer_receiver.connect("tcp://127.0.0.1:5557")     while True:         buff = consumer_receiver.recv()         # print(time.time())         data = np.frombuffer(buff, dtype="float32")         l = np.where(data == 0)         data = data[0:l[0][0]:1]         print(data)          bandwidth = []         signals = []         for i in range(0, len(data), 2):             signals.append(data[i])             bandwidth.append(data[i + 1])          for i in range(0, len(signals)):             self.textEdit.append(str(signals[i]) + "\n")             self.textEdit_2.append(str(bandwidth[i]) + "\n") 

and when i run the main:

if __name__ == "__main__":    app = QtWidgets.QApplication(sys.argv)    Form = QtWidgets.QWidget()    ui = Ui_Form()    ui.setupUi(Form)    Form.show()    ui.consumer()    sys.exit(app.exec_()) 

the date receive it correct but can’t display it .

Add Comment
1 Answer(s)

Explanation:

The cause of the problem is that the infinite loop is blocking the Qt eventloop preventing it from handling tasks like updating the gui, listening to OS events, etc.

Solution:

You must implement some logic that allows to communicate with ZMQ and does not block the eventloop, and for this there are several options (in the following code I show the example of the server of my test):

server.py

import random import time  import zmq  import numpy as np  import logging  logging.basicConfig(level=logging.DEBUG)  context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557")   while True:     d = []     start = random.randint(0, 10)     for i in range(10):         x = start + i         d.append(x)         d.append(x ** 2)     arr = np.array(d, dtype="float32")     buf = arr.tobytes()     logging.debug(buf)     socket.send(buf)     time.sleep(1) 

1. threading.Thread

import sys import threading  from PyQt5 import QtCore, QtWidgets import zmq import numpy as np   class ZMQReceiver(QtCore.QObject):     dataChanged = QtCore.pyqtSignal(bytes)      def start(self):         threading.Thread(target=self._execute, daemon=True).start()      def _execute(self):         context = zmq.Context()         consumer_receiver = context.socket(zmq.PULL)         consumer_receiver.connect("tcp://127.0.0.1:5557")         while True:             buff = consumer_receiver.recv()             self.dataChanged.emit(buff)   class Widget(QtWidgets.QWidget):     def __init__(self, parent=None):         super().__init__(parent)          self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)         self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)          lay = QtWidgets.QHBoxLayout(self)         lay.addWidget(self.logedit_1)         lay.addWidget(self.logedit_2)          zmq_receiver = ZMQReceiver(self)         zmq_receiver.dataChanged.connect(self.on_data_changed)         zmq_receiver.start()      @QtCore.pyqtSlot(bytes)     def on_data_changed(self, buff):         data = np.frombuffer(buff, dtype="float32")         bandwidth = []         signals = []         for i in range(0, len(data), 2):             signals.append(data[i])             bandwidth.append(data[i + 1])          for sg, bw in zip(signals, bandwidth):             self.logedit_1.append(str(sg))             self.logedit_2.append(str(bw))   if __name__ == "__main__":     app = QtWidgets.QApplication(sys.argv)     w = Widget()     w.show()     sys.exit(app.exec_()) 

2. QSocketNotifier

import sys  from PyQt5 import QtCore, QtWidgets import zmq import numpy as np   class Widget(QtWidgets.QWidget):     def __init__(self, parent=None):         super().__init__(parent)          self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)         self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)          lay = QtWidgets.QHBoxLayout(self)         lay.addWidget(self.logedit_1)         lay.addWidget(self.logedit_2)          context = zmq.Context()         self.consumer_receiver = context.socket(zmq.PULL)         self.consumer_receiver.connect("tcp://127.0.0.1:5556")          self.read_notifier = QtCore.QSocketNotifier(             self.consumer_receiver.getsockopt(zmq.FD), QtCore.QSocketNotifier.Read, self         )         self.read_notifier.activated.connect(self.on_read_msg)      @QtCore.pyqtSlot()     def on_read_msg(self):         self.read_notifier.setEnabled(False)         if self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:             while self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:                 buff = self.consumer_receiver.recv()                 data = np.frombuffer(buff, dtype="float32")                 bandwidth = []                 signals = []                 for i in range(0, len(data), 2):                     signals.append(data[i])                     bandwidth.append(data[i + 1])                  for sg, bw in zip(signals, bandwidth):                     self.logedit_1.append(str(sg))                     self.logedit_2.append(str(bw))          self.read_notifier.setEnabled(True)   if __name__ == "__main__":     app = QtWidgets.QApplication(sys.argv)     w = Widget()     w.show()     sys.exit(app.exec_()) 

3. asyncio

import sys import asyncio  from PyQt5 import QtCore, QtWidgets  from asyncqt import QEventLoop, asyncClose # from qasync import QEventLoop, asyncClose  import zmq from zmq.asyncio import Context, Poller  import numpy as np   class Widget(QtWidgets.QWidget):     def __init__(self, parent=None):         super().__init__(parent)          self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)         self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)          lay = QtWidgets.QHBoxLayout(self)         lay.addWidget(self.logedit_1)         lay.addWidget(self.logedit_2)          context = Context()         self.consumer_receiver = context.socket(zmq.PULL)         self.consumer_receiver.connect("tcp://127.0.0.1:5557")          self.poller = Poller()         self.poller.register(self.consumer_receiver, zmq.POLLIN)      async def start_consumer(self):         while True:             events = await self.poller.poll()             if self.consumer_receiver in dict(events):                 buff = await self.consumer_receiver.recv()                 data = np.frombuffer(buff, dtype="float32")                 bandwidth = []                 signals = []                 for i in range(0, len(data), 2):                     signals.append(data[i])                     bandwidth.append(data[i + 1])                  for sg, bw in zip(signals, bandwidth):                     self.logedit_1.append(str(sg))                     self.logedit_2.append(str(bw))      @asyncClose     async def closeEvent(self, event):         self.consumer_receiver.close()   if __name__ == "__main__":     app = QtWidgets.QApplication(sys.argv)     loop = QEventLoop(app)     asyncio.set_event_loop(loop)     w = Widget()     w.show()     try:         loop.run_until_complete(w.start_consumer())     except asyncio.CancelledError:         print("start_consumer is cancelled now")     finally:         loop.close() 
Add Comment

Your Answer

By posting your answer, you agree to the privacy policy and terms of service.