Python Block Tags: Difference between revisions
Mattcarrick (talk | contribs) |
m (rephrased) |
||
(75 intermediate revisions by 4 users not shown) | |||
Line 1: | Line 1: | ||
{{ | <div style="float:right"> | ||
{{Template:BeginnerTutorials}} | |||
</div> | |||
This tutorial demonstrates how to create two ''Embedded Python Blocks'' for detecting when the input signal crosses the threshold and writing a tag for it and then reading the tag in a separate block and updating the output with the time since the last detection. | |||
The previous tutorial, [[Python_Block_Message_Passing|Python Block Message Passing]] demonstrates how to send and receive messages using the ''Embedded Python Block''. The next tutorial, [[Low_Pass_Filter_Example|Low Pass Filter Example]], demonstrates how to use filtering blocks in GNU Radio. | The previous tutorial, [[Python_Block_Message_Passing|Python Block Message Passing]] demonstrates how to send and receive messages using the ''Embedded Python Block''. The next tutorial, [[Low_Pass_Filter_Example|Low Pass Filter Example]], demonstrates how to use filtering blocks in GNU Radio. | ||
Line 17: | Line 11: | ||
Where [[Python_Block_Message_Passing|messages]] convey information in an asynchronous fashion with no clock-based time guarantee, tags are information which are associated with specific RF samples. Tags ride alongside digitized RF samples in data streams and vectors, including ''Complex Float 32'', ''Float 32'', ''Byte'' and all of the other formats. | Where [[Python_Block_Message_Passing|messages]] convey information in an asynchronous fashion with no clock-based time guarantee, tags are information which are associated with specific RF samples. Tags ride alongside digitized RF samples in data streams and vectors, including ''Complex Float 32'', ''Float 32'', ''Byte'' and all of the other formats. | ||
Tags are added using the line: | |||
<pre>self.add_item_tag(outputPortNumber, absoluteIndex, key, value)</pre> | |||
The ''outputPortNumber'' determines which output stream the tag is added to. The ''absoluteIndex'' is the sample index the tag is added to. The flowgraph counts each sample and the first sample produced is at absolute sample index ''0''. The ''key'' is a PMT type containing the name of the variable to be stored and ''value'' is another PMT type that contains the information to be stored. | |||
[[File:AddItemTag.png|700px]] | |||
Reading tags can be done with the function: | |||
<pre>tagTuple = self.get_tags_in_window(inputPortNumber, relativeIndexStart, relativeIndexStop))</pre> | |||
Reading tags in a ''window'' reads them based on the ''relative index'' within the current ''input_items'' vector. The simplest way to get all of the tags corresponding to the current ''input_items'' samples is with the function call: | |||
<pre>tagTuple = self.get_tags_in_window(inputPortNumber, 0, len(input_items[inputPortNumber])))</pre> | |||
[[File:GetTagsInWindow.png|900px]] | |||
More information about tags can be found here: [[Stream_Tags|Stream Tags]] | More information about tags can be found here: [[Stream_Tags|Stream Tags]] | ||
Line 22: | Line 32: | ||
== Creating Test Signal == | == Creating Test Signal == | ||
A test signal is needed. | A test signal is needed. Drag in the blocks for the input signal: | ||
* ''GLFSR Source'' | * ''GLFSR Source'' | ||
* ''Repeat'' | * ''Repeat'' | ||
Line 40: | Line 50: | ||
** Number of Points: 2048 | ** Number of Points: 2048 | ||
** Autoscale: Yes | ** Autoscale: Yes | ||
* ''samp_rate'' Variable, Value: 3200 | * ''samp_rate'' Variable, Value: 3200 | ||
Line 47: | Line 56: | ||
[[File:TestSignalFlowgraph.png|900px]] | [[File:TestSignalFlowgraph.png|900px]] | ||
Run the flowgraph. A randomized sequence of filtered ''0''s and ''1''s is generated: | Run the flowgraph. A pseudo-randomized sequence of filtered ''0''s and ''1''s is generated: | ||
[[File:TestSignalTimeSink.png|700px]] | [[File:TestSignalTimeSink.png|700px]] | ||
Line 53: | Line 62: | ||
== Threshold Detector: Defining the Block == | == Threshold Detector: Defining the Block == | ||
Drag in a ''Python Block'' and double-click it to edit the source code. | Drag in a ''Python Block'' and double-click it to edit the source code. Recall that ''Embedded Python Blocks'' use indentation in multiples of 4 spaces. | ||
Change the ''example_param'' variable name and add a new parameter ''report_period'': | Change the ''example_param'' variable name and add a new parameter ''report_period'': | ||
< | <pre>def __init__(self, threshold=1.0, report_period=128):</pre> | ||
Update the block name: | Update the block name: | ||
< | <pre>name='Threshold Detector',</pre> | ||
Change the input and output types to ''Float'': | Change the input and output types to ''Float'': | ||
< | <pre>in_sig=[np.float32], | ||
out_sig=[np.float32]</pre> | |||
Change the variable name from ''self.example_param'': | Change the variable name from ''self.example_param'': | ||
< | <pre>self.threshold = threshold | ||
self.report_period = report_period</pre> | |||
Remove the multiplication by ''self.example_param'': | Remove the multiplication by ''self.example_param'': | ||
< | <pre>output_items[0][:] = input_items[0]</pre> | ||
The code | The code looks like the following: | ||
[[File:DefineBlockThresholdDetector.png|800px]] | [[File:DefineBlockThresholdDetector.png|800px]] | ||
Save the code (CTRL + S) and return to GRC. The block | Save the code (CTRL + S) and return to GRC. The block looks like the following: | ||
[[File: | [[File:ThresholdDetectorBlockDefined.png|300px]] | ||
If the block did not update properly there may be a problem with the Python syntax. Double-click the ''Embedded Python Block'' to view any potential syntax errors. The following image gives an example of where the synax errors are located: | If the block did not update properly there may be a problem with the Python syntax. Double-click the ''Embedded Python Block'' to view any potential syntax errors. The following image gives an example of where the synax errors are located: | ||
Line 97: | Line 104: | ||
* ''Virtual Sink'', Stream ID: signal | * ''Virtual Sink'', Stream ID: signal | ||
* ''Virtual Source'', Stream ID: signal | * ''Virtual Source'', Stream ID: signal | ||
* ''QT GUI Time Sink'', name: "Threshold Detector" | |||
Connect the blocks according to the flowgraph: | Connect the blocks according to the flowgraph: | ||
Line 104: | Line 112: | ||
== Threshold Detector: Writing Tags == | == Threshold Detector: Writing Tags == | ||
The internals of the ''Threshold Detector'' | The internals of the ''Threshold Detector'' need to be written. Recall that ''Embedded Python Blocks'' use indentation in multiples of 4 spaces. | ||
Import the ''pmt'' library: | Import the ''pmt'' library: | ||
< | <pre>import pmt</pre> | ||
Add two new variables, ''self.timer'' and ''self.readyForTag'' under the ''__init__()'' function: | |||
<pre>self.timer = 0 | |||
self.readyForTag = True</pre> | |||
[[File:AddTwoVariables.png|700px]] | |||
The ''work()'' function needs to be modified. Create a for loop to iterate through all of the input samples: | |||
<pre>for index in range(len(input_items[0])):</pre> | |||
Three sections of code need to be written. The first block writes the amplitude level into a tag named ''detect'' once the threshold is met or exceeded. The tag is only written if the ''self.readyForTag'' state variable is True. Once a tag is written the state variable ''self.readyForTag'' is set to False. | |||
<pre># write the tag | |||
if (input_items[0][index] >= self.threshold and self.readyForTag == True): | |||
# define the key as 'detect' | |||
key = pmt.intern('detect') | |||
# get the detection value | |||
value = pmt.from_float(np.round(float(input_items[0][index]),2)) | |||
# tag index to be written | |||
writeIndex = self.nitems_written(0) + index | |||
# add the tag object (key, value pair) | |||
self.add_item_tag(0, writeIndex, key, value ) | |||
# tag has been written, set state | |||
self.readyForTag = False</pre> | |||
The next block of code is used to run the timer. The timer increases by 1 for each input sample as long as ''self.readyForTag'' is False: | |||
< | <pre># increase the timer by 1 | ||
if (self.readyForTag == False): | |||
self.timer = self.timer + 1</pre> | |||
< | The third block of code controls the state variable ''self.readyForTag''. Once ''self.timer'' reaches the maximum value the timer is reset and the state variable ''self.readyForTag'' is set to True: | ||
<pre># set flag to write | |||
if (self.timer >= self.report_period): | |||
# reset timer | |||
self.timer = 0 | |||
# reset state once timer hits max value | |||
self.readyForTag = True</pre> | |||
[[File:WriteDetectionTags.png|700px]] | |||
Run the flowgraph. Tags are displayed in the ''QT GUI Time Sink'': | |||
[[File:TimeSinkTags.png|700px]] | |||
== Detection Counter: Defining the Block == | == Detection Counter: Defining the Block == | ||
A new ''Embedded Python Block'' is created to read the tags, count the number of samples since the last tag, and produce that number as an output. | |||
Drag and drop a <u>new</u> ''Python Block'' into the GRC workspace. Do <u>not</u> copy and paste the existing python block, it only creates a second copy of ''Threshold Detector''. | |||
Double-click on the ''Embedded Python Block'' and edit the code. | |||
Remove the ''self.example_param'' parameter: | |||
<pre>def __init__(self):</pre> | |||
Change the name: | |||
<pre>name='Detection Counter',</pre> | |||
Make the input and output ports ''Floats'': | |||
<pre>in_sig=[np.float32], | |||
out_sig=[np.float32]</pre> | |||
Remove the line ''self.example_param = example_param'' and the multiplication by ''self.example_param'': | |||
<pre>output_items[0][:] = input_items[0]</pre> | |||
[[File:DefineBlockDetectionCounter.png|700px]] | |||
Save the code (CTRL + S). Add another ''QT GUI Time Sink'' and change the properties: | |||
* ''QT GUI Time Sink'' | |||
** Name: "Detection Counter" | |||
** Number of Points: 2048 | |||
** Autoscale: Yes | |||
Connect the ''Detection Counter'' block after the ''Threshold Detection''. | |||
[[File:AddedDetectionCounterToFlowgraph.png|900px]] | |||
== Detection Counter: Reading Tags == | == Detection Counter: Reading Tags == | ||
The ''Detection Counter'' block needs to be modified to read the tags. | |||
Import the ''pmt'' library: | |||
<pre>import pmt</pre> | |||
Add a new variable under ''__init__()'': | |||
<pre>self.samplesSinceDetection = 0</pre> | |||
[[File:AddVariableDetectionCounter.png|700px]] | |||
Modify the ''work()'' function to read the tags: | |||
<pre># get all tags associated with input_items[0] | |||
tagTuple = self.get_tags_in_window(0, 0, len(input_items[0]))</pre> | |||
Loop through all of the tags, calculate the relative offset of those with the key equal to ''detect'' and store it in a list: | |||
<pre># declare a list | |||
relativeOffsetList = [] | |||
# loop through all 'detect' tags and store their relative offset | |||
for tag in tagTuple: | |||
if (pmt.to_python(tag.key) == 'detect'): | |||
relativeOffsetList.append( tag.offset - self.nitems_read(0) )</pre> | |||
Sort the offsets from lowest to highest: | |||
<pre># sort list of relative offsets | |||
relativeOffsetList.sort()</pre> | |||
Loop through all of the output samples: | |||
<pre># loop through all output samples | |||
for index in range(len(output_items[0])):</pre> | |||
Produce an output sample with the current count of samples since the last ''detect'' tag: | |||
<pre># output is now samples since detection counter | |||
output_items[0][index] = self.samplesSinceDetection</pre> | |||
If the current output sample index is greater than or equal to the index of the current ''detect'' tag, then remove the offset value from the list and reset the sample counter ''self.samplesSinceDetection.'' Otherwise, increase the sample counter by 1. | |||
<pre># make sure the list is not-empty, and if the current input sample | |||
# is greater than or equal to the next | |||
if (len(relativeOffsetList) > 0 and index >= relativeOffsetList[0]): | |||
# clear the offset | |||
relativeOffsetList.pop(0) | |||
# reset the output counter | |||
self.samplesSinceDetection = 0 | |||
else: | |||
# a detect tag has not been seen, so continue to increase | |||
# the output counter | |||
self.samplesSinceDetection = self.samplesSinceDetection + 1</pre> | |||
Remove the output assignment: | |||
<pre>output_items[0][:] = input_items[0]</pre> | |||
The work function looks like: | |||
[[File:DetectionCounterWorkFunction.png|700px]] | |||
Save the code (CTRL + S). Run the flowgraph. The output looks like the following: | |||
[[File:DetectionCounterTimeSink.png|700px]] | |||
Notice that all of the tags from the input of ''Detection Counter'' are automatically conveyed to its output. | |||
== Tag Propagation == | |||
By default all input tags are propagated to all output tags. It can be useful to reduce or completely remove tags from certain streams. The ''Tag Gate'' block does exactly that. Connect the ''Tag Gate'' after the ''Detection Counter'' block: | |||
[[File:TagGateFlowgraph.png|900px]] | |||
Run the flowgraph. The tags are removed from the ''QT GUI Time Sink'': | |||
[[File:TagGateTimeSink.png|700px]] | |||
The next tutorial, [[Low_Pass_Filter_Example|Low Pass Filter Example]], demonstrates how to use filtering blocks in GNU Radio. | The next tutorial, [[Low_Pass_Filter_Example|Low Pass Filter Example]], demonstrates how to use filtering blocks in GNU Radio. |
Latest revision as of 16:30, 19 January 2023
Beginner Tutorials
Introducing GNU Radio Flowgraph Fundamentals
Creating and Modifying Python Blocks DSP Blocks
SDR Hardware |
This tutorial demonstrates how to create two Embedded Python Blocks for detecting when the input signal crosses the threshold and writing a tag for it and then reading the tag in a separate block and updating the output with the time since the last detection.
The previous tutorial, Python Block Message Passing demonstrates how to send and receive messages using the Embedded Python Block. The next tutorial, Low Pass Filter Example, demonstrates how to use filtering blocks in GNU Radio.
Tags Overview
Tags are a way to convey information alongside digitized RF samples in a time-synchronous fashion. Tags are particularly useful when downstream blocks need to know upon which sample the receiver was tuned to a new frequency, or for including timestamps with specific samples.
Where messages convey information in an asynchronous fashion with no clock-based time guarantee, tags are information which are associated with specific RF samples. Tags ride alongside digitized RF samples in data streams and vectors, including Complex Float 32, Float 32, Byte and all of the other formats.
Tags are added using the line:
self.add_item_tag(outputPortNumber, absoluteIndex, key, value)
The outputPortNumber determines which output stream the tag is added to. The absoluteIndex is the sample index the tag is added to. The flowgraph counts each sample and the first sample produced is at absolute sample index 0. The key is a PMT type containing the name of the variable to be stored and value is another PMT type that contains the information to be stored.
Reading tags can be done with the function:
tagTuple = self.get_tags_in_window(inputPortNumber, relativeIndexStart, relativeIndexStop))
Reading tags in a window reads them based on the relative index within the current input_items vector. The simplest way to get all of the tags corresponding to the current input_items samples is with the function call:
tagTuple = self.get_tags_in_window(inputPortNumber, 0, len(input_items[inputPortNumber])))
More information about tags can be found here: Stream Tags
Creating Test Signal
A test signal is needed. Drag in the blocks for the input signal:
- GLFSR Source
- Repeat
- Multiply Const
- Add Const
- Single Pole IIR Filter
- Throttle
- QT GUI Time Sink
Change the following parameters:
- GLFSR Source, Degree: 32
- Repeat, Interpolation: 128
- Multiply Const, Constant: 0.5
- Add Const, Constant: 0.5
- Single Pole IIR Filter, Alpha: 0.05
- QT GUI Time Sink
- Number of Points: 2048
- Autoscale: Yes
- samp_rate Variable, Value: 3200
Change all of the blocks to be Float input and output. Connect them all according to the following flowgraph:
Run the flowgraph. A pseudo-randomized sequence of filtered 0s and 1s is generated:
Threshold Detector: Defining the Block
Drag in a Python Block and double-click it to edit the source code. Recall that Embedded Python Blocks use indentation in multiples of 4 spaces.
Change the example_param variable name and add a new parameter report_period:
def __init__(self, threshold=1.0, report_period=128):
Update the block name:
name='Threshold Detector',
Change the input and output types to Float:
in_sig=[np.float32], out_sig=[np.float32]
Change the variable name from self.example_param:
self.threshold = threshold self.report_period = report_period
Remove the multiplication by self.example_param:
output_items[0][:] = input_items[0]
The code looks like the following:
Save the code (CTRL + S) and return to GRC. The block looks like the following:
If the block did not update properly there may be a problem with the Python syntax. Double-click the Embedded Python Block to view any potential syntax errors. The following image gives an example of where the synax errors are located:
Add a Virtual Sink and Virtual Source block to the flowgraph. Change the following block properties:
- Threshold Detector
- Threshold: 0.75
- Report Period: 128
- Virtual Sink, Stream ID: signal
- Virtual Source, Stream ID: signal
- QT GUI Time Sink, name: "Threshold Detector"
Connect the blocks according to the flowgraph:
Threshold Detector: Writing Tags
The internals of the Threshold Detector need to be written. Recall that Embedded Python Blocks use indentation in multiples of 4 spaces.
Import the pmt library:
import pmt
Add two new variables, self.timer and self.readyForTag under the __init__() function:
self.timer = 0 self.readyForTag = True
The work() function needs to be modified. Create a for loop to iterate through all of the input samples:
for index in range(len(input_items[0])):
Three sections of code need to be written. The first block writes the amplitude level into a tag named detect once the threshold is met or exceeded. The tag is only written if the self.readyForTag state variable is True. Once a tag is written the state variable self.readyForTag is set to False.
# write the tag if (input_items[0][index] >= self.threshold and self.readyForTag == True): # define the key as 'detect' key = pmt.intern('detect') # get the detection value value = pmt.from_float(np.round(float(input_items[0][index]),2)) # tag index to be written writeIndex = self.nitems_written(0) + index # add the tag object (key, value pair) self.add_item_tag(0, writeIndex, key, value ) # tag has been written, set state self.readyForTag = False
The next block of code is used to run the timer. The timer increases by 1 for each input sample as long as self.readyForTag is False:
# increase the timer by 1 if (self.readyForTag == False): self.timer = self.timer + 1
The third block of code controls the state variable self.readyForTag. Once self.timer reaches the maximum value the timer is reset and the state variable self.readyForTag is set to True:
# set flag to write if (self.timer >= self.report_period): # reset timer self.timer = 0 # reset state once timer hits max value self.readyForTag = True
Run the flowgraph. Tags are displayed in the QT GUI Time Sink:
Detection Counter: Defining the Block
A new Embedded Python Block is created to read the tags, count the number of samples since the last tag, and produce that number as an output.
Drag and drop a new Python Block into the GRC workspace. Do not copy and paste the existing python block, it only creates a second copy of Threshold Detector.
Double-click on the Embedded Python Block and edit the code.
Remove the self.example_param parameter:
def __init__(self):
Change the name:
name='Detection Counter',
Make the input and output ports Floats:
in_sig=[np.float32], out_sig=[np.float32]
Remove the line self.example_param = example_param and the multiplication by self.example_param:
output_items[0][:] = input_items[0]
Save the code (CTRL + S). Add another QT GUI Time Sink and change the properties:
- QT GUI Time Sink
- Name: "Detection Counter"
- Number of Points: 2048
- Autoscale: Yes
Connect the Detection Counter block after the Threshold Detection.
Detection Counter: Reading Tags
The Detection Counter block needs to be modified to read the tags.
Import the pmt library:
import pmt
Add a new variable under __init__():
self.samplesSinceDetection = 0
Modify the work() function to read the tags:
# get all tags associated with input_items[0] tagTuple = self.get_tags_in_window(0, 0, len(input_items[0]))
Loop through all of the tags, calculate the relative offset of those with the key equal to detect and store it in a list:
# declare a list relativeOffsetList = [] # loop through all 'detect' tags and store their relative offset for tag in tagTuple: if (pmt.to_python(tag.key) == 'detect'): relativeOffsetList.append( tag.offset - self.nitems_read(0) )
Sort the offsets from lowest to highest:
# sort list of relative offsets relativeOffsetList.sort()
Loop through all of the output samples:
# loop through all output samples for index in range(len(output_items[0])):
Produce an output sample with the current count of samples since the last detect tag:
# output is now samples since detection counter output_items[0][index] = self.samplesSinceDetection
If the current output sample index is greater than or equal to the index of the current detect tag, then remove the offset value from the list and reset the sample counter self.samplesSinceDetection. Otherwise, increase the sample counter by 1.
# make sure the list is not-empty, and if the current input sample # is greater than or equal to the next if (len(relativeOffsetList) > 0 and index >= relativeOffsetList[0]): # clear the offset relativeOffsetList.pop(0) # reset the output counter self.samplesSinceDetection = 0 else: # a detect tag has not been seen, so continue to increase # the output counter self.samplesSinceDetection = self.samplesSinceDetection + 1
Remove the output assignment:
output_items[0][:] = input_items[0]
The work function looks like:
Save the code (CTRL + S). Run the flowgraph. The output looks like the following:
Notice that all of the tags from the input of Detection Counter are automatically conveyed to its output.
Tag Propagation
By default all input tags are propagated to all output tags. It can be useful to reduce or completely remove tags from certain streams. The Tag Gate block does exactly that. Connect the Tag Gate after the Detection Counter block:
Run the flowgraph. The tags are removed from the QT GUI Time Sink:
The next tutorial, Low Pass Filter Example, demonstrates how to use filtering blocks in GNU Radio.