GNU Radio Flowgraph Embedded in Python Applications

From GNU Radio
Revision as of 13:02, 26 August 2025 by Studhamza (talk | contribs) (Tutorial)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

This comprehensive tutorial demonstrates how to integrate GNU Radio flowgraphs into custom Python GUI applications, giving you full programmatic control over signal processing workflows. You'll learn to create professional applications that leverage GNU Radio's powerful signal processing capabilities with custom user interfaces.
GNU Radio uses "flowgraphs" - visual representations of signal processing chains where blocks (filters, modulators, sources, sinks) are connected together. When you create a flowgraph in GNU Radio Companion (GRC), it generates Python code that can be seamlessly integrated into your applications.

Basic Integration

Create Your Flow Graph

Start by building a simple FM receiver flowgraph.

You can either:

wget https://raw.githubusercontent.com/StudHamza/GNU-Radio-FM-App/main/src/fm_receiver/flowgraphs/fm_receiver.grc

So your flowgraph would look like this if followed the tutorial:

RTL-SDR-FM-Receiver-with-volume-control.jpg

If you downloaded the GRC directly it would look a little different but it would still work. Either way the next step is to generate or run the flowgraph

Understand Python Generated Code

Running the GRC file will create a file.py in the same directory. While there are plenty of tutorials that explain in detail the generated files, in this tutorial we will further look on how to embed it to your pyQT python application.

class simple_fm_receiver(gr.top_block, Qt.QWidget):
    def __init__(self):
        gr.top_block.__init__(self, "Simple FM Receiver")
        Qt.QWidget.__init__(self)
        
        # Variables (can be modified at runtime)
        self.freq = freq = 101.1e6  # Center frequency
        self.samp_rate = samp_rate = 2048000  # Sample rate
        
        # Blocks
        self.rtlsdr_source = osmosdr.source(args="numchan=1")
        self.low_pass_filter = filter.fir_filter_ccf(...)
        self.analog_wfm_rcv = analog.wfm_rcv(...)
        self.audio_sink = audio.sink(48000)
        
        # Connections
        self.connect((self.rtlsdr_source, 0), (self.low_pass_filter, 0))
        # ... more connections
    
    def closeEvent(self, event):
        self.settings = Qt.QSettings("GNU Radio", "fm_receiver")
        self.settings.setValue("geometry", self.saveGeometry())
        self.stop()
        self.wait()

        event.accept()

    # Getters and Setters
    def get_samp_rate(self):
        return self.samp_rate

    def set_samp_rate(self, samp_rate):
        self.samp_rate = samp_rate

    def get_freq(self):
        return self.freq

    def set_freq(self, freq):
        self.freq = freq
        self.qtgui_sink_x_0.set_frequency_range(self.freq, 2.048e6)
        self.soapy_rtlsdr_source_0.set_frequency(0, self.freq)

    def get_fft_size(self):
        return self.fft_size

    def set_fft_size(self, fft_size):
        self.fft_size = fft_size

Now if you scroll down a little bit you'll find the main function at the very end of the file.

def main(top_block_cls=simple_fm_receiver, options=None):
    tb = top_block_cls()

    def sig_handler(sig=None, frame=None):
        tb.stop()
        tb.wait()

        sys.exit(0)

    signal.signal(signal.SIGINT, sig_handler)
    signal.signal(signal.SIGTERM, sig_handler)

    tb.start()

    try:
        input('Press Enter to quit: ')
    except EOFError:
        pass
    tb.stop()
    tb.wait()


if __name__ == '__main__':
    main()


The parameter top_block_cls=simple_fm_receiver is your flow graph, which inherits the gr.top_block. The Key Methods of interest here are

  1. Start:
 Start the contained flowgraph. Creates one or more threads to execute the 
 flow graph. Returns to the caller once the threads are created.
  1. Stop:
 Stop the running flowgraph. Notifies each thread created by the scheduler 
 to shutdown, then returns to caller.
  1. Wait:
 Wait for a flowgraph to complete. Flowgraphs complete when either 
 (1) all blocks indicate that they are done, or 
 (2) after stop() has been called to request shutdown.

These three will allow us to pragmatically control the flowgraph.

Setting Up a PyQt5 Application to Control a GNU Radio Flowgraph

If you already have GRC installed, you have PyQt5 installed on your system; you don’t need to install it separately. Create a new python virtual environment make sure to include system packages.

Note: Make sure to include system packages so that your interpreter can find the GNU Radio library

Step 1: Create and Activate a Virtual Environment

Use a Python virtual environment with access to system packages so that GNU Radio can be found.

# Create environment
python -m venv .venv --system-site-packages

# Activate (Linux/Mac)
source .venv/bin/activate

# Activate (Windows)
.venv\Scripts\activate

Step 2: Create Your Application File

Create a new file called app.py in the same directory as your GNU Radio flowgraph (e.g., fm_receiver.py).

Insert the following code to create a simple PyQt5 application with a button that toggles the flowgraph on and off.

import sys
from PyQt5.QtWidgets import QApplication, QPushButton, QWidget, QVBoxLayout
# First Import your flowgraph
from fm_receiver import fm_receiver

