#!/usr/bin/env python3 """ SDR TV & RF Analysis Tool Multi-mode SDR application for RTL-SDR V4: - DVB-T digital TV reception (MPEG-TS output) - Analog TV reception (PAL/NTSC via FM demod) - Spectrum analyzer (terminal-based power display) - Wireless camera scanner (common security camera frequencies) Designed for authorized RF security testing and research. Usage: python3 sdr_tv.py dvbt --freq 506e6 | mpv - python3 sdr_tv.py analog --freq 175.25e6 --standard pal | mpv --demuxer=rawvideo --rawvideo=w=768:h=576:format=gray - python3 sdr_tv.py spectrum --freq 500e6 --span 20e6 python3 sdr_tv.py camera --preset 1.2ghz """ import argparse import signal import sys import time import struct import threading import numpy as np from gnuradio import gr, blocks, fft, dtv, analog, filter import osmosdr # ============================================================================= # Constants & Presets # ============================================================================= # DVB-T parameters TRANSMISSION_MODE_MAP = { '2k': (dtv.T2k, 2048, 1705), '8k': (dtv.T8k, 8192, 6817), } CONSTELLATION_MAP = { 'qpsk': dtv.MOD_QPSK, '16qam': dtv.MOD_16QAM, '64qam': dtv.MOD_64QAM, } CODE_RATE_MAP = { '1/2': dtv.C1_2, '2/3': dtv.C2_3, '3/4': dtv.C3_4, '5/6': dtv.C5_6, '7/8': dtv.C7_8, } GUARD_INTERVAL_MAP = { '1/4': (dtv.GI_1_4, 4), '1/8': (dtv.GI_1_8, 8), '1/16': (dtv.GI_1_16, 16), '1/32': (dtv.GI_1_32, 32), } # Wireless camera frequency presets for authorized security testing # These are commonly documented frequencies used by analog wireless cameras CAMERA_PRESETS = { '900mhz': { 'name': '900 MHz ISM Band', 'channels': [ (910.0e6, '910 MHz Ch1'), (920.0e6, '920 MHz Ch2'), (930.0e6, '930 MHz Ch3'), (940.0e6, '940 MHz Ch4'), ], 'bandwidth': 6.0e6, 'modulation': 'fm', }, '1.2ghz': { 'name': '1.2 GHz Band', 'channels': [ (1080.0e6, '1080 MHz Ch1'), (1120.0e6, '1120 MHz Ch2'), (1160.0e6, '1160 MHz Ch3'), (1200.0e6, '1200 MHz Ch4'), (1240.0e6, '1240 MHz Ch5'), (1280.0e6, '1280 MHz Ch6'), (1320.0e6, '1320 MHz Ch7'), ], 'bandwidth': 10.0e6, 'modulation': 'fm', }, '2.4ghz': { 'name': '2.4 GHz ISM Band', 'channels': [ (2411.0e6, '2411 MHz Ch1'), (2431.0e6, '2431 MHz Ch2'), (2451.0e6, '2451 MHz Ch3'), (2473.0e6, '2473 MHz Ch4'), ], 'bandwidth': 10.0e6, 'modulation': 'fm', }, 'fpv': { 'name': 'FPV/Drone Video (5.8 GHz — RTL-SDR cannot tune here)', 'channels': [ (5740.0e6, '5740 MHz R1'), (5760.0e6, '5760 MHz R2'), (5780.0e6, '5780 MHz R3'), (5800.0e6, '5800 MHz R4'), (5820.0e6, '5820 MHz R5'), (5840.0e6, '5840 MHz R6'), (5860.0e6, '5860 MHz R7'), (5880.0e6, '5880 MHz R8'), ], 'bandwidth': 20.0e6, 'modulation': 'fm', }, } def parse_freq(s): """Parse frequency string, supporting scientific notation like 506e6.""" return int(float(s)) def log(msg): """Print to stderr so stdout stays clean for data output.""" print(msg, file=sys.stderr) # ============================================================================= # RTL-SDR Source Helper # ============================================================================= def create_rtl_source(freq, samp_rate, gain=40.0, if_gain=40.0, ppm=0.0): """Create and configure an osmosdr RTL-SDR source.""" source = osmosdr.source(args="numchan=1") source.set_sample_rate(samp_rate) source.set_center_freq(freq) source.set_freq_corr(ppm) source.set_gain_mode(False) source.set_gain(gain) source.set_if_gain(if_gain) source.set_bb_gain(0) return source # ============================================================================= # Mode 1: DVB-T Digital TV Receiver # ============================================================================= class DVBTReceiver(gr.top_block): def __init__(self, args): gr.top_block.__init__(self, "DVB-T Receiver") tx_mode_enum, fft_length, active_carriers = TRANSMISSION_MODE_MAP[args.transmission_mode] constellation = CONSTELLATION_MAP[args.constellation] code_rate = CODE_RATE_MAP[args.code_rate] gi_enum, gi_divisor = GUARD_INTERVAL_MAP[args.guard_interval] cp_length = fft_length // gi_divisor if args.bandwidth == 8: samp_rate = 64e6 / 7.0 elif args.bandwidth == 7: samp_rate = 8e6 else: samp_rate = 48e6 / 7.0 log(f"=== DVB-T Digital Receiver ===") log(f"Frequency: {args.freq / 1e6:.3f} MHz") log(f"Bandwidth: {args.bandwidth} MHz") log(f"Sample rate: {samp_rate:.0f} Hz") log(f"Mode: {args.transmission_mode} (FFT={fft_length})") log(f"Constellation: {args.constellation}") log(f"Code rate: {args.code_rate}") log(f"Guard interval: {args.guard_interval} (CP={cp_length})") # Source if args.input_file: log(f"Source: file '{args.input_file}'") self.source = blocks.file_source(gr.sizeof_gr_complex, args.input_file, repeat=False) else: log(f"Source: RTL-SDR (live)") self.source = create_rtl_source(args.freq, samp_rate, args.gain, args.if_gain, args.ppm) actual = self.source.get_sample_rate() if abs(actual - samp_rate) > 1000: log(f"WARNING: Got {actual:.0f} Hz instead of {samp_rate:.0f} Hz") log(f"WARNING: RTL-SDR can't sample fast enough for DVB-T.") log(f"WARNING: Use --input-file with a pre-recorded IQ file.") # OFDM demodulation chain self.s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, fft_length) self.ofdm_acq = dtv.dvbt_ofdm_sym_acquisition( blocks=1, fft_length=fft_length, occupied_tones=active_carriers, cp_length=cp_length, snr=100.0 ) self.fft_block = fft.fft_vcc(fft_length, True, [], False) self.demod_ref = dtv.dvbt_demod_reference_signals( gr.sizeof_gr_complex, fft_length, active_carriers, constellation, dtv.NH, code_rate, dtv.C1_2, gi_enum, tx_mode_enum, 0, 0 ) self.v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, active_carriers) self.demap = dtv.dvbt_demap(active_carriers, constellation, dtv.NH, tx_mode_enum, 1.0) self.bit_deintlv = dtv.dvbt_bit_inner_deinterleaver( active_carriers, constellation, dtv.NH, tx_mode_enum ) self.viterbi = dtv.dvbt_viterbi_decoder(constellation, dtv.NH, code_rate, active_carriers) self.conv_deintlv = dtv.dvbt_convolutional_deinterleaver(136, 12, 17) self.rs_dec = dtv.dvbt_reed_solomon_dec(2, 8, 0x11d, 255, 239, 8, 51, 8) self.descramble = dtv.dvbt_energy_descramble(8) self.sink = blocks.file_descriptor_sink(gr.sizeof_char, 1) # Wire it up self.connect( self.source, self.s2v, self.ofdm_acq, self.fft_block, self.demod_ref, self.v2s, self.demap, self.bit_deintlv, self.viterbi, self.conv_deintlv, self.rs_dec, self.descramble, self.sink ) log("Flowgraph ready. MPEG-TS output on stdout.") # ============================================================================= # Mode 2: Analog TV Receiver (PAL/NTSC via Wideband FM Demod) # ============================================================================= class AnalogTVReceiver(gr.top_block): """ Analog TV receiver using wideband FM demodulation. Outputs raw baseband video as unsigned 8-bit samples to stdout. Analog TV transmits composite video via AM on the picture carrier, with FM audio offset +4.5 MHz (NTSC) or +5.5 MHz (PAL). This receiver demodulates the composite video baseband signal. """ def __init__(self, args): gr.top_block.__init__(self, "Analog TV Receiver") samp_rate = 2.4e6 # RTL-SDR max stable rate audio_rate = 48000 if args.standard == 'pal': fm_deviation = 5.5e6 video_bw = 5.0e6 else: # ntsc fm_deviation = 4.5e6 video_bw = 4.2e6 log(f"=== Analog TV Receiver ({args.standard.upper()}) ===") log(f"Frequency: {args.freq / 1e6:.3f} MHz") log(f"Sample rate: {samp_rate:.0f} Hz") log(f"Standard: {args.standard.upper()}") # Source if args.input_file: log(f"Source: file '{args.input_file}'") self.source = blocks.file_source(gr.sizeof_gr_complex, args.input_file, repeat=False) else: log(f"Source: RTL-SDR (live)") self.source = create_rtl_source(args.freq, samp_rate, args.gain, args.if_gain, args.ppm) # FM demodulator for composite video self.fm_demod = analog.wfm_rcv( quad_rate=samp_rate, audio_decimation=1, ) # Low-pass filter to isolate video baseband video_taps = filter.firdes.low_pass( 1.0, samp_rate, min(video_bw, samp_rate / 2 - 100e3), 100e3 ) self.video_lpf = filter.fir_filter_fff(1, video_taps) # Scale float [-1,1] to unsigned byte [0,255] for raw video output self.scale = blocks.multiply_const_ff(127.0) self.offset = blocks.add_const_ff(128.0) self.f2c = blocks.float_to_uchar() # Output to stdout self.sink = blocks.file_descriptor_sink(gr.sizeof_char, 1) self.connect( self.source, self.fm_demod, self.video_lpf, self.scale, self.offset, self.f2c, self.sink ) log("Flowgraph ready. Raw video on stdout.") log(f"Play with: python3 sdr_tv.py analog ... | mpv --demuxer=rawvideo --rawvideo=w=720:h=576:format=gray -") # ============================================================================= # Mode 3: Spectrum Analyzer (Terminal-based) # ============================================================================= class SpectrumAnalyzer(gr.top_block): """ Real-time terminal spectrum analyzer. Captures IQ, computes FFT, and displays power spectrum in the terminal. """ def __init__(self, args): gr.top_block.__init__(self, "Spectrum Analyzer") self.fft_size = args.fft_size self.samp_rate = min(args.span, 2.4e6) # RTL-SDR limited self.center_freq = args.freq self.args = args log(f"=== Spectrum Analyzer ===") log(f"Center: {args.freq / 1e6:.3f} MHz") log(f"Span: {self.samp_rate / 1e6:.1f} MHz") log(f"FFT size: {self.fft_size}") log(f"Bin width: {self.samp_rate / self.fft_size:.0f} Hz") # Source if args.input_file: self.source = blocks.file_source(gr.sizeof_gr_complex, args.input_file, repeat=True) else: self.source = create_rtl_source(args.freq, self.samp_rate, args.gain, args.if_gain, args.ppm) # FFT -> magnitude squared -> log self.s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, self.fft_size) self.fft_block = fft.fft_vcc(self.fft_size, True, fft.window.blackmanharris(self.fft_size), True) self.mag = blocks.complex_to_mag_squared(self.fft_size) # Probe to read FFT data from Python self.probe = blocks.probe_signal_vf(self.fft_size) self.connect(self.source, self.s2v, self.fft_block, self.mag, self.probe) def display_loop(self): """Terminal display loop — runs in main thread.""" cols = 80 try: cols = os.get_terminal_size().columns except Exception: pass log("") while True: try: data = self.probe.level() if len(data) == 0: time.sleep(0.1) continue power = np.array(data) # Convert to dB, avoiding log(0) power_db = 10.0 * np.log10(np.maximum(power, 1e-12)) # Bin down to terminal width num_bins = min(cols - 12, self.fft_size) if num_bins < self.fft_size: binned = np.mean(power_db.reshape(num_bins, -1), axis=1) else: binned = power_db[:num_bins] db_min = self.args.db_min db_max = self.args.db_max height = 20 # Clear screen and draw sys.stderr.write('\033[2J\033[H') freq_start = (self.center_freq - self.samp_rate / 2) / 1e6 freq_end = (self.center_freq + self.samp_rate / 2) / 1e6 sys.stderr.write(f' Spectrum: {freq_start:.2f} - {freq_end:.2f} MHz ' f'(center {self.center_freq/1e6:.3f} MHz)\n') sys.stderr.write(f' Range: {db_min} to {db_max} dB\n\n') # Draw waterfall-style rows for row in range(height - 1, -1, -1): db_level = db_min + (db_max - db_min) * row / height label = f'{db_level:6.0f}|' line = [] for b in range(num_bins): if binned[b] >= db_level: intensity = min(1.0, (binned[b] - db_level) / ((db_max - db_min) / height)) if intensity > 0.7: line.append('\033[91m\u2588\033[0m') # red = strong elif intensity > 0.3: line.append('\033[93m\u2593\033[0m') # yellow = medium else: line.append('\033[92m\u2591\033[0m') # green = weak else: line.append(' ') sys.stderr.write(label + ''.join(line) + '\n') # Frequency axis sys.stderr.write(' ' * 7 + '\u2514' + '\u2500' * num_bins + '\n') axis = ' ' * 7 axis += f'{freq_start:.1f}' mid_pos = num_bins // 2 - 4 axis += ' ' * (mid_pos - len(f'{freq_start:.1f}')) axis += f'{self.center_freq/1e6:.1f}' end_pos = num_bins - len(f'{freq_end:.1f}') - mid_pos axis += ' ' * max(1, end_pos) axis += f'{freq_end:.1f}' sys.stderr.write(axis + ' MHz\n') # Peak info peak_bin = np.argmax(binned) peak_freq = freq_start + (freq_end - freq_start) * peak_bin / num_bins sys.stderr.write(f'\n Peak: {binned[peak_bin]:.1f} dB @ {peak_freq:.3f} MHz\n') sys.stderr.write(f' [Ctrl+C to quit]\n') time.sleep(0.1) except (ValueError, RuntimeError): time.sleep(0.2) except KeyboardInterrupt: break # ============================================================================= # Mode 4: Wireless Camera Scanner # ============================================================================= class CameraScanner(gr.top_block): """ Scans known wireless camera frequencies and reports signal strength. For authorized security testing only. """ def __init__(self, args): gr.top_block.__init__(self, "Camera Scanner") self.samp_rate = 2.4e6 self.fft_size = 1024 self.args = args # Start with a dummy frequency, we'll retune self.source = create_rtl_source(100e6, self.samp_rate, args.gain, args.if_gain, args.ppm) self.s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, self.fft_size) self.fft_block = fft.fft_vcc(self.fft_size, True, fft.window.blackmanharris(self.fft_size), True) self.mag = blocks.complex_to_mag_squared(self.fft_size) self.probe = blocks.probe_signal_vf(self.fft_size) self.connect(self.source, self.s2v, self.fft_block, self.mag, self.probe) def measure_power(self, freq, dwell=0.5): """Tune to frequency and measure average power.""" self.source.set_center_freq(freq) time.sleep(dwell) try: data = np.array(self.probe.level()) if len(data) == 0: return -100.0 power = 10.0 * np.log10(np.maximum(np.mean(data), 1e-12)) return power except (ValueError, RuntimeError): return -100.0 def scan_loop(self): """Scan all camera presets and display results.""" presets = self.args.preset if presets == 'all': scan_presets = list(CAMERA_PRESETS.keys()) else: scan_presets = [presets] log("\n=== Wireless Camera Scanner ===") log("For authorized security testing only.\n") # RTL-SDR V4 tunes roughly 24 MHz - 1766 MHz max_freq = 1766e6 while True: try: sys.stderr.write('\033[2J\033[H') sys.stderr.write('=== Wireless Camera Scanner ===\n') sys.stderr.write('Authorized security testing only.\n\n') for preset_name in scan_presets: preset = CAMERA_PRESETS[preset_name] sys.stderr.write(f'--- {preset["name"]} ---\n') for freq, label in preset['channels']: if freq > max_freq: sys.stderr.write(f' {label:25s} [OUT OF RANGE for RTL-SDR]\n') continue power = self.measure_power(freq, dwell=0.3) # Signal strength bar bar_len = max(0, int((power + 60) / 2)) bar = '\u2588' * bar_len if power > -20: color = '\033[91m' # red = strong signal status = 'STRONG' elif power > -35: color = '\033[93m' # yellow = moderate status = 'MODERATE' elif power > -50: color = '\033[92m' # green = weak status = 'WEAK' else: color = '\033[90m' # gray = noise floor status = 'noise' sys.stderr.write( f' {label:25s} {power:7.1f} dB ' f'{color}{bar:20s} {status}\033[0m\n' ) sys.stderr.write('\n') sys.stderr.write('[Scanning... Ctrl+C to quit]\n') time.sleep(1.0) except KeyboardInterrupt: break # ============================================================================= # Mode 5: Camera Video Receiver (tune to a specific camera freq and demod) # ============================================================================= class CameraVideoReceiver(gr.top_block): """ Receives analog wireless camera video via wideband FM demodulation. Most analog wireless cameras use FM modulated composite video. """ def __init__(self, args): gr.top_block.__init__(self, "Camera Video Receiver") samp_rate = 2.4e6 log(f"=== Camera Video Receiver ===") log(f"Frequency: {args.freq / 1e6:.3f} MHz") log(f"Demodulation: Wideband FM") self.source = create_rtl_source(args.freq, samp_rate, args.gain, args.if_gain, args.ppm) # Wideband FM demod (analog cameras typically use ~5 MHz deviation) self.fm_demod = analog.wfm_rcv(quad_rate=samp_rate, audio_decimation=1) # Low-pass to get composite video video_taps = filter.firdes.low_pass(1.0, samp_rate, 1.0e6, 200e3) self.video_lpf = filter.fir_filter_fff(1, video_taps) # Float to byte for output self.scale = blocks.multiply_const_ff(127.0) self.offset = blocks.add_const_ff(128.0) self.f2c = blocks.float_to_uchar() self.sink = blocks.file_descriptor_sink(gr.sizeof_char, 1) self.connect(self.source, self.fm_demod, self.video_lpf, self.scale, self.offset, self.f2c, self.sink) log("Raw composite video on stdout.") log("View with: python3 sdr_tv.py camera-rx ... | ffplay -f rawvideo -pix_fmt gray -video_size 720x576 -") # ============================================================================= # CLI # ============================================================================= import os def main(): parser = argparse.ArgumentParser( description='SDR TV & RF Analysis Tool for RTL-SDR V4', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Modes: dvbt DVB-T digital TV receiver (outputs MPEG-TS to stdout) analog Analog TV receiver PAL/NTSC (outputs raw video to stdout) spectrum Real-time terminal spectrum analyzer camera Scan wireless camera frequencies for signals camera-rx Receive/demodulate a wireless camera signal Examples: python3 sdr_tv.py dvbt --freq 506e6 | mpv - python3 sdr_tv.py analog --freq 175.25e6 | ffplay -f rawvideo -pix_fmt gray -video_size 720x576 - python3 sdr_tv.py spectrum --freq 500e6 python3 sdr_tv.py camera --preset all python3 sdr_tv.py camera --preset 1.2ghz python3 sdr_tv.py camera-rx --freq 1200e6 | ffplay -f rawvideo -pix_fmt gray -video_size 720x576 - """ ) subparsers = parser.add_subparsers(dest='mode', required=True) # Common arguments for all modes def add_common_args(p): p.add_argument('--gain', type=float, default=40.0, help='RF gain dB (default: 40)') p.add_argument('--if-gain', type=float, default=40.0, help='IF gain dB (default: 40)') p.add_argument('--ppm', type=float, default=0.0, help='Freq correction PPM (default: 0)') p.add_argument('--input-file', type=str, default=None, help='IQ file instead of live SDR') # --- DVB-T --- p_dvbt = subparsers.add_parser('dvbt', help='DVB-T digital TV receiver') p_dvbt.add_argument('--freq', type=parse_freq, required=True, help='Center freq Hz') p_dvbt.add_argument('--bandwidth', type=int, default=8, choices=[6, 7, 8], help='BW MHz') p_dvbt.add_argument('--transmission-mode', default='8k', choices=['2k', '8k']) p_dvbt.add_argument('--constellation', default='64qam', choices=['qpsk', '16qam', '64qam']) p_dvbt.add_argument('--code-rate', default='2/3', choices=['1/2', '2/3', '3/4', '5/6', '7/8']) p_dvbt.add_argument('--guard-interval', default='1/32', choices=['1/4', '1/8', '1/16', '1/32']) add_common_args(p_dvbt) # --- Analog TV --- p_analog = subparsers.add_parser('analog', help='Analog TV receiver (PAL/NTSC)') p_analog.add_argument('--freq', type=parse_freq, required=True, help='Picture carrier freq Hz') p_analog.add_argument('--standard', default='pal', choices=['pal', 'ntsc'], help='TV standard') add_common_args(p_analog) # --- Spectrum Analyzer --- p_spec = subparsers.add_parser('spectrum', help='Terminal spectrum analyzer') p_spec.add_argument('--freq', type=parse_freq, required=True, help='Center freq Hz') p_spec.add_argument('--span', type=float, default=2.4e6, help='Span Hz (max 2.4M for RTL-SDR)') p_spec.add_argument('--fft-size', type=int, default=1024, help='FFT size (default: 1024)') p_spec.add_argument('--db-min', type=float, default=-60.0, help='Min dB for display') p_spec.add_argument('--db-max', type=float, default=0.0, help='Max dB for display') add_common_args(p_spec) # --- Camera Scanner --- p_cam = subparsers.add_parser('camera', help='Scan wireless camera frequencies') p_cam.add_argument('--preset', default='all', choices=list(CAMERA_PRESETS.keys()) + ['all'], help='Camera freq preset to scan') add_common_args(p_cam) # --- Camera Video Receiver --- p_camrx = subparsers.add_parser('camera-rx', help='Receive wireless camera video') p_camrx.add_argument('--freq', type=parse_freq, required=True, help='Camera freq Hz') add_common_args(p_camrx) args = parser.parse_args() # Signal handling def signal_handler(sig, frame): log("\nStopping...") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) try: signal.signal(signal.SIGPIPE, signal.SIG_DFL) except AttributeError: pass # Dispatch if args.mode == 'dvbt': if hasattr(sys.stdout, 'buffer'): sys.stdout = sys.stdout.buffer tb = DVBTReceiver(args) tb.start() try: tb.wait() except KeyboardInterrupt: tb.stop() tb.wait() elif args.mode == 'analog': if hasattr(sys.stdout, 'buffer'): sys.stdout = sys.stdout.buffer tb = AnalogTVReceiver(args) tb.start() try: tb.wait() except KeyboardInterrupt: tb.stop() tb.wait() elif args.mode == 'spectrum': tb = SpectrumAnalyzer(args) tb.start() try: tb.display_loop() except KeyboardInterrupt: pass tb.stop() tb.wait() elif args.mode == 'camera': tb = CameraScanner(args) tb.start() try: tb.scan_loop() except KeyboardInterrupt: pass tb.stop() tb.wait() elif args.mode == 'camera-rx': if hasattr(sys.stdout, 'buffer'): sys.stdout = sys.stdout.buffer tb = CameraVideoReceiver(args) tb.start() try: tb.wait() except KeyboardInterrupt: tb.stop() tb.wait() if __name__ == '__main__': main()