#!/usr/bin/env python3 """ DVB-T Digital TV Receiver using RTL-SDR and GNU Radio Receives DVB-T signals and outputs MPEG-TS stream to stdout. Usage: python3 dvbt_rx.py --freq 506000000 | mpv - python3 dvbt_rx.py --freq 506e6 --input-file recording.raw | vlc - """ import argparse import signal import sys import os from gnuradio import gr, blocks, fft, dtv import osmosdr # DVB-T parameter tables BANDWIDTH_MAP = { 8: (64.0 / 7e6, dtv.BANDWIDTH_8_0_MHZ), # 9142857.14 Hz 7: (1.0 / 8e6, dtv.BANDWIDTH_7_0_MHZ), # 8000000 Hz 6: (7.0 / 48e6, dtv.BANDWIDTH_6_0_MHZ), # 6857142.86 Hz } TRANSMISSION_MODE_MAP = { '2k': (dtv.T2k, 2048, 1705), '8k': (dtv.T8k, 8192, 6817), } CONSTELLATION_MAP = { 'qpsk': dtv.MOD_QPSK, '16qam': dtv.MOD_16QAM, '64qam': dtv.MOD_64QAM, } CODE_RATE_MAP = { '1/2': dtv.C1_2, '2/3': dtv.C2_3, '3/4': dtv.C3_4, '5/6': dtv.C5_6, '7/8': dtv.C7_8, } GUARD_INTERVAL_MAP = { '1/4': (dtv.GI_1_4, 4), '1/8': (dtv.GI_1_8, 8), '1/16': (dtv.GI_1_16, 16), '1/32': (dtv.GI_1_32, 32), } def parse_freq(s): """Parse frequency string, supporting scientific notation like 506e6.""" return int(float(s)) class DVBTReceiver(gr.top_block): def __init__(self, args): gr.top_block.__init__(self, "DVB-T Receiver") # Resolve parameters tx_mode_enum, fft_length, active_carriers = TRANSMISSION_MODE_MAP[args.transmission_mode] constellation = CONSTELLATION_MAP[args.constellation] code_rate = CODE_RATE_MAP[args.code_rate] gi_enum, gi_divisor = GUARD_INTERVAL_MAP[args.guard_interval] cp_length = fft_length // gi_divisor # Sample rate from bandwidth # DVB-T sample rate = bandwidth * 8/7 (for 8 MHz: 64/7 MHz) bw_mhz = args.bandwidth if bw_mhz == 8: samp_rate = 64e6 / 7.0 # 9142857.14 Hz elif bw_mhz == 7: samp_rate = 8e6 else: samp_rate = 48e6 / 7.0 # 6857142.86 Hz print(f"--- DVB-T Receiver ---", file=sys.stderr) print(f"Frequency: {args.freq / 1e6:.3f} MHz", file=sys.stderr) print(f"Bandwidth: {bw_mhz} MHz", file=sys.stderr) print(f"Sample rate: {samp_rate:.0f} Hz", file=sys.stderr) print(f"Mode: {args.transmission_mode} (FFT={fft_length})", file=sys.stderr) print(f"Constellation: {args.constellation}", file=sys.stderr) print(f"Code rate: {args.code_rate}", file=sys.stderr) print(f"Guard interval:{args.guard_interval} (CP={cp_length})", file=sys.stderr) print(f"", file=sys.stderr) # --- Source --- if args.input_file: print(f"Source: file '{args.input_file}'", file=sys.stderr) self.source = blocks.file_source( gr.sizeof_gr_complex, args.input_file, repeat=False ) else: print(f"Source: RTL-SDR (live)", file=sys.stderr) self.source = osmosdr.source(args="numchan=1") self.source.set_sample_rate(samp_rate) self.source.set_center_freq(args.freq) self.source.set_freq_corr(args.ppm) self.source.set_gain_mode(True) self.source.set_gain(args.gain) self.source.set_if_gain(args.if_gain) self.source.set_bb_gain(0) actual_rate = self.source.get_sample_rate() if abs(actual_rate - samp_rate) > 1000: print(f"WARNING: Requested sample rate {samp_rate:.0f} Hz but got {actual_rate:.0f} Hz", file=sys.stderr) print(f"WARNING: RTL-SDR max is ~2.4 Msps. DVB-T {bw_mhz}MHz needs {samp_rate:.0f} Hz.", file=sys.stderr) print(f"WARNING: Signal decoding will likely fail. Use --input-file for testing.", file=sys.stderr) # --- OFDM Demodulation Chain --- # Stream -> Vector for OFDM processing self.s2v_ofdm = blocks.stream_to_vector(gr.sizeof_gr_complex, fft_length) # OFDM symbol acquisition (timing sync, CP removal) self.ofdm_acq = dtv.dvbt_ofdm_sym_acquisition( blocks=1, fft_length=fft_length, occupied_tones=active_carriers, cp_length=cp_length, snr=100.0 ) # FFT: time domain -> frequency domain self.fft = fft.fft_vcc( fft_size=fft_length, forward=True, window=[], shift=False ) # Channel estimation and equalization using reference signals self.demod_ref = dtv.dvbt_demod_reference_signals( itemsize=gr.sizeof_gr_complex, ninput=fft_length, noutput=active_carriers, constellation=constellation, hierarchy=dtv.NH, code_rate_HP=code_rate, code_rate_LP=dtv.C1_2, guard_interval=gi_enum, transmission_mode=tx_mode_enum, include_cell_id=0, cell_id=0 ) # --- Demapping and Decoding Chain --- # Vector -> Stream for demapper self.v2s_demod = blocks.vector_to_stream(gr.sizeof_gr_complex, active_carriers) # Constellation demapping (soft decision) self.demap = dtv.dvbt_demap( nsize=active_carriers, constellation=constellation, hierarchy=dtv.NH, transmission=tx_mode_enum, gain=1.0 ) # Bit-level inner deinterleaver self.bit_deintlv = dtv.dvbt_bit_inner_deinterleaver( nsize=active_carriers, constellation=constellation, hierarchy=dtv.NH, transmission=tx_mode_enum ) # Viterbi decoder (inner FEC) self.viterbi = dtv.dvbt_viterbi_decoder( constellation=constellation, hierarchy=dtv.NH, coderate=code_rate, bsize=active_carriers ) # Byte-level convolutional deinterleaver self.conv_deintlv = dtv.dvbt_convolutional_deinterleaver( nsize=136, I=12, M=17 ) # Reed-Solomon decoder (outer FEC) - RS(204,188,t=8) shortened from RS(255,239) self.rs_dec = dtv.dvbt_reed_solomon_dec( p=2, m=8, gfpoly=0x11d, n=255, k=239, t=8, s=51, blocks=8 ) # Energy descrambler - restores MPEG-TS sync bytes self.descramble = dtv.dvbt_energy_descramble(nblocks=8) # --- Output: MPEG-TS to stdout --- self.sink = blocks.file_descriptor_sink(gr.sizeof_char, 1) # fd=1 = stdout # --- Wire the flowgraph --- self.connect( self.source, self.s2v_ofdm, self.ofdm_acq, self.fft, self.demod_ref, self.v2s_demod, self.demap, self.bit_deintlv, self.viterbi, self.conv_deintlv, self.rs_dec, self.descramble, self.sink ) print("Flowgraph built. Starting...", file=sys.stderr) def main(): parser = argparse.ArgumentParser( description='DVB-T Digital TV Receiver', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: Live reception (requires SDR with sufficient sample rate): python3 dvbt_rx.py --freq 506000000 | mpv - From recorded IQ file: python3 dvbt_rx.py --freq 506e6 --input-file recording.cf32 | vlc - Record IQ samples first (if you have a capable SDR): rtl_sdr -f 506000000 -s 9142857 -g 40 recording.raw """ ) parser.add_argument('--freq', type=parse_freq, required=True, help='Center frequency in Hz (e.g., 506000000 or 506e6)') parser.add_argument('--bandwidth', type=int, default=8, choices=[6, 7, 8], help='Channel bandwidth in MHz (default: 8)') parser.add_argument('--transmission-mode', default='8k', choices=['2k', '8k'], help='Transmission mode (default: 8k)') parser.add_argument('--constellation', default='64qam', choices=['qpsk', '16qam', '64qam'], help='Constellation (default: 64qam)') parser.add_argument('--code-rate', default='2/3', choices=['1/2', '2/3', '3/4', '5/6', '7/8'], help='Code rate (default: 2/3)') parser.add_argument('--guard-interval', default='1/32', choices=['1/4', '1/8', '1/16', '1/32'], help='Guard interval (default: 1/32)') parser.add_argument('--gain', type=float, default=40.0, help='RF gain in dB (default: 40)') parser.add_argument('--if-gain', type=float, default=40.0, help='IF gain in dB (default: 40)') parser.add_argument('--ppm', type=float, default=0.0, help='Frequency correction in PPM (default: 0)') parser.add_argument('--input-file', type=str, default=None, help='Read IQ from file instead of RTL-SDR (complex float32)') args = parser.parse_args() # Make stdout binary for MPEG-TS output if hasattr(sys.stdout, 'buffer'): sys.stdout = sys.stdout.buffer tb = DVBTReceiver(args) def signal_handler(sig, frame): print("\nStopping...", file=sys.stderr) tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGPIPE, signal.SIG_DFL) tb.start() try: tb.wait() except KeyboardInterrupt: tb.stop() tb.wait() if __name__ == '__main__': main()