| import os |
| import time |
| import logging |
| import sys |
| import gradio as gr |
| |
| from pinecone import Pinecone, ServerlessSpec |
| from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext |
| from llama_index.vector_stores.pinecone import PineconeVectorStore |
| from llama_index.readers.file import PDFReader |
| |
| |
| |
| |
| |
| |
| |
| logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
| logger = logging.getLogger(__name__) |
| |
| |
| |
| PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
| |
| if not PINECONE_API_KEY: |
| raise ValueError("Missing PINECONE_API_KEY in Hugging Face Space secrets.") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| INDEX_NAME = "quickstart" |
| DIMENSION = 1536 |
| METRIC = "euclidean" |
| CLOUD = "aws" |
| REGION = "us-east-1" |
| |
| |
| pc = Pinecone(api_key=PINECONE_API_KEY) |
| |
| def get_existing_index_names(pc_client): |
| """Safely extract index names across SDK response shapes.""" |
| raw = pc_client.list_indexes() |
| |
| |
| try: |
| return [idx["name"] for idx in raw] |
| except Exception: |
| pass |
| |
| |
| try: |
| return [idx["name"] for idx in raw.indexes] |
| except Exception: |
| pass |
| |
| |
| return [] |
| |
| def ensure_index(pc_client, index_name: str, dimension: int): |
| existing_indexes = get_existing_index_names(pc_client) |
| |
| if index_name not in existing_indexes: |
| logger.info(f"Creating Pinecone index: {index_name}") |
| pc_client.create_index( |
| name=index_name, |
| dimension=dimension, |
| metric=METRIC, |
| spec=ServerlessSpec(cloud=CLOUD, region=REGION), |
| ) |
| |
| time.sleep(5) |
| else: |
| logger.info(f"Using existing Pinecone index: {index_name}") |
| |
| return pc_client.Index(index_name) |
| |
| |
| def load_documents(): |
| documents = SimpleDirectoryReader( |
| input_dir="data", |
| required_exts=[".pdf"], |
| file_extractor={".pdf": PDFReader()} |
| ).load_data() |
| |
| if not documents: |
| raise ValueError("No PDF documents were loaded from the 'data' folder.") |
| |
| logger.info(f"Loaded {len(documents)} document chunks/items.") |
| return documents |
| |
| |
| def build_query_engine(): |
| pinecone_index = ensure_index(pc, INDEX_NAME, DIMENSION) |
| documents = load_documents() |
| |
| vector_store = PineconeVectorStore(pinecone_index=pinecone_index) |
| storage_context = StorageContext.from_defaults(vector_store=vector_store) |
| |
| index = VectorStoreIndex.from_documents( |
| documents, |
| storage_context=storage_context |
| ) |
| |
| return index.as_query_engine() |
| |
| query_engine = build_query_engine() |
| |
| |
| def query_doc(prompt): |
| if not prompt or not prompt.strip(): |
| return "Please enter a question." |
| |
| try: |
| response = query_engine.query(prompt) |
| return str(response) |
| except Exception as e: |
| logger.exception("Query failed") |
| return f"Error: {str(e)}" |
| |
| |
| demo = gr.Interface( |
| fn=query_doc, |
| inputs=gr.Textbox( |
| label="Ask a question about the document", |
| placeholder="What does the policy say about social media conduct?" |
| ), |
| outputs=gr.Textbox(label="Answer"), |
| title="DDS Enterprise Chatbot", |
| description="Ask questions based on the indexed Social Media Regulation PDF. Powered by LlamaIndex & Pinecone." |
| ) |
| |
| if __name__ == "__main__": |
| demo.launch() |
|
|