Python Block Tags
Beginner Tutorials
Introducing GNU Radio Flowgraph Fundamentals
Creating and Modifying Python Blocks DSP Blocks
SDR Hardware |
This tutorial demonstrates how to create two Embedded Python Blocks for detecting when the input signal crosses the threshold and writing a tag for it and then reading the tag in a separate block and updating the output with the time since the last detection.
The previous tutorial, Python Block Message Passing demonstrates how to send and receive messages using the Embedded Python Block. The next tutorial, Low Pass Filter Example, demonstrates how to use filtering blocks in GNU Radio.
Tags Overview
Tags are a way to convey information alongside digitized RF samples in a time-synchronous fashion. Tags are particularly useful when downstream blocks need to know upon which sample the receiver was tuned to a new frequency, or for including timestamps with specific samples.
Where messages convey information in an asynchronous fashion with no clock-based time guarantee, tags are information which are associated with specific RF samples. Tags ride alongside digitized RF samples in data streams and vectors, including Complex Float 32, Float 32, Byte and all of the other formats.
Tags are added using the line:
self.add_item_tag(outputPortNumber, absoluteIndex, key, value)
The outputPortNumber determines which output stream the tag is added to. The absoluteIndex is the sample index the tag is added to. The flowgraph counts each sample and the first sample produced is at absolute sample index 0. The key is a PMT type containing the name of the variable to be stored and value is another PMT type that contains the information to be stored.
Reading tags can be done with the function:
tagTuple = self.get_tags_in_window(inputPortNumber, relativeIndexStart, relativeIndexStop))
Reading tags in a window reads them based on the relative index within the current input_items vector. The simplest way to get all of the tags corresponding to the current input_items samples is with the function call:
tagTuple = self.get_tags_in_window(inputPortNumber, 0, len(input_items[inputPortNumber])))
More information about tags can be found here: Stream Tags
Creating Test Signal
A test signal is needed. Drag in the blocks for the input signal:
- GLFSR Source
- Repeat
- Multiply Const
- Add Const
- Single Pole IIR Filter
- Throttle
- QT GUI Time Sink
Change the following parameters:
- GLFSR Source, Degree: 32
- Repeat, Interpolation: 128
- Multiply Const, Constant: 0.5
- Add Const, Constant: 0.5
- Single Pole IIR Filter, Alpha: 0.05
- QT GUI Time Sink
- Number of Points: 2048
- Autoscale: Yes
- samp_rate Variable, Value: 3200
Change all of the blocks to be Float input and output. Connect them all according to the following flowgraph:
Run the flowgraph. A pseudo-randomized sequence of filtered 0s and 1s is generated:
Threshold Detector: Defining the Block
Drag in a Python Block and double-click it to edit the source code. Recall that Embedded Python Blocks use indentation in multiples of 4 spaces.
Change the example_param variable name and add a new parameter report_period:
def __init__(self, threshold=1.0, report_period=128):
Update the block name:
name='Threshold Detector',
Change the input and output types to Float:
in_sig=[np.float32], out_sig=[np.float32]
Change the variable name from self.example_param:
self.threshold = threshold self.report_period = report_period
Remove the multiplication by self.example_param:
output_items[0][:] = input_items[0]
The code looks like the following:
Save the code (CTRL + S) and return to GRC. The block looks like the following:
If the block did not update properly there may be a problem with the Python syntax. Double-click the Embedded Python Block to view any potential syntax errors. The following image gives an example of where the synax errors are located:
Add a Virtual Sink and Virtual Source block to the flowgraph. Change the following block properties:
- Threshold Detector
- Threshold: 0.75
- Report Period: 128
- Virtual Sink, Stream ID: signal
- Virtual Source, Stream ID: signal
- QT GUI Time Sink, name: "Threshold Detector"
Connect the blocks according to the flowgraph:
Threshold Detector: Writing Tags
The internals of the Threshold Detector need to be written. Recall that Embedded Python Blocks use indentation in multiples of 4 spaces.
Import the pmt library:
import pmt
Add two new variables, self.timer and self.readyForTag under the __init__() function:
self.timer = 0 self.readyForTag = True
The work() function needs to be modified. Create a for loop to iterate through all of the input samples:
for index in range(len(input_items[0])):
Three sections of code need to be written. The first block writes the amplitude level into a tag named detect once the threshold is met or exceeded. The tag is only written if the self.readyForTag state variable is True. Once a tag is written the state variable self.readyForTag is set to False.
# write the tag if (input_items[0][index] >= self.threshold and self.readyForTag == True): # define the key as 'detect' key = pmt.intern('detect') # get the detection value value = pmt.from_float(np.round(float(input_items[0][index]),2)) # tag index to be written writeIndex = self.nitems_written(0) + index # add the tag object (key, value pair) self.add_item_tag(0, writeIndex, key, value ) # tag has been written, set state self.readyForTag = False
The next block of code is used to run the timer. The timer increases by 1 for each input sample as long as self.readyForTag is False:
# increase the timer by 1 if (self.readyForTag == False): self.timer = self.timer + 1
The third block of code controls the state variable self.readyForTag. Once self.timer reaches the maximum value the timer is reset and the state variable self.readyForTag is set to True:
# set flag to write if (self.timer >= self.report_period): # reset timer self.timer = 0 # reset state once timer hits max value self.readyForTag = True
Run the flowgraph. Tags are displayed in the QT GUI Time Sink:
Detection Counter: Defining the Block
A new Embedded Python Block is created to read the tags, count the number of samples since the last tag, and produce that number as an output.
Drag and drop a new Python Block into the GRC workspace. Do not copy and paste the existing python block, it only creates a second copy of Threshold Detector.
Double-click on the Embedded Python Block and edit the code.
Remove the self.example_param parameter:
def __init__(self):
Change the name:
name='Detection Counter',
Make the input and output ports Floats:
in_sig=[np.float32], out_sig=[np.float32]
Remove the line self.example_param = example_param and the multiplication by self.example_param:
output_items[0][:] = input_items[0]
Save the code (CTRL + S). Add another QT GUI Time Sink and change the properties:
- QT GUI Time Sink
- Name: "Detection Counter"
- Number of Points: 2048
- Autoscale: Yes
Connect the Detection Counter block after the Threshold Detection.
Detection Counter: Reading Tags
The Detection Counter block needs to be modified to read the tags.
Import the pmt library:
import pmt
Add a new variable under __init__():
self.samplesSinceDetection = 0
Modify the work() function to read the tags:
# get all tags associated with input_items[0] tagTuple = self.get_tags_in_window(0, 0, len(input_items[0]))
Loop through all of the tags, calculate the relative offset of those with the key equal to detect and store it in a list:
# declare a list relativeOffsetList = [] # loop through all 'detect' tags and store their relative offset for tag in tagTuple: if (pmt.to_python(tag.key) == 'detect'): relativeOffsetList.append( tag.offset - self.nitems_read(0) )
Sort the offsets from lowest to highest:
# sort list of relative offsets relativeOffsetList.sort()
Loop through all of the output samples:
# loop through all output samples for index in range(len(output_items[0])):
Produce an output sample with the current count of samples since the last detect tag:
# output is now samples since detection counter output_items[0][index] = self.samplesSinceDetection
If the current output sample index is greater than or equal to the index of the current detect tag, then remove the offset value from the list and reset the sample counter self.samplesSinceDetection. Otherwise, increase the sample counter by 1.
# make sure the list is not-empty, and if the current input sample # is greater than or equal to the next if (len(relativeOffsetList) > 0 and index >= relativeOffsetList[0]): # clear the offset relativeOffsetList.pop(0) # reset the output counter self.samplesSinceDetection = 0 else: # a detect tag has not been seen, so continue to increase # the output counter self.samplesSinceDetection = self.samplesSinceDetection + 1
Remove the output assignment:
output_items[0][:] = input_items[0]
The work function looks like:
Save the code (CTRL + S). Run the flowgraph. The output looks like the following:
Notice that all of the tags from the input of Detection Counter are automatically conveyed to its output.
Tag Propagation
By default all input tags are propagated to all output tags. It can be useful to reduce or completely remove tags from certain streams. The Tag Gate block does exactly that. Connect the Tag Gate after the Detection Counter block:
Run the flowgraph. The tags are removed from the QT GUI Time Sink:
The next tutorial, Low Pass Filter Example, demonstrates how to use filtering blocks in GNU Radio.