Understanding ZMQ Blocks: Difference between revisions
m (fix link syntax) |
|||
(8 intermediate revisions by 2 users not shown) | |||
Line 1: | Line 1: | ||
<!-- Understanding_ZMQ_Blocks.mediawiki --> | <!-- Understanding_ZMQ_Blocks.mediawiki --> | ||
This tutorial presents the GNU Radio ZMQ blocks. It is a set of six Source Blocks and six Sink Blocks. The naming convention follows other source and sink blocks in that a source block provides data entering a GNU Radio flowgraph and a sink block sends data out of the flowgraph. It is a flowgraph-oriented perspective. | This tutorial presents the GNU Radio ZMQ blocks. It is a set of six Source Blocks and six Sink Blocks. The naming convention follows other source and sink blocks in that a source block provides data entering a GNU Radio flowgraph and a sink block sends data out of the flowgraph. It is a flowgraph-oriented perspective. | ||
Line 35: | Line 33: | ||
== Using ZMQ Blocks == | == Using ZMQ Blocks == | ||
The cases described below explain the differences in the block addressing. To conform to port addressing defined by the [https://www.iana.org/ Internet Assigned Numbers Authority (IANA)], private ports can be assigned in the range 49152–65535. | Users of ZMQ blocks are expected to have some familiarity with ZeroMQ. In particular, one should be cognizant of the differences between ZMQ sockets and BSD sockets. See the [https://zeromq.org/socket-api/ ZMQ Socket API] for an overview. | ||
ZMQ blocks use '''endpoints''' to describe how ZMQ should pass the data. While the most common endpoint uses TCP to transfer the data, other protocols are possible. See [http://api.zeromq.org/master:zmq_tcp zmq_tcp] and [http://api.zeromq.org/master:zmq-ipc zmq_ipc] for a description of how each protocol defines endpoints. | |||
The cases described below explain the differences in the block addressing. To conform to port addressing defined by the [https://www.iana.org/ Internet Assigned Numbers Authority (IANA)], private ports can be assigned in the range 49152–65535. Within a single flowgraph using ZMQ blocks is not recommended, since [[Virtual_Source]] and [[Virtual_Sink]] blocks are much more efficient. | |||
=== TCP Bind vs Connect === | |||
Some users might be tempted to connect directly to GNU Radio ZMQ Blocks. While this is possible, some caution is needed. First be aware that in any topology, there must be '''exactly one''' <code>bind</code> to a given endpoint, while there may be multiple <code>connect</code>s to the same endpoint. In GNU Radio, stream '''sinks''' <code>bind</code> and stream '''sources''' <code>connect</code>. Message blocks accept a parameter that specify whether the block should <code>bind</code> or <code>connect</code>. | |||
Also be aware that the semantics of TCP endpoints vary between <code>bind</code> and <code>connect</code>. | |||
==== TCP Bind ==== | |||
When binding a TCP endpoint, you specify where you will listen for connections. If you specify an IP address, you are telling the socket to only accept connections on the network associated with that address (e.g., <code>127.0.0.1</code> or <code>192.168.1.123</code>). In some cases, you may want to listen on all networks connected to your node. For GNU Radio, you should use <code>0.0.0.0</code> as the wildcard address; although ZMQ does accept <code>*</code> as a wildcard, it doesn't work well in all cases. So, you may choose to <code>bind</code> to <code>tcp://0.0.0.0:54321</code>. | |||
Be aware that if you don't enter an IP address, <code>bind</code> treats the value as a network adapter name (e.g., <code>eth0</code>). See [http://api.zeromq.org/master:zmq_tcp zmq_tcp]. '''Bind to <code>tcp://localhost:1234</code> will not do what you think!''' Use either <code>tcp://127.0.0.1:1234</code> or <code>tcp://lo:1234</code> (for the loopback network adapter). | |||
==== TCP Connect ==== | |||
When connecting a TCP endpoint, you specify the remote endpoint you want to connect to. You can specify either an IP address or a DNS resolvable name. This difference in semantics between ''connect'' and ''bind'' is confusing, but must be honored, in order to have your flowgraph communicate as you expect. The simplest solution is to use IP addresses everywhere, but that is not an option in some configurations (e.g., deploying into a Kubernetes cluster). | |||
=== Wire Format === | |||
The ZMQ stream blocks have the option to pass tags. In addition, the PUB/SUB blocks support filtering. Both of these options affect the ZMQ wire protocol. | |||
When a filter string is supplied to a PUB/SUB block, GNU Radio uses [https://zeromq.org/messages/ multi-part messages] to send the filter string, followed by the payload. Non-GNU Radio code attempting to interface with GNU Radio ZMQ blocks must be prepared for this part, and discard it. Note that the sender only sends this message part if a non-empty filter has been specified. | |||
Next, if sending tags is enabled, any tags within the window of the data to be sent are encoded in a special format and ''prepended'' to the payload data. If tags are not enabled, this header is elided. | |||
These two features make matching the sender configuration to the receiver configuration essential. Failure to do so will cause runtime errors in your flowgraph. | |||
== Examples == | |||
=== Separate GR flowgraphs on Same Computer === | === Separate GR flowgraphs on Same Computer === | ||
When the ZMQ blocks are in separate flowgraphs but on the same computer, the IP address should be | When the ZMQ blocks are in separate flowgraphs but on the same computer, the IP address should be <code>127.0.0.1</code> for localhost. It has less overhead than a full IP. | ||
These flowgraphs, using the PUB / SUB pair, are taken from [[Simulation_example:_AM_transmitter_and_receiver]]. | These flowgraphs, using the PUB / SUB pair, are taken from [[Simulation_example:_AM_transmitter_and_receiver]]. | ||
Line 51: | Line 78: | ||
=== Separate GR flowgraphs on Different Computers === | === Separate GR flowgraphs on Different Computers === | ||
If the Source and Sink blocks are on two different computers, then the IP and port number of the Sink block must be specified on each end of that connection. For example, if the Sink is on IP 192.168.1.194:50241 and the Source is on IP 192.168.1.85, both Source and Sink blocks must specify the Sink IP and port | If the Source and Sink blocks are on two different computers, then the IP and port number of the Sink block must be specified on each end of that connection. For example, if the Sink is on IP <code>192.168.1.194:50241</code> and the Source is on IP <code>192.168.1.85</code>, both Source and Sink blocks must specify the Sink IP and port <code>192.168.1.194:50241</code>. | ||
[[File:ZMQ_PUSH_msg_test_fg.png]] | [[File:ZMQ_PUSH_msg_test_fg.png]] | ||
Line 57: | Line 84: | ||
[[File:PULL_msg_test_fg.png]] | [[File:PULL_msg_test_fg.png]] | ||
=== Python Program as | === Python Program as a REQ / REP Server === | ||
The following Python program receives a string message on its '''REQ'''uest socket, capitalizes the text, and sends the string on its '''REP'''ly socket. The terminology gets confusing here because the incoming REQ came from a GR [[ZMQ_REP_Message_Sink]] and is returned to a [[ZMQ_REQ_Message_Source]]. Just remember that a '''sink''' is the terminating point of a flowgraph (swallows all data) and a '''source''' is the origin of a flowgraph (produces or ingests data). | |||
<pre> | |||
#!/usr/bin/python3 | |||
# -*- coding: utf-8 -*- | |||
# zmq_REQ_REP_server.py | |||
# This server program capitalizes received strings and returns them. | |||
# NOTES: | |||
# 1) To comply with the GNU Radio view, messages are received on the REQ socket and sent on the REP socket. | |||
# 2) The REQ and REP messages must be on separate port numbers. | |||
import pmt | |||
import zmq | |||
_debug = 0 # set to zero to turn off diagnostics | |||
# create a REQ socket | |||
_PROTOCOL = "tcp://" | |||
_SERVER = "127.0.0.1" # localhost | |||
_REQ_PORT = ":50246" | |||
_REQ_ADDR = _PROTOCOL + _SERVER + _REQ_PORT | |||
if (_debug): | |||
print ("'zmq_REQ_REP_server' version 20056.1 connecting to:", _REQ_ADDR) | |||
req_context = zmq.Context() | |||
if (_debug): | |||
assert (req_context) | |||
req_sock = req_context.socket (zmq.REQ) | |||
if (_debug): | |||
assert (req_sock) | |||
rc = req_sock.connect (_REQ_ADDR) | |||
if (_debug): | |||
assert (rc == None) | |||
# create a REP socket | |||
_PROTOCOL = "tcp://" | |||
_SERVER = "127.0.0.1" # localhost | |||
_REP_PORT = ":50247" | |||
_REP_ADDR = _PROTOCOL + _SERVER + _REP_PORT | |||
if (_debug): | |||
print ("'zmq_REQ_REP_server' version 20056.1 binding to:", _REP_ADDR) | |||
rep_context = zmq.Context() | |||
if (_debug): | |||
assert (rep_context) | |||
rep_sock = rep_context.socket (zmq.REP) | |||
if (_debug): | |||
assert (rep_sock) | |||
rc = rep_sock.bind (_REP_ADDR) | |||
if (_debug): | |||
assert (rc == None) | |||
while True: | |||
# Wait for next request from client | |||
data = req_sock.recv() | |||
message = pmt.to_python(pmt.deserialize_str(data)) | |||
print("Received request: %s" % message) | |||
output = message.upper() | |||
# Send reply back to client | |||
rep_sock.send (pmt.serialize_str(pmt.to_pmt(output))) | |||
</pre> | |||
=== Python Program as a PUSH / PULL Server === | |||
Similar to the example above, the following Python program receives a string message on its ZMQ PULL socket, capitalizes the text, and returns the string on its ZMQ PUSH socket. | |||
<pre> | |||
#!/usr/bin/python3 | |||
# -*- coding: utf-8 -*- | |||
# zmq_PUSH_PULL_server.py | |||
import sys | |||
import pmt | |||
import zmq | |||
_debug = 0 # set to zero to turn off diagnostics | |||
# create a PUSH socket | |||
_PROTOCOL = "tcp://" | |||
_SERVER = "127.0.0.1" # localhost | |||
_PUSH_PORT = ":50252" | |||
_PUSH_ADDR = _PROTOCOL + _SERVER + _PUSH_PORT | |||
if (_debug): | |||
print ("'zmq_PUSH_PULL_server' version 20068.1 binding to:", _PUSH_ADDR) | |||
push_context = zmq.Context() | |||
if (_debug): | |||
assert (push_context) | |||
push_sock = push_context.socket (zmq.PUSH) | |||
if (_debug): | |||
assert (push_sock) | |||
rc = push_sock.bind (_PUSH_ADDR) | |||
if (_debug): | |||
assert (rc == None) | |||
# create a PULL socket | |||
_PROTOCOL = "tcp://" | |||
_SERVER = "127.0.0.1" # localhost | |||
_PULL_PORT = ":50251" | |||
_PULL_ADDR = _PROTOCOL + _SERVER + _PULL_PORT | |||
if (_debug): | |||
print ("'zmq_PUSH_PULL_server' connecting to:", _PULL_ADDR) | |||
pull_context = zmq.Context() | |||
if (_debug): | |||
assert (pull_context) | |||
pull_sock = pull_context.socket (zmq.PULL) | |||
if (_debug): | |||
assert (pull_sock) | |||
rc = pull_sock.connect (_PULL_ADDR) | |||
if (_debug): | |||
assert (rc == None) | |||
while True: | |||
# Wait for next request from client | |||
data = pull_sock.recv() | |||
message = pmt.to_python(pmt.deserialize_str(data)) | |||
# print("Received request: %s" % message) | |||
output = message.upper() # capitalize message | |||
# Send reply back to client | |||
push_sock.send (pmt.serialize_str(pmt.to_pmt(output))) | |||
</pre> | |||
=== Python Program to Process Flowgraph Data === | |||
Here's the code to do GNU Radio --> Python over ZMQ PUB/SUB, which is by far the most useful case. Often you use GNU Radio for the signal processing but then at some point you want the resulting stream to go to a regular Python app. PUB/SUB is just so you can have multiple apps getting the stream. In the [[ZMQ_PUB_Sink]] you can switch <code>*</code> to <code>127.0.0.1</code>, but with <code>*</code>, any device on the LAN would be able to see it. It's essentially broadcasting to any interface, not just the loopback. | |||
An example flowgraph: | |||
[[File:ZMQ_data_PUB_fg.png]] | |||
<hr> | |||
and the Python code: | |||
<pre> | |||
#!/usr/bin/python3 | |||
# -*- coding: utf-8 -*- | |||
# zmq_SUB_proc.py | |||
# Author: Marc Lichtman | |||
import zmq | |||
import numpy as np | |||
import time | |||
import matplotlib.pyplot as plt | |||
context = zmq.Context() | |||
socket = context.socket(zmq.SUB) | |||
socket.connect("tcp://127.0.0.1:55555") # connect, not bind, the PUB will bind, only 1 can bind | |||
socket.setsockopt(zmq.SUBSCRIBE, b'') # subscribe to topic of all (needed or else it won't work) | |||
=== | while True: | ||
if socket.poll(10) != 0: # check if there is a message on the socket | |||
msg = socket.recv() # grab the message | |||
print(len(msg)) # size of msg | |||
data = np.frombuffer(msg, dtype=np.complex64, count=-1) # make sure to use correct data type (complex64 or float32); '-1' means read all data in the buffer | |||
print(data[0:10]) | |||
# plt.plot(np.real(data)) | |||
# plt.plot(np.imag(data)) | |||
# plt.show() | |||
else: | |||
time.sleep(0.1) # wait 100ms and try again | |||
</pre> |
Latest revision as of 23:42, 20 February 2023
This tutorial presents the GNU Radio ZMQ blocks. It is a set of six Source Blocks and six Sink Blocks. The naming convention follows other source and sink blocks in that a source block provides data entering a GNU Radio flowgraph and a sink block sends data out of the flowgraph. It is a flowgraph-oriented perspective.
From the ZeroMQ website: "ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast."
Prerequisites
Types of ZMQ Blocks
For GNU Radio, the two basic groups of ZMQ blocks are those which transport stream data, and those which transport messages. They are described below.
ZMQ blocks come in pairs:
- PUB - SUB
- PUSH - PULL
- REQ - REP
The PUB, PUSH, and REP blocks are always sink blocks; the others are source blocks. Choosing which pair to use depends on your system architecture.
- The PUB - SUB pair can be compared to broadcasting. The PUBlish sink sends out data which can be received by one or more SUBscribers.
- The PUSH - PULL pair is a point to point link of equal peers.
- The REQ - REP pair is a point to point link which operates in lock-step: one REQuest input gives one REPly output. This case changes the perspective somewhat in that the flowgraph is acting as a server for a remote client.
Data Blocks
ZMQ data blocks transport raw stream data; there is no formatting. The data type and the sample rate are determined by the flowgraph feeding the ZMQ Sink. Therefore the flowgraph or program receiving the data must know those parameters in order to interpret the data correctly.
Message Blocks
Unlike the generic ZeroMQ strings, GNU Radio ZMQ Message Blocks utilize Polymorphic_Types_(PMTs) to encode/decode the data. See also Message_Passing.
Using ZMQ Blocks
Users of ZMQ blocks are expected to have some familiarity with ZeroMQ. In particular, one should be cognizant of the differences between ZMQ sockets and BSD sockets. See the ZMQ Socket API for an overview.
ZMQ blocks use endpoints to describe how ZMQ should pass the data. While the most common endpoint uses TCP to transfer the data, other protocols are possible. See zmq_tcp and zmq_ipc for a description of how each protocol defines endpoints.
The cases described below explain the differences in the block addressing. To conform to port addressing defined by the Internet Assigned Numbers Authority (IANA), private ports can be assigned in the range 49152–65535. Within a single flowgraph using ZMQ blocks is not recommended, since Virtual_Source and Virtual_Sink blocks are much more efficient.
TCP Bind vs Connect
Some users might be tempted to connect directly to GNU Radio ZMQ Blocks. While this is possible, some caution is needed. First be aware that in any topology, there must be exactly one bind
to a given endpoint, while there may be multiple connect
s to the same endpoint. In GNU Radio, stream sinks bind
and stream sources connect
. Message blocks accept a parameter that specify whether the block should bind
or connect
.
Also be aware that the semantics of TCP endpoints vary between bind
and connect
.
TCP Bind
When binding a TCP endpoint, you specify where you will listen for connections. If you specify an IP address, you are telling the socket to only accept connections on the network associated with that address (e.g., 127.0.0.1
or 192.168.1.123
). In some cases, you may want to listen on all networks connected to your node. For GNU Radio, you should use 0.0.0.0
as the wildcard address; although ZMQ does accept *
as a wildcard, it doesn't work well in all cases. So, you may choose to bind
to tcp://0.0.0.0:54321
.
Be aware that if you don't enter an IP address, bind
treats the value as a network adapter name (e.g., eth0
). See zmq_tcp. Bind to tcp://localhost:1234
will not do what you think! Use either tcp://127.0.0.1:1234
or tcp://lo:1234
(for the loopback network adapter).
TCP Connect
When connecting a TCP endpoint, you specify the remote endpoint you want to connect to. You can specify either an IP address or a DNS resolvable name. This difference in semantics between connect and bind is confusing, but must be honored, in order to have your flowgraph communicate as you expect. The simplest solution is to use IP addresses everywhere, but that is not an option in some configurations (e.g., deploying into a Kubernetes cluster).
Wire Format
The ZMQ stream blocks have the option to pass tags. In addition, the PUB/SUB blocks support filtering. Both of these options affect the ZMQ wire protocol.
When a filter string is supplied to a PUB/SUB block, GNU Radio uses multi-part messages to send the filter string, followed by the payload. Non-GNU Radio code attempting to interface with GNU Radio ZMQ blocks must be prepared for this part, and discard it. Note that the sender only sends this message part if a non-empty filter has been specified.
Next, if sending tags is enabled, any tags within the window of the data to be sent are encoded in a special format and prepended to the payload data. If tags are not enabled, this header is elided.
These two features make matching the sender configuration to the receiver configuration essential. Failure to do so will cause runtime errors in your flowgraph.
Examples
Separate GR flowgraphs on Same Computer
When the ZMQ blocks are in separate flowgraphs but on the same computer, the IP address should be 127.0.0.1
for localhost. It has less overhead than a full IP.
These flowgraphs, using the PUB / SUB pair, are taken from Simulation_example:_AM_transmitter_and_receiver.
Separate GR flowgraphs on Different Computers
If the Source and Sink blocks are on two different computers, then the IP and port number of the Sink block must be specified on each end of that connection. For example, if the Sink is on IP 192.168.1.194:50241
and the Source is on IP 192.168.1.85
, both Source and Sink blocks must specify the Sink IP and port 192.168.1.194:50241
.
Python Program as a REQ / REP Server
The following Python program receives a string message on its REQuest socket, capitalizes the text, and sends the string on its REPly socket. The terminology gets confusing here because the incoming REQ came from a GR ZMQ_REP_Message_Sink and is returned to a ZMQ_REQ_Message_Source. Just remember that a sink is the terminating point of a flowgraph (swallows all data) and a source is the origin of a flowgraph (produces or ingests data).
#!/usr/bin/python3 # -*- coding: utf-8 -*- # zmq_REQ_REP_server.py # This server program capitalizes received strings and returns them. # NOTES: # 1) To comply with the GNU Radio view, messages are received on the REQ socket and sent on the REP socket. # 2) The REQ and REP messages must be on separate port numbers. import pmt import zmq _debug = 0 # set to zero to turn off diagnostics # create a REQ socket _PROTOCOL = "tcp://" _SERVER = "127.0.0.1" # localhost _REQ_PORT = ":50246" _REQ_ADDR = _PROTOCOL + _SERVER + _REQ_PORT if (_debug): print ("'zmq_REQ_REP_server' version 20056.1 connecting to:", _REQ_ADDR) req_context = zmq.Context() if (_debug): assert (req_context) req_sock = req_context.socket (zmq.REQ) if (_debug): assert (req_sock) rc = req_sock.connect (_REQ_ADDR) if (_debug): assert (rc == None) # create a REP socket _PROTOCOL = "tcp://" _SERVER = "127.0.0.1" # localhost _REP_PORT = ":50247" _REP_ADDR = _PROTOCOL + _SERVER + _REP_PORT if (_debug): print ("'zmq_REQ_REP_server' version 20056.1 binding to:", _REP_ADDR) rep_context = zmq.Context() if (_debug): assert (rep_context) rep_sock = rep_context.socket (zmq.REP) if (_debug): assert (rep_sock) rc = rep_sock.bind (_REP_ADDR) if (_debug): assert (rc == None) while True: # Wait for next request from client data = req_sock.recv() message = pmt.to_python(pmt.deserialize_str(data)) print("Received request: %s" % message) output = message.upper() # Send reply back to client rep_sock.send (pmt.serialize_str(pmt.to_pmt(output)))
Python Program as a PUSH / PULL Server
Similar to the example above, the following Python program receives a string message on its ZMQ PULL socket, capitalizes the text, and returns the string on its ZMQ PUSH socket.
#!/usr/bin/python3 # -*- coding: utf-8 -*- # zmq_PUSH_PULL_server.py import sys import pmt import zmq _debug = 0 # set to zero to turn off diagnostics # create a PUSH socket _PROTOCOL = "tcp://" _SERVER = "127.0.0.1" # localhost _PUSH_PORT = ":50252" _PUSH_ADDR = _PROTOCOL + _SERVER + _PUSH_PORT if (_debug): print ("'zmq_PUSH_PULL_server' version 20068.1 binding to:", _PUSH_ADDR) push_context = zmq.Context() if (_debug): assert (push_context) push_sock = push_context.socket (zmq.PUSH) if (_debug): assert (push_sock) rc = push_sock.bind (_PUSH_ADDR) if (_debug): assert (rc == None) # create a PULL socket _PROTOCOL = "tcp://" _SERVER = "127.0.0.1" # localhost _PULL_PORT = ":50251" _PULL_ADDR = _PROTOCOL + _SERVER + _PULL_PORT if (_debug): print ("'zmq_PUSH_PULL_server' connecting to:", _PULL_ADDR) pull_context = zmq.Context() if (_debug): assert (pull_context) pull_sock = pull_context.socket (zmq.PULL) if (_debug): assert (pull_sock) rc = pull_sock.connect (_PULL_ADDR) if (_debug): assert (rc == None) while True: # Wait for next request from client data = pull_sock.recv() message = pmt.to_python(pmt.deserialize_str(data)) # print("Received request: %s" % message) output = message.upper() # capitalize message # Send reply back to client push_sock.send (pmt.serialize_str(pmt.to_pmt(output)))
Python Program to Process Flowgraph Data
Here's the code to do GNU Radio --> Python over ZMQ PUB/SUB, which is by far the most useful case. Often you use GNU Radio for the signal processing but then at some point you want the resulting stream to go to a regular Python app. PUB/SUB is just so you can have multiple apps getting the stream. In the ZMQ_PUB_Sink you can switch *
to 127.0.0.1
, but with *
, any device on the LAN would be able to see it. It's essentially broadcasting to any interface, not just the loopback.
An example flowgraph:
and the Python code:
#!/usr/bin/python3 # -*- coding: utf-8 -*- # zmq_SUB_proc.py # Author: Marc Lichtman import zmq import numpy as np import time import matplotlib.pyplot as plt context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:55555") # connect, not bind, the PUB will bind, only 1 can bind socket.setsockopt(zmq.SUBSCRIBE, b'') # subscribe to topic of all (needed or else it won't work) while True: if socket.poll(10) != 0: # check if there is a message on the socket msg = socket.recv() # grab the message print(len(msg)) # size of msg data = np.frombuffer(msg, dtype=np.complex64, count=-1) # make sure to use correct data type (complex64 or float32); '-1' means read all data in the buffer print(data[0:10]) # plt.plot(np.real(data)) # plt.plot(np.imag(data)) # plt.show() else: time.sleep(0.1) # wait 100ms and try again