ZMQ REQ Message Source
The ZMQ REQ Message Source block receives messages from a ZMQ REQ socket and outputs async messages. This block will connect to a ZMQ REP Message Sink.
The zeromq.org website says:
"The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), in a loop (or once if that's all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or recv call." Likewise, the server "issues zmq_recv() and then zmq_send() in that order, as often as it needs to."
Parameters
(R): Run-time adjustable
- Address
- ZMQ socket address specifier. The format of the address is
tcp://*:port
where * should be 127.0.0.1 for localhost.
- Timeout
- Socket timeout in milliseconds, default is 100ms.
Example Flowgraph
Inter-flowgraph
Request/Reply pairs can be used on one, or two separate, flowgraphs to exchange messages.
External Python client (send only)
An external Python program can send messages to a ZMQ REQ Message Source block. An example flowgraph and Python code follow.
GNU Radio as a server
If the GNU Radio flowgraph(s) is configured as a server, the REQ message is processed by the flowgraph and a message is sent back in a REP message as the response. An example flowgraph and Python code follow. Note that two different ports are used.
The Embedded Python block "Server demo" contains the following code:
from gnuradio import gr import pmt class my_sync_block (gr.sync_block): # accepts message string from input port # capitalizes the string # sends message to output port def __init__(self): gr.sync_block.__init__(self, name = "Server demo", in_sig = None, out_sig = None) self.message_port_register_in(pmt.intern('msg_in')) self.message_port_register_out(pmt.intern('msg_out')) self.set_msg_handler(pmt.intern('msg_in'), self.handle_msg) def handle_msg(self, msg): inputText = pmt.symbol_to_string (msg) # print (inputText) if (len (inputText) > 0): # capitalize the string outputText = inputText.upper() # print (outputText) # Send reply back to client self.message_port_pub(pmt.intern('msg_out'), pmt.intern(outputText)) def work(self, input_items, output_items): # with no data ports, there is nothing to do return (0)
The Python client code looks like this:
#!/usr/bin/python3 # -*- coding: utf-8 -*- # zmq_REQ_client.py # # The REQest / REPly nomenclature of the GNU Radio message blocks is from # the perspective of the flowgraph. So, to send a 'request' to GNU Radio, the message # must be sent as a 'reply' from the Python client, Likewise, a 'reply' from GNU Radio # must be received as a 'request' to the Python client! Therefore, send on the reply socket # and receive on the request socket. # # The zeromq.org website says: # "The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), # in a loop (or once if that's all it needs). Doing any other sequence (e.g., sending two messages in a row) # will result in a return code of -1 from the send or recv call." Likewise, the server "issues zmq_recv() # and then zmq_send() in that order, as often as it needs to." # # To conform to that requirement, a non-standard "kludge" is used (see below). import datetime import time import signal import pmt import zmq _debug = 1 # set to zero to turn off diagnostics # create a REQ socket _PROTOCOL = "tcp://" _SERVER = "127.0.0.1" # localhost _REQ_PORT = ":49202" _REQ_ADDR = _PROTOCOL + _SERVER + _REQ_PORT if (_debug): print ("'zmq_REQ_client' connecting to:", _REQ_ADDR) req_context = zmq.Context() if (_debug): assert (req_context) req_sock = req_context.socket (zmq.REQ) if (_debug): assert (req_sock) rc = req_sock.connect (_REQ_ADDR) if (_debug): assert (rc == None) # create a REP socket _PROTOCOL = "tcp://" _SERVER = "127.0.0.1" # localhost _REP_PORT = ":49201" _REP_ADDR = _PROTOCOL + _SERVER + _REP_PORT if (_debug): print ("'zmq_REQ_client' binding to:", _REP_ADDR) rep_context = zmq.Context() if (_debug): assert (rep_context) rep_sock = rep_context.socket (zmq.REP) if (_debug): assert (rep_sock) rc = rep_sock.bind (_REP_ADDR) if (_debug): assert (rc == None) while True: # generate an outgoing message _tim = datetime.datetime.now() _out = ("Local time is " + str (_tim)) try: print(_out) rep_sock.recv() # this is the non-standard "kludge" rep_sock.send (pmt.serialize_str(pmt.to_pmt(_out))) # send on the 'reply' socket time.sleep(1) req_sock.send_string("\x01\x00\x00\x00") # this is the non-standard "kludge" msg = req_sock.recv() # receive on the 'request' socket print (pmt.to_python(pmt.deserialize_str(msg))) time.sleep(4) except KeyboardInterrupt: if (_debug): print (" Interrupt received. shutting down.") # clean up req_sock.close() req_context.term() rep_sock.close() rep_context.term() exit()
Source Files
- C++ files
- TODO
- Header files
- TODO
- Public header files
- TODO
- Block definition
- TODO