| | from neo4j import GraphDatabase, Driver, exceptions |
| | from .config import settings |
| | import logging |
| | from typing import List, Dict, Any, Optional |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class Neo4jClient: |
| | _driver: Optional[Driver] = None |
| |
|
| | def _get_driver(self) -> Driver: |
| | """Initializes and returns the Neo4j driver instance.""" |
| | if self._driver is None or self._driver.close(): |
| | logger.info(f"Initializing Neo4j Driver for URI: {settings.neo4j_uri}") |
| | try: |
| | self._driver = GraphDatabase.driver( |
| | settings.neo4j_uri, |
| | auth=(settings.neo4j_username, settings.neo4j_password.get_secret_value()) |
| | ) |
| | |
| | self._driver.verify_connectivity() |
| | logger.info("Neo4j Driver initialized and connection verified.") |
| | except exceptions.AuthError as e: |
| | logger.error(f"Neo4j Authentication Error: {e}", exc_info=True) |
| | raise ConnectionError("Neo4j Authentication Failed. Check credentials.") from e |
| | except exceptions.ServiceUnavailable as e: |
| | logger.error(f"Neo4j Service Unavailable: {e}", exc_info=True) |
| | raise ConnectionError(f"Could not connect to Neo4j at {settings.neo4j_uri}. Ensure DB is running and reachable.") from e |
| | except Exception as e: |
| | logger.error(f"Unexpected error initializing Neo4j Driver: {e}", exc_info=True) |
| | raise ConnectionError("An unexpected error occurred connecting to Neo4j.") from e |
| | return self._driver |
| |
|
| | def close(self): |
| | """Closes the Neo4j driver connection.""" |
| | if self._driver and not self._driver.close(): |
| | logger.info("Closing Neo4j Driver.") |
| | self._driver.close() |
| | self._driver = None |
| |
|
| | def query(self, cypher_query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: |
| | """Executes a Cypher query and returns the results.""" |
| | driver = self._get_driver() |
| | logger.debug(f"Executing Cypher: {cypher_query} with params: {params}") |
| | try: |
| | |
| | with driver.session() as session: |
| | result = session.run(cypher_query, params or {}) |
| | |
| | data = [record.data() for record in result] |
| | logger.debug(f"Query returned {len(data)} records.") |
| | return data |
| | except (exceptions.ServiceUnavailable, exceptions.SessionExpired) as e: |
| | logger.error(f"Neo4j connection error during query: {e}", exc_info=True) |
| | |
| | self.close() |
| | raise ConnectionError("Neo4j connection error during query execution.") from e |
| | except exceptions.CypherSyntaxError as e: |
| | logger.error(f"Neo4j Cypher Syntax Error: {e}\nQuery: {cypher_query}", exc_info=True) |
| | raise ValueError("Invalid Cypher query syntax.") from e |
| | except Exception as e: |
| | logger.error(f"Unexpected error during Neo4j query: {e}", exc_info=True) |
| | raise RuntimeError("An unexpected error occurred during the Neo4j query.") from e |
| |
|
| | def get_schema(self, force_refresh: bool = False) -> Dict[str, Any]: |
| | """ Fetches the graph schema. Placeholder - Langchain community graph has better schema fetching.""" |
| | |
| | |
| | logger.warning("Neo4jClient.get_schema() is a placeholder. Implement if schema needed.") |
| | return {} |
| |
|
| | def get_concepts(self) -> List[str]: |
| | """Fetches all Concept names from the graph.""" |
| | cypher = "MATCH (c:Concept) RETURN c.name AS name ORDER BY name" |
| | results = self.query(cypher) |
| | return [record['name'] for record in results if 'name' in record] |
| |
|
| | def get_concept_description(self, concept_name: str) -> Optional[str]: |
| | """Fetches the description for a specific concept.""" |
| | cypher = "MATCH (c:Concept {name: $name}) RETURN c.description AS description LIMIT 1" |
| | params = {"name": concept_name} |
| | results = self.query(cypher, params) |
| | return results[0]['description'] if results and 'description' in results[0] else None |
| |
|
| |
|
| | |
| | neo4j_client = Neo4jClient() |
| |
|
| | |
| | import atexit |
| | atexit.register(neo4j_client.close) |