ZMQ REQ Message Source: Difference between revisions
(expand Example Flowgraph section) |
|||
Line 27: | Line 27: | ||
=== GNU Radio as a server === | === GNU Radio as a server === | ||
If the GNU Radio flowgraph(s) is configured as a server, the REQ message is processed by the flowgraph and a message is sent back in a REP message as the response. An example flowgraph and Python code follow.<br> | If the GNU Radio flowgraph(s) is configured as a server, the REQ message is processed by the flowgraph and a message is sent back in a REP message as the response. An example flowgraph and Python code follow. Note that two different ports are used.<br> | ||
[[File:Server_demo_fg.png]] | [[File:Server_demo_fg.png]] | ||
Line 36: | Line 36: | ||
from gnuradio import gr | from gnuradio import gr | ||
import pmt | import pmt | ||
class my_sync_block (gr.sync_block): | class my_sync_block (gr.sync_block): | ||
Line 53: | Line 51: | ||
def handle_msg(self, msg): | def handle_msg(self, msg): | ||
inputText = pmt.symbol_to_string (msg) | inputText = pmt.symbol_to_string (msg) | ||
# print (inputText) | # print (inputText) | ||
Line 66: | Line 63: | ||
# with no data ports, there is nothing to do | # with no data ports, there is nothing to do | ||
return (0) | return (0) | ||
</pre> | |||
The Python client code looks like this:<br> | |||
<pre> | |||
#!/usr/bin/python3 | |||
# -*- coding: utf-8 -*- | |||
# zmq_REQ_client.py | |||
# | |||
# The REQest / REPly nomenclature of the GNU Radio message blocks is from | |||
# the perspective of the flowgraph. So, to send a 'request' to GNU Radio, the message | |||
# must be sent as a 'reply' from the Python client, Likewise, a 'reply' from GNU Radio | |||
# must be received as a 'request' to the Python client! Therefore, send on the reply socket | |||
# and receive on the request socket. | |||
# | |||
# The zeromq.org website says: | |||
# "The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), | |||
# in a loop (or once if that's all it needs). Doing any other sequence (e.g., sending two messages in a row) | |||
# will result in a return code of -1 from the send or recv call." Likewise, the server "issues zmq_recv() | |||
# and then zmq_send() in that order, as often as it needs to." | |||
# | |||
# To conform to that requirement, a non-standard "kludge" is used (see below). | |||
import datetime | |||
import time | |||
import signal | |||
import pmt | |||
import zmq | |||
_debug = 1 # set to zero to turn off diagnostics | |||
# create a REQ socket | |||
_PROTOCOL = "tcp://" | |||
_SERVER = "127.0.0.1" # localhost | |||
_REQ_PORT = ":49202" | |||
_REQ_ADDR = _PROTOCOL + _SERVER + _REQ_PORT | |||
if (_debug): | |||
print ("'zmq_REQ_client' connecting to:", _REQ_ADDR) | |||
req_context = zmq.Context() | |||
if (_debug): | |||
assert (req_context) | |||
req_sock = req_context.socket (zmq.REQ) | |||
if (_debug): | |||
assert (req_sock) | |||
rc = req_sock.connect (_REQ_ADDR) | |||
if (_debug): | |||
assert (rc == None) | |||
# create a REP socket | |||
_PROTOCOL = "tcp://" | |||
_SERVER = "127.0.0.1" # localhost | |||
_REP_PORT = ":49201" | |||
_REP_ADDR = _PROTOCOL + _SERVER + _REP_PORT | |||
if (_debug): | |||
print ("'zmq_REQ_client' binding to:", _REP_ADDR) | |||
rep_context = zmq.Context() | |||
if (_debug): | |||
assert (rep_context) | |||
rep_sock = rep_context.socket (zmq.REP) | |||
if (_debug): | |||
assert (rep_sock) | |||
rc = rep_sock.bind (_REP_ADDR) | |||
if (_debug): | |||
assert (rc == None) | |||
while True: | |||
# generate an outgoing message | |||
_tim = datetime.datetime.now() | |||
_out = ("Local time is " + str (_tim)) | |||
try: | |||
print(_out) | |||
rep_sock.recv() # this is the non-standard "kludge" | |||
rep_sock.send (pmt.serialize_str(pmt.to_pmt(_out))) # send on the 'reply' socket | |||
time.sleep(1) | |||
req_sock.send_string("\x01\x00\x00\x00") # this is the non-standard "kludge" | |||
msg = req_sock.recv() # receive on the 'request' socket | |||
print (pmt.to_python(pmt.deserialize_str(msg))) | |||
time.sleep(4) | |||
except KeyboardInterrupt: | |||
if (_debug): | |||
print (" Interrupt received. shutting down.") | |||
# clean up | |||
req_sock.close() | |||
req_context.term() | |||
rep_sock.close() | |||
rep_context.term() | |||
exit() | |||
</pre> | </pre> | ||
Revision as of 01:57, 12 March 2020
The ZMQ REQ Message Source block receives messages from a ZMQ REQ socket and outputs async messages. This block will connect to a ZMQ REP Message Sink.
The zeromq.org website says:
"The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), in a loop (or once if that's all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or recv call." Likewise, the server "issues zmq_recv() and then zmq_send() in that order, as often as it needs to."
Parameters
(R): Run-time adjustable
- Address
- ZMQ socket address specifier. The format of the address is
tcp://*:port
where * should be 127.0.0.1 for localhost.
- Timeout
- Socket timeout in milliseconds, default is 100ms.
Example Flowgraph
Inter-flowgraph
Request/Reply pairs can be used on one, or two separate, flowgraphs to exchange messages.
External Python client (send only)
An external Python program can send messages to a ZMQ REQ Message Source block. An example flowgraph and Python code follow.
GNU Radio as a server
If the GNU Radio flowgraph(s) is configured as a server, the REQ message is processed by the flowgraph and a message is sent back in a REP message as the response. An example flowgraph and Python code follow. Note that two different ports are used.
The Embedded Python block "Server demo" contains the following code:
from gnuradio import gr import pmt class my_sync_block (gr.sync_block): # accepts message string from input port # capitalizes the string # sends message to output port def __init__(self): gr.sync_block.__init__(self, name = "Server demo", in_sig = None, out_sig = None) self.message_port_register_in(pmt.intern('msg_in')) self.message_port_register_out(pmt.intern('msg_out')) self.set_msg_handler(pmt.intern('msg_in'), self.handle_msg) def handle_msg(self, msg): inputText = pmt.symbol_to_string (msg) # print (inputText) if (len (inputText) > 0): # capitalize the string outputText = inputText.upper() # print (outputText) # Send reply back to client self.message_port_pub(pmt.intern('msg_out'), pmt.intern(outputText)) def work(self, input_items, output_items): # with no data ports, there is nothing to do return (0)
The Python client code looks like this:
#!/usr/bin/python3 # -*- coding: utf-8 -*- # zmq_REQ_client.py # # The REQest / REPly nomenclature of the GNU Radio message blocks is from # the perspective of the flowgraph. So, to send a 'request' to GNU Radio, the message # must be sent as a 'reply' from the Python client, Likewise, a 'reply' from GNU Radio # must be received as a 'request' to the Python client! Therefore, send on the reply socket # and receive on the request socket. # # The zeromq.org website says: # "The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), # in a loop (or once if that's all it needs). Doing any other sequence (e.g., sending two messages in a row) # will result in a return code of -1 from the send or recv call." Likewise, the server "issues zmq_recv() # and then zmq_send() in that order, as often as it needs to." # # To conform to that requirement, a non-standard "kludge" is used (see below). import datetime import time import signal import pmt import zmq _debug = 1 # set to zero to turn off diagnostics # create a REQ socket _PROTOCOL = "tcp://" _SERVER = "127.0.0.1" # localhost _REQ_PORT = ":49202" _REQ_ADDR = _PROTOCOL + _SERVER + _REQ_PORT if (_debug): print ("'zmq_REQ_client' connecting to:", _REQ_ADDR) req_context = zmq.Context() if (_debug): assert (req_context) req_sock = req_context.socket (zmq.REQ) if (_debug): assert (req_sock) rc = req_sock.connect (_REQ_ADDR) if (_debug): assert (rc == None) # create a REP socket _PROTOCOL = "tcp://" _SERVER = "127.0.0.1" # localhost _REP_PORT = ":49201" _REP_ADDR = _PROTOCOL + _SERVER + _REP_PORT if (_debug): print ("'zmq_REQ_client' binding to:", _REP_ADDR) rep_context = zmq.Context() if (_debug): assert (rep_context) rep_sock = rep_context.socket (zmq.REP) if (_debug): assert (rep_sock) rc = rep_sock.bind (_REP_ADDR) if (_debug): assert (rc == None) while True: # generate an outgoing message _tim = datetime.datetime.now() _out = ("Local time is " + str (_tim)) try: print(_out) rep_sock.recv() # this is the non-standard "kludge" rep_sock.send (pmt.serialize_str(pmt.to_pmt(_out))) # send on the 'reply' socket time.sleep(1) req_sock.send_string("\x01\x00\x00\x00") # this is the non-standard "kludge" msg = req_sock.recv() # receive on the 'request' socket print (pmt.to_python(pmt.deserialize_str(msg))) time.sleep(4) except KeyboardInterrupt: if (_debug): print (" Interrupt received. shutting down.") # clean up req_sock.close() req_context.term() rep_sock.close() rep_context.term() exit()
Source Files
- C++ files
- TODO
- Header files
- TODO
- Public header files
- TODO
- Block definition
- TODO