| |
|
|
| """ |
| Database Tools Examples for EvoAgentX |
| |
| This file demonstrates how to use various database toolkits: |
| - MongoDBToolkit: Document database operations |
| - PostgreSQLToolkit: Relational database operations |
| - FaissToolkit: Vector database for semantic search |
| |
| Each toolkit provides comprehensive database management capabilities with automatic |
| storage management and support for complex queries. |
| """ |
|
|
| import os |
| import sys |
| import json |
| from pathlib import Path |
|
|
| |
| sys.path.append(str(Path(__file__).parent.parent)) |
|
|
| from evoagentx.tools import ( |
| MongoDBToolkit, |
| PostgreSQLToolkit |
| ) |
| from evoagentx.tools.database_faiss import FaissToolkit |
|
|
|
|
| def run_mongodb_examples(): |
| """Run examples using MongoDBToolkit for document database operations.""" |
| print("\n===== MONGODB TOOLKIT EXAMPLES =====\n") |
| |
| try: |
| |
| toolkit = MongoDBToolkit( |
| name="DemoMongoDBToolkit", |
| database_name="demo_db", |
| auto_save=True |
| ) |
| |
| print("β MongoDBToolkit initialized with default storage") |
| |
| |
| execute_tool = toolkit.get_tool("mongodb_execute_query") |
| find_tool = toolkit.get_tool("mongodb_find") |
| update_tool = toolkit.get_tool("mongodb_update") |
| delete_tool = toolkit.get_tool("mongodb_delete") |
| info_tool = toolkit.get_tool("mongodb_info") |
| |
| print(f"β Available tools: {[tool.name for tool in toolkit.get_tools()]}") |
| |
| |
| print("\n1. Inserting product data...") |
| products = [ |
| {"id": "P001", "name": "Laptop", "category": "Electronics", "price": 999.99, "stock": 50, "brand": "TechCorp"}, |
| {"id": "P002", "name": "Wireless Mouse", "category": "Electronics", "price": 29.99, "stock": 100, "brand": "TechCorp"}, |
| {"id": "P003", "name": "Desk Chair", "category": "Furniture", "price": 199.99, "stock": 25, "brand": "ComfortCo"}, |
| {"id": "P004", "name": "Coffee Table", "category": "Furniture", "price": 149.99, "stock": 15, "brand": "ComfortCo"}, |
| {"id": "P005", "name": "Smartphone", "category": "Electronics", "price": 799.99, "stock": 75, "brand": "MobileTech"} |
| ] |
| |
| insert_result = execute_tool( |
| query=json.dumps(products), |
| query_type="insert", |
| collection_name="products" |
| ) |
| |
| if insert_result.get("success"): |
| print(f"β Successfully inserted {len(products)} products") |
| print(f" Documents inserted: {insert_result.get('data', {}).get('inserted_count', 'Unknown')}") |
| else: |
| print(f"β Insert failed: {insert_result.get('error', 'Unknown error')}") |
| return |
| |
| |
| print("\n2. Finding electronics products...") |
| find_result = find_tool( |
| collection_name="products", |
| filter='{"category": "Electronics"}', |
| sort='{"price": -1}', |
| limit=5 |
| ) |
| |
| if find_result.get("success"): |
| electronics = find_result.get("data", []) |
| print(f"β Found {len(electronics)} electronics products:") |
| for product in electronics: |
| name = product.get('name', 'Unknown') |
| price = product.get('price', 0) |
| brand = product.get('brand', 'Unknown') |
| print(f" - {name}: ${price} ({brand})") |
| else: |
| print(f"β Find failed: {find_result.get('error', 'Unknown error')}") |
| |
| |
| print("\n3. Updating product prices (10% discount on electronics)...") |
| update_result = update_tool( |
| collection_name="products", |
| filter='{"category": "Electronics"}', |
| update='{"$mul": {"price": 0.9}}', |
| multi=True |
| ) |
| |
| if update_result.get("success"): |
| updated_count = update_result.get("data", {}).get("modified_count", 0) |
| print(f"β Updated {updated_count} electronics products with 10% discount") |
| else: |
| print(f"β Update failed: {update_result.get('error', 'Unknown error')}") |
| |
| |
| print("\n4. Running aggregation query (average price by category)...") |
| aggregation_pipeline = [ |
| {"$group": {"_id": "$category", "avg_price": {"$avg": "$price"}, "total_stock": {"$sum": "$stock"}}}, |
| {"$sort": {"avg_price": -1}} |
| ] |
| |
| agg_result = execute_tool( |
| query=json.dumps(aggregation_pipeline), |
| query_type="aggregate", |
| collection_name="products" |
| ) |
| |
| if agg_result.get("success"): |
| categories = agg_result.get("data", []) |
| print(f"β Category analysis:") |
| for category in categories: |
| cat_name = category.get('_id', 'Unknown') |
| avg_price = category.get('avg_price', 0) |
| total_stock = category.get('total_stock', 0) |
| print(f" - {cat_name}: Avg price ${avg_price:.2f}, Total stock: {total_stock}") |
| else: |
| print(f"β Aggregation failed: {agg_result.get('error', 'Unknown error')}") |
| |
| |
| print("\n5. Testing delete functionality...") |
| delete_result = delete_tool( |
| collection_name="products", |
| filter='{"category": "Furniture"}', |
| multi=True |
| ) |
| |
| if delete_result.get("success"): |
| deleted_count = delete_result.get("data", {}).get("deleted_count", 0) |
| print(f"β Deleted {deleted_count} furniture products") |
| else: |
| print(f"β Delete failed: {delete_result.get('error', 'Unknown error')}") |
| |
| |
| print("\n6. Getting database information...") |
| info_result = info_tool() |
| |
| if info_result.get("success"): |
| info = info_result.get("data", {}) |
| print(f"β Database info:") |
| print(f" - Database: {info.get('database_name', 'Unknown')}") |
| |
| |
| collections = info.get('collections', []) |
| if isinstance(collections, (list, tuple)) and collections: |
| print(f" - Collections: {', '.join(collections)}") |
| elif collections: |
| print(f" - Collections: {collections}") |
| else: |
| print(" - Collections: None") |
| |
| print(f" - Total documents: {info.get('total_documents', 'Unknown')}") |
| else: |
| print(f"β Info failed: {info_result.get('error', 'Unknown error')}") |
| |
| print("\nβ MongoDB examples completed successfully!") |
| |
| except Exception as e: |
| print(f"β Error running MongoDB examples: {str(e)}") |
|
|
| def run_postgresql_examples(): |
| """Powerful example using PostgreSQLToolkit for database operations.""" |
| print("\n===== POSTGRESQL TOOL EXAMPLE =====\n") |
| |
| try: |
| |
| toolkit = PostgreSQLToolkit( |
| name="DemoPostgreSQLToolkit", |
| database_name="demo_db", |
| auto_save=True |
| ) |
| |
| print("β PostgreSQLToolkit initialized with default storage") |
| |
| |
| execute_tool = toolkit.get_tool("postgresql_execute") |
| find_tool = toolkit.get_tool("postgresql_find") |
| create_tool = toolkit.get_tool("postgresql_create") |
| delete_tool = toolkit.get_tool("postgresql_delete") |
| |
| |
| create_sql = """ |
| CREATE TABLE IF NOT EXISTS users ( |
| id SERIAL PRIMARY KEY, |
| name VARCHAR(100) NOT NULL, |
| email VARCHAR(100) UNIQUE NOT NULL, |
| age INTEGER, |
| department VARCHAR(50) |
| ); |
| """ |
| |
| result = create_tool(create_sql) |
| if result["success"]: |
| print("β Created users table") |
| |
| |
| insert_sql = """ |
| INSERT INTO users (name, email, age, department) VALUES |
| ('Alice Johnson', 'alice@example.com', 28, 'Engineering'), |
| ('Bob Smith', 'bob@example.com', 32, 'Marketing'), |
| ('Carol Davis', 'carol@example.com', 25, 'Engineering') |
| """ |
| |
| result = execute_tool(insert_sql) |
| if result["success"]: |
| print("β Inserted users") |
| |
| |
| find_result = find_tool( |
| "users", |
| where="department = 'Engineering'", |
| columns="name, age", |
| sort="age ASC" |
| ) |
| |
| if find_result["success"]: |
| engineers = find_result["data"]["data"] |
| print(f"β Found {len(engineers)} engineers:") |
| for user in engineers: |
| |
| name = user.get('name', 'Unknown') |
| age = user.get('age', 'N/A') |
| print(f" - {name} (age: {age})") |
| |
| |
| print("\nποΈ Testing delete functionality...") |
| delete_result = delete_tool( |
| "users", |
| "department = 'Marketing'" |
| ) |
| |
| if delete_result["success"]: |
| deleted_count = delete_result["data"].get("rowcount", 0) |
| print(f"β Deleted {deleted_count} marketing users") |
| |
| |
| verify_result = find_tool("users") |
| if verify_result["success"]: |
| remaining = verify_result["data"] |
| print(f"β Remaining users after deletion: {len(remaining)}") |
| |
| print("\nβ PostgreSQLToolkit test completed with default storage") |
| |
| except Exception as e: |
| print(f"Error: {str(e)}") |
|
|
| def run_faiss_examples(): |
| """Run examples using FaissToolkit for vector database operations.""" |
| print("\n===== FAISS TOOLKIT EXAMPLES =====\n") |
| |
| |
| if not os.getenv("OPENAI_API_KEY"): |
| print("β OPENAI_API_KEY not found in environment variables") |
| print("To test FAISS examples, set your OpenAI API key:") |
| print("export OPENAI_API_KEY='your-openai-api-key-here'") |
| print("Get your key from: https://platform.openai.com/api-keys") |
| return |
| |
| try: |
| |
| toolkit = FaissToolkit( |
| name="DemoFaissToolkit", |
| default_corpus_id="demo_corpus" |
| ) |
| |
| print("β FaissToolkit initialized with default storage") |
| print(f"β Using OpenAI API key: {os.getenv('OPENAI_API_KEY')[:8]}...") |
| |
| |
| insert_tool = toolkit.get_tool("faiss_insert") |
| query_tool = toolkit.get_tool("faiss_query") |
| list_tool = toolkit.get_tool("faiss_list") |
| stats_tool = toolkit.get_tool("faiss_stats") |
| delete_tool = toolkit.get_tool("faiss_delete") |
| |
| print(f"β Available tools: {[tool.name for tool in toolkit.get_tools()]}") |
| |
| |
| print("\n1. Inserting AI knowledge documents...") |
| ai_documents = [ |
| "Artificial Intelligence (AI) is a branch of computer science that aims to create intelligent machines capable of performing tasks that typically require human intelligence.", |
| "Machine learning is a subset of artificial intelligence that enables computers to learn and improve from experience without being explicitly programmed.", |
| "Deep learning is a specialized form of machine learning that uses neural networks with multiple layers to analyze and learn from data.", |
| "Natural Language Processing (NLP) helps computers understand, interpret, and generate human language in a useful way.", |
| "Computer vision enables machines to interpret and understand visual information from the world, including images and videos.", |
| "Reinforcement learning is a type of machine learning where an agent learns to make decisions by taking actions in an environment to achieve maximum cumulative reward.", |
| "Neural networks are computing systems inspired by biological neural networks, consisting of interconnected nodes that process information.", |
| "Transfer learning allows a model trained on one task to be adapted for a related task, improving efficiency and performance.", |
| "Generative AI models can create new content, such as text, images, music, and code, based on patterns learned from training data.", |
| "Explainable AI focuses on making AI systems' decisions and processes transparent and understandable to humans." |
| ] |
| |
| insert_result = insert_tool( |
| documents=ai_documents, |
| metadata={ |
| "source": "ai_knowledge_base", |
| "topic": "artificial_intelligence", |
| "language": "en", |
| "difficulty": "intermediate" |
| } |
| ) |
| |
| if insert_result.get("success"): |
| docs_inserted = insert_result.get("data", {}).get("documents_inserted", 0) |
| chunks_created = insert_result.get("data", {}).get("chunks_created", 0) |
| print(f"β Successfully inserted {docs_inserted} documents") |
| print(f" Chunks created: {chunks_created}") |
| else: |
| print(f"β Insert failed: {insert_result.get('error', 'Unknown error')}") |
| return |
| |
| |
| print("\n2. Performing semantic search queries...") |
| |
| search_queries = [ |
| "How do machines learn?", |
| "What is neural network?", |
| "Explain deep learning", |
| "How does AI generate content?", |
| "What is computer vision?" |
| ] |
| |
| for i, query in enumerate(search_queries, 1): |
| print(f"\n Query {i}: '{query}'") |
| search_result = query_tool( |
| query=query, |
| top_k=3, |
| similarity_threshold=0.1 |
| ) |
| |
| if search_result.get("success"): |
| results = search_result.get("data", {}).get("results", []) |
| print(f" β Found {len(results)} relevant results:") |
| for j, result in enumerate(results, 1): |
| score = result.get('score', 0) |
| content = result.get('content', '')[:80] |
| print(f" {j}. Score: {score:.3f} - {content}...") |
| else: |
| print(f" β Search failed: {search_result.get('error', 'Unknown error')}") |
| |
| |
| print("\n3. Searching with metadata filters...") |
| filtered_search_result = query_tool( |
| query="machine learning algorithms", |
| top_k=5, |
| similarity_threshold=0.1, |
| metadata_filters={"topic": "artificial_intelligence", "difficulty": "intermediate"} |
| ) |
| |
| if filtered_search_result.get("success"): |
| results = filtered_search_result.get("data", {}).get("results", []) |
| print(f"β Found {len(results)} results with metadata filters:") |
| for i, result in enumerate(results, 1): |
| score = result.get('score', 0) |
| content = result.get('content', '')[:100] |
| metadata = result.get('metadata', {}) |
| print(f" {i}. Score: {score:.3f} - {content}...") |
| print(f" Metadata: {metadata}") |
| else: |
| print(f"β Filtered search failed: {filtered_search_result.get('error', 'Unknown error')}") |
| |
| |
| print("\n4. Getting database statistics...") |
| stats_result = stats_tool() |
| |
| if stats_result.get("success"): |
| stats = stats_result.get("data", {}) |
| print(f"β Database statistics:") |
| print(f" - Total corpora: {stats.get('total_corpora', 'Unknown')}") |
| print(f" - Corpora: {', '.join(stats.get('corpora', []))}") |
| print(f" - Embedding model: {stats.get('embedding_model', 'Unknown')}") |
| print(f" - Vector store type: {stats.get('vector_store_type', 'Unknown')}") |
| else: |
| print(f"β Stats failed: {stats_result.get('error', 'Unknown error')}") |
| |
| |
| print("\n5. Listing all corpora...") |
| list_result = list_tool() |
| |
| if list_result.get("success"): |
| corpora = list_result.get("data", {}).get("corpora", []) |
| print(f"β Found {len(corpora)} corpora:") |
| for corpus in corpora: |
| corpus_id = corpus.get('corpus_id', 'Unknown') |
| doc_count = corpus.get('document_count', 'Unknown') |
| chunk_count = corpus.get('chunk_count', 'Unknown') |
| print(f" - {corpus_id}: {doc_count} documents, {chunk_count} chunks") |
| else: |
| print(f"β List failed: {list_result.get('error', 'Unknown error')}") |
| |
| |
| print("\n6. Testing delete functionality...") |
| delete_result = delete_tool( |
| metadata_filters={"source": "ai_knowledge_base"} |
| ) |
| |
| if delete_result.get("success"): |
| deleted_count = delete_result.get("data", {}).get("deleted_count", 0) |
| print(f"β Deleted {deleted_count} documents with metadata filter") |
| |
| |
| verify_result = query_tool( |
| query="artificial intelligence", |
| top_k=5, |
| similarity_threshold=0.1 |
| ) |
| |
| if verify_result.get("success"): |
| remaining = verify_result.get('data', {}).get('total_results', 0) |
| print(f"β Remaining documents after deletion: {remaining}") |
| else: |
| print(f"β Delete failed: {delete_result.get('error', 'Unknown error')}") |
| |
| print("\nβ FAISS examples completed successfully!") |
| |
| except Exception as e: |
| print(f"β Error running FAISS examples: {str(e)}") |
| if "DocumentMetadata" in str(e): |
| print("Note: This appears to be a dependency issue with the RAG engine components") |
| print("The FAISS toolkit may need additional setup or dependencies") |
|
|
|
|
| def main(): |
| """Main function to run all database tool examples.""" |
| print("===== DATABASE TOOLS EXAMPLES =====\n") |
| |
| |
| run_mongodb_examples() |
| |
| |
| run_postgresql_examples() |
| |
| |
| run_faiss_examples() |
| |
| print("\n===== ALL DATABASE EXAMPLES COMPLETED =====") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|