Files
rocky-man/rocky_man2.py
Stephen Simpson 2287678798 Init
2025-01-09 15:39:20 -06:00

381 lines
14 KiB
Python

import asyncio
import aiohttp
import aiofiles
import dnf
import rpmfile
import pprint as pp
import gzip
import subprocess
import re
import json
import tarfile
from urllib.parse import urljoin
from typing import List, Dict, Any, Callable
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
sitemap = {}
class Package:
def __init__(self, name: str, repo_type: str, chksum: str, location: str, baseurl: str, license: str, download_path: Path = None, extract_dir: Path = None):
self.name = name
self.repo_type = repo_type
self.chksum = chksum
self.location = location
self.baseurl = baseurl
self.filename = location.split("/")[-1]
self.license = license
self.download_path = download_path
self.extract_dir = extract_dir
class ManFile:
def __init__(self, filelocation: Path):
self.filelocation = filelocation
self.filename = self.filelocation.parts[-1]
self.context = self.filelocation.parts[-2]
self.context_number = str(''.join(filter(str.isdigit, self.context)))
self.regular_name = self.filename.replace(".gz","")
self.name = ".".join(self.regular_name.split(".")[:-1])
self.man_text = None
self.man_html = None
self.generated_html = None
self.html_folder_location = None
self._html_file_location = None
self.html_uri_location = ""
@property
def html_file_location(self):
return self._html_file_location
@html_file_location.setter
def html_file_location(self, value: Path):
self._html_file_location = value
if value:
self.html_uri_location = "/".join(value.parts[2:])
else:
self.html_uri_location = ""
class ManMaker:
def __init__(self, man_dir: str, html_dir: str):
self.man_dir = man_dir
self.html_dir = html_dir
async def zcat(self, file_path: Path):
async with aiofiles.open(file_path, 'rb') as f:
content = await f.read()
try:
return gzip.decompress(content).decode('utf-8')
except gzip.BadGzipFile:
return None
async def extract_man_files(self, package: Package):
rpm_file = package.download_path.stem
extract_dir = Path(f"{self.man_dir}/{rpm_file}")
extract_dir.mkdir(parents=True, exist_ok=True)
package.extract_dir = extract_dir
man_files = []
with rpmfile.open(package.download_path) as rpm:
for member in rpm.getmembers():
if "/man/" in member.name:
man_file = ManFile(filelocation=extract_dir / member.name)
man_file.filelocation.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(man_file.filelocation, "wb") as f:
await f.write(rpm.extractfile(member).read())
man_files.append(man_file)
await self.get_man_file_contents(package, man_files)
async def get_man_file_contents(self, package: Package, man_files: List[ManFile]):
tasks = [self.process_man_file(man_file, package) for man_file in man_files]
await asyncio.gather(*tasks)
async def process_man_file(self, man_file: ManFile, package: Package):
try:
man_file.man_text = await self.zcat(man_file.filelocation)
if man_file.man_text:
await self.convert_man_to_html(man_file, package)
except Exception as e:
print(f"Error processing {man_file.filelocation}: {e}")
async def convert_man_to_html(self, man_file: ManFile, package: Package):
process = await asyncio.create_subprocess_exec(
'mandoc', '-T', 'html', '-O', 'fragment,toc',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate(input=man_file.man_text.encode())
man_file.man_html = stdout.decode()
if process.returncode == 0:
await self.clean_html(man_file, package)
else:
print(f"Error converting man to HTML: {stderr.decode()}")
async def clean_html(self, man_file: ManFile, package: Package):
man_file.man_html = re.sub(r'<td class="head-ltitle">\(\)</td>', '<td class="head-ltitle"></td>', man_file.man_html)
man_file.man_html = re.sub(r'<td class="head-rtitle">\(\)</td>', '<td class="head-rtitle"></td>', man_file.man_html)
man_file.man_html.strip()
await self.generate_html(man_file, package)
def clean_name(self, man_file: ManFile):
invalid_filenames = {
"..1": "..1".replace("..", "__"),
":.1": ":.1".replace(":.", "_"),
"[.1": "[.1".replace("[", "(").replace(".", "_")
}
cleaned_name = man_file.regular_name
if cleaned_name in invalid_filenames:
cleaned_name = invalid_filenames[cleaned_name]
return cleaned_name
async def generate_html(self, man_file: ManFile, package: Package):
env = setup_jinja()
template = env.get_template("man_page.j2")
data = {
'title': f'{man_file.name} - {package.name} - Rocky Man Page',
'header_title': f'{man_file.name}',
'main_content': man_file.man_html
}
man_file.generated_html = template.render(data)
await self.save_html(man_file, package)
async def save_html(self, man_file: ManFile, package: Package):
man_file.html_folder_location = html_folder_export(man_file, package, self.html_dir)
man_file.html_folder_location.mkdir(parents=True, exist_ok=True)
man_file.html_file_location = man_file.html_folder_location / f"{self.clean_name(man_file)}.html"
async with aiofiles.open(man_file.html_file_location, "w") as f:
await f.write(man_file.generated_html)
self.update_sitemap(man_file, package)
def update_sitemap(self, man_file: ManFile, package: Package):
global sitemap
if package.name not in sitemap:
sitemap[package.name] = {}
sitemap[package.name][man_file.name] = {
"url": str(man_file.html_uri_location),
"man_type": man_file.context,
"man_type_number": man_file.context_number,
"repo_type": package.repo_type,
"fullname": f"{package.name} - {man_file.name}({man_file.context_number})"
}
class RepoManager:
def __init__(self, base_url: str, contentdir: str, releasever: str, basearch: str, repo_type: str, download_dir, enabled: bool = True, gpgcheck: bool = False):
self.base_url = base_url
self.contentdir = contentdir
self.releasever = releasever
self.basearch = basearch
self.repo_type = repo_type
self.repo_name = f"{repo_type}-{releasever}"
self.download_dir = download_dir
self.enabled = enabled
self.gpgcheck = gpgcheck
self.base = dnf.Base()
self.base.conf.debuglevel = 0
self.base.conf.errorlevel = 0
self.download_dir = Path(download_dir)
self.download_dir.mkdir(parents=True, exist_ok=True)
self._configure_repo()
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
def generate_repo_url(self):
repo_url = urljoin(self.base_url, f"{self.contentdir}/{self.releasever}/BaseOS/{self.basearch}/os/")
return repo_url
def print_repo_url(self):
repo_url = self.generate_repo_url()
print(f"Repository URL: {repo_url}")
def _configure_repo(self):
repo = dnf.repo.Repo(self.repo_name, self.base.conf)
repo_url = self.generate_repo_url()
repo.baseurl = [repo_url]
repo.enabled = self.enabled
repo.gpgcheck = self.gpgcheck
self.base.repos.add(repo)
self.base.fill_sack(load_system_repo=False)
def print_repo(self):
repo = self.base.repos
print(repo)
def list_packages(self) -> List[str]:
package_list = []
for pkg in self.base.sack.query().available():
package_list.append(pkg.name)
return package_list
def list_packages_raw(self):
for pkg in self.base.sack.query().available():
print(f"Package: {pkg.name}")
for attr in dir(pkg):
if not attr.startswith("_"):
print(f" {attr}: {getattr(pkg, attr)}")
print("\n")
break
def list_package_object(self, package_name: str) -> List[Package]:
pkgs = self.base.sack.query().filter(name=package_name)
if not pkgs:
raise ValueError(f"Package {package_name} not found in the repository.")
return self.generate_package_list(pkgs)
def list_packages_object(self) -> List[Package]:
pkgs = self.base.sack.query().available()
if not pkgs:
raise ValueError(f"No packages found in the repository.")
return self.generate_package_list(pkgs)
def generate_package_list(self, pkgs) -> List[Package]:
package_list = []
for pkg in pkgs:
repo = pkg.repo
package_info = Package(
name=getattr(pkg, "name", None),
repo_type=self.repo_type,
chksum=getattr(pkg, "chksum", None),
location=getattr(pkg, "location", None),
baseurl=repo.baseurl[0] if repo and repo.baseurl else None,
license=getattr(pkg, "license", None)
)
package_list.append(package_info)
return package_list
async def download_file(self, download_url: str, download_path: Path):
if download_path.exists():
return
async with self.session.get(download_url) as response:
response.raise_for_status()
async with aiofiles.open(download_path, "wb") as f:
await f.write(await response.read())
async def download_package(self, package_name: str, man_maker: ManMaker) -> Package:
packages = self.list_package_object(package_name)
for package in packages:
download_url = urljoin(package.baseurl, package.location)
download_path = self.download_dir / f"{package.filename}"
package.download_path = download_path
await self.download_file(download_url, download_path)
await man_maker.extract_man_files(package)
return package
async def download_all_packages(self, man_maker: ManMaker) -> List[Package]:
packages = self.list_packages_object()
tasks = []
for package in packages:
try:
tasks.append(self.download_package(package.name, man_maker))
except Exception as e:
print(f"Error queueing package: {e}")
return await asyncio.gather(*tasks)
def delete_package(self, rpm_path: Path):
rpm_path.unlink()
async def save_json(sitemap: Dict[str, Dict[str, Any]], json_file_location: Path):
sorted_sitemap = {k: sitemap[k] for k in sorted(sitemap)}
async with aiofiles.open(json_file_location, "w") as f:
await f.write(json.dumps(sorted_sitemap))
gzipped_file_location = f"{json_file_location}.gz"
with gzip.open(gzipped_file_location, "wt") as gz:
json.dump(sorted_sitemap, gz)
def html_folder_export(man_file: ManFile, package: Package, html_base_dir: str) -> Path:
return Path(f"{html_base_dir}/{package.name}/{man_file.context}")
def setup_jinja():
env = Environment(loader=FileSystemLoader('./templates'))
return env
async def generate_index(releasever: str, html_dir: str):
env = setup_jinja()
template = env.get_template("index.j2")
data = {
'title': f'Rocky Linux {releasever} - Man Page Search',
'header_title': f'Rocky Linux {releasever} - Man Page Search'
}
render = template.render(data)
async with aiofiles.open(f"{html_dir}/index.html", "w") as f:
await f.write(render)
async def process_repo(base_url: str, contentdir: str, releasever: str, basearch: str,
repo_type: str, download_dir: str, man_dir: str, html_dir: str):
async with RepoManager(
base_url=base_url,
contentdir=contentdir,
releasever=releasever,
basearch=basearch,
repo_type=repo_type,
download_dir=download_dir,
enabled=True,
gpgcheck=False
) as repo_manager:
man_maker = ManMaker(man_dir=man_dir, html_dir=html_dir)
print(f"Processing {repo_type} for {releasever}...")
await repo_manager.download_all_packages(man_maker)
async def main():
BASE_URL = "https://ord.mirror.rackspace.com/"
CONTENTDIR = "rocky"
RELEASEVERS = ["8.10", "9.5"]
BASEARCH = "aarch64"
REPO_TYPES = ["BaseOS", "AppStream"]
DOWNLOAD_BASE_DIR = "./tmp/repo"
MAN_BASE_DIR = "./tmp/export"
HTML_BASE_DIR = "./html"
for RELEASEVER in RELEASEVERS:
tasks = []
for REPO_TYPE in REPO_TYPES:
DOWNLOAD_DIR = f"{DOWNLOAD_BASE_DIR}/{RELEASEVER}/{REPO_TYPE}"
MAN_DIR = f"{MAN_BASE_DIR}/{RELEASEVER}/{REPO_TYPE}"
HTML_DIR = f"{HTML_BASE_DIR}/{RELEASEVER}"
tasks.append(process_repo(
BASE_URL, CONTENTDIR, RELEASEVER, BASEARCH,
REPO_TYPE, DOWNLOAD_DIR, MAN_DIR, HTML_DIR
))
await asyncio.gather(*tasks)
await generate_index(RELEASEVER, HTML_DIR)
await save_json(sitemap, Path(f"{HTML_DIR}/list.json"))
if __name__ == "__main__":
asyncio.run(main())