Understanding ZMQ Blocks: Difference between revisions

From GNU Radio
Jump to navigation Jump to search
(add Python PUSH / PULL example)
(8 intermediate revisions by the same user not shown)
Line 1: Line 1:
<!-- Understanding_ZMQ_Blocks.mediawiki -->
<!-- Understanding_ZMQ_Blocks.mediawiki -->
<b>This tutorial is under construction.</b>
This tutorial presents the GNU Radio ZMQ blocks. It is a set of six Source Blocks and six Sink Blocks. The naming convention follows other source and sink blocks in that a source block provides data entering a GNU Radio flowgraph and a sink block sends data out of the flowgraph. It is a flowgraph-oriented perspective.
This tutorial presents the GNU Radio ZMQ blocks. It is a set of six Source Blocks and six Sink Blocks. The naming convention follows other source and sink blocks in that a source block provides data entering a GNU Radio flowgraph and a sink block sends data out of the flowgraph. It is a flowgraph-oriented perspective.


Line 35: Line 33:
== Using ZMQ Blocks ==
== Using ZMQ Blocks ==


The cases described below explain the differences in the block addressing. To conform to port addressing defined by the [https://www.iana.org/ Internet Assigned Numbers Authority (IANA)], private ports can be assigned in the range 49152–65535.
The cases described below explain the differences in the block addressing. To conform to port addressing defined by the [https://www.iana.org/ Internet Assigned Numbers Authority (IANA)], private ports can be assigned in the range 49152–65535. Within a single flowgraph there is no point in using ZMQ blocks. Using [[Virtual_Source]] and [[Virtual_Sink]] blocks is much more efficient.


=== Separate GR flowgraphs on Same Computer ===
=== Separate GR flowgraphs on Same Computer ===
When the ZMQ blocks are in separate flowgraphs but on the same computer, the IP address should be <code>127.0.0.1</code> for localhost. It has less overhead than a full IP.
These flowgraphs, using the PUB / SUB pair, are taken from [[Simulation_example:_AM_transmitter_and_receiver]].
[[File:AM_transmit_fg.png|800px]]
<hr>
[[File:AM_receive_fg.png|800px]]


=== Separate GR flowgraphs on Different Computers ===
=== Separate GR flowgraphs on Different Computers ===


=== Python Program as Destination of ZMQ Block ===
If the Source and Sink blocks are on two different computers, then the IP and port number of the Sink block must be specified on each end of that connection. For example, if the Sink is on IP <code>192.168.1.194:50241</code> and the Source is on IP <code>192.168.1.85</code>, both Source and Sink blocks must specify the Sink IP and port <code>192.168.1.194:50241</code>.
 
[[File:ZMQ_PUSH_msg_test_fg.png]]
<hr>
[[File:PULL_msg_test_fg.png]]
 
=== Python Program as a REQ / REP Server ===
 
The following Python program receives a string message on its REQ socket, capitalizes the text, and sends the string on its REP socket. The terminology gets confusing here because the incoming REQ came from a GR [[ZMQ_REP_Message_Sink]] and is returned to a [[ZMQ_REQ_Message_Source]].
 
<pre>
#!/usr/bin/python3
# -*- coding: utf-8 -*-
 
# zmq_REQ_REP_server.py
 
# This server program capitalizes received strings and returns them.
# NOTES:
#  1) To comply with the GNU Radio view, messages are received on the REQ socket and sent on the REP socket.
#  2) The REQ and REP messages must be on separate port numbers.
 
import pmt
import zmq
 
_debug = 0          # set to zero to turn off diagnostics
 
# create a REQ socket
_PROTOCOL = "tcp://"
_SERVER = "127.0.0.1"          # localhost
_REQ_PORT = ":50246"
_REQ_ADDR = _PROTOCOL + _SERVER + _REQ_PORT
if (_debug):
    print ("'zmq_REQ_REP_server' version 20056.1 connecting to:", _REQ_ADDR)
req_context = zmq.Context()
if (_debug):
    assert (req_context)
req_sock = req_context.socket (zmq.REQ)
if (_debug):
    assert (req_sock)
rc = req_sock.connect (_REQ_ADDR)
if (_debug):
    assert (rc == None)
 
# create a REP socket
_PROTOCOL = "tcp://"
_SERVER = "127.0.0.1"          # localhost
_REP_PORT = ":50247"
_REP_ADDR = _PROTOCOL + _SERVER + _REP_PORT
if (_debug):
    print ("'zmq_REQ_REP_server' version 20056.1 binding to:", _REP_ADDR)
rep_context = zmq.Context()
if (_debug):
    assert (rep_context)
rep_sock = rep_context.socket (zmq.REP)
if (_debug):
    assert (rep_sock)
rc = rep_sock.bind (_REP_ADDR)
if (_debug):
    assert (rc == None)
 
while True:
    #  Wait for next request from client
    data = req_sock.recv()
    message = pmt.to_python(pmt.deserialize_str(data))
    print("Received request: %s" % message)
 
    output = message.upper()
 
    #  Send reply back to client
    rep_sock.send (pmt.serialize_str(pmt.to_pmt(output)))
</pre>
 
=== Python Program as a PUSH / PULL Server ===
 
Similar to the example above, the following Python program receives a string message on its ZMQ PULL socket, capitalizes the text, and returns the string on its ZMQ PUSH socket.
 
<pre>
#!/usr/bin/python3
# -*- coding: utf-8 -*-
 
# zmq_PUSH_PULL_server.py
 
import sys
import pmt
import zmq
 
_debug = 0          # set to zero to turn off diagnostics
 
# create a PUSH socket
_PROTOCOL = "tcp://"
_SERVER = "127.0.0.1"          # localhost
_PUSH_PORT = ":50252"
_PUSH_ADDR = _PROTOCOL + _SERVER + _PUSH_PORT
if (_debug):
    print ("'zmq_PUSH_PULL_server' version 20068.1 binding to:", _PUSH_ADDR)
push_context = zmq.Context()
if (_debug):
    assert (push_context)
push_sock = push_context.socket (zmq.PUSH)
if (_debug):
    assert (push_sock)
rc = push_sock.bind (_PUSH_ADDR)
if (_debug):
    assert (rc == None)
 
# create a PULL socket
_PROTOCOL = "tcp://"
_SERVER = "127.0.0.1"          # localhost
_PULL_PORT = ":50251"
_PULL_ADDR = _PROTOCOL + _SERVER + _PULL_PORT
if (_debug):
    print ("'zmq_PUSH_PULL_server' connecting to:", _PULL_ADDR)
pull_context = zmq.Context()
if (_debug):
    assert (pull_context)
pull_sock = pull_context.socket (zmq.PULL)
if (_debug):
    assert (pull_sock)
rc = pull_sock.connect (_PULL_ADDR)
if (_debug):
    assert (rc == None)
 
while True:
    #  Wait for next request from client
    data = pull_sock.recv()
    message = pmt.to_python(pmt.deserialize_str(data))
    # print("Received request: %s" % message)
 
    output = message.upper()    # capitalize message
 
    #  Send reply back to client
    push_sock.send (pmt.serialize_str(pmt.to_pmt(output)))
 
</pre>
 
=== Python Program to Process Flowgraph Data ===
 
Here's the code to do GNU Radio --> Python over ZMQ PUB/SUB, which is by far the most useful case. Often you use GNU Radio for the signal processing but then at some point you want the resulting stream to go to a regular Python app.  PUB/SUB is just so you can have multiple apps getting the stream. In the [[ZMQ_PUB_Sink]] you can switch <code>*</code> to <code>127.0.0.1</code>, but with <code>*</code>, any device on the LAN would be able to see it. It's essentially broadcasting to any interface, not just the loopback.
 
An example flowgraph:
 
[[File:ZMQ_data_PUB_fg.png]]
<hr>
 
and the Python code:
 
<pre>
#!/usr/bin/python3
# -*- coding: utf-8 -*-
 
# zmq_SUB_proc.py
# Author: Marc Lichtman
 
import zmq
import numpy as np
import time
import matplotlib.pyplot as plt
 
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:55555") # connect, not bind, the PUB will bind, only 1 can bind
socket.setsockopt(zmq.SUBSCRIBE, b'') # subscribe to topic of all (needed or else it won't work)


=== Python Program as Source to ZMQ Block ===
while True:
    if socket.poll(10) != 0: # check if there is a message on the socket
        msg = socket.recv() # grab the message
        print(len(msg)) # size of msg
        data = np.frombuffer(msg, dtype=np.complex64, count=-1) # make sure to use correct data type (complex64 or float32); '-1' means read all data in the buffer
        print(data[0:10])
        # plt.plot(np.real(data))
        # plt.plot(np.imag(data))
        # plt.show()
    else:
        time.sleep(0.1) # wait 100ms and try again
</pre>

Revision as of 17:54, 15 November 2021

This tutorial presents the GNU Radio ZMQ blocks. It is a set of six Source Blocks and six Sink Blocks. The naming convention follows other source and sink blocks in that a source block provides data entering a GNU Radio flowgraph and a sink block sends data out of the flowgraph. It is a flowgraph-oriented perspective.

From the ZeroMQ website: "ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast."

Prerequisites

Types of ZMQ Blocks

For GNU Radio, the two basic groups of ZMQ blocks are those which transport stream data, and those which transport messages. They are described below.

ZMQ blocks come in pairs:

  • PUB - SUB
  • PUSH - PULL
  • REQ - REP

The PUB, PUSH, and REP blocks are always sink blocks; the others are source blocks. Choosing which pair to use depends on your system architecture.

  • The PUB - SUB pair can be compared to broadcasting. The PUBlish sink sends out data which can be received by one or more SUBscribers.
  • The PUSH - PULL pair is a point to point link of equal peers.
  • The REQ - REP pair is a point to point link which operates in lock-step: one REQuest input gives one REPly output. This case changes the perspective somewhat in that the flowgraph is acting as a server for a remote client.

Data Blocks

ZMQ data blocks transport raw stream data; there is no formatting. The data type and the sample rate are determined by the flowgraph feeding the ZMQ Sink. Therefore the flowgraph or program receiving the data must know those parameters in order to interpret the data correctly.

Message Blocks

Unlike the generic ZeroMQ strings, GNU Radio ZMQ Message Blocks utilize Polymorphic_Types_(PMTs) to encode/decode the data. See also Message_Passing.

Using ZMQ Blocks

The cases described below explain the differences in the block addressing. To conform to port addressing defined by the Internet Assigned Numbers Authority (IANA), private ports can be assigned in the range 49152–65535. Within a single flowgraph there is no point in using ZMQ blocks. Using Virtual_Source and Virtual_Sink blocks is much more efficient.

Separate GR flowgraphs on Same Computer

When the ZMQ blocks are in separate flowgraphs but on the same computer, the IP address should be 127.0.0.1 for localhost. It has less overhead than a full IP.

These flowgraphs, using the PUB / SUB pair, are taken from Simulation_example:_AM_transmitter_and_receiver.

AM transmit fg.png


AM receive fg.png

Separate GR flowgraphs on Different Computers

If the Source and Sink blocks are on two different computers, then the IP and port number of the Sink block must be specified on each end of that connection. For example, if the Sink is on IP 192.168.1.194:50241 and the Source is on IP 192.168.1.85, both Source and Sink blocks must specify the Sink IP and port 192.168.1.194:50241.

ZMQ PUSH msg test fg.png


PULL msg test fg.png

Python Program as a REQ / REP Server

The following Python program receives a string message on its REQ socket, capitalizes the text, and sends the string on its REP socket. The terminology gets confusing here because the incoming REQ came from a GR ZMQ_REP_Message_Sink and is returned to a ZMQ_REQ_Message_Source.

#!/usr/bin/python3
# -*- coding: utf-8 -*-

# zmq_REQ_REP_server.py

# This server program capitalizes received strings and returns them.
# NOTES:
#   1) To comply with the GNU Radio view, messages are received on the REQ socket and sent on the REP socket.
#   2) The REQ and REP messages must be on separate port numbers.

import pmt
import zmq

_debug = 0          # set to zero to turn off diagnostics

# create a REQ socket
_PROTOCOL = "tcp://"
_SERVER = "127.0.0.1"          # localhost
_REQ_PORT = ":50246"
_REQ_ADDR = _PROTOCOL + _SERVER + _REQ_PORT
if (_debug):
    print ("'zmq_REQ_REP_server' version 20056.1 connecting to:", _REQ_ADDR)
req_context = zmq.Context()
if (_debug):
    assert (req_context)
req_sock = req_context.socket (zmq.REQ)
if (_debug):
    assert (req_sock)
rc = req_sock.connect (_REQ_ADDR)
if (_debug):
    assert (rc == None)

# create a REP socket
_PROTOCOL = "tcp://"
_SERVER = "127.0.0.1"          # localhost
_REP_PORT = ":50247"
_REP_ADDR = _PROTOCOL + _SERVER + _REP_PORT
if (_debug):
    print ("'zmq_REQ_REP_server' version 20056.1 binding to:", _REP_ADDR)
rep_context = zmq.Context()
if (_debug):
    assert (rep_context)
rep_sock = rep_context.socket (zmq.REP)
if (_debug):
    assert (rep_sock)
rc = rep_sock.bind (_REP_ADDR)
if (_debug):
    assert (rc == None)

while True:
    #  Wait for next request from client
    data = req_sock.recv()
    message = pmt.to_python(pmt.deserialize_str(data))
    print("Received request: %s" % message)

    output = message.upper()

    #  Send reply back to client
    rep_sock.send (pmt.serialize_str(pmt.to_pmt(output)))

Python Program as a PUSH / PULL Server

Similar to the example above, the following Python program receives a string message on its ZMQ PULL socket, capitalizes the text, and returns the string on its ZMQ PUSH socket.

#!/usr/bin/python3
# -*- coding: utf-8 -*-

# zmq_PUSH_PULL_server.py

import sys
import pmt
import zmq

_debug = 0          # set to zero to turn off diagnostics

# create a PUSH socket
_PROTOCOL = "tcp://"
_SERVER = "127.0.0.1"          # localhost
_PUSH_PORT = ":50252"
_PUSH_ADDR = _PROTOCOL + _SERVER + _PUSH_PORT
if (_debug):
    print ("'zmq_PUSH_PULL_server' version 20068.1 binding to:", _PUSH_ADDR)
push_context = zmq.Context()
if (_debug):
    assert (push_context)
push_sock = push_context.socket (zmq.PUSH)
if (_debug):
    assert (push_sock)
rc = push_sock.bind (_PUSH_ADDR)
if (_debug):
    assert (rc == None)

# create a PULL socket
_PROTOCOL = "tcp://"
_SERVER = "127.0.0.1"          # localhost
_PULL_PORT = ":50251"
_PULL_ADDR = _PROTOCOL + _SERVER + _PULL_PORT
if (_debug):
    print ("'zmq_PUSH_PULL_server' connecting to:", _PULL_ADDR)
pull_context = zmq.Context()
if (_debug):
    assert (pull_context)
pull_sock = pull_context.socket (zmq.PULL)
if (_debug):
    assert (pull_sock)
rc = pull_sock.connect (_PULL_ADDR)
if (_debug):
    assert (rc == None)

while True:
    #  Wait for next request from client
    data = pull_sock.recv()
    message = pmt.to_python(pmt.deserialize_str(data))
    # print("Received request: %s" % message)

    output = message.upper()    # capitalize message

    #  Send reply back to client
    push_sock.send (pmt.serialize_str(pmt.to_pmt(output)))

Python Program to Process Flowgraph Data

Here's the code to do GNU Radio --> Python over ZMQ PUB/SUB, which is by far the most useful case. Often you use GNU Radio for the signal processing but then at some point you want the resulting stream to go to a regular Python app. PUB/SUB is just so you can have multiple apps getting the stream. In the ZMQ_PUB_Sink you can switch * to 127.0.0.1, but with *, any device on the LAN would be able to see it. It's essentially broadcasting to any interface, not just the loopback.

An example flowgraph:

ZMQ data PUB fg.png


and the Python code:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

# zmq_SUB_proc.py
# Author: Marc Lichtman

import zmq
import numpy as np
import time
import matplotlib.pyplot as plt

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:55555") # connect, not bind, the PUB will bind, only 1 can bind
socket.setsockopt(zmq.SUBSCRIBE, b'') # subscribe to topic of all (needed or else it won't work)

while True:
    if socket.poll(10) != 0: # check if there is a message on the socket
        msg = socket.recv() # grab the message
        print(len(msg)) # size of msg
        data = np.frombuffer(msg, dtype=np.complex64, count=-1) # make sure to use correct data type (complex64 or float32); '-1' means read all data in the buffer
        print(data[0:10])
        # plt.plot(np.real(data))
        # plt.plot(np.imag(data))
        # plt.show()
    else:
        time.sleep(0.1) # wait 100ms and try again