Learn how to integrate AgentSource MCP using the official Python MCP SDK for direct, programmatic access to Explorium's business and contact data.
Overview
This guide demonstrates how to use AgentSource MCP with the Python MCP SDK. The SDK provides a native Python interface for connecting to MCP servers, making it ideal for building custom applications, data pipelines, and automation tools.
Prerequisites
- Python 3.8 or higher
- AgentSource API key
- Basic understanding of asynchronous Python programming
Installation
Install the MCP Python SDK:
pip install mcp
Note: You may need to restart your Python kernel after installation:
# For Jupyter/Databricks
dbutils.library.restartPython()
Quick Start
1. Import Required Libraries
import os
from mcp import ClientSession
from mcp.client.sse import sse_client
import asyncio
import json
2. Basic Connection Example
async def connect_to_agentsource():
# Configure your API key
api_key = os.environ.get('EXPLORIUM_API_KEY', 'your-api-key-here')
headers = {
"Authorization": f"Bearer {api_key}",
}
# Connect to the MCP endpoint
async with sse_client(
url="https://mcp.explorium.ai/sse",
headers=headers
) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
print("✓ Successfully connected to AgentSource MCP!")
# List available tools
tools = await session.list_tools()
print(f"Available tools: {len(tools.tools)}")
for tool in tools.tools[:5]: # Show first 5 tools
print(f" • {tool.name}: {tool.description[:60]}...")
# Run the async function
asyncio.run(connect_to_agentsource())
Common Use Cases
Matching Businesses
Find business IDs for companies by name or domain:
async def match_businesses_example():
api_key = os.environ.get('EXPLORIUM_API_KEY', 'your-api-key')
headers = {"Authorization": f"Bearer {api_key}"}
async with sse_client(
url="https://mcp.explorium.ai/sse",
headers=headers
) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# Prepare businesses to match
match_input = {
"businesses_to_match": [
{"name": "Google"},
{"domain": "microsoft.com"},
{"name": "Amazon", "domain": "amazon.com"}
]
}
# Call the match_businesses tool
result = await session.call_tool("match_businesses", arguments=match_input)
# Process results
if hasattr(result, 'content') and result.content:
data = json.loads(result.content[0].text)
print(f"Matched {data['total_matches']} businesses:")
for business in data['matched_businesses']:
print(f" • {business['input']} → ID: {business['business_id']}")
asyncio.run(match_businesses_example())
Finding Prospects at Companies
Search for employees with specific roles:
async def find_prospects_example():
api_key = os.environ.get('EXPLORIUM_API_KEY', 'your-api-key')
headers = {"Authorization": f"Bearer {api_key}"}
async with sse_client(
url="https://mcp.explorium.ai/sse",
headers=headers
) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# First, match the company
match_result = await session.call_tool(
"match_businesses",
arguments={"businesses_to_match": [{"name": "Salesforce"}]}
)
# Extract business ID from match result
match_data = json.loads(match_result.content[0].text)
business_id = match_data['matched_businesses'][0]['business_id']
# Search for VPs in Engineering
prospects_input = {
"filters": {
"business_id": [business_id],
"job_level": ["vp"],
"job_department": ["Engineering"]
},
"size": 20,
"page_size": 10,
"page": 1
}
# Fetch prospects
prospects_result = await session.call_tool(
"fetch_prospects",
arguments=prospects_input
)
# Process and display results
prospects_data = json.loads(prospects_result.content[0].text)
print(f"Found {len(prospects_data['data'])} VPs in Engineering:")
for prospect in prospects_data['data'][:5]:
print(f" • {prospect['full_name']} - {prospect['job_title']}")
asyncio.run(find_prospects_example())
Enriching Contact Information
Get email addresses and phone numbers for prospects:
async def enrich_contacts_example():
api_key = os.environ.get('EXPLORIUM_API_KEY', 'your-api-key')
headers = {"Authorization": f"Bearer {api_key}"}
async with sse_client(
url="https://mcp.explorium.ai/sse",
headers=headers
) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# Assume we have prospect IDs from a previous search
prospect_ids = ["prospect_id_1", "prospect_id_2"]
# Enrich contact information
enrich_result = await session.call_tool(
"enrich_prospects_contacts_information",
arguments={"prospect_ids": prospect_ids}
)
# Display enriched data
enrich_data = json.loads(enrich_result.content[0].text)
for prospect in enrich_data['data']:
if 'emails' in prospect['data']:
print(f"Contact: {prospect['prospect_id']}")
for email in prospect['data']['emails']:
print(f" • Email ({email['type']}): {email['email']}")
asyncio.run(enrich_contacts_example())
Advanced Patterns
Using Autocomplete for Valid Filter Values
Many filters require valid values from the autocomplete tool:
async def search_with_autocomplete():
api_key = os.environ.get('AGENTSOURCE_API_KEY', 'your-api-key')
headers = {"Authorization": f"Bearer {api_key}"}
async with sse_client(
url="https://mcp.explorium.ai/sse",
headers=headers
) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# Get valid LinkedIn categories for "software"
autocomplete_result = await session.call_tool(
"autocomplete",
arguments={
"field": "linkedin_category",
"query": "software"
}
)
# Parse autocomplete results
auto_data = json.loads(autocomplete_result.content[0].text)
categories = [item['value'] for item in auto_data['data']]
# Use the validated categories in search
search_result = await session.call_tool(
"fetch_businesses",
arguments={
"filters": {
"linkedin_category": categories[:2], # Use first 2 categories
"company_size": ["51-200", "201-500"],
"country_code": ["US"]
},
"size": 10
}
)
# Process results
search_data = json.loads(search_result.content[0].text)
print(f"Found {search_data['total_results']} companies")
asyncio.run(search_with_autocomplete())
Comprehensive Company Analysis
Combine multiple enrichment tools for detailed insights:
async def comprehensive_company_analysis():
api_key = os.environ.get('AGENTSOURCE_API_KEY', 'your-api-key')
headers = {"Authorization": f"Bearer {api_key}"}
async with sse_client(
url="https://mcp.explorium.ai/sse",
headers=headers
) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# Match a company
match_result = await session.call_tool(
"match_businesses",
arguments={"businesses_to_match": [{"domain": "tesla.com"}]}
)
match_data = json.loads(match_result.content[0].text)
business_id = match_data['matched_businesses'][0]['business_id']
# Gather multiple types of enrichment data
enrichment_tools = [
("enrich_businesses_firmographics", "Company Overview"),
("enrich_businesses_technographics", "Technology Stack"),
("enrich_businesses_workforce_trends", "Workforce Trends"),
("enrich_businesses_funding_and_acquisitions", "Funding History")
]
for tool_name, description in enrichment_tools:
print(f"\n{description}:")
result = await session.call_tool(
tool_name,
arguments={"business_ids": [business_id]}
)
# Process and display key insights
data = json.loads(result.content[0].text)
# Display relevant information based on the tool
print(f" ✓ Data retrieved successfully")
asyncio.run(comprehensive_company_analysis())
Error Handling
Implement robust error handling for production use:
async def safe_mcp_call():
api_key = os.environ.get('AGENTSOURCE_API_KEY')
if not api_key:
raise ValueError("AGENTSOURCE_API_KEY environment variable not set")
headers = {"Authorization": f"Bearer {api_key}"}
try:
async with sse_client(
url="https://mcp.explorium.ai/sse",
headers=headers
) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as session:
try:
await session.initialize()
# Your MCP operations here
result = await session.call_tool(
"match_businesses",
arguments={"businesses_to_match": [{"name": "Example Corp"}]}
)
return result
except Exception as e:
print(f"Error during tool execution: {e}")
raise
except Exception as e:
print(f"Connection error: {e}")
raise
# Run with error handling
try:
result = asyncio.run(safe_mcp_call())
except Exception as e:
print(f"Failed to execute MCP call: {e}")
Best Practices
1. Environment Variables
Always use environment variables for API keys:
import os
api_key = os.environ.get('EXPLORIUM_API_KEY')
if not api_key:
raise ValueError("Please set EXPLORIUM_API_KEY environment variable")
2. Connection Management
Use context managers to ensure proper connection cleanup:
async with sse_client(url, headers) as streams:
async with ClientSession(*streams) as session:
# Your operations here
pass # Connection automatically closed
3. Result Parsing
Always validate and parse results safely:
def parse_tool_result(result):
"""Safely parse tool results"""
if hasattr(result, 'content') and result.content:
try:
return json.loads(result.content[0].text)
except (json.JSONDecodeError, IndexError, AttributeError):
print("Failed to parse result")
return None
return None
4. Batch Operations
When processing multiple items, use appropriate page sizes:
# Good: Process in reasonable batches
page_size = 50
for page in range(1, total_pages + 1):
result = await session.call_tool(
"fetch_businesses",
arguments={
"filters": {...},
"page_size": page_size,
"page": page
}
)
Available Tools Reference
The MCP SDK provides access to all AgentSource tools:
- Business Tools:
match_businesses
,fetch_businesses
,enrich_businesses_*
- Prospect Tools:
match_prospects
,fetch_prospects
,enrich_prospects_*
- Utility Tools:
autocomplete
,fetch_businesses_events
,fetch_prospects_events
Use session.list_tools()
to discover all available tools and their parameters.
Debugging Tips
Enable verbose logging to troubleshoot issues:
import logging
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Log tool calls
logger.debug(f"Calling tool: {tool_name}")
logger.debug(f"Arguments: {json.dumps(arguments, indent=2)}")
Next Steps
- Explore the complete AgentSource MCP documentation
- Check out the OpenAI implementation guide for AI agent integration
- Learn about advanced filtering and search capabilities
- Build production-ready data pipelines with MCP
Support
If you encounter any issues or have questions:
- Check the AgentSource MCP reference
- Contact our support team at [email protected]
Note: Remember to replace'your-api-key-here'
with your actual AgentSource API key.