175 lines
5.7 KiB
Python
175 lines
5.7 KiB
Python
"""Contents file parser for identifying packages with man pages."""
|
|
|
|
import gzip
|
|
import logging
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
from typing import Set
|
|
from urllib.parse import urljoin
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ContentsParser:
|
|
"""Parse repository metadata to identify packages containing man pages.
|
|
|
|
This is a key optimization - instead of downloading all packages,
|
|
we parse the filelists.xml to find only packages with man pages.
|
|
"""
|
|
|
|
def __init__(self, repo_url: str, cache_dir: Path):
|
|
"""Initialize the contents parser.
|
|
|
|
Args:
|
|
repo_url: Base URL of the repository (e.g., .../BaseOS/x86_64/os/)
|
|
cache_dir: Directory to cache downloaded metadata
|
|
"""
|
|
self.repo_url = repo_url.rstrip('/') + '/'
|
|
self.cache_dir = Path(cache_dir)
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def get_packages_with_manpages(self) -> Set[str]:
|
|
"""Get set of package names that contain man pages.
|
|
|
|
Returns:
|
|
Set of package names (e.g., {'bash', 'coreutils', ...})
|
|
"""
|
|
logger.info(f"Fetching filelists for {self.repo_url}")
|
|
|
|
filelists_path = self._get_filelists_path()
|
|
if not filelists_path:
|
|
logger.warning("Could not find filelists in repository metadata")
|
|
return set()
|
|
|
|
filelists_file = self._download_filelists(filelists_path)
|
|
if not filelists_file:
|
|
logger.warning("Could not download filelists")
|
|
return set()
|
|
|
|
packages = self._parse_filelists(filelists_file)
|
|
logger.info(f"Found {len(packages)} packages with man pages")
|
|
|
|
return packages
|
|
|
|
def _get_filelists_path(self) -> str:
|
|
"""Parse repomd.xml to get the filelists.xml location.
|
|
|
|
Returns:
|
|
Relative path to filelists.xml.gz
|
|
"""
|
|
repomd_url = urljoin(self.repo_url, 'repodata/repomd.xml')
|
|
|
|
try:
|
|
response = requests.get(repomd_url, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
root = ET.fromstring(response.content)
|
|
ns = {'repo': 'http://linux.duke.edu/metadata/repo'}
|
|
|
|
for data in root.findall('repo:data', ns):
|
|
if data.get('type') == 'filelists':
|
|
location = data.find('repo:location', ns)
|
|
if location is not None:
|
|
return location.get('href')
|
|
|
|
# Fallback without namespace
|
|
for data in root.findall('data'):
|
|
if data.get('type') == 'filelists':
|
|
location = data.find('location')
|
|
if location is not None:
|
|
return location.get('href')
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing repomd.xml: {e}")
|
|
|
|
return None
|
|
|
|
def _download_filelists(self, relative_path: str) -> Path:
|
|
"""Download filelists.xml.gz file.
|
|
|
|
Args:
|
|
relative_path: Relative path from repo root (e.g., 'repodata/...-filelists.xml.gz')
|
|
|
|
Returns:
|
|
Path to downloaded file
|
|
"""
|
|
url = urljoin(self.repo_url, relative_path)
|
|
cache_file = self.cache_dir / relative_path.split('/')[-1]
|
|
|
|
if cache_file.exists():
|
|
logger.debug(f"Using cached filelists: {cache_file}")
|
|
return cache_file
|
|
|
|
try:
|
|
logger.info(f"Downloading {url}")
|
|
response = requests.get(url, timeout=60, stream=True)
|
|
response.raise_for_status()
|
|
|
|
cache_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(cache_file, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
return cache_file
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error downloading filelists: {e}")
|
|
return None
|
|
|
|
def _parse_filelists(self, filelists_path: Path) -> Set[str]:
|
|
"""Parse filelists.xml.gz to find packages with man pages.
|
|
|
|
Args:
|
|
filelists_path: Path to filelists.xml.gz file
|
|
|
|
Returns:
|
|
Set of package names containing man pages
|
|
"""
|
|
packages = set()
|
|
|
|
try:
|
|
with gzip.open(filelists_path, 'rb') as f:
|
|
context = ET.iterparse(f, events=('start', 'end'))
|
|
|
|
current_package = None
|
|
has_manpage = False
|
|
|
|
for event, elem in context:
|
|
if event == 'start' and elem.tag.endswith('package'):
|
|
current_package = elem.get('name')
|
|
has_manpage = False
|
|
|
|
elif event == 'end':
|
|
if elem.tag.endswith('file'):
|
|
file_path = elem.text
|
|
if file_path and self._is_manpage_path(file_path):
|
|
has_manpage = True
|
|
|
|
elif elem.tag.endswith('package'):
|
|
if has_manpage and current_package:
|
|
packages.add(current_package)
|
|
elem.clear()
|
|
current_package = None
|
|
has_manpage = False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing filelists: {e}")
|
|
|
|
return packages
|
|
|
|
@staticmethod
|
|
def _is_manpage_path(file_path: str) -> bool:
|
|
"""Check if a file path is a man page location.
|
|
|
|
Args:
|
|
file_path: File path to check
|
|
|
|
Returns:
|
|
True if path is in a standard man page directory
|
|
"""
|
|
return '/man/' in file_path and (
|
|
'/share/man/' in file_path or file_path.startswith('/usr/man/')
|
|
)
|