CUSP-1256 (#1)

* Complete refactor

Signed-off-by: Stephen Simpson <ssimpson89@users.noreply.github.com>

* Complete refactor

Signed-off-by: Stephen Simpson <ssimpson89@users.noreply.github.com>

---------

Signed-off-by: Stephen Simpson <ssimpson89@users.noreply.github.com>
This commit is contained in:
Stephen Simpson
2025-11-20 12:16:33 -05:00
committed by GitHub
parent 5248edad62
commit ec32c72363
44 changed files with 4083 additions and 1540 deletions
+222
View File
@@ -0,0 +1,222 @@
"""Extract man pages from RPM packages."""
import gzip
import logging
from pathlib import Path
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
import rpmfile
from ..models import Package, ManFile
logger = logging.getLogger(__name__)
class ManPageExtractor:
"""Extracts man pages from RPM packages.
Handles:
- Extracting man pages from RPMs
- Reading gzipped man page content
- Organizing extracted files by package
"""
def __init__(self, extract_dir: Path, skip_sections: List[str] = None, skip_languages: bool = True):
"""Initialize extractor.
Args:
extract_dir: Base directory for extracting man pages
skip_sections: List of man sections to skip (e.g., ['3', '3p', '3pm'])
skip_languages: If True, skip non-English man pages
"""
self.extract_dir = Path(extract_dir)
self.extract_dir.mkdir(parents=True, exist_ok=True)
self.skip_sections = skip_sections or []
self.skip_languages = skip_languages
def extract_from_package(self, package: Package) -> List[ManFile]:
"""Extract all man pages from a package.
Args:
package: Package to extract from
Returns:
List of ManFile objects for extracted man pages
"""
if not package.download_path or not package.download_path.exists():
logger.warning(f"Package file not found: {package.name}")
return []
# Create extraction directory for this package
pkg_extract_dir = self.extract_dir / package.name
pkg_extract_dir.mkdir(parents=True, exist_ok=True)
man_files = []
try:
logger.info(f"Extracting man pages from {package.filename}")
with rpmfile.open(package.download_path) as rpm:
for member in rpm.getmembers():
# Check if this is a man page file
if not self._is_manpage(member.name):
continue
# Create ManFile object
extract_path = pkg_extract_dir / member.name.lstrip('/')
man_file = ManFile(
file_path=extract_path,
package_name=package.name
)
# Apply section filtering
if self.skip_sections and man_file.section in self.skip_sections:
logger.debug(f"Skipping {man_file.display_name} (section {man_file.section})")
continue
# Apply language filtering
if self.skip_languages and man_file.language and man_file.language != 'en':
logger.debug(f"Skipping {man_file.display_name} (language {man_file.language})")
continue
# Extract the file
extract_path.parent.mkdir(parents=True, exist_ok=True)
try:
content = rpm.extractfile(member).read()
with open(extract_path, 'wb') as f:
f.write(content)
man_file.content = content
man_files.append(man_file)
except Exception as e:
logger.warning(f"Failed to extract {member.name}: {e}")
logger.info(f"Extracted {len(man_files)} man pages from {package.name}")
except Exception as e:
logger.error(f"Error extracting from {package.filename}: {e}")
return man_files
def extract_from_packages(
self,
packages: List[Package],
max_workers: int = 5
) -> List[ManFile]:
"""Extract man pages from multiple packages in parallel.
Args:
packages: List of packages to process
max_workers: Maximum number of parallel extractions
Returns:
List of all extracted ManFile objects
"""
all_man_files = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all extraction tasks
future_to_pkg = {
executor.submit(self.extract_from_package, pkg): pkg
for pkg in packages
}
# Collect results
for future in as_completed(future_to_pkg):
pkg = future_to_pkg[future]
try:
man_files = future.result()
all_man_files.extend(man_files)
except Exception as e:
logger.error(f"Error processing {pkg.name}: {e}")
logger.info(f"Extracted total of {len(all_man_files)} man pages from {len(packages)} packages")
return all_man_files
def read_manpage_content(self, man_file: ManFile) -> str:
"""Read and decompress man page content.
Args:
man_file: ManFile to read
Returns:
Decompressed man page content as string
"""
if not man_file.file_path.exists():
logger.warning(f"Man page file not found: {man_file.file_path}")
return ""
try:
# Try reading as gzipped file first
if man_file.file_path.suffix == '.gz':
with gzip.open(man_file.file_path, 'rb') as f:
content = f.read()
else:
# Read as plain text
with open(man_file.file_path, 'rb') as f:
content = f.read()
# Decode with error handling
return content.decode('utf-8', errors='replace')
except gzip.BadGzipFile:
# Not a gzip file, try reading as plain text
try:
with open(man_file.file_path, 'rb') as f:
content = f.read()
return content.decode('utf-8', errors='replace')
except Exception as e:
logger.error(f"Error reading {man_file.file_path}: {e}")
return ""
except Exception as e:
logger.error(f"Error reading {man_file.file_path}: {e}")
return ""
@staticmethod
def _is_manpage(path: str) -> bool:
"""Check if a file path is a man page.
Args:
path: File path to check
Returns:
True if this looks like a man page file
"""
# Must contain /man/ in path
if '/man/' not in path:
return False
# Should be in /usr/share/man/ or /usr/man/
if not ('/share/man/' in path or path.startswith('/usr/man/')):
return False
# Common man page patterns
# - /usr/share/man/man1/foo.1.gz
# - /usr/share/man/es/man1/foo.1.gz
# - /usr/share/man/man3/printf.3.gz
parts = path.split('/')
# Check for man<digit> directory
has_man_section = any(
part.startswith('man') and len(part) > 3 and part[3].isdigit()
for part in parts
)
return has_man_section
def cleanup_extracts(self, package: Package):
"""Clean up extracted files for a package.
Args:
package: Package whose extracts to clean up
"""
pkg_extract_dir = self.extract_dir / package.name
if pkg_extract_dir.exists():
import shutil
shutil.rmtree(pkg_extract_dir)
logger.debug(f"Cleaned up extracts for {package.name}")