ec32c72363
* Complete refactor Signed-off-by: Stephen Simpson <ssimpson89@users.noreply.github.com> * Complete refactor Signed-off-by: Stephen Simpson <ssimpson89@users.noreply.github.com> --------- Signed-off-by: Stephen Simpson <ssimpson89@users.noreply.github.com>
223 lines
7.4 KiB
Python
223 lines
7.4 KiB
Python
"""Extract man pages from RPM packages."""
|
|
|
|
import gzip
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
import rpmfile
|
|
|
|
from ..models import Package, ManFile
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ManPageExtractor:
|
|
"""Extracts man pages from RPM packages.
|
|
|
|
Handles:
|
|
- Extracting man pages from RPMs
|
|
- Reading gzipped man page content
|
|
- Organizing extracted files by package
|
|
"""
|
|
|
|
def __init__(self, extract_dir: Path, skip_sections: List[str] = None, skip_languages: bool = True):
|
|
"""Initialize extractor.
|
|
|
|
Args:
|
|
extract_dir: Base directory for extracting man pages
|
|
skip_sections: List of man sections to skip (e.g., ['3', '3p', '3pm'])
|
|
skip_languages: If True, skip non-English man pages
|
|
"""
|
|
self.extract_dir = Path(extract_dir)
|
|
self.extract_dir.mkdir(parents=True, exist_ok=True)
|
|
self.skip_sections = skip_sections or []
|
|
self.skip_languages = skip_languages
|
|
|
|
def extract_from_package(self, package: Package) -> List[ManFile]:
|
|
"""Extract all man pages from a package.
|
|
|
|
Args:
|
|
package: Package to extract from
|
|
|
|
Returns:
|
|
List of ManFile objects for extracted man pages
|
|
"""
|
|
if not package.download_path or not package.download_path.exists():
|
|
logger.warning(f"Package file not found: {package.name}")
|
|
return []
|
|
|
|
# Create extraction directory for this package
|
|
pkg_extract_dir = self.extract_dir / package.name
|
|
pkg_extract_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
man_files = []
|
|
|
|
try:
|
|
logger.info(f"Extracting man pages from {package.filename}")
|
|
|
|
with rpmfile.open(package.download_path) as rpm:
|
|
for member in rpm.getmembers():
|
|
# Check if this is a man page file
|
|
if not self._is_manpage(member.name):
|
|
continue
|
|
|
|
# Create ManFile object
|
|
extract_path = pkg_extract_dir / member.name.lstrip('/')
|
|
man_file = ManFile(
|
|
file_path=extract_path,
|
|
package_name=package.name
|
|
)
|
|
|
|
# Apply section filtering
|
|
if self.skip_sections and man_file.section in self.skip_sections:
|
|
logger.debug(f"Skipping {man_file.display_name} (section {man_file.section})")
|
|
continue
|
|
|
|
# Apply language filtering
|
|
if self.skip_languages and man_file.language and man_file.language != 'en':
|
|
logger.debug(f"Skipping {man_file.display_name} (language {man_file.language})")
|
|
continue
|
|
|
|
# Extract the file
|
|
extract_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
content = rpm.extractfile(member).read()
|
|
with open(extract_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
man_file.content = content
|
|
man_files.append(man_file)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract {member.name}: {e}")
|
|
|
|
logger.info(f"Extracted {len(man_files)} man pages from {package.name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting from {package.filename}: {e}")
|
|
|
|
return man_files
|
|
|
|
def extract_from_packages(
|
|
self,
|
|
packages: List[Package],
|
|
max_workers: int = 5
|
|
) -> List[ManFile]:
|
|
"""Extract man pages from multiple packages in parallel.
|
|
|
|
Args:
|
|
packages: List of packages to process
|
|
max_workers: Maximum number of parallel extractions
|
|
|
|
Returns:
|
|
List of all extracted ManFile objects
|
|
"""
|
|
all_man_files = []
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all extraction tasks
|
|
future_to_pkg = {
|
|
executor.submit(self.extract_from_package, pkg): pkg
|
|
for pkg in packages
|
|
}
|
|
|
|
# Collect results
|
|
for future in as_completed(future_to_pkg):
|
|
pkg = future_to_pkg[future]
|
|
try:
|
|
man_files = future.result()
|
|
all_man_files.extend(man_files)
|
|
except Exception as e:
|
|
logger.error(f"Error processing {pkg.name}: {e}")
|
|
|
|
logger.info(f"Extracted total of {len(all_man_files)} man pages from {len(packages)} packages")
|
|
return all_man_files
|
|
|
|
def read_manpage_content(self, man_file: ManFile) -> str:
|
|
"""Read and decompress man page content.
|
|
|
|
Args:
|
|
man_file: ManFile to read
|
|
|
|
Returns:
|
|
Decompressed man page content as string
|
|
"""
|
|
if not man_file.file_path.exists():
|
|
logger.warning(f"Man page file not found: {man_file.file_path}")
|
|
return ""
|
|
|
|
try:
|
|
# Try reading as gzipped file first
|
|
if man_file.file_path.suffix == '.gz':
|
|
with gzip.open(man_file.file_path, 'rb') as f:
|
|
content = f.read()
|
|
else:
|
|
# Read as plain text
|
|
with open(man_file.file_path, 'rb') as f:
|
|
content = f.read()
|
|
|
|
# Decode with error handling
|
|
return content.decode('utf-8', errors='replace')
|
|
|
|
except gzip.BadGzipFile:
|
|
# Not a gzip file, try reading as plain text
|
|
try:
|
|
with open(man_file.file_path, 'rb') as f:
|
|
content = f.read()
|
|
return content.decode('utf-8', errors='replace')
|
|
except Exception as e:
|
|
logger.error(f"Error reading {man_file.file_path}: {e}")
|
|
return ""
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading {man_file.file_path}: {e}")
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _is_manpage(path: str) -> bool:
|
|
"""Check if a file path is a man page.
|
|
|
|
Args:
|
|
path: File path to check
|
|
|
|
Returns:
|
|
True if this looks like a man page file
|
|
"""
|
|
# Must contain /man/ in path
|
|
if '/man/' not in path:
|
|
return False
|
|
|
|
# Should be in /usr/share/man/ or /usr/man/
|
|
if not ('/share/man/' in path or path.startswith('/usr/man/')):
|
|
return False
|
|
|
|
# Common man page patterns
|
|
# - /usr/share/man/man1/foo.1.gz
|
|
# - /usr/share/man/es/man1/foo.1.gz
|
|
# - /usr/share/man/man3/printf.3.gz
|
|
|
|
parts = path.split('/')
|
|
|
|
# Check for man<digit> directory
|
|
has_man_section = any(
|
|
part.startswith('man') and len(part) > 3 and part[3].isdigit()
|
|
for part in parts
|
|
)
|
|
|
|
return has_man_section
|
|
|
|
def cleanup_extracts(self, package: Package):
|
|
"""Clean up extracted files for a package.
|
|
|
|
Args:
|
|
package: Package whose extracts to clean up
|
|
"""
|
|
pkg_extract_dir = self.extract_dir / package.name
|
|
if pkg_extract_dir.exists():
|
|
import shutil
|
|
shutil.rmtree(pkg_extract_dir)
|
|
logger.debug(f"Cleaned up extracts for {package.name}")
|