"""Extract man pages from RPM packages.""" import gzip import logging from pathlib import Path from typing import List from concurrent.futures import ThreadPoolExecutor, as_completed import rpmfile from ..models import Package, ManFile logger = logging.getLogger(__name__) class ManPageExtractor: """Extracts man pages from RPM packages. Handles: - Extracting man pages from RPMs - Reading gzipped man page content - Organizing extracted files by package """ def __init__(self, extract_dir: Path, skip_sections: List[str] = None, skip_languages: bool = True): """Initialize extractor. Args: extract_dir: Base directory for extracting man pages skip_sections: List of man sections to skip (e.g., ['3', '3p', '3pm']) skip_languages: If True, skip non-English man pages """ self.extract_dir = Path(extract_dir) self.extract_dir.mkdir(parents=True, exist_ok=True) self.skip_sections = skip_sections or [] self.skip_languages = skip_languages def extract_from_package(self, package: Package) -> List[ManFile]: """Extract all man pages from a package. Args: package: Package to extract from Returns: List of ManFile objects for extracted man pages """ if not package.download_path or not package.download_path.exists(): logger.warning(f"Package file not found: {package.name}") return [] # Create extraction directory for this package pkg_extract_dir = self.extract_dir / package.name pkg_extract_dir.mkdir(parents=True, exist_ok=True) man_files = [] try: logger.info(f"Extracting man pages from {package.filename}") with rpmfile.open(package.download_path) as rpm: for member in rpm.getmembers(): # Check if this is a man page file if not self._is_manpage(member.name): continue # Create ManFile object extract_path = pkg_extract_dir / member.name.lstrip('/') man_file = ManFile( file_path=extract_path, package_name=package.name ) # Apply section filtering if self.skip_sections and man_file.section in self.skip_sections: logger.debug(f"Skipping {man_file.display_name} (section {man_file.section})") continue # Apply language filtering if self.skip_languages and man_file.language and man_file.language != 'en': logger.debug(f"Skipping {man_file.display_name} (language {man_file.language})") continue # Extract the file extract_path.parent.mkdir(parents=True, exist_ok=True) try: content = rpm.extractfile(member).read() with open(extract_path, 'wb') as f: f.write(content) man_file.content = content man_files.append(man_file) except Exception as e: logger.warning(f"Failed to extract {member.name}: {e}") logger.info(f"Extracted {len(man_files)} man pages from {package.name}") except Exception as e: logger.error(f"Error extracting from {package.filename}: {e}") return man_files def extract_from_packages( self, packages: List[Package], max_workers: int = 5 ) -> List[ManFile]: """Extract man pages from multiple packages in parallel. Args: packages: List of packages to process max_workers: Maximum number of parallel extractions Returns: List of all extracted ManFile objects """ all_man_files = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all extraction tasks future_to_pkg = { executor.submit(self.extract_from_package, pkg): pkg for pkg in packages } # Collect results for future in as_completed(future_to_pkg): pkg = future_to_pkg[future] try: man_files = future.result() all_man_files.extend(man_files) except Exception as e: logger.error(f"Error processing {pkg.name}: {e}") logger.info(f"Extracted total of {len(all_man_files)} man pages from {len(packages)} packages") return all_man_files def read_manpage_content(self, man_file: ManFile) -> str: """Read and decompress man page content. Args: man_file: ManFile to read Returns: Decompressed man page content as string """ if not man_file.file_path.exists(): logger.warning(f"Man page file not found: {man_file.file_path}") return "" try: # Try reading as gzipped file first if man_file.file_path.suffix == '.gz': with gzip.open(man_file.file_path, 'rb') as f: content = f.read() else: # Read as plain text with open(man_file.file_path, 'rb') as f: content = f.read() # Decode with error handling return content.decode('utf-8', errors='replace') except gzip.BadGzipFile: # Not a gzip file, try reading as plain text try: with open(man_file.file_path, 'rb') as f: content = f.read() return content.decode('utf-8', errors='replace') except Exception as e: logger.error(f"Error reading {man_file.file_path}: {e}") return "" except Exception as e: logger.error(f"Error reading {man_file.file_path}: {e}") return "" @staticmethod def _is_manpage(path: str) -> bool: """Check if a file path is a man page. Args: path: File path to check Returns: True if this looks like a man page file """ # Must contain /man/ in path if '/man/' not in path: return False # Should be in /usr/share/man/ or /usr/man/ if not ('/share/man/' in path or path.startswith('/usr/man/')): return False # Common man page patterns # - /usr/share/man/man1/foo.1.gz # - /usr/share/man/es/man1/foo.1.gz # - /usr/share/man/man3/printf.3.gz parts = path.split('/') # Check for man directory has_man_section = any( part.startswith('man') and len(part) > 3 and part[3].isdigit() for part in parts ) return has_man_section def cleanup_extracts(self, package: Package): """Clean up extracted files for a package. Args: package: Package whose extracts to clean up """ pkg_extract_dir = self.extract_dir / package.name if pkg_extract_dir.exists(): import shutil shutil.rmtree(pkg_extract_dir) logger.debug(f"Cleaned up extracts for {package.name}")