Autarch/core/cve.py
DigiJ ffe47c51b5 Initial public release — AUTARCH v1.0.0
Full security platform with web dashboard, 16 Flask blueprints, 26 modules,
autonomous AI agent, WebUSB hardware support, and Archon Android companion app.

Includes Hash Toolkit, debug console, anti-stalkerware shield, Metasploit/RouterSploit
integration, WireGuard VPN, OSINT reconnaissance, and multi-backend LLM support.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 03:57:32 -08:00

870 lines
30 KiB
Python

"""
AUTARCH CVE Database Module
SQLite-based local CVE database with NVD API synchronization
https://nvd.nist.gov/developers/vulnerabilities
"""
import os
import json
import time
import sqlite3
import platform
import subprocess
import urllib.request
import urllib.parse
import threading
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Callable
from .banner import Colors
from .config import get_config
class CVEDatabase:
"""SQLite-based CVE Database with NVD API synchronization."""
NVD_API_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"
DB_VERSION = 1
RESULTS_PER_PAGE = 2000 # NVD max is 2000
# OS to CPE mapping for common systems
OS_CPE_MAP = {
'ubuntu': 'cpe:2.3:o:canonical:ubuntu_linux',
'debian': 'cpe:2.3:o:debian:debian_linux',
'fedora': 'cpe:2.3:o:fedoraproject:fedora',
'centos': 'cpe:2.3:o:centos:centos',
'rhel': 'cpe:2.3:o:redhat:enterprise_linux',
'rocky': 'cpe:2.3:o:rockylinux:rocky_linux',
'alma': 'cpe:2.3:o:almalinux:almalinux',
'arch': 'cpe:2.3:o:archlinux:arch_linux',
'opensuse': 'cpe:2.3:o:opensuse:opensuse',
'suse': 'cpe:2.3:o:suse:suse_linux',
'kali': 'cpe:2.3:o:kali:kali_linux',
'mint': 'cpe:2.3:o:linuxmint:linux_mint',
'windows': 'cpe:2.3:o:microsoft:windows',
'macos': 'cpe:2.3:o:apple:macos',
'darwin': 'cpe:2.3:o:apple:macos',
}
# SQL Schema
SCHEMA = """
-- CVE main table
CREATE TABLE IF NOT EXISTS cves (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cve_id TEXT UNIQUE NOT NULL,
description TEXT,
published TEXT,
modified TEXT,
cvss_v3_score REAL,
cvss_v3_severity TEXT,
cvss_v3_vector TEXT,
cvss_v2_score REAL,
cvss_v2_severity TEXT,
cvss_v2_vector TEXT
);
-- CPE (affected products) table
CREATE TABLE IF NOT EXISTS cve_cpes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cve_id TEXT NOT NULL,
cpe_criteria TEXT NOT NULL,
vulnerable INTEGER DEFAULT 1,
version_start TEXT,
version_end TEXT,
FOREIGN KEY (cve_id) REFERENCES cves(cve_id)
);
-- References table
CREATE TABLE IF NOT EXISTS cve_references (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cve_id TEXT NOT NULL,
url TEXT NOT NULL,
source TEXT,
FOREIGN KEY (cve_id) REFERENCES cves(cve_id)
);
-- Weaknesses (CWE) table
CREATE TABLE IF NOT EXISTS cve_weaknesses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cve_id TEXT NOT NULL,
cwe_id TEXT NOT NULL,
FOREIGN KEY (cve_id) REFERENCES cves(cve_id)
);
-- Metadata table
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT
);
-- Indexes for fast queries
CREATE INDEX IF NOT EXISTS idx_cve_id ON cves(cve_id);
CREATE INDEX IF NOT EXISTS idx_cve_severity ON cves(cvss_v3_severity);
CREATE INDEX IF NOT EXISTS idx_cve_score ON cves(cvss_v3_score);
CREATE INDEX IF NOT EXISTS idx_cve_published ON cves(published);
CREATE INDEX IF NOT EXISTS idx_cpe_cve ON cve_cpes(cve_id);
CREATE INDEX IF NOT EXISTS idx_cpe_criteria ON cve_cpes(cpe_criteria);
CREATE INDEX IF NOT EXISTS idx_ref_cve ON cve_references(cve_id);
CREATE INDEX IF NOT EXISTS idx_weakness_cve ON cve_weaknesses(cve_id);
"""
def __init__(self, db_path: str = None):
"""Initialize CVE database.
Args:
db_path: Path to SQLite database. Defaults to data/cve/cve.db
"""
if db_path is None:
from core.paths import get_data_dir
self.data_dir = get_data_dir() / "cve"
self.db_path = self.data_dir / "cve.db"
else:
self.db_path = Path(db_path)
self.data_dir = self.db_path.parent
self.data_dir.mkdir(parents=True, exist_ok=True)
self.system_info = self._detect_system()
self._conn = None
self._lock = threading.Lock()
self._init_database()
def _get_connection(self) -> sqlite3.Connection:
"""Get thread-safe database connection."""
if self._conn is None:
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
self._conn.row_factory = sqlite3.Row
return self._conn
def _init_database(self):
"""Initialize database schema."""
with self._lock:
conn = self._get_connection()
conn.executescript(self.SCHEMA)
conn.commit()
def _detect_system(self) -> Dict[str, str]:
"""Detect the current system information."""
info = {
'os_type': platform.system().lower(),
'os_name': '',
'os_version': '',
'os_id': '',
'kernel': platform.release(),
'arch': platform.machine(),
'cpe_prefix': '',
}
if info['os_type'] == 'linux':
os_release = Path("/etc/os-release")
if os_release.exists():
content = os_release.read_text()
for line in content.split('\n'):
if line.startswith('ID='):
info['os_id'] = line.split('=')[1].strip('"').lower()
elif line.startswith('VERSION_ID='):
info['os_version'] = line.split('=')[1].strip('"')
elif line.startswith('PRETTY_NAME='):
info['os_name'] = line.split('=', 1)[1].strip('"')
if not info['os_id']:
if Path("/etc/debian_version").exists():
info['os_id'] = 'debian'
elif Path("/etc/redhat-release").exists():
info['os_id'] = 'rhel'
elif Path("/etc/arch-release").exists():
info['os_id'] = 'arch'
elif info['os_type'] == 'darwin':
info['os_id'] = 'macos'
try:
result = subprocess.run(['sw_vers', '-productVersion'],
capture_output=True, text=True, timeout=5)
info['os_version'] = result.stdout.strip()
except:
pass
elif info['os_type'] == 'windows':
info['os_id'] = 'windows'
info['os_version'] = platform.version()
info['os_name'] = platform.platform()
for os_key, cpe in self.OS_CPE_MAP.items():
if os_key in info['os_id']:
info['cpe_prefix'] = cpe
break
return info
def get_system_info(self) -> Dict[str, str]:
"""Get detected system information."""
return self.system_info.copy()
def get_db_stats(self) -> Dict[str, Any]:
"""Get database statistics."""
with self._lock:
conn = self._get_connection()
cursor = conn.cursor()
stats = {
'db_path': str(self.db_path),
'db_size_mb': round(self.db_path.stat().st_size / 1024 / 1024, 2) if self.db_path.exists() else 0,
'total_cves': 0,
'total_cpes': 0,
'last_sync': None,
'last_modified': None,
}
try:
cursor.execute("SELECT COUNT(*) FROM cves")
stats['total_cves'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM cve_cpes")
stats['total_cpes'] = cursor.fetchone()[0]
cursor.execute("SELECT value FROM metadata WHERE key = 'last_sync'")
row = cursor.fetchone()
if row:
stats['last_sync'] = row[0]
cursor.execute("SELECT value FROM metadata WHERE key = 'last_modified'")
row = cursor.fetchone()
if row:
stats['last_modified'] = row[0]
# Count by severity
cursor.execute("""
SELECT cvss_v3_severity, COUNT(*)
FROM cves
WHERE cvss_v3_severity IS NOT NULL
GROUP BY cvss_v3_severity
""")
stats['by_severity'] = {row[0]: row[1] for row in cursor.fetchall()}
except sqlite3.Error:
pass
return stats
# =========================================================================
# NVD API METHODS
# =========================================================================
def _make_nvd_request(self, params: Dict[str, str], verbose: bool = False) -> Optional[Dict]:
"""Make a request to the NVD API."""
url = f"{self.NVD_API_BASE}?{urllib.parse.urlencode(params)}"
if verbose:
print(f"{Colors.DIM} API: {url[:80]}...{Colors.RESET}")
headers = {
'User-Agent': 'AUTARCH-Security-Framework/1.0',
'Accept': 'application/json',
}
config = get_config()
api_key = config.get('nvd', 'api_key', fallback='')
if api_key:
headers['apiKey'] = api_key
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=60) as response:
return json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError as e:
if verbose:
print(f"{Colors.RED}[X] NVD API error: {e.code} - {e.reason}{Colors.RESET}")
return None
except urllib.error.URLError as e:
if verbose:
print(f"{Colors.RED}[X] Network error: {e.reason}{Colors.RESET}")
return None
except Exception as e:
if verbose:
print(f"{Colors.RED}[X] Request failed: {e}{Colors.RESET}")
return None
def _parse_cve_data(self, vuln: Dict) -> Dict:
"""Parse CVE data from NVD API response."""
cve_data = vuln.get('cve', {})
cve_id = cve_data.get('id', '')
# Description
descriptions = cve_data.get('descriptions', [])
description = ''
for desc in descriptions:
if desc.get('lang') == 'en':
description = desc.get('value', '')
break
# CVSS scores
metrics = cve_data.get('metrics', {})
cvss_v3 = metrics.get('cvssMetricV31', metrics.get('cvssMetricV30', []))
cvss_v2 = metrics.get('cvssMetricV2', [])
cvss_v3_score = None
cvss_v3_severity = None
cvss_v3_vector = None
cvss_v2_score = None
cvss_v2_severity = None
cvss_v2_vector = None
if cvss_v3:
cvss_data = cvss_v3[0].get('cvssData', {})
cvss_v3_score = cvss_data.get('baseScore')
cvss_v3_severity = cvss_data.get('baseSeverity')
cvss_v3_vector = cvss_data.get('vectorString')
if cvss_v2:
cvss_data = cvss_v2[0].get('cvssData', {})
cvss_v2_score = cvss_data.get('baseScore')
cvss_v2_severity = cvss_v2[0].get('baseSeverity')
cvss_v2_vector = cvss_data.get('vectorString')
# CPEs (affected products)
cpes = []
for config in cve_data.get('configurations', []):
for node in config.get('nodes', []):
for match in node.get('cpeMatch', []):
cpes.append({
'criteria': match.get('criteria', ''),
'vulnerable': match.get('vulnerable', True),
'version_start': match.get('versionStartIncluding') or match.get('versionStartExcluding'),
'version_end': match.get('versionEndIncluding') or match.get('versionEndExcluding'),
})
# References
references = [
{'url': ref.get('url', ''), 'source': ref.get('source', '')}
for ref in cve_data.get('references', [])
]
# Weaknesses
weaknesses = []
for weakness in cve_data.get('weaknesses', []):
for desc in weakness.get('description', []):
if desc.get('lang') == 'en' and desc.get('value', '').startswith('CWE-'):
weaknesses.append(desc.get('value'))
return {
'cve_id': cve_id,
'description': description,
'published': cve_data.get('published', ''),
'modified': cve_data.get('lastModified', ''),
'cvss_v3_score': cvss_v3_score,
'cvss_v3_severity': cvss_v3_severity,
'cvss_v3_vector': cvss_v3_vector,
'cvss_v2_score': cvss_v2_score,
'cvss_v2_severity': cvss_v2_severity,
'cvss_v2_vector': cvss_v2_vector,
'cpes': cpes,
'references': references,
'weaknesses': weaknesses,
}
def _insert_cve(self, conn: sqlite3.Connection, cve_data: Dict):
"""Insert or update a CVE in the database."""
cursor = conn.cursor()
# Insert/update main CVE record
cursor.execute("""
INSERT OR REPLACE INTO cves
(cve_id, description, published, modified,
cvss_v3_score, cvss_v3_severity, cvss_v3_vector,
cvss_v2_score, cvss_v2_severity, cvss_v2_vector)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
cve_data['cve_id'],
cve_data['description'],
cve_data['published'],
cve_data['modified'],
cve_data['cvss_v3_score'],
cve_data['cvss_v3_severity'],
cve_data['cvss_v3_vector'],
cve_data['cvss_v2_score'],
cve_data['cvss_v2_severity'],
cve_data['cvss_v2_vector'],
))
cve_id = cve_data['cve_id']
# Clear existing related data
cursor.execute("DELETE FROM cve_cpes WHERE cve_id = ?", (cve_id,))
cursor.execute("DELETE FROM cve_references WHERE cve_id = ?", (cve_id,))
cursor.execute("DELETE FROM cve_weaknesses WHERE cve_id = ?", (cve_id,))
# Insert CPEs
for cpe in cve_data['cpes']:
cursor.execute("""
INSERT INTO cve_cpes (cve_id, cpe_criteria, vulnerable, version_start, version_end)
VALUES (?, ?, ?, ?, ?)
""", (cve_id, cpe['criteria'], 1 if cpe['vulnerable'] else 0,
cpe['version_start'], cpe['version_end']))
# Insert references (limit to 10)
for ref in cve_data['references'][:10]:
cursor.execute("""
INSERT INTO cve_references (cve_id, url, source)
VALUES (?, ?, ?)
""", (cve_id, ref['url'], ref['source']))
# Insert weaknesses
for cwe in cve_data['weaknesses']:
cursor.execute("""
INSERT INTO cve_weaknesses (cve_id, cwe_id)
VALUES (?, ?)
""", (cve_id, cwe))
# =========================================================================
# DATABASE SYNC
# =========================================================================
def sync_database(
self,
days_back: int = 120,
full_sync: bool = False,
progress_callback: Callable[[str, int, int], None] = None,
verbose: bool = True
) -> Dict[str, Any]:
"""Synchronize database with NVD.
Args:
days_back: For incremental sync, get CVEs from last N days.
full_sync: If True, download entire database (WARNING: slow, 200k+ CVEs).
progress_callback: Callback function(message, current, total).
verbose: Show progress messages.
Returns:
Sync statistics dictionary.
"""
stats = {
'started': datetime.now().isoformat(),
'cves_processed': 0,
'cves_added': 0,
'cves_updated': 0,
'errors': 0,
'completed': False,
}
if verbose:
print(f"{Colors.CYAN}[*] Starting CVE database sync...{Colors.RESET}")
# Determine date range
if full_sync:
# Start from 1999 (first CVEs)
start_date = datetime(1999, 1, 1)
if verbose:
print(f"{Colors.YELLOW}[!] Full sync requested - this may take a while...{Colors.RESET}")
else:
start_date = datetime.utcnow() - timedelta(days=days_back)
end_date = datetime.utcnow()
# Calculate total CVEs to fetch (estimate)
params = {
'pubStartDate': start_date.strftime('%Y-%m-%dT00:00:00.000'),
'pubEndDate': end_date.strftime('%Y-%m-%dT23:59:59.999'),
'resultsPerPage': '1',
}
response = self._make_nvd_request(params, verbose)
if not response:
if verbose:
print(f"{Colors.RED}[X] Failed to connect to NVD API{Colors.RESET}")
return stats
total_results = response.get('totalResults', 0)
if verbose:
print(f"{Colors.CYAN}[*] Found {total_results:,} CVEs to process{Colors.RESET}")
if total_results == 0:
stats['completed'] = True
return stats
# Process in batches
start_index = 0
batch_num = 0
total_batches = (total_results + self.RESULTS_PER_PAGE - 1) // self.RESULTS_PER_PAGE
with self._lock:
conn = self._get_connection()
while start_index < total_results:
batch_num += 1
if verbose:
pct = int((start_index / total_results) * 100)
print(f"{Colors.CYAN}[*] Batch {batch_num}/{total_batches} ({pct}%) - {start_index:,}/{total_results:,}{Colors.RESET}")
if progress_callback:
progress_callback(f"Downloading batch {batch_num}/{total_batches}", start_index, total_results)
params = {
'pubStartDate': start_date.strftime('%Y-%m-%dT00:00:00.000'),
'pubEndDate': end_date.strftime('%Y-%m-%dT23:59:59.999'),
'resultsPerPage': str(self.RESULTS_PER_PAGE),
'startIndex': str(start_index),
}
response = self._make_nvd_request(params, verbose=False)
if not response:
stats['errors'] += 1
if verbose:
print(f"{Colors.YELLOW}[!] Batch {batch_num} failed, retrying...{Colors.RESET}")
time.sleep(6) # NVD rate limit
continue
vulnerabilities = response.get('vulnerabilities', [])
for vuln in vulnerabilities:
try:
cve_data = self._parse_cve_data(vuln)
self._insert_cve(conn, cve_data)
stats['cves_processed'] += 1
stats['cves_added'] += 1
except Exception as e:
stats['errors'] += 1
if verbose:
print(f"{Colors.RED}[X] Error processing CVE: {e}{Colors.RESET}")
conn.commit()
start_index += self.RESULTS_PER_PAGE
# Rate limiting - NVD allows 5 requests per 30 seconds without API key
config = get_config()
if not config.get('nvd', 'api_key', fallback=''):
time.sleep(6)
else:
time.sleep(0.6) # With API key: 50 requests per 30 seconds
# Update metadata
conn.execute("""
INSERT OR REPLACE INTO metadata (key, value) VALUES ('last_sync', ?)
""", (datetime.now().isoformat(),))
conn.execute("""
INSERT OR REPLACE INTO metadata (key, value) VALUES ('last_modified', ?)
""", (end_date.isoformat(),))
conn.commit()
stats['completed'] = True
stats['finished'] = datetime.now().isoformat()
if verbose:
print(f"{Colors.GREEN}[+] Sync complete: {stats['cves_processed']:,} CVEs processed{Colors.RESET}")
return stats
def sync_recent(self, days: int = 7, verbose: bool = True) -> Dict[str, Any]:
"""Quick sync of recent CVEs only."""
return self.sync_database(days_back=days, full_sync=False, verbose=verbose)
# =========================================================================
# QUERY METHODS
# =========================================================================
def search_cves(
self,
keyword: str = None,
cpe_pattern: str = None,
severity: str = None,
min_score: float = None,
max_results: int = 100,
days_back: int = None
) -> List[Dict]:
"""Search CVEs in local database.
Args:
keyword: Search in CVE ID or description.
cpe_pattern: CPE pattern to match (uses LIKE).
severity: Filter by severity (LOW, MEDIUM, HIGH, CRITICAL).
min_score: Minimum CVSS v3 score.
max_results: Maximum results to return.
days_back: Only return CVEs from last N days.
Returns:
List of matching CVE dictionaries.
"""
with self._lock:
conn = self._get_connection()
cursor = conn.cursor()
query = "SELECT DISTINCT c.* FROM cves c"
conditions = []
params = []
if cpe_pattern:
query += " LEFT JOIN cve_cpes cp ON c.cve_id = cp.cve_id"
conditions.append("cp.cpe_criteria LIKE ?")
params.append(f"%{cpe_pattern}%")
if keyword:
conditions.append("(c.cve_id LIKE ? OR c.description LIKE ?)")
params.extend([f"%{keyword}%", f"%{keyword}%"])
if severity:
conditions.append("c.cvss_v3_severity = ?")
params.append(severity.upper())
if min_score is not None:
conditions.append("c.cvss_v3_score >= ?")
params.append(min_score)
if days_back:
cutoff = (datetime.utcnow() - timedelta(days=days_back)).strftime('%Y-%m-%d')
conditions.append("c.published >= ?")
params.append(cutoff)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY c.cvss_v3_score DESC NULLS LAST, c.published DESC"
query += f" LIMIT {max_results}"
cursor.execute(query, params)
rows = cursor.fetchall()
return [self._row_to_dict(row) for row in rows]
def get_cve(self, cve_id: str) -> Optional[Dict]:
"""Get detailed information about a specific CVE.
Args:
cve_id: The CVE ID (e.g., CVE-2024-1234).
Returns:
CVE details dictionary or None if not found.
"""
with self._lock:
conn = self._get_connection()
cursor = conn.cursor()
# Get main CVE data
cursor.execute("SELECT * FROM cves WHERE cve_id = ?", (cve_id,))
row = cursor.fetchone()
if not row:
return None
cve = self._row_to_dict(row)
# Get CPEs
cursor.execute("SELECT * FROM cve_cpes WHERE cve_id = ?", (cve_id,))
cve['cpes'] = [dict(r) for r in cursor.fetchall()]
# Get references
cursor.execute("SELECT url, source FROM cve_references WHERE cve_id = ?", (cve_id,))
cve['references'] = [dict(r) for r in cursor.fetchall()]
# Get weaknesses
cursor.execute("SELECT cwe_id FROM cve_weaknesses WHERE cve_id = ?", (cve_id,))
cve['weaknesses'] = [r['cwe_id'] for r in cursor.fetchall()]
return cve
def get_system_cves(
self,
severity_filter: str = None,
max_results: int = 100
) -> List[Dict]:
"""Get CVEs relevant to the detected system.
Args:
severity_filter: Filter by severity.
max_results: Maximum results.
Returns:
List of relevant CVEs.
"""
cpe_prefix = self.system_info.get('cpe_prefix', '')
if not cpe_prefix:
return []
# Build CPE pattern for this system
cpe_pattern = cpe_prefix
if self.system_info.get('os_version'):
version = self.system_info['os_version'].split('.')[0]
cpe_pattern = f"{cpe_prefix}:{version}"
return self.search_cves(
cpe_pattern=cpe_pattern,
severity=severity_filter,
max_results=max_results
)
def get_software_cves(
self,
software: str,
vendor: str = None,
version: str = None,
max_results: int = 100
) -> List[Dict]:
"""Search CVEs for specific software.
Args:
software: Software/product name.
vendor: Vendor name (optional).
version: Software version (optional).
max_results: Maximum results.
Returns:
List of CVEs.
"""
# Try CPE-based search first
cpe_pattern = software.lower().replace(' ', '_')
if vendor:
cpe_pattern = f"{vendor.lower()}:{cpe_pattern}"
if version:
cpe_pattern = f"{cpe_pattern}:{version}"
results = self.search_cves(cpe_pattern=cpe_pattern, max_results=max_results)
# Also search by keyword if CPE search returns few results
if len(results) < 10:
keyword = software
if vendor:
keyword = f"{vendor} {software}"
keyword_results = self.search_cves(keyword=keyword, max_results=max_results)
# Merge results, avoiding duplicates
seen = {r['cve_id'] for r in results}
for r in keyword_results:
if r['cve_id'] not in seen:
results.append(r)
seen.add(r['cve_id'])
return results[:max_results]
def get_cves_by_severity(self, severity: str, max_results: int = 100) -> List[Dict]:
"""Get CVEs by severity level."""
return self.search_cves(severity=severity, max_results=max_results)
def get_recent_cves(self, days: int = 30, max_results: int = 100) -> List[Dict]:
"""Get recently published CVEs."""
return self.search_cves(days_back=days, max_results=max_results)
def _row_to_dict(self, row: sqlite3.Row) -> Dict:
"""Convert database row to dictionary."""
return {
'cve_id': row['cve_id'],
'id': row['cve_id'], # Alias for compatibility
'description': row['description'],
'published': row['published'],
'modified': row['modified'],
'cvss_score': row['cvss_v3_score'] or row['cvss_v2_score'] or 0,
'cvss_v3_score': row['cvss_v3_score'],
'cvss_v3_severity': row['cvss_v3_severity'],
'cvss_v3_vector': row['cvss_v3_vector'],
'cvss_v2_score': row['cvss_v2_score'],
'cvss_v2_severity': row['cvss_v2_severity'],
'cvss_v2_vector': row['cvss_v2_vector'],
'severity': row['cvss_v3_severity'] or row['cvss_v2_severity'] or 'UNKNOWN',
}
# =========================================================================
# ONLINE FALLBACK
# =========================================================================
def fetch_cve_online(self, cve_id: str, verbose: bool = False) -> Optional[Dict]:
"""Fetch a specific CVE from NVD API (online fallback).
Args:
cve_id: The CVE ID.
verbose: Show progress.
Returns:
CVE details or None.
"""
params = {'cveId': cve_id}
if verbose:
print(f"{Colors.CYAN}[*] Fetching {cve_id} from NVD...{Colors.RESET}")
response = self._make_nvd_request(params, verbose)
if not response or not response.get('vulnerabilities'):
return None
cve_data = self._parse_cve_data(response['vulnerabilities'][0])
# Store in database
with self._lock:
conn = self._get_connection()
self._insert_cve(conn, cve_data)
conn.commit()
return self.get_cve(cve_id)
def search_online(
self,
keyword: str = None,
cpe_name: str = None,
severity: str = None,
days_back: int = 120,
max_results: int = 100,
verbose: bool = False
) -> List[Dict]:
"""Search NVD API directly (online mode).
Use this when local database is empty or for real-time results.
"""
params = {
'resultsPerPage': str(min(max_results, 2000)),
}
if keyword:
params['keywordSearch'] = keyword
if cpe_name:
params['cpeName'] = cpe_name
if severity:
params['cvssV3Severity'] = severity.upper()
if days_back > 0:
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days_back)
params['pubStartDate'] = start_date.strftime('%Y-%m-%dT00:00:00.000')
params['pubEndDate'] = end_date.strftime('%Y-%m-%dT23:59:59.999')
if verbose:
print(f"{Colors.CYAN}[*] Searching NVD online...{Colors.RESET}")
response = self._make_nvd_request(params, verbose)
if not response:
return []
cves = []
for vuln in response.get('vulnerabilities', []):
cve_data = self._parse_cve_data(vuln)
cves.append({
'cve_id': cve_data['cve_id'],
'id': cve_data['cve_id'],
'description': cve_data['description'][:200] + '...' if len(cve_data['description']) > 200 else cve_data['description'],
'cvss_score': cve_data['cvss_v3_score'] or cve_data['cvss_v2_score'] or 0,
'severity': cve_data['cvss_v3_severity'] or cve_data['cvss_v2_severity'] or 'UNKNOWN',
'published': cve_data['published'][:10] if cve_data['published'] else '',
})
return cves
def close(self):
"""Close database connection."""
if self._conn:
self._conn.close()
self._conn = None
# Global instance
_cve_db: Optional[CVEDatabase] = None
def get_cve_db() -> CVEDatabase:
"""Get the global CVE database instance."""
global _cve_db
if _cve_db is None:
_cve_db = CVEDatabase()
return _cve_db