Stream Tags
Introduction
GNU Radio was originally a streaming system with no other mechanism to pass data between blocks. Streams of data are a model that work well for samples, bits, etc., but can lack for control and meta data.
Part of this is solved using the existing message passing interface, which allows blocks to subscribe to messages published by any other block in the flowgraph (see Message Passing). The main drawback to the message passing system is that is works asynchronously, meaning that there is no guarantee when a message may arrive relative to the data stream.
Stream tags are an isosynchronous data stream that runs parallel to the main data stream. A stream tag is generated by a block's work function and from there on flows downstream alongside a particular sample, until it reaches a sink or is forced to stop propagating by another block.
Stream tags are defined for a specific item in the data stream and are formed as a key:value pair. The key identifies what the value represents while the value holds the data that the tag contains. Both key and value are Polymorphic Types (PMTs) where the key is a PMT symbol while the value is any type of PMT and can therefore handle any data we wish to pass. An additional part of the tag is the srcid, which is a PMT symbol and is used to identify the block that created the tag (which is usually the block's alias).
API Extensions to the gr::block
To enable the stream tags, we have extended the API of gr::block to understand absolute item numbers. In the data stream model, each block's work function is given a buffer in the data stream that is referenced from 0 to N-1. This is a relative offset into the data stream. The absolute reference starts from the beginning of the flowgraph and continues to count up with every item. Each input stream is associated with a concept of the 'number of items read' and each output stream has a 'number of items written'. These are retrieved during runtime using the two API calls:
unsigned long int nitems_read(unsigned int which_input); unsigned long int nitems_written(unsigned int which_output);
Each tag is associated with some item in this absolute time scale that is calculated using these functions.
Like the rest of the data stream, the number of items read/written are only updated once during the call to work. So in a work function, nitems_read/written will refer to the state of the data stream at the start of the work function. We must therefore add to this value the current relative offset in the data stream. So if we are iterating i over all output items, we would write the stream tag to output ports at nitems_written(0)+i for the 0th output port.
Stream Tags API
The stream tags API is split into two parts: adding tags to a stream, and getting tags from a stream. Note that the functions described below are only meant to be accessed within a call to general_work/work. While they can be called at other points in time by a block, the behavior outside of work is undefined without exact knowledge of the item counts in the buffers.
Adding a Tag to a Stream
We add a tag to a particular output stream of the block using:
- gr::block::add_item_tag: Adds an item tag to a particular output port using a gr::tag_t data type or by specifying the tag values.
We can output them to multiple output streams if we want, but to do so means calling this function once for each port. This function can be provided with a gr::tag_t data type, or each value of the tag can be explicitly given.
Again, a tag is defined as:
- offset: The offset, in absolute item time, of the tag in the data stream.
- key: the PMT symbol identifying the type of tag.
- value: the PMT holding the data of the tag.
- srcid: (optional) the PMT symbol identifying the block which created the tag.
We can create a gr::tag_t structure to hold all of the above information of a tag, which is probably the easiest/best way to do it. The gr::tag_t struct is defined as having the same members as in the above list. To add a gr::tag_t tag to a stream, use the function:
void add_item_tag(unsigned int which_output, const tag_t &tag);
The secondary API allows us to create a tag by explicitly listing all of the tag information in the function call:
void add_item_tag(unsigned int which_output, uint64_t abs_offset, const pmt::pmt_t &key, const pmt::pmt_t &value, const pmt::pmt_t &srcid=pmt::PMT_F);
In Python, we can add a tag to a stream using one of the following:
add_item_tag(which_output, abs_offset, key, value) add_item_tag(which_output, abs_offset, key, value, srcid)
Note that the key and value are both PMTs. To create a key and value using Python strings, you can do:
key = pmt.intern("example_key") value = pmt.intern("example_value")
Here is an example of a Python block that loops through samples and puts tags on ones that are above a threshold:
import numpy as np from gnuradio import gr import pmt class blk(gr.sync_block): # other base classes are basic_block, decim_block, interp_block def __init__(self): # only default arguments here gr.sync_block.__init__( self, name='Embedded Python Block', # will show up in GRC in_sig=[np.complex64], out_sig=[np.complex64] ) def work(self, input_items, output_items): for indx, sample in enumerate(input_items[0]): if np.random.rand() > 0.95: key = pmt.intern("example_key") value = pmt.intern("example_value") self.add_item_tag(0, self.nitems_written(0) + indx, key, value) # note: (self.nitems_written(0) + indx) is our current sample, in absolute time output_items[0][:] = input_items[0] # copy input to output return len(output_items[0])
Getting tags from a Stream
To get tags from a particular input stream, we have two functions we can use:
- gr::block::get_tags_in_range: Gets all tags from a particular input port between a certain range of items (in absolute item time).
- gr::block::get_tags_in_window: Gets all tags from a particular input port between a certain range of items (in relative item time within the work function).
The difference between these functions is working in absolute item time versus relative item time. Both of these pass back vectors of gr::tag_t, and they both allow specifying a particular key (as a PMT symbol) to filter against (or the fifth argument can be left out to search for all keys). Filtering for a certain key reduces the effort inside the work function for getting the right tag's data.
For example, this call just returns any tags between the given range of items:
void get_tags_in_range(std::vector<tag_t> &v, unsigned int which_input, uint64_t abs_start, uint64_t abs_end);
Adding a fifth argument to this function allows us to filter on the key key.
void get_tags_in_range(std::vector<tag_t> &v, unsigned int which_input, uint64_t abs_start, uint64_t abs_end, const pmt::pmt_t &key);
In Python, the main difference from the C++ function is that instead of having the first argument be a vector where the tags are stored, the Python version just returns a list of tags. We would use it like this:
def work(self, input_items, output_items): .... tags = get_tags_in_window(which_input, rel_start, rel_end) ....
Tag Propagation
Tags are propagated downstream from block to block like the normal data streams. How tags are actually moved depends on a specific propagation policy. We defined three types of policies:
- All-to-All: all tags from any input port are replicated to all output ports
- One-to-One: tags from input port i are only copied to output port i (depends on num inputs = num outputs).
- Dont: Does not propagate tags. Tags are either stopped here or the work function recreates them in some manner.
The default behavior of a block is the 'All-to-All' method of propagation.
To set a different propagation policy, use the function:
void set_tag_propagation_policy(tag_propagation_policy_t p);
See the gr::block::tag_propagation_policy_t documentation for details on this enum type.
Tag Propagation through Rate Changes
When a tag is propagated through a block that has a rate change, the item's offset in the data stream will change. The scheduler uses the block's gr::block::relative_rate concept to perform the update on the tag's offset value. The relative rate of a block determines the relationship between the input rate and output rate. Decimators that decimate by a factor of D have a relative rate of 1/D.
Synchronous blocks (gr::sync_block), decimators (gr::sync_decimator), and interpolators (gr::sync_interpolator) all have pre-defined and well-understood relative rates. A standard gr::block has a default relative rate of 1.0, but this must be set if it does not work this way. Often, we use a gr::block because we have no pre-conceived notion of the number of input to output items. If it is important to pass tags through these blocks that respect the change in item value, we would have to use the TPP_DONT tag propagation policy and handle the propagation internally.
In no case is the value of the tag modified when propagating through a block. This becomes relevant when using Tagged Stream Blocks.
Notes on How to Use Tags
Tags can be very useful to an application, and their use is spreading. USRP sources generate tag information on the time, sample rate, and frequency of the board if anything changes. We have a meta data file source/sink (see Metadata) that use tags to store information about the data stream. But there are things to think about when using tags in a block.
First, when tags are not being used, there is almost no effect on the scheduler. However, when we use tags, we add overhead by getting and extracting tags from a data stream. We also use overhead in propagating the tags. For each tag, each block must copy a vector of tags from the output port(s) of one block to the input port(s) of the next block(s). These copy operations can add up.
The key is to minimize the use of tags. Use them when and only when necessary and try to provide some control over how tags are generated to control their frequency. A good example is the USRP source, which generates a time tag. If it generated a tag with every sample, we would have thousands of tags per second, which would add a significant amount of overhead. This is because if we started at time t0 at sample rate sr, then after N samples, we know that we are now at time t0 + N/sr. So continuously producing new tags adds no information.
The main issue we need to deal with in the above situation is when there is a discontinuity in the packets received from the USRP. Since we have no way of knowing in the flowgraph how many samples were potentially lost, we have lost track of the timing information. The USRP driver recognizes when packets have been dropped and uses this to queue another tag, which allows us to resync. Likewise, any point the sample rate or frequency changes, a new tag is issued.