Stream Tags

From GNU Radio
Revision as of 00:24, 23 May 2021 by Solomonbstoner (talk | contribs) (Add comments to code example to make it easier to comprehend)
Jump to navigation Jump to search

Introduction

GNU Radio was originally a streaming system with no other mechanism to pass data between blocks. Streams of data are a model that work well for samples, bits, etc., but can lack for control and meta data.

Part of this is solved using the existing message passing interface, which allows blocks to subscribe to messages published by any other block in the flowgraph (see Message Passing). The main drawback to the message passing system is that it works asynchronously, meaning that there is no guarantee when a message may arrive relative to the data stream.

Stream tags are an isosynchronous data stream that runs parallel to the main data stream. A stream tag is generated by a block's work function and from there on flows downstream alongside a particular sample, until it reaches a sink or is forced to stop propagating by another block.

Stream tags are defined for a specific item in the data stream and are formed as a key:value pair. The key identifies what the value represents while the value holds the data that the tag contains. Both key and value are Polymorphic Types (PMTs) where the key is a PMT symbol while the value is any type of PMT and can therefore handle any data we wish to pass. An additional part of the tag is the srcid, which is a PMT symbol and is used to identify the block that created the tag (which is usually the block's alias).

API Extensions to the gr::block

To enable the stream tags, we have extended the API of gr::block to understand absolute item numbers. In the data stream model, each block's work function is given a buffer in the data stream that is referenced from 0 to N-1. This is a relative offset into the data stream. The absolute reference starts from the beginning of the flowgraph and continues to count up with every item. Each input stream is associated with a concept of the 'number of items read' and each output stream has a 'number of items written'. These are retrieved during runtime using the two API calls:

unsigned long int nitems_read(unsigned int which_input);
unsigned long int nitems_written(unsigned int which_output);

Each tag is associated with some item in this absolute time scale that is calculated using these functions.

Like the rest of the data stream, the number of items read/written are only updated once during the call to work. So in a work function, nitems_read/written will refer to the state of the data stream at the start of the work function. We must therefore add to this value the current relative offset in the data stream. So if we are iterating i over all output items, we would write the stream tag to output ports at nitems_written(0)+i for the 0th output port.

Stream Tags API

The stream tags API is split into two parts: adding tags to a stream, and getting tags from a stream. Note that the functions described below are only meant to be accessed within a call to general_work/work. While they can be called at other points in time by a block, the behavior outside of work is undefined without exact knowledge of the item counts in the buffers.

Adding a Tag to a Stream

We add a tag to a particular output stream of the block using:

  • gr::block::add_item_tag: Adds an item tag to a particular output port using a gr::tag_t data type or by specifying the tag values.

We can output them to multiple output streams if we want, but to do so means calling this function once for each port. This function can be provided with a gr::tag_t data type, or each value of the tag can be explicitly given.

Again, a tag is defined as:

  1. offset: The offset, in absolute item time, of the tag in the data stream.
  2. key: the PMT symbol identifying the type of tag.
  3. value: the PMT holding the data of the tag.
  4. srcid: (optional) the PMT symbol identifying the block which created the tag.

We can create a gr::tag_t structure to hold all of the above information of a tag, which is probably the easiest/best way to do it. The gr::tag_t struct is defined as having the same members as in the above list. To add a gr::tag_t tag to a stream, use the function:

 void add_item_tag(unsigned int which_output, const tag_t &tag);

The secondary API allows us to create a tag by explicitly listing all of the tag information in the function call:

 void add_item_tag(unsigned int which_output,
                   uint64_t abs_offset,
                   const pmt::pmt_t &key,
                   const pmt::pmt_t &value,
                   const pmt::pmt_t &srcid=pmt::PMT_F);

In Python, we can add a tag to a stream using one of the following:

add_item_tag(which_output, abs_offset, key, value)
add_item_tag(which_output, abs_offset, key, value, srcid)

Note that the key and value are both PMTs. To create a string type PMT you can use pmt.intern("example_key").

Consider the following flowgraph as an example. We will have an embedded python block insert stream tags at random intervals, and view these tags on the QT GUI Time sink.

Epy stream tags example flowchart.png

Add the following code into the embedded python block. This code outputs the same signal as in the input, except with tags on randomly selected input samples:

 import numpy as np
 from gnuradio import gr
 import pmt
 
 class blk(gr.sync_block):
     def __init__(self):
         gr.sync_block.__init__(
             self,
             name='Embedded Python Block',
             in_sig=[np.complex64],
             out_sig=[np.complex64]
         )
 
     def work(self, input_items, output_items):
         for indx, sample in enumerate(input_items[0]): # Enumerate through the input samples in port in0
             if np.random.rand() > 0.95: # 5% chance this sample is chosen
                 key = pmt.intern("example_key")
                 value = pmt.intern("example_value")
                 self.add_item_tag(0, # Write to output port 0
                          self.nitems_written(0) + indx, # Index of the tag in absolute terms
                          key, # Key of the tag
                          value # Value of the tag
                 )
                 # note: (self.nitems_written(0) + indx) is our current sample, in absolute time
         output_items[0][:] = input_items[0] # copy input to output
         return len(output_items[0])

You may expect an output similar to the screenshot of the time sink below.

Epy stream tags example timesink.png

Getting tags from a Stream

To get tags from a particular input stream, we have two functions we can use:

  1. gr::block::get_tags_in_range: Gets all tags from a particular input port between a certain range of items (in absolute item time).
  1. gr::block::get_tags_in_window: Gets all tags from a particular input port between a certain range of items (in relative item time within the work function).

The difference between these functions is working in absolute item time versus relative item time. Both of these pass back vectors of gr::tag_t, and they both allow specifying a particular key (as a PMT symbol) to filter against (or the fifth argument can be left out to search for all keys). Filtering for a certain key reduces the effort inside the work function for getting the right tag's data.

For example, this call just returns any tags between the given range of items:

 void get_tags_in_range(std::vector<tag_t> &v,
                        unsigned int which_input,
                        uint64_t abs_start,
                        uint64_t abs_end);

Adding a fifth argument to this function allows us to filter on the key key.

 void get_tags_in_range(std::vector<tag_t> &v,
                        unsigned int which_input,
                        uint64_t abs_start,
                        uint64_t abs_end,
                        const pmt::pmt_t &key);

In Python, the main difference from the C++ function is that instead of having the first argument be a vector where the tags are stored, the Python version just returns a list of tags. We would use it like this:

 def work(self, input_items, output_items):
     ....
     tags = self.get_tags_in_window(which_input, rel_start, rel_end)
     ....

If you want to grab all tags on the samples currently being processed by work(), on input port 0, here's a minimal example of doing that:

 import numpy as np
 from gnuradio import gr
 import pmt
 
 class blk(gr.sync_block):
     def __init__(self):
         gr.sync_block.__init__(self,name='Read Tags', in_sig=[np.float32], out_sig=None)
 
     def work(self, input_items, output_items):
         tags = self.get_tags_in_window(0, 0, len(input_items[0]))
         for tag in tags:
             key = pmt.to_python(tag.key) # convert from PMT to python string
             value = pmt.to_python(tag.value) # Note that the type(value) can be several things, it depends what PMT type it was
             print 'key:', key
             print 'value:', value, type(value)
             print ''
         return len(input_items[0])

Tag Propagation

We now know how to add tags to streams, and how to read them. But what happens to tags after they were read? And what happens to tags that aren't used? After all, there are many blocks that don't care about tags at all.

The answer is: It depends on the tag propagation policy of a block what happens to tags that enter it.
There are three policies to choose from:

  1. All-to-All: all tags from any input port are replicated to all output ports
  2. One-to-One: tags from input port i are only copied to output port i (depends on num inputs = num outputs).
  3. Dont: Does not propagate tags. Tags are either stopped here or the work function recreates them in some manner.

The default behavior of a block is the 'All-to-All' method of propagation.

We generally set the tag propagation policy in the block's constructor using @set_tag_propagation_policy

 void set_tag_propagation_policy(tag_propagation_policy_t p);

See the gr::block::tag_propagation_policy_t documentation for details on this enum type.

When the tag propagation policy is set to TPP_ALL_TO_ALL or TPP_ONE_TO_ONE, the GNU Radio scheduler uses any information available to figure out which output item corresponds to which input item. The block may read them and add new tags, but existing tags are automatically moved downstream in a manner deemed appropriate.

When a tag is propagated through a block that has a rate change, the item's offset in the data stream will change. The scheduler uses the block's gr::block::relative_rate concept to perform the update on the tag's offset value. The relative rate of a block determines the relationship between the input rate and output rate. Decimators that decimate by a factor of D have a relative rate of 1/D.

Synchronous blocks (gr::sync_block), decimators (gr::sync_decimator), and interpolators (gr::sync_interpolator) all have pre-defined and well-understood relative rates. A standard gr::block has a default relative rate of 1.0, but this must be set if it does not work this way. Often, we use a gr::block because we have no pre-conceived notion of the number of input to output items. If it is important to pass tags through these blocks that respect the change in item value, we would have to use the TPP_DONT tag propagation policy and handle the propagation internally.

In no case is the value of the tag modified when propagating through a block. This becomes relevant when using Tagged Stream Blocks.

As an example, consider an interpolating block. See the following flow graph:

tags_interp.grc.png

tags_interp.png

As you can tell, we produce tags on every 10th sample, and then pass them through a block that repeats every sample 100 times. Tags do not get repeated with the standard tag propagation policies (after all, they're not tag manipulation policies), so the scheduler takes care that every tag is put on the output item that corresponds to the input item it was on before. Here, the scheduler makes an educated guess and puts the tag on the first of 100 items.

Note: We can't use one QT GUI Time Sink for both signals here, because they are running at a different rate. Note the time difference on the x-axis!

On decimating blocks, the behavior is similar. Consider this very simple flow graph, and the position of the samples:

tags_decim.grc.png

tags_decim.png


We can see that no tags are destroyed, and tags are indeed spaced at one-tenth of the original spacing of 100 items. Of course, the actual item that was passed through the block might be destroyed, or modified (think of a decimating FIR filter).

In fact, this works with any rate-changing block. Note that there are cases where the relation of tag positions of in- and output are ambiguous, the GNU Radio scheduler will then try and get as close as possible.

Here's another interesting example: Consider this flow graph, which has a delay block, and the position of the tags after it:

tags_ramp_delay.grc.png

tags_ramp_delay.png


Before the delay block, tags were positioned at the beginning of the ramp. After the delay, they're still in the same position! Would we inspect the source code of the delay block, we'd find that there is absolutely no tag handling code. Instead, the block declares a delay to the scheduler, which then propagates tags with this delay.

Using these mechanisms, we can let GNU Radio handle tag propagation for a large set of cases. For specialized or corner cases, there is no option than to set the tag propagation policy to TPP_DONT and manually propagate tags (actually, there's another way: Say we want most tags to propagate normally, but a select few should be treated differently. We can use remove_item_tag() (DEPRECATED. Will be removed in 3.8.) to remove these tags from the input; they will then not be propagated any more even if the tag propagation policy is set to something other than TPP_DONT. But that's more advanced use and will not be elaborated on here).

Notes on How to Use Tags

Tags can be very useful to an application, and their use is spreading. USRP sources generate tag information on the time, sample rate, and frequency of the board if anything changes. We have a meta data file source/sink (see Metadata_Information) that use tags to store information about the data stream. But there are things to think about when using tags in a block.

First, when tags are not being used, there is almost no effect on the scheduler. However, when we use tags, we add overhead by getting and extracting tags from a data stream. We also use overhead in propagating the tags. For each tag, each block must copy a vector of tags from the output port(s) of one block to the input port(s) of the next block(s). These copy operations can add up.

The key is to minimize the use of tags. Use them when and only when necessary and try to provide some control over how tags are generated to control their frequency. A good example is the USRP source, which generates a time tag. If it generated a tag with every sample, we would have thousands of tags per second, which would add a significant amount of overhead. This is because if we started at time t0 at sample rate sr, then after N samples, we know that we are now at time t0 + N/sr. So continuously producing new tags adds no information.

The main issue we need to deal with in the above situation is when there is a discontinuity in the packets received from the USRP. Since we have no way of knowing in the flowgraph how many samples were potentially lost, we have lost track of the timing information. The USRP driver recognizes when packets have been dropped and uses this to queue another tag, which allows us to resync. Likewise, any point the sample rate or frequency changes, a new tag is issued.

Example Flowgraph

Let's have a look at a simple example:

tut5_tagstest_fg.png

In this flow graph, we have two sources: A sinusoid and a tag strobe. A tag strobe is a block that will output a constant tag, in this case, on every 1000th item (the actual value of the items is always zero). Those sources get added up. The signal after the adder is identical to the sine wave we produced, because we are always adding zeros. However, the tags stay attached to the same position as they were coming from the tag strobe! This means every 1000th sample of the sinusoid now has a tag. The QT scope can display tags, and even trigger on them.

tut5_tagstest_scope.png

We now have a mechanism to randomly attach any metadata to specific items. There are several blocks that use tags. One of them is the UHD Sink block, the driver used for transmitting with USRP devices. It will react to tags with certain keys, one of them being tx_freq, which can be used to set the transmit frequency of a USRP while streaming.

Adding tags to the QPSK demodulator

Going back to our QPSK demodulation example, we might want to add a feature to tell downstream blocks that the demodulation is not going well. Remember the output of our block is always hard-decision, and we have to output something. So we could use tags to notify that the input is not well formed, and that the output is not reliable.

As a failure criterion, we discuss the case where the input amplitude is too small, say smaller than 0.01. When the amplitude drops below this value, we output one tag. Another tag is only sent when the amplitude has recovered, and falls back below the threshold. We extend our work function like this:

if (std::abs(in[i]) < 0.01 and not d_low_ampl_state) {
    add_item_tag(0, // Port number
                 nitems_written(0) + i, // Offset
                 pmt::mp("amplitude_warning"), // Key
                 pmt::from_double(std::abs(in[i])) // Value
    );
    d_low_ampl_state = true;
}
else if (std::abs(in[i]) >= 0.01 and d_low_ampl_state) {
    add_item_tag(0, // Port number
        nitems_written(0) + i, // Offset
        pmt::mp("amplitude_recovered"), // Key
        pmt::PMT_T // Value
    );
    d_low_ampl_state = false; // Reset state
}

In Python, the code would look like this (assuming we have a member of our block class called d_low_ampl_state):

# The vector 'in' is called 'in0' here because 'in' is a Python keyword
if abs(in0[i]) < 0.01 and not d_low_ampl_state:
    self.add_item_tag(0, # Port number
                 self.nitems_written(0) + i, # Offset
                 pmt.intern("amplitude_warning"), # Key
                 pmt.from_double(numpy.double(abs(in0[i]))) # Value
                 # Note: We need to explicitly create a 'double' here,
                 # because in0[i] is an explicit 32-bit float here
    )
    self.d_low_ampl_state = True
elif abs(in0[i]) >= 0.01 and d_low_ampl_state:
    self.add_item_tag(0, # Port number
                 self.nitems_written(0) + i, # Offset
                 pmt.intern("amplitude_recovered"), # Key
                 pmt.PMT_T # Value
    )
    self.d_low_ampl_state = False; // Reset state

We can also create a tag data type tag_t and directly pass this along:

if (std::abs(in[i]) < 0.01 and not d_low_ampl_state) {
    tag_t tag;
    tag.offset = nitems_written(0) + i;
    tag.key = pmt::mp("amplitude_warning");
    tag.value = pmt::from_double(std::abs(in[i]));
    add_item_tag(0, tag);
    d_low_ampl_state = true;
}

Here's a flow graph that uses the tagged version of the demodulator. We input 20 valid QPSK symbols, then 10 zeros. Since the output of this block is always either 0, 1, 2 or 3, we normally have no way to see if the input was not clearly one of these values.

Demod_with_tags.grc.png

Here's the output. You can see we have tags on those values which were not from a valid QPSK symbol, but from something unreliable.

Demod_with_tags.png

Use case: FIR filters

Assume we have a block that is actually an FIR filter. We want to let GNU Radio handle the tag propagation. How do we configure the block?

Now, an FIR filter has one input and one output. So, it doesn't matter if we set the propagation policy to TPP_ALL_TO_ALL or TPP_ONE_TO_ONE, and we can leave it as the default. The items going in and those coming out are different, so how do we match input to output? Since we want to preserve the timing of the tag position, we need to use the filter's group delay as a delay for tags (which for this symmetric FIR filter is (N-1)/2, where N is the number of filter taps). Finally, we might be interpolating, decimating or both (say, for sample rate changes) and we need to tell the scheduler about this as well.