Init
This commit is contained in:
362
rocky_man.py
Normal file
362
rocky_man.py
Normal file
@@ -0,0 +1,362 @@
|
||||
import requests
|
||||
import dnf
|
||||
import rpmfile
|
||||
import pprint as pp
|
||||
import gzip
|
||||
import subprocess
|
||||
import re
|
||||
import json
|
||||
import tarfile
|
||||
from urllib.parse import urljoin
|
||||
from typing import List, Dict, Any, Callable
|
||||
from pathlib import Path
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
sitemap = {}
|
||||
|
||||
class Package:
|
||||
def __init__(self, name: str, repo_type: str, chksum: str, location: str, baseurl: str, license: str, download_path: Path = None, extract_dir: Path = None):
|
||||
self.name = name
|
||||
self.repo_type = repo_type
|
||||
self.chksum = chksum
|
||||
self.location = location
|
||||
self.baseurl = baseurl
|
||||
self.filename = location.split("/")[-1]
|
||||
self.license = license
|
||||
self.download_path = download_path
|
||||
self.extract_dir = extract_dir
|
||||
|
||||
class ManFile:
|
||||
def __init__(self, filelocation: Path):
|
||||
self.filelocation = filelocation
|
||||
self.filename = self.filelocation.parts[-1]
|
||||
self.context = self.filelocation.parts[-2]
|
||||
self.context_number = str(''.join(filter(str.isdigit, self.context)))
|
||||
self.regular_name = self.filename.replace(".gz","")
|
||||
self.name = ".".join(self.regular_name.split(".")[:-1])
|
||||
self.man_text = None
|
||||
self.man_html = None
|
||||
self.generated_html = None
|
||||
self.html_folder_location = None
|
||||
self._html_file_location = None
|
||||
self.html_uri_location = ""
|
||||
|
||||
@property
|
||||
def html_file_location(self):
|
||||
return self._html_file_location
|
||||
|
||||
@html_file_location.setter
|
||||
def html_file_location(self, value: Path):
|
||||
self._html_file_location = value
|
||||
if value:
|
||||
self.html_uri_location = "/".join(value.parts[2:])
|
||||
else:
|
||||
self.html_uri_location = ""
|
||||
|
||||
class ManMaker:
|
||||
def __init__(self, man_dir: str, html_dir: str):
|
||||
self.man_dir = man_dir
|
||||
self.html_dir = html_dir
|
||||
|
||||
def zcat(self, file_path: Path):
|
||||
with gzip.open(file_path, 'rb') as f:
|
||||
file_content = f.read()
|
||||
return file_content.decode('utf-8')
|
||||
|
||||
def extract_man_files(self, package: Package):
|
||||
rpm_file = package.download_path.stem
|
||||
|
||||
extract_dir = Path(f"{self.man_dir}/{rpm_file}")
|
||||
extract_dir.mkdir(parents=True, exist_ok=True)
|
||||
package.extract_dir = extract_dir
|
||||
|
||||
man_files = []
|
||||
with rpmfile.open(package.download_path) as rpm:
|
||||
for member in rpm.getmembers():
|
||||
if "/man/" in member.name:
|
||||
man_file = ManFile(filelocation=extract_dir / member.name)
|
||||
man_file.filelocation.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(man_file.filelocation, "wb") as f:
|
||||
f.write(rpm.extractfile(member).read())
|
||||
man_files.append(man_file)
|
||||
|
||||
self.get_man_file_contents(package, man_files)
|
||||
|
||||
def get_man_file_contents(self, package: Package, man_files: List[ManFile]):
|
||||
for man_file in man_files:
|
||||
try:
|
||||
man_file.man_text = self.zcat(man_file.filelocation)
|
||||
self.convert_man_to_html(man_file, package)
|
||||
except gzip.BadGzipFile as e:
|
||||
# print(f"{e}: {man_file.filelocation}")
|
||||
pass
|
||||
|
||||
def convert_man_to_html(self, man_file: ManFile, package: Package):
|
||||
process = subprocess.Popen(
|
||||
['mandoc', '-T', 'html', '-O', 'fragment,toc'],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
|
||||
man_file.man_html, stderr = process.communicate(input=man_file.man_text)
|
||||
if process.returncode != 0:
|
||||
print(f"Error converting man to HTML: {stderr}")
|
||||
else:
|
||||
self.clean_html(man_file, package)
|
||||
|
||||
def clean_html(self, man_file: ManFile, package: Package):
|
||||
man_file.man_html = re.sub(r'<td class="head-ltitle">\(\)</td>', '<td class="head-ltitle"></td>', man_file.man_html)
|
||||
man_file.man_html = re.sub(r'<td class="head-rtitle">\(\)</td>', '<td class="head-rtitle"></td>', man_file.man_html)
|
||||
man_file.man_html.strip()
|
||||
self.generate_html(man_file, package)
|
||||
|
||||
def clean_name(self, man_file: ManFile):
|
||||
invalid_filenames = {
|
||||
"..1": "..1".replace("..", "__"),
|
||||
":.1": ":.1".replace(":.", "_"),
|
||||
"[.1": "[.1".replace("[", "(").replace(".", "_")
|
||||
}
|
||||
|
||||
cleaned_name = man_file.regular_name
|
||||
if cleaned_name in invalid_filenames:
|
||||
cleaned_name = invalid_filenames[cleaned_name]
|
||||
|
||||
return cleaned_name
|
||||
|
||||
def generate_html(self, man_file: ManFile, package: Package):
|
||||
env = setup_jinja()
|
||||
template = env.get_template("man_page.j2")
|
||||
|
||||
data = {
|
||||
'title': f'{man_file.name} - {package.name} - Rocky Man Page',
|
||||
'header_title': f'{man_file.name}',
|
||||
'main_content': man_file.man_html
|
||||
}
|
||||
|
||||
man_file.generated_html = template.render(data)
|
||||
self.save_html(man_file, package)
|
||||
|
||||
def save_html(self, man_file: ManFile, package: Package):
|
||||
man_file.html_folder_location = html_folder_export(man_file, package, self.html_dir)
|
||||
man_file.html_folder_location.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
man_file.html_file_location = man_file.html_folder_location / f"{self.clean_name(man_file)}.html"
|
||||
|
||||
with open(man_file.html_file_location, "w") as f:
|
||||
f.write(man_file.generated_html)
|
||||
# print(f"Saved HTML to {man_file.html_file_location}")
|
||||
|
||||
self.update_sitemap(man_file, package)
|
||||
|
||||
def update_sitemap(self, man_file: ManFile, package: Package):
|
||||
global sitemap
|
||||
if package.name not in sitemap:
|
||||
sitemap[package.name] = {}
|
||||
sitemap[package.name][man_file.name] = {
|
||||
"url": str(man_file.html_uri_location),
|
||||
"man_type": man_file.context,
|
||||
"man_type_number": man_file.context_number,
|
||||
"repo_type": package.repo_type,
|
||||
"fullname": f"{package.name} - {man_file.name}({man_file.context_number})"
|
||||
}
|
||||
|
||||
class RepoManager:
|
||||
def __init__(self, base_url: str, contentdir: str, releasever: str, basearch: str, repo_type: str, download_dir, enabled: bool = True, gpgcheck: bool = False):
|
||||
self.base_url = base_url
|
||||
self.contentdir = contentdir
|
||||
self.releasever = releasever
|
||||
self.basearch = basearch
|
||||
self.repo_type = repo_type
|
||||
self.repo_name = f"{repo_type}-{releasever}"
|
||||
|
||||
self.download_dir = download_dir
|
||||
|
||||
self.enabled = enabled
|
||||
self.gpgcheck = gpgcheck
|
||||
|
||||
self.base = dnf.Base()
|
||||
self.base.conf.debuglevel = 0
|
||||
self.base.conf.errorlevel = 0
|
||||
|
||||
self.download_dir = Path(download_dir)
|
||||
self.download_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._configure_repo()
|
||||
|
||||
def generate_repo_url(self):
|
||||
repo_url = urljoin(self.base_url, f"{self.contentdir}/{self.releasever}/BaseOS/{self.basearch}/os/")
|
||||
return repo_url
|
||||
|
||||
def print_repo_url(self):
|
||||
repo_url = self.generate_repo_url()
|
||||
print(f"Repository URL: {repo_url}")
|
||||
|
||||
def _configure_repo(self):
|
||||
repo = dnf.repo.Repo(self.repo_name, self.base.conf)
|
||||
repo_url = self.generate_repo_url()
|
||||
repo.baseurl = [repo_url]
|
||||
repo.enabled = self.enabled
|
||||
repo.gpgcheck = self.gpgcheck
|
||||
self.base.repos.add(repo)
|
||||
self.base.fill_sack(load_system_repo=False)
|
||||
|
||||
def print_repo(self):
|
||||
repo = self.base.repos
|
||||
print(repo)
|
||||
|
||||
def list_packages(self) -> List[str]:
|
||||
package_list = []
|
||||
for pkg in self.base.sack.query().available():
|
||||
package_list.append(pkg.name)
|
||||
return package_list
|
||||
|
||||
def list_packages_raw(self):
|
||||
for pkg in self.base.sack.query().available():
|
||||
print(f"Package: {pkg.name}")
|
||||
for attr in dir(pkg):
|
||||
if not attr.startswith("_"):
|
||||
print(f" {attr}: {getattr(pkg, attr)}")
|
||||
print("\n")
|
||||
break
|
||||
|
||||
def list_package_object(self, package_name: str) -> List[Package]:
|
||||
pkgs = self.base.sack.query().filter(name=package_name)
|
||||
|
||||
if not pkgs:
|
||||
raise ValueError(f"Package {package_name} not found in the repository.")
|
||||
|
||||
return self.generate_package_list(pkgs)
|
||||
|
||||
def list_packages_object(self) -> List[Package]:
|
||||
pkgs = self.base.sack.query().available()
|
||||
|
||||
if not pkgs:
|
||||
raise ValueError(f"No packages found in the repository.")
|
||||
|
||||
return self.generate_package_list(pkgs)
|
||||
|
||||
def generate_package_list(self, pkgs) -> List[Package]:
|
||||
package_list = []
|
||||
for pkg in pkgs:
|
||||
repo = pkg.repo
|
||||
package_info = Package(
|
||||
name=getattr(pkg, "name", None),
|
||||
repo_type=self.repo_type,
|
||||
chksum=getattr(pkg, "chksum", None),
|
||||
location=getattr(pkg, "location", None),
|
||||
baseurl=repo.baseurl[0] if repo and repo.baseurl else None,
|
||||
license=getattr(pkg, "license", None)
|
||||
)
|
||||
package_list.append(package_info)
|
||||
return package_list
|
||||
|
||||
def download_file(self, download_url: str, download_path: Path):
|
||||
if download_path.exists():
|
||||
return
|
||||
|
||||
response = requests.get(download_url)
|
||||
response.raise_for_status()
|
||||
with open(download_path, "wb") as f:
|
||||
f.write(response.content)
|
||||
|
||||
def download_package(self, package_name: str, man_maker: ManMaker) -> Package:
|
||||
packages = self.list_package_object(package_name)
|
||||
|
||||
for package in packages:
|
||||
download_url = urljoin(package.baseurl, package.location)
|
||||
download_path = self.download_dir / f"{package.filename}"
|
||||
package.download_path = download_path
|
||||
self.download_file(download_url, download_path)
|
||||
|
||||
# Process the package immediately after downloading
|
||||
man_maker.extract_man_files(package)
|
||||
|
||||
return package
|
||||
|
||||
def download_all_packages(self, man_maker: ManMaker) -> List[Package]:
|
||||
packages = self.list_packages_object()
|
||||
downloaded_files = []
|
||||
|
||||
for package in packages:
|
||||
try:
|
||||
downloaded_files.append(self.download_package(package.name, man_maker))
|
||||
except Exception as e:
|
||||
print(f"Error downloading package: {e}")
|
||||
|
||||
return downloaded_files
|
||||
|
||||
def delete_package(self, rpm_path: Path):
|
||||
rpm_path.unlink()
|
||||
|
||||
def save_json(sitemap: Dict[str, Dict[str, Any]], json_file_location: Path):
|
||||
sorted_sitemap = {k: sitemap[k] for k in sorted(sitemap)}
|
||||
|
||||
# Save the JSON file
|
||||
with open(json_file_location, "w") as f:
|
||||
json.dump(sorted_sitemap, f)
|
||||
|
||||
# Save the gzipped JSON file
|
||||
gzipped_file_location = f"{json_file_location}.gz"
|
||||
with gzip.open(gzipped_file_location, "wt") as gz:
|
||||
json.dump(sorted_sitemap, gz)
|
||||
|
||||
def html_folder_export(man_file: ManFile, package: Package, html_base_dir: str) -> Path:
|
||||
return Path(f"{html_base_dir}/{package.name}/{man_file.context}")
|
||||
|
||||
def setup_jinja():
|
||||
env = Environment(loader=FileSystemLoader('./templates'))
|
||||
return env
|
||||
|
||||
def generate_index(releasever: str, html_dir: str):
|
||||
env = setup_jinja()
|
||||
template = env.get_template("index.j2")
|
||||
|
||||
data = {
|
||||
'title': f'Rocky Linux {releasever} - Man Page Search',
|
||||
'header_title': f'Rocky Linux {releasever} - Man Page Search'
|
||||
}
|
||||
|
||||
render = template.render(data)
|
||||
with open(f"{html_dir}/index.html", "w") as f:
|
||||
f.write(render)
|
||||
|
||||
def main():
|
||||
BASE_URL = "http://dl.rockylinux.org/"
|
||||
CONTENTDIR = "pub/rocky"
|
||||
RELEASEVERS = ["8.10", "9.5"]
|
||||
BASEARCH = "aarch64"
|
||||
REPO_TYPES = ["BaseOS", "AppStream"]
|
||||
DOWNLOAD_BASE_DIR = "./tmp/repo"
|
||||
MAN_BASE_DIR = "./tmp/export"
|
||||
HTML_BASE_DIR = "./html"
|
||||
|
||||
for RELEASEVER in RELEASEVERS:
|
||||
for REPO_TYPE in REPO_TYPES:
|
||||
DOWNLOAD_DIR = f"{DOWNLOAD_BASE_DIR}/{RELEASEVER}/{REPO_TYPE}"
|
||||
MAN_DIR = f"{MAN_BASE_DIR}/{RELEASEVER}/{REPO_TYPE}"
|
||||
HTML_DIR = f"{HTML_BASE_DIR}/{RELEASEVER}"
|
||||
|
||||
repo_manager = RepoManager(
|
||||
base_url = BASE_URL,
|
||||
contentdir = CONTENTDIR,
|
||||
releasever = RELEASEVER,
|
||||
basearch = BASEARCH,
|
||||
repo_type = REPO_TYPE,
|
||||
download_dir = DOWNLOAD_DIR,
|
||||
enabled = True,
|
||||
gpgcheck = False
|
||||
)
|
||||
|
||||
man_maker = ManMaker(man_dir=MAN_DIR, html_dir=HTML_DIR)
|
||||
|
||||
print("Downloading packages and generating HTML...")
|
||||
repo_manager.download_all_packages(man_maker)
|
||||
# repo_manager.download_package("at", man_maker)
|
||||
|
||||
generate_index(RELEASEVER, HTML_DIR)
|
||||
save_json(sitemap, Path(f"{HTML_DIR}/list.json"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user