class FM(QWidget):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("PyQt5 Simple FM Example")

        # Instantiate FM Receiver App
        self.tb = fm_receiver()
        self.setMinimumSize(700, 500)

        # State variable
        self.listening = False 

        # Layout
        layout = QVBoxLayout()

        # Button
        self.button = QPushButton("Start Listening")
        self.button.clicked.connect(self.toggle_listening)  # connect click event
        layout.addWidget(self.button)

        self.setLayout(layout)

    def toggle_listening(self):
        """Toggle listening state and update button text."""
        self.listening = not self.listening

        if self.listening:
            self.button.setText("Stop Listening")
            print("Listening started...")
            self.tb.start()
        else:
            self.button.setText("Start Listening")
            print("Listening stopped.")
            self.tb.stop()
            self.tb.wait()


if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = FM()
    window.show()
    sys.exit(app.exec_())

Step 3: Import Your Flowgraph

In your application, you must import your flowgraph and instantiate it in the __init__ method so that you can control it from anywhere in the application.

from simple_fm import simple_fm

# Instantiate FM Receiver App
self.tb = simple_fm()

Step 4: Toggle the Flowgraph

The button toggles the flowgraph, allowing the user to start or stop it on command.

def toggle_listening(self):
    """Toggle listening state and update button text."""
    self.listening = not self.listening

    if self.listening:
        self.button.setText("Stop Listening")
        print("Listening started...")
        self.tb.start()
    else:
        self.button.setText("Start Listening")
        print("Listening stopped.")
        self.tb.stop()
        self.tb.wait()

Advanced Features

This section covers more advanced controls that can make your Python app manage every single detail of your flowgraph. This explanation is specific for the FM Application but can be applied elsewhere.

The topics covered in this tutorial are:

  1. Variable control
  2. Using GRC GUI elements in your Python application
  3. Using parameters for your flowgraph
  4. Advanced control over blocks by disconnecting and reconnecting

Setting Variables

If you click the Listen button, there is a good chance you will just hear noise. In order to listen to an actual station, you need to tune to a specific center frequency.

Looking at the Python-generated file, you will find at the very bottom some getters and setters:

def set_freq(self, freq):
    self.freq = freq
    self.qtgui_freq_sink_x_0.set_frequency_range(self.freq, self.samp_rate)
    self.qtgui_waterfall_sink_x_0.set_frequency_range(self.freq, self.samp_rate)
    self.soapy_rtlsdr_source_0.set_frequency(0, self.freq)

To set frequency, tie this method to a GUI element in your application. You can get as creative as you want with the widget, but for simplicity we will use a QSlider.

Add a Slider widget along with a function to handle changing the frequency:

# Frequency Label
self.freq_label = QLabel("Frequency: 100 MHz")
layout.addWidget(self.freq_label)

# Frequency Slider
self.freq_slider = QSlider(Qt.Horizontal)
self.freq_slider.setMinimum(88000000)   # 88 MHz
self.freq_slider.setMaximum(108000000)  # 108 MHz
self.freq_slider.setValue(int(self.tb.get_freq()))       
self.freq_slider.setTickInterval(int(1e6))  # 1 MHz step
self.freq_slider.setSingleStep(100000)
self.freq_slider.valueChanged.connect(self.change_frequency)
layout.addWidget(self.freq_slider)
def change_frequency(self, freq):
    """Update FM receiver frequency from slider."""
    self.tb.set_freq(freq)
    self.freq_label.setText(f"Frequency: {freq/1e6:.1f} MHz")
    print(f"Frequency set to {freq/1e6:.1f} MHz")

We used the get_freq() method to set the default slider value to the current receiver frequency, and the set_freq() method to update the SDR’s center frequency, letting you tune across stations.

Using GUI Blocks

To tune into stations, we can look for peaks in the frequency-domain view of our signal. GNU Radio’s QT Frequency Sink makes this easy by showing the signal spectrum in real time.

When you generate your flowgraph, check the Python file: GUI elements are added to the QT layout with addWidget(). The variables passed here (usually ending in _win) are widget objects you can embed directly into your application.

self._qtgui_freq_sink_x_0_win = sip.wrapinstance(
    self.qtgui_freq_sink_x_0.qwidget(), Qt.QWidget
)
self.top_layout.addWidget(self._qtgui_freq_sink_x_0_win)

Disconnecting and Connecting Blocks

Suppose you want to record a specific stream only when triggered by a button press. GNU Radio allows you to control your flowgraph programmatically from within your Python application. This means you can dynamically connect and disconnect blocks as needed.

You can approach this in two ways:

  1. Create the block directly in Python by checking the block’s constructor in the API reference.
  2. Recommended: Add the block in GRC, then generate the Python code.


By reviewing the code, you can see how the block is constructed and how connections are defined. . Example of a WAV file sink from the generated flowgraph:

from gnuradio import audio
.
.
.
self.blocks_wavfile_sink_0 = blocks.wavfile_sink(
    'filename',
    1,
    samp_rate,
    blocks.FORMAT_WAV,
    blocks.FORMAT_PCM_16,
    False
)

