GNU Radio Flowgraph Embedded in Python Applications
This comprehensive tutorial demonstrates how to integrate GNU Radio flowgraphs into custom Python GUI applications, giving you full programmatic control over signal processing workflows. You'll learn to create professional applications that leverage GNU Radio's powerful signal processing capabilities with custom user interfaces.GNU Radio uses "flowgraphs" - visual representations of signal processing chains where blocks (filters, modulators, sources, sinks) are connected together. When you create a flowgraph in GNU Radio Companion (GRC), it generates Python code that can be seamlessly integrated into your applications.
Examples of applications built with GNU Radio are:
Basic Integration
Create Your Flow Graph
Start by building a simple FM receiver flowgraph.
You can either:
- Follow the RTL-SDR FM Receiver tutorial
- Download the example directly:
wget https://raw.githubusercontent.com/StudHamza/GNU-Radio-FM-App/main/src/fm_receiver/flowgraphs/fm_receiver.grc
So your flowgraph would look like this if followed the tutorial:
If you downloaded the GRC directly it would look a little different but it would still work. Either way the next step is to generate or run the flowgraph
Understand Python Generated Code
Running the GRC file will create a file.py in the same directory. While there are plenty of tutorials that explain in detail the generated files, in this tutorial we will further look on how to embed it to your pyQT python application.
class simple_fm_receiver(gr.top_block, Qt.QWidget):
def __init__(self):
gr.top_block.__init__(self, "Simple FM Receiver")
Qt.QWidget.__init__(self)
# Variables (can be modified at runtime)
self.freq = freq = 101.1e6 # Center frequency
self.samp_rate = samp_rate = 2048000 # Sample rate
# Blocks
self.rtlsdr_source = osmosdr.source(args="numchan=1")
self.low_pass_filter = filter.fir_filter_ccf(...)
self.analog_wfm_rcv = analog.wfm_rcv(...)
self.audio_sink = audio.sink(48000)
# Connections
self.connect((self.rtlsdr_source, 0), (self.low_pass_filter, 0))
# ... more connections
def closeEvent(self, event):
self.settings = Qt.QSettings("GNU Radio", "fm_receiver")
self.settings.setValue("geometry", self.saveGeometry())
self.stop()
self.wait()
event.accept()
# Getters and Setters
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
def get_freq(self):
return self.freq
def set_freq(self, freq):
self.freq = freq
self.qtgui_sink_x_0.set_frequency_range(self.freq, 2.048e6)
self.soapy_rtlsdr_source_0.set_frequency(0, self.freq)
def get_fft_size(self):
return self.fft_size
def set_fft_size(self, fft_size):
self.fft_size = fft_size
Now if you scroll down a little bit you'll find the main function at the very end of the file.
def main(top_block_cls=simple_fm_receiver, options=None):
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
try:
input('Press Enter to quit: ')
except EOFError:
pass
tb.stop()
tb.wait()
if __name__ == '__main__':
main()
The parameter top_block_cls=simple_fm_receiver
is your flow graph,
which inherits the gr.top_block
. The Key Methods of interest here are
Start the contained flowgraph. Creates one or more threads to execute the flow graph. Returns to the caller once the threads are created.
- Stop:
Stop the running flowgraph. Notifies each thread created by the scheduler to shutdown, then returns to caller.
- Wait:
Wait for a flowgraph to complete. Flowgraphs complete when either (1) all blocks indicate that they are done, or (2) after stop() has been called to request shutdown.
These three will allow us to pragmatically control the flowgraph.
Setting Up a PyQt5 Application to Control a GNU Radio Flowgraph
If you already have GRC installed, you have PyQt5 installed on your system; you don’t need to install it separately. Create a new python virtual environment make sure to include system packages.
Note: Make sure to include system packages so that your interpreter can find the GNU Radio library
Step 1: Create and Activate a Virtual Environment
Use a Python virtual environment with access to system packages so that GNU Radio can be found.
# Create environment
python -m venv .venv --system-site-packages
# Activate (Linux/Mac)
source .venv/bin/activate
# Activate (Windows)
.venv\Scripts\activate
Step 2: Create Your Application File
Create a new file called app.py
in the same directory as your GNU Radio flowgraph (e.g., fm_receiver.py
).
Insert the following code to create a simple PyQt5 application with a button that toggles the flowgraph on and off.
import sys
from PyQt5.QtWidgets import QApplication, QPushButton, QWidget, QVBoxLayout
# First Import your flowgraph
from fm_receiver import fm_receiver
class FM(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("PyQt5 Simple FM Example")
# Instantiate FM Receiver App
self.tb = fm_receiver()
self.setMinimumSize(700, 500)
# State variable
self.listening = False
# Layout
layout = QVBoxLayout()
# Button
self.button = QPushButton("Start Listening")
self.button.clicked.connect(self.toggle_listening) # connect click event
layout.addWidget(self.button)
self.setLayout(layout)
def toggle_listening(self):
"""Toggle listening state and update button text."""
self.listening = not self.listening
if self.listening:
self.button.setText("Stop Listening")
print("Listening started...")
self.tb.start()
else:
self.button.setText("Start Listening")
print("Listening stopped.")
self.tb.stop()
self.tb.wait()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = FM()
window.show()
sys.exit(app.exec_())
Step 3: Import Your Flowgraph
In your application, you must import your flowgraph and instantiate it in the __init__
method so that you can control it from anywhere in the application.
from simple_fm import simple_fm
# Instantiate FM Receiver App
self.tb = simple_fm()
Step 4: Toggle the Flowgraph
The button toggles the flowgraph, allowing the user to start or stop it on command.
def toggle_listening(self):
"""Toggle listening state and update button text."""
self.listening = not self.listening
if self.listening:
self.button.setText("Stop Listening")
print("Listening started...")
self.tb.start()
else:
self.button.setText("Start Listening")
print("Listening stopped.")
self.tb.stop()
self.tb.wait()
Advanced Features
This section covers more advanced controls that can make your Python app manage every single detail of your flowgraph. This explanation is specific for the FM Application but can be applied elsewhere.
The topics covered in this tutorial are:
- Variable control
- Using GRC GUI elements in your Python application
- Using parameters for your flowgraph
- Advanced control over blocks by disconnecting and reconnecting
Setting Variables
If you click the Listen button, there is a good chance you will just hear noise. In order to listen to an actual station, you need to tune to a specific center frequency.
Looking at the Python-generated file, you will find at the very bottom some getters and setters:
def set_freq(self, freq):
self.freq = freq
self.qtgui_freq_sink_x_0.set_frequency_range(self.freq, self.samp_rate)
self.qtgui_waterfall_sink_x_0.set_frequency_range(self.freq, self.samp_rate)
self.soapy_rtlsdr_source_0.set_frequency(0, self.freq)
To set frequency, tie this method to a GUI element in your application.
You can get as creative as you want with the widget, but for simplicity we will use a QSlider
.
Add a Slider widget along with a function to handle changing the frequency:
# Frequency Label
self.freq_label = QLabel("Frequency: 100 MHz")
layout.addWidget(self.freq_label)
# Frequency Slider
self.freq_slider = QSlider(Qt.Horizontal)
self.freq_slider.setMinimum(88000000) # 88 MHz
self.freq_slider.setMaximum(108000000) # 108 MHz
self.freq_slider.setValue(int(self.tb.get_freq()))
self.freq_slider.setTickInterval(int(1e6)) # 1 MHz step
self.freq_slider.setSingleStep(100000)
self.freq_slider.valueChanged.connect(self.change_frequency)
layout.addWidget(self.freq_slider)
def change_frequency(self, freq):
"""Update FM receiver frequency from slider."""
self.tb.set_freq(freq)
self.freq_label.setText(f"Frequency: {freq/1e6:.1f} MHz")
print(f"Frequency set to {freq/1e6:.1f} MHz")
We used the get_freq()
method to set the default slider value to the current receiver frequency,
and the set_freq()
method to update the SDR’s center frequency, letting you tune across stations.
Using GUI Blocks
To tune into stations, we can look for peaks in the frequency-domain view of our signal. GNU Radio’s QT Frequency Sink makes this easy by showing the signal spectrum in real time.
When you generate your flowgraph, check the Python file: GUI elements are added to the QT layout with addWidget()
.
The variables passed here (usually ending in _win
) are widget objects you can embed directly into your application.
self._qtgui_freq_sink_x_0_win = sip.wrapinstance(
self.qtgui_freq_sink_x_0.qwidget(), Qt.QWidget
)
self.top_layout.addWidget(self._qtgui_freq_sink_x_0_win)
Disconnecting and Connecting Blocks
Suppose you want to record a specific stream only when triggered by a button press. GNU Radio allows you to control your flowgraph programmatically from within your Python application. This means you can dynamically connect and disconnect blocks as needed.
You can approach this in two ways:
- Create the block directly in Python by checking the block’s constructor in the API reference.
- Recommended: Add the block in GRC, then generate the Python code.
By reviewing the code, you can see how the block is constructed and how connections are defined.
. Example of a WAV file sink from the generated flowgraph:
from gnuradio import audio
.
.
.
self.blocks_wavfile_sink_0 = blocks.wavfile_sink(
'filename',
1,
samp_rate,
blocks.FORMAT_WAV,
blocks.FORMAT_PCM_16,
False
)
##################################################
# Connections
##################################################
self.connect((self.analog_wfm_rcv_0, 0), (self.blocks_wavfile_sink_0, 0))
Adding Recording Control
First initialize the block:
from gnuradio import blocks
# Initialize wavfile sink (but don't connect yet)
self.recorder = blocks.wavfile_sink(
'recording.wav', # Output file
1, # Number of channels
int(48e3),
blocks.FORMAT_WAV,
blocks.FORMAT_PCM_16,
False
)
Add a record button:
# Record button
self.recording = False
self.record_button = QPushButton("Start Recording")
self.record_button.clicked.connect(self.toggle_recording)
layout.addWidget(self.record_button)
Then create a function to toggle recording:
def toggle_recording(self):
"""Toggle audio recording by connecting/disconnecting wavfile_sink."""
self.recording = not self.recording
if self.recording:
self.record_button.setText("Stop Recording")
print("Recording started...")
# Dynamically connect FM decoder to wavfile sink
self.tb.stop()
self.tb.wait()
self.tb.connect((self.tb.analog_wfm_rcv_0, 0), (self.recorder, 0))
self.tb.start()
else:
self.record_button.setText("Start Recording")
print("Recording stopped.")
# Disconnect wavfile sink
self.tb.stop()
self.tb.wait()
try:
self.tb.disconnect((self.tb.analog_wfm_rcv_0, 0), (self.recorder, 0))
except Exception as e:
print("Already disconnected:", e)
self.tb.start()
Full Application Code
import sys
from PyQt5.QtWidgets import QApplication, QPushButton, QWidget, QVBoxLayout, QSlider, QLabel
from PyQt5.QtCore import Qt
# Import your flowgraph
from fm_receiver import fm_receiver
from gnuradio import audio, blocks
class FM(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("PyQt5 FM Receiver with Recording")
# Instantiate FM Receiver App
self.tb = fm_receiver()
self.setMinimumSize(700, 500)
# State variables
self.listening = False
self.recording = False
# Initialize recording block (not yet connected)
self.recorder = blocks.wavfile_sink(
'recording.wav',
1,
int(48e3),
blocks.FORMAT_WAV,
blocks.FORMAT_PCM_16,
False
)
# Layout
layout = QVBoxLayout()
# Listening Button
self.listen_button = QPushButton("Start Listening")
self.listen_button.clicked.connect(self.toggle_listening)
layout.addWidget(self.listen_button)
# Recording Button
self.record_button = QPushButton("Start Recording")
self.record_button.clicked.connect(self.toggle_recording)
layout.addWidget(self.record_button)
# Frequency Label
self.freq_label = QLabel("Frequency: 100 MHz")
layout.addWidget(self.freq_label)
# Frequency Slider
self.freq_slider = QSlider(Qt.Horizontal)
self.freq_slider.setMinimum(88000000) # 88 MHz
self.freq_slider.setMaximum(108000000) # 108 MHz
self.freq_slider.setValue(int(self.tb.get_freq()))
self.freq_slider.setTickInterval(int(1e6)) # 1 MHz step
self.freq_slider.setSingleStep(100000)
self.freq_slider.valueChanged.connect(self.change_frequency)
layout.addWidget(self.freq_slider)
# Add the Frequency Sink widget from GNU Radio (Qt GUI block)
layout.addWidget(self.tb._qtgui_freq_sink_x_0_win)
self.setLayout(layout)
def toggle_listening(self):
"""Toggle listening state and update button text."""
self.listening = not self.listening
if self.listening:
self.listen_button.setText("Stop Listening")
print("Listening started...")
self.tb.start()
else:
self.listen_button.setText("Start Listening")
print("Listening stopped.")
self.tb.stop()
self.tb.wait()
def toggle_recording(self):
"""Toggle audio recording by connecting/disconnecting wavfile_sink."""
self.recording = not self.recording
if self.recording:
self.record_button.setText("Stop Recording")
print("Recording started...")
# Dynamically connect FM decoder to wavfile sink
self.tb.stop()
self.tb.wait()
self.tb.connect((self.tb.analog_wfm_rcv_0, 0), (self.recorder, 0))
self.tb.start()
else:
self.record_button.setText("Start Recording")
print("Recording stopped.")
# Disconnect wavfile sink
self.tb.stop()
self.tb.wait()
try:
self.tb.disconnect((self.tb.analog_wfm_rcv_0, 0), (self.recorder, 0))
except Exception as e:
print("Already disconnected:", e)
self.tb.start()
def change_frequency(self, freq):
"""Update FM receiver frequency from slider."""
self.tb.set_freq(freq)
self.freq_label.setText(f"Frequency: {freq/1e6:.1f} MHz")
print(f"Frequency set to {freq/1e6:.1f} MHz")
if __name__ == "__main__":
app = QApplication(sys.argv)
window = FM()
window.show()
sys.exit(app.exec_())
Now your application should look like this: