import requests
import dnf
import rpmfile
import pprint as pp
import gzip
import subprocess
import re
import json
import tarfile
from urllib.parse import urljoin
from typing import List, Dict, Any, Callable
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
sitemap = {}
class Package:
def __lt__(self, other):
return self.name < other.name
def __init__(self, name: str, repo_type: str, chksum: str, location: str, baseurl: str, license: str, download_path: Path = None, extract_dir: Path = None):
self.name = name
self.repo_type = repo_type
self.chksum = chksum
self.location = location
self.baseurl = baseurl
self.filename = location.split("/")[-1]
self.license = license
self.download_path = download_path
self.extract_dir = extract_dir
class ManFile:
def __init__(self, filelocation: Path):
self.filelocation = filelocation
self.filename = self.filelocation.parts[-1]
self.context = self.filelocation.parts[-2]
self.context_number = str(''.join(filter(str.isdigit, self.context)))
self.regular_name = self.filename.replace(".gz","")
self.name = ".".join(self.regular_name.split(".")[:-1])
self.man_text = None
self.man_html = None
self.generated_html = None
self.html_folder_location = None
self._html_file_location = None
self.html_uri_location = ""
@property
def html_file_location(self):
return self._html_file_location
@html_file_location.setter
def html_file_location(self, value: Path):
self._html_file_location = value
if value:
self.html_uri_location = "/".join(value.parts[2:])
else:
self.html_uri_location = ""
class ManMaker:
def __init__(self, man_dir: str, html_dir: str):
self.man_dir = man_dir
self.html_dir = html_dir
def zcat(self, file_path: Path):
with gzip.open(file_path, 'rb') as f:
file_content = f.read()
return file_content.decode('utf-8')
def extract_man_files(self, package: Package):
rpm_file = package.download_path.stem
extract_dir = Path(f"{self.man_dir}/{rpm_file}")
extract_dir.mkdir(parents=True, exist_ok=True)
package.extract_dir = extract_dir
man_files = []
with rpmfile.open(package.download_path) as rpm:
for member in rpm.getmembers():
if "/man/" in member.name:
man_file = ManFile(filelocation=extract_dir / member.name)
if not man_file.filelocation.exists():
man_file.filelocation.parent.mkdir(parents=True, exist_ok=True)
with open(man_file.filelocation, "wb") as f:
f.write(rpm.extractfile(member).read())
man_files.append(man_file)
self.get_man_file_contents(package, man_files)
def get_man_file_contents(self, package: Package, man_files: List[ManFile]):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.process_man_file, man_file, package) for man_file in man_files]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
# Handle exceptions if needed
pass
def process_man_file(self, man_file: ManFile, package: Package):
try:
man_file.man_text = self.zcat(man_file.filelocation)
self.convert_man_to_html(man_file, package)
except gzip.BadGzipFile as e:
# print(f"{e}: {man_file.filelocation}")
pass
def convert_man_to_html(self, man_file: ManFile, package: Package):
process = subprocess.Popen(
['mandoc', '-T', 'html', '-O', 'fragment,toc'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
man_file.man_html, stderr = process.communicate(input=man_file.man_text)
if process.returncode != 0:
print(f"Error converting man to HTML: {stderr}")
else:
self.clean_html(man_file, package)
def clean_html(self, man_file: ManFile, package: Package):
man_file.man_html = re.sub(r'
\(\) | ', ' | ', man_file.man_html)
man_file.man_html = re.sub(r'\(\) | ', ' | ', man_file.man_html)
man_file.man_html.strip()
self.generate_html(man_file, package)
def clean_name(self, man_file: ManFile):
invalid_filenames = {
"..1": "..1".replace("..", "__"),
":.1": ":.1".replace(":.", "_"),
"[.1": "[.1".replace("[", "(").replace(".", "_")
}
cleaned_name = man_file.regular_name
if cleaned_name in invalid_filenames:
cleaned_name = invalid_filenames[cleaned_name]
return cleaned_name
def generate_html(self, man_file: ManFile, package: Package):
env = setup_jinja()
template = env.get_template("man_page.j2")
data = {
'title': f'{man_file.name} - {package.name} - Rocky Man Page',
'header_title': f'{man_file.name}',
'main_content': man_file.man_html
}
man_file.generated_html = template.render(data)
self.save_html(man_file, package)
def save_html(self, man_file: ManFile, package: Package):
man_file.html_folder_location = html_folder_export(man_file, package, self.html_dir)
man_file.html_folder_location.mkdir(parents=True, exist_ok=True)
man_file.html_file_location = man_file.html_folder_location / f"{self.clean_name(man_file)}.html"
with open(man_file.html_file_location, "w") as f:
f.write(man_file.generated_html)
# print(f"Saved HTML to {man_file.html_file_location}")
self.update_sitemap(man_file, package)
def update_sitemap(self, man_file: ManFile, package: Package):
global sitemap
if package.name not in sitemap:
sitemap[package.name] = {}
sitemap[package.name][man_file.name] = {
"url": str(man_file.html_uri_location),
"man_type": man_file.context,
"man_type_number": man_file.context_number,
"repo_type": package.repo_type,
"fullname": f"{package.name} - {man_file.name}({man_file.context_number})"
}
class RepoManager:
def __init__(self, base_url: str, contentdir: str, releasever: str, basearch: str, repo_types: str, download_dir, enabled: bool = True, gpgcheck: bool = False):
self.base_url = base_url
self.contentdir = contentdir
self.releasever = releasever
self.basearch = basearch
self.repo_type = repo_types
self.download_dir = download_dir
self.enabled = enabled
self.gpgcheck = gpgcheck
self.base = dnf.Base()
self.base.conf.debuglevel = 0
self.base.conf.errorlevel = 0
self.download_dir = Path(download_dir)
self.download_dir.mkdir(parents=True, exist_ok=True)
self._configure_repo()
def generate_repo_url(self, repo_type: str = None):
repo_url = urljoin(self.base_url, f"{self.contentdir}/{self.releasever}/{repo_type}/{self.basearch}/os/")
return repo_url
def print_repo_url(self):
repo_url = self.generate_repo_url()
print(f"Repository URL: {repo_url}")
def _configure_repo(self):
for repo_type in self.repo_type:
self.repo_name = f"{repo_type}-{self.releasever}"
repo = dnf.repo.Repo(self.repo_name, self.base.conf)
repo_url = self.generate_repo_url(repo_type)
repo.baseurl = [repo_url]
repo.enabled = self.enabled
repo.gpgcheck = self.gpgcheck
self.base.repos.add(repo)
self.base.fill_sack(load_system_repo=False, load_available_repos=True)
def print_repo(self):
repo = self.base.repos
print(repo)
def list_packages(self) -> List[str]:
package_list = []
for pkg in self.base.sack.query().available():
package_list.append(pkg.name)
return package_list
def list_packages_raw(self):
for pkg in self.base.sack.query().available():
print(f"Package: {pkg.name}")
for attr in dir(pkg):
if not attr.startswith("_"):
print(f" {attr}: {getattr(pkg, attr)}")
print("\n")
break
def list_package_object(self, package_name: str) -> List[Package]:
pkgs = self.base.sack.query().filter(name=package_name)
if not pkgs:
raise ValueError(f"Package {package_name} not found in the repository.")
return self.generate_package_list(pkgs)
def list_packages_object(self) -> List[Package]:
pkgs = self.base.sack.query().available()
if not pkgs:
raise ValueError(f"No packages found in the repository.")
return self.generate_package_list(pkgs)
def generate_package_list(self, pkgs) -> List[Package]:
package_list = []
for pkg in pkgs:
repo = pkg.repo
package_info = Package(
name=getattr(pkg, "name", None),
repo_type=self.repo_type,
chksum=getattr(pkg, "chksum", None),
location=getattr(pkg, "location", None),
baseurl=repo.baseurl[0] if repo and repo.baseurl else None,
license=getattr(pkg, "license", None)
)
package_list.append(package_info)
return package_list
def download_file(self, download_url: str, download_path: Path):
if download_path.exists():
return
response = requests.get(download_url)
response.raise_for_status()
with open(download_path, "wb") as f:
f.write(response.content)
def download_package(self, package_name: str, man_maker: ManMaker) -> Package:
try:
packages = self.list_package_object(package_name)
except ValueError as e:
print(f"Error downloading package: {e}")
return
for package in packages:
download_path = self.download_dir / f"{package.filename}"
package.download_path = download_path
if not download_path.exists():
download_url = urljoin(package.baseurl, package.location)
self.download_file(download_url, download_path)
# Process the package immediately after downloading
print(f"Extracting files from {package.filename}...")
man_maker.extract_man_files(package)
return package
def download_all_packages(self, man_maker: ManMaker) -> List[Package]:
packages = self.list_packages_object()
downloaded_files = []
with ThreadPoolExecutor() as executor:
future_to_package = {executor.submit(self.download_package, package.name, man_maker): package for package in packages}
for future in as_completed(future_to_package):
package = future_to_package[future]
try:
downloaded_files.append(future.result())
except Exception as e:
print(f"Error downloading package {package.name}: {e}")
return downloaded_files
def delete_package(self, rpm_path: Path):
rpm_path.unlink()
def save_json(sitemap: Dict[str, Dict[str, Any]], json_file_location: Path):
sorted_sitemap = {k: sitemap[k] for k in sorted(sitemap)}
# Save the JSON file
with open(json_file_location, "w") as f:
json.dump(sorted_sitemap, f)
# Save the gzipped JSON file
with gzip.open(f"{json_file_location}.gz", "wt") as gz:
json.dump(sorted_sitemap, gz)
def html_folder_export(man_file: ManFile, package: Package, html_base_dir: str) -> Path:
return Path(f"{html_base_dir}/{package.name}/{man_file.context}")
def setup_jinja():
env = Environment(loader=FileSystemLoader('./templates'))
return env
def generate_index(releasever: str, html_dir: str):
env = setup_jinja()
template = env.get_template("index.j2")
data = {
'title': f'Rocky Linux {releasever} - Man Page Search',
'header_title': f'Rocky Linux {releasever} - Man Page Search'
}
render = template.render(data)
with open(f"{html_dir}/index.html", "w") as f:
f.write(render)
def main():
BASE_URL = "http://dl.rockylinux.org/"
CONTENTDIR = "pub/rocky"
RELEASEVERS = ["8.10", "9.5"]
BASEARCH = "aarch64"
REPO_TYPES = ["BaseOS", "AppStream"]
DOWNLOAD_BASE_DIR = "./tmp/repo"
MAN_BASE_DIR = "./tmp/export"
HTML_BASE_DIR = "./html"
for RELEASEVER in RELEASEVERS:
DOWNLOAD_DIR = f"{DOWNLOAD_BASE_DIR}/{RELEASEVER}"
MAN_DIR = f"{MAN_BASE_DIR}/{RELEASEVER}"
HTML_DIR = f"{HTML_BASE_DIR}/{RELEASEVER}"
repo_manager = RepoManager(
base_url = BASE_URL,
contentdir = CONTENTDIR,
releasever = RELEASEVER,
basearch = BASEARCH,
repo_types = REPO_TYPES,
download_dir = DOWNLOAD_DIR,
enabled = True,
gpgcheck = False
)
man_maker = ManMaker(man_dir=MAN_DIR, html_dir=HTML_DIR)
print(f"Downloading packages and generating HTML for {RELEASEVER}...")
repo_manager.download_all_packages(man_maker)
# repo_manager.download_package("at", man_maker)
generate_index(RELEASEVER, HTML_DIR)
save_json(sitemap, Path(f"{HTML_DIR}/list.json"))
if __name__ == "__main__":
main()