##################################################
# Connections
##################################################
self.connect((self.analog_wfm_rcv_0, 0), (self.blocks_wavfile_sink_0, 0))

Adding Recording Control

First initialize the block:

from gnuradio import blocks

# Initialize wavfile sink (but don't connect yet)
self.recorder = blocks.wavfile_sink(
    'recording.wav',        # Output file
    1,                      # Number of channels
    int(48e3), 
    blocks.FORMAT_WAV,
    blocks.FORMAT_PCM_16,
    False
)

Add a record button:

# Record button
self.recording = False
self.record_button = QPushButton("Start Recording")
self.record_button.clicked.connect(self.toggle_recording)
layout.addWidget(self.record_button)

Then create a function to toggle recording:

def toggle_recording(self):
    """Toggle audio recording by connecting/disconnecting wavfile_sink."""
    self.recording = not self.recording

    if self.recording:
        self.record_button.setText("Stop Recording")
        print("Recording started...")

        # Dynamically connect FM decoder to wavfile sink
        self.tb.stop()
        self.tb.wait()  
        self.tb.connect((self.tb.analog_wfm_rcv_0, 0), (self.recorder, 0))
        self.tb.start()
    else:
        self.record_button.setText("Start Recording")
        print("Recording stopped.")

        # Disconnect wavfile sink
        self.tb.stop()
        self.tb.wait()
        try:
            self.tb.disconnect((self.tb.analog_wfm_rcv_0, 0), (self.recorder, 0))
        except Exception as e:
            print("Already disconnected:", e)
        self.tb.start()

Full Application Code

import sys
from PyQt5.QtWidgets import QApplication, QPushButton, QWidget, QVBoxLayout, QSlider, QLabel
from PyQt5.QtCore import Qt

# Import your flowgraph
from fm_receiver import fm_receiver
from gnuradio import audio, blocks


class FM(QWidget):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("PyQt5 FM Receiver with Recording")

        # Instantiate FM Receiver App
        self.tb = fm_receiver()
        self.setMinimumSize(700, 500)

        # State variables
        self.listening = False
        self.recording = False

        # Initialize recording block (not yet connected)
        self.recorder = blocks.wavfile_sink(
            'recording.wav',
            1,
            int(48e3),
            blocks.FORMAT_WAV,
            blocks.FORMAT_PCM_16,
            False
        )

        # Layout
        layout = QVBoxLayout()

        # Listening Button
        self.listen_button = QPushButton("Start Listening")
        self.listen_button.clicked.connect(self.toggle_listening)
        layout.addWidget(self.listen_button)

        # Recording Button
        self.record_button = QPushButton("Start Recording")
        self.record_button.clicked.connect(self.toggle_recording)
        layout.addWidget(self.record_button)

        # Frequency Label
        self.freq_label = QLabel("Frequency: 100 MHz")
        layout.addWidget(self.freq_label)

        # Frequency Slider
        self.freq_slider = QSlider(Qt.Horizontal)
        self.freq_slider.setMinimum(88000000)   # 88 MHz
        self.freq_slider.setMaximum(108000000)  # 108 MHz
        self.freq_slider.setValue(int(self.tb.get_freq()))  
        self.freq_slider.setTickInterval(int(1e6))  # 1 MHz step
        self.freq_slider.setSingleStep(100000)
        self.freq_slider.valueChanged.connect(self.change_frequency)
        layout.addWidget(self.freq_slider)

        # Add the Frequency Sink widget from GNU Radio (Qt GUI block)
        layout.addWidget(self.tb._qtgui_freq_sink_x_0_win)

        self.setLayout(layout)

    def toggle_listening(self):
        """Toggle listening state and update button text."""
        self.listening = not self.listening

        if self.listening:
            self.listen_button.setText("Stop Listening")
            print("Listening started...")
            self.tb.start()
        else:
            self.listen_button.setText("Start Listening")
            print("Listening stopped.")
            self.tb.stop()
            self.tb.wait()

    def toggle_recording(self):
        """Toggle audio recording by connecting/disconnecting wavfile_sink."""
        self.recording = not self.recording

        if self.recording:
            self.record_button.setText("Stop Recording")
            print("Recording started...")

            # Dynamically connect FM decoder to wavfile sink
            self.tb.stop()
            self.tb.wait()
            self.tb.connect((self.tb.analog_wfm_rcv_0, 0), (self.recorder, 0))
            self.tb.start()
        else:
            self.record_button.setText("Start Recording")
            print("Recording stopped.")

            # Disconnect wavfile sink
            self.tb.stop()
            self.tb.wait()
            try:
                self.tb.disconnect((self.tb.analog_wfm_rcv_0, 0), (self.recorder, 0))
            except Exception as e:
                print("Already disconnected:", e)
            self.tb.start()

    def change_frequency(self, freq):
        """Update FM receiver frequency from slider."""
        self.tb.set_freq(freq)
        self.freq_label.setText(f"Frequency: {freq/1e6:.1f} MHz")
        print(f"Frequency set to {freq/1e6:.1f} MHz")

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = FM()
    window.show()
    sys.exit(app.exec_())

Now your application should look like this:

Python demo app.png