| | import requests, os, zipfile, subprocess, re, warnings |
| | warnings.filterwarnings("ignore") |
| | os.environ["CURL_CA_BUNDLE"] = "" |
| | from io import BytesIO |
| | from dotenv import load_dotenv |
| | load_dotenv() |
| | from datasets import load_dataset |
| | import fitz |
| | from fastapi import FastAPI, HTTPException |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from pydantic import BaseModel |
| |
|
| | app = FastAPI(title="Specification Retriever/Splitter API", |
| | description=open('documentation.md').read(), |
| | docs_url="/") |
| |
|
| | origins = [ |
| | "*", |
| | ] |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=origins, |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | spec_contents_3gpp = load_dataset("OrganizedProgrammers/3GPPSpecContent") |
| | spec_contents_3gpp = spec_contents_3gpp["train"].to_list() |
| |
|
| | spec_contents_etsi = load_dataset("OrganizedProgrammers/ETSISpecContent") |
| | spec_contents_etsi = spec_contents_etsi["train"].to_list() |
| |
|
| | spec_3gpp_format = re.compile(r'^\d{2}\.\d{3}(?:-\d+)?') |
| | spec_etsi_format = re.compile(r'^\d{,3} \d{,3}(?:-\d+)?') |
| |
|
| | class SpecRequest(BaseModel): |
| | spec_id: str |
| |
|
| | def is_doc_indexed(spec_id: str): |
| | return any([True if spec_id == s["doc_id"] else False for s in spec_contents_3gpp]) or any([True if spec_id == s["doc_id"] else False for s in spec_contents_etsi]) |
| |
|
| | def get_doc(spec_id: str): |
| | doc = [] |
| | for spec in spec_contents_3gpp + spec_contents_etsi: |
| | if spec["doc_id"] == spec_id: |
| | doc.append(f"{spec['section']}\n{spec['content']}") |
| | return "\n\n".join(doc) |
| |
|
| | def get_structured_doc(spec_id: str): |
| | doc = {} |
| | for spec in spec_contents_3gpp + spec_contents_etsi: |
| | if spec["doc_id"] == spec_id: |
| | doc[spec["section"]] = spec["content"] |
| | return doc |
| |
|
| | def get_pdf_data(request: SpecRequest): |
| | specification = request.spec_id |
| | if is_doc_indexed(specification): |
| | return get_doc(specification) |
| | url = requests.post( |
| | "https://organizedprogrammers-docfinder.hf.space/find/single", |
| | verify=False, |
| | headers={"Content-Type": "application/json"}, |
| | json={"doc_id": specification} |
| | ) |
| |
|
| | if url.status_code != 200: |
| | raise HTTPException(404, detail="Not found") |
| | |
| | url = url.json()['url'] |
| | response = requests.get( |
| | url, |
| | verify=False, |
| | headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} |
| |
|
| | ) |
| |
|
| | pdf = fitz.open(stream=response.content, filetype="pdf") |
| | return pdf, pdf.get_toc() |
| |
|
| | @app.post("/extract_text/full") |
| | def extract_full_spec(request: SpecRequest): |
| | specification = request.spec_id |
| | if is_doc_indexed(specification): |
| | return get_doc(specification) |
| | print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex") |
| | total_file = [] |
| | if spec_3gpp_format.match(specification): |
| | url = requests.post( |
| | "https://organizedprogrammers-docfinder.hf.space/find/single", |
| | verify=False, |
| | headers={"Content-Type": "application/json"}, |
| | json={"doc_id": specification} |
| | ) |
| |
|
| | if url.status_code != 200: |
| | raise HTTPException(404, detail="Not found") |
| | |
| | url = url.json()['url'] |
| | response = requests.get( |
| | url, |
| | verify=False, |
| | headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} |
| |
|
| | ) |
| |
|
| | zip_bytes = BytesIO(response.content) |
| | current_zip_file = zipfile.ZipFile(zip_bytes) |
| | for file_info in current_zip_file.infolist(): |
| | if file_info.filename.endswith(".zip") and len(current_zip_file.namelist()) == 1: |
| | nested_zip_bytes = BytesIO(current_zip_file.read(file_info.filename)) |
| | current_zip_file = zipfile.ZipFile(nested_zip_bytes) |
| | break |
| | |
| | for file_info in current_zip_file.infolist(): |
| | filename = file_info.filename |
| | if (filename.endswith('.doc') or filename.endswith('.docx')) and ("cover" not in filename.lower() and "annex" not in filename.lower()): |
| | doc_bytes = current_zip_file.read(filename) |
| | ext = filename.split(".")[-1] |
| | input_path = f"/tmp/{specification}.{ext}" |
| | output_path = f"/tmp/{specification}.txt" |
| | with open(input_path, "wb") as f: |
| | f.write(doc_bytes) |
| | |
| | subprocess.run([ |
| | "libreoffice", |
| | "--headless", |
| | "--convert-to", "txt", |
| | "--outdir", "/tmp", |
| | input_path |
| | ], check=True) |
| | |
| | with open(output_path, "r") as f: |
| | txt_data = [line.strip() for line in f if line.strip()] |
| | |
| | os.remove(input_path) |
| | os.remove(output_path) |
| | total_file.extend(txt_data) |
| | if total_file == []: |
| | raise HTTPException(status_code=404, detail="Not found !") |
| | else: |
| | return total_file |
| | elif spec_etsi_format.match(specification): |
| | print("\n[INFO] Tentative de récupération du texte", flush=True) |
| | pdf, doc_toc = get_pdf_data(request) |
| | text = [] |
| | first = 0 |
| | for level, title, page in doc_toc: |
| | if title[0].isnumeric(): |
| | first = page - 1 |
| | break |
| | for page in pdf[first:]: |
| | text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) |
| | text = "\n".join(text) |
| |
|
| | if not text or not doc_toc: |
| | print("\n[ERREUR] Pas de texte/table of contents trouvé !") |
| | return {} |
| | print(f"\n[INFO] Texte {request.spec_id} récupéré", flush=True) |
| | return text |
| | else: |
| | raise HTTPException(status_code=400, detail="Document ID format invalid !") |
| | |
| | @app.post("/extract_text/structured") |
| | def extract_full_spec_by_chapters(request: SpecRequest): |
| | specification = request.spec_id |
| | if is_doc_indexed(request.spec_id): |
| | return get_structured_doc(request.spec_id) |
| | print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex") |
| | total_file = [] |
| | text = extract_full_spec(request) |
| | if spec_3gpp_format.match(specification): |
| | chapters = [] |
| | chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+$") |
| |
|
| | for i, line in enumerate(text): |
| | if chapter_regex.fullmatch(line): |
| | chapters.append((i, line)) |
| |
|
| | document = {} |
| | for i in range(len(chapters)): |
| | start_index, chapter_title = chapters[i] |
| | end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text) |
| | content_lines = text[start_index + 1 : end_index] |
| | document[chapter_title.replace('\t', " ")] = "\n".join(content_lines) |
| | return document |
| | elif spec_etsi_format.match(specification): |
| | def extract_sections(text, titles): |
| | sections = {} |
| | |
| | sorted_titles = sorted(titles, key=lambda t: text.find(t)) |
| | for i, title in enumerate(sorted_titles): |
| | start = text.find(title) |
| | if i + 1 < len(sorted_titles): |
| | end = text.find(sorted_titles[i + 1]) |
| | sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) |
| | else: |
| | sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) |
| | return sections |
| | pdf, toc = get_pdf_data(request) |
| | if not text or not toc: |
| | print("\n[ERREUR] Pas de texte/table of contents trouvé !") |
| | return {} |
| | print(f"\n[INFO] Texte {request.spec_id} récupéré", flush=True) |
| | titles = [] |
| | for level, title, page in toc: |
| | if title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: |
| | titles.append('\n'.join(title.strip().split(" ", 1))) |
| | |
| | return extract_sections(text, titles) |
| | else: |
| | raise HTTPException(status_code=400, detail="Document ID format invalid !") |