"""Repository manager for querying and downloading RPM packages.""" import logging from pathlib import Path from typing import List, Set, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import dnf import requests from ..models import Package from .contents import ContentsParser logger = logging.getLogger(__name__) class RepoManager: """Manages Rocky Linux repository operations. Handles: - Repository configuration with DNF - Package discovery and filtering - Package downloads with progress tracking """ def __init__( self, repo_url: str, version: str, repo_type: str, arch: str, cache_dir: Path, download_dir: Path, ): """Initialize repository manager. Args: repo_url: Full repository URL version: Rocky Linux version (e.g., '9.5') repo_type: Repository type ('BaseOS' or 'AppStream') arch: Architecture (e.g., 'x86_64') cache_dir: Directory for caching metadata download_dir: Directory for downloading packages """ self.repo_url = repo_url self.version = version self.repo_type = repo_type self.arch = arch self.cache_dir = Path(cache_dir) self.download_dir = Path(download_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self.download_dir.mkdir(parents=True, exist_ok=True) # Initialize DNF self.base = dnf.Base() self.base.conf.debuglevel = 0 self.base.conf.errorlevel = 0 self.base.conf.cachedir = str(self.cache_dir / "dnf") self._configure_repo() self.packages_with_manpages: Optional[Set[str]] = None def _configure_repo(self): """Configure DNF repository.""" repo_id = f"rocky-{self.repo_type.lower()}-{self.version}-{self.arch}" repo = dnf.repo.Repo(repo_id, self.base.conf) repo.baseurl = [self.repo_url] repo.enabled = True repo.gpgcheck = False # We verify checksums separately self.base.repos.add(repo) logger.info(f"Configured repository: {repo_id} at {self.repo_url}") # Fill the sack (package database) self.base.fill_sack(load_system_repo=False, load_available_repos=True) logger.info("Repository metadata loaded") def discover_packages_with_manpages(self) -> Set[str]: """Discover which packages contain man pages using filelists. This is the key optimization - we parse repository metadata to identify packages with man pages before downloading anything. Returns: Set of package names that contain man pages """ if self.packages_with_manpages is not None: return self.packages_with_manpages parser = ContentsParser(self.repo_url, self.cache_dir) self.packages_with_manpages = parser.get_packages_with_manpages() return self.packages_with_manpages def list_packages(self, with_manpages_only: bool = True) -> List[Package]: """List all packages in the repository. Args: with_manpages_only: If True, only return packages with man pages Returns: List of Package objects """ logger.info(f"Querying packages from {self.repo_type} ({self.version}/{self.arch})") # Get packages with man pages if filtering manpage_packages = None if with_manpages_only: manpage_packages = self.discover_packages_with_manpages() logger.info(f"Filtering to {len(manpage_packages)} packages with man pages") packages = [] # Query all available packages query = self.base.sack.query().available() # For each package name, get only one arch (prefer noarch, then our target arch) seen_names = set() for pkg in query: pkg_name = pkg.name # Skip if we've already added this package if pkg_name in seen_names: continue # Skip if filtering and package doesn't have man pages if manpage_packages and pkg_name not in manpage_packages: continue # Get repo information repo = pkg.repo baseurl = repo.baseurl[0] if repo and repo.baseurl else self.repo_url # Create Package object package = Package( name=pkg_name, version=pkg.version, release=pkg.release, arch=pkg.arch, repo_type=self.repo_type, location=pkg.location, baseurl=baseurl, checksum=pkg.chksum[1] if pkg.chksum else "", # chksum is (type, value) checksum_type=pkg.chksum[0] if pkg.chksum else "sha256", has_manpages=True if manpage_packages else False, ) packages.append(package) seen_names.add(pkg_name) logger.info(f"Found {len(packages)} packages to process") return sorted(packages) # Sort by name for consistent ordering def download_package(self, package: Package) -> bool: """Download a single package. Args: package: Package to download Returns: True if download successful, False otherwise """ download_path = self.download_dir / package.filename package.download_path = download_path # Skip if already downloaded if download_path.exists(): logger.debug(f"Package already downloaded: {package.filename}") return True try: logger.info(f"Downloading {package.filename}") response = requests.get(package.download_url, timeout=300, stream=True) response.raise_for_status() # Download with progress (optional: could add progress bar here) with open(download_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logger.debug(f"Downloaded: {package.filename}") return True except Exception as e: logger.error(f"Error downloading {package.filename}: {e}") # Clean up partial download if download_path.exists(): download_path.unlink() return False def download_packages( self, packages: List[Package], max_workers: int = 5 ) -> List[Package]: """Download multiple packages in parallel. Args: packages: List of packages to download max_workers: Maximum number of parallel downloads Returns: List of successfully downloaded packages """ downloaded = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all download tasks future_to_pkg = { executor.submit(self.download_package, pkg): pkg for pkg in packages } # Process completed downloads for future in as_completed(future_to_pkg): pkg = future_to_pkg[future] try: if future.result(): downloaded.append(pkg) except Exception as e: logger.error(f"Error processing {pkg.name}: {e}") logger.info(f"Successfully downloaded {len(downloaded)}/{len(packages)} packages") return downloaded def cleanup_package(self, package: Package): """Delete a downloaded package file. Args: package: Package to clean up """ if package.download_path and package.download_path.exists(): package.download_path.unlink() logger.debug(f"Deleted: {package.filename}")