Explore comprehensive code examples for building MCP integrations. These samples cover common integration patterns and can be used as starting points for your own custom integrations.
Copy and Customize: These samples are designed to be copied into MCP Studio and customized for your specific needs. Replace placeholder values with your actual credentials and endpoints.
A complete CRM integration example that demonstrates fetching contacts, searching records, and creating new entries.
Use Cases
Retrieve customer information during conversations
Look up account details and history
Create new leads or contacts from chat
Update CRM records based on meeting outcomes
Python Code
Required Secrets
Secret
Description
CRM_BASE_URL
Base URL for your CRM API
CRM_API_KEY
API key for authentication
Database Query
A secure database query integration that allows read-only access to your PostgreSQL or MySQL database.
Use Cases
Query customer data during conversations
Generate reports on demand
Look up product information
Analyze historical data
Python Code
Required Secrets
Secret
Description
Example
DATABASE_URL
PostgreSQL connection string
postgresql://user:pass@host:5432/dbname
Security Note: This integration enforces read-only access, but you should also use a database user with read-only permissions for defense in depth.
REST API Wrapper
A flexible REST API wrapper that can connect to any HTTP API.
Use Cases
Connect to internal APIs
Integrate third-party services
Access webhooks and endpoints
Retrieve data from any HTTP source
Python Code
Required Secrets
Secret
Description
Example
API_BASE_URL
Base URL for the API
https://api.example.com/v1
API_KEY
Authentication credential
sk_live_xxx...
AUTH_TYPE
Authentication type
bearer, api-key, or basic
Slack Notifications
Send messages and notifications to Slack channels.
Use Cases
Alert teams about important events
Share meeting summaries
Post workflow results
Send reminders and notifications
Python Code
Required Secrets
Secret
Description
SLACK_BOT_TOKEN
Slack Bot User OAuth Token (starts with xoxb-)
Slack App Setup: Create a Slack App at api.slack.com/apps, add the chat:write, channels:read, and channels:history scopes, and install to your workspace.
Web Scraping
Extract data from web pages (use responsibly and respect robots.txt).
Use Cases
Research competitor websites
Extract product information
Gather public data
Monitor web content
Python Code
Required Secrets
This integration does not require secrets, but you may want to add:
Secret
Description
Optional
USER_AGENT
Custom User-Agent string
Yes
PROXY_URL
Proxy server for requests
Yes
Responsible Use: Always respect website terms of service and robots.txt. Do not scrape sites that prohibit it, and implement appropriate rate limiting.
from mcp.server import Server
from mcp.types import TextContent
import httpx
import os
import json
# Create the server
app = Server("crm-integration")
# Configuration from environment
CRM_BASE_URL = os.environ.get("CRM_BASE_URL", "https://api.yourcrm.com/v1")
CRM_API_KEY = os.environ.get("CRM_API_KEY")
async def make_crm_request(method: str, endpoint: str, data: dict = None) -> dict:
"""Helper function to make authenticated CRM API requests."""
headers = {
"Authorization": f"Bearer {CRM_API_KEY}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=f"{CRM_BASE_URL}{endpoint}",
headers=headers,
json=data,
timeout=30.0
)
response.raise_for_status()
return response.json()
@app.tool()
async def get_contact(contact_id: str) -> list[TextContent]:
"""
Retrieve a contact by their unique ID.
Args:
contact_id: The unique identifier for the contact
Returns:
Contact details including name, email, phone, and company
"""
try:
contact = await make_crm_request("GET", f"/contacts/{contact_id}")
result = f"""
**Contact: {contact.get('name', 'Unknown')}**
- Email: {contact.get('email', 'N/A')}
- Phone: {contact.get('phone', 'N/A')}
- Company: {contact.get('company', 'N/A')}
- Title: {contact.get('title', 'N/A')}
- Last Contact: {contact.get('last_contact_date', 'Never')}
- Status: {contact.get('status', 'Unknown')}
"""
return [TextContent(type="text", text=result)]
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return [TextContent(type="text", text=f"Contact not found: {contact_id}")]
return [TextContent(type="text", text=f"Error retrieving contact: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def search_contacts(
query: str,
field: str = "name",
limit: int = 10
) -> list[TextContent]:
"""
Search for contacts in the CRM.
Args:
query: The search term
field: Field to search (name, email, company). Default: name
limit: Maximum number of results. Default: 10
Returns:
List of matching contacts with basic details
"""
try:
params = {"q": query, "field": field, "limit": min(limit, 50)}
results = await make_crm_request("GET", f"/contacts/search?{httpx.QueryParams(params)}")
if not results.get("contacts"):
return [TextContent(type="text", text=f"No contacts found matching '{query}'")]
output = f"**Found {len(results['contacts'])} contacts:**\n\n"
for contact in results["contacts"]:
output += f"- **{contact['name']}** ({contact.get('email', 'No email')})\n"
output += f" Company: {contact.get('company', 'N/A')} | ID: {contact['id']}\n\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(type="text", text=f"Search error: {str(e)}")]
@app.tool()
async def create_contact(
name: str,
email: str,
company: str = None,
phone: str = None,
title: str = None,
notes: str = None
) -> list[TextContent]:
"""
Create a new contact in the CRM.
Args:
name: Contact's full name (required)
email: Contact's email address (required)
company: Company name
phone: Phone number
title: Job title
notes: Additional notes about the contact
Returns:
Confirmation with the new contact's ID
"""
try:
contact_data = {
"name": name,
"email": email,
"company": company,
"phone": phone,
"title": title,
"notes": notes,
"source": "DarcyIQ MCP Integration"
}
# Remove None values
contact_data = {k: v for k, v in contact_data.items() if v is not None}
result = await make_crm_request("POST", "/contacts", contact_data)
return [TextContent(
type="text",
text=f"β Contact created successfully!\n\nID: {result['id']}\nName: {name}\nEmail: {email}"
)]
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
return [TextContent(type="text", text=f"A contact with email {email} already exists")]
return [TextContent(type="text", text=f"Error creating contact: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def get_recent_activities(
contact_id: str,
limit: int = 5
) -> list[TextContent]:
"""
Get recent activities for a contact.
Args:
contact_id: The contact's unique ID
limit: Number of activities to retrieve. Default: 5
Returns:
List of recent activities (calls, emails, meetings)
"""
try:
activities = await make_crm_request(
"GET",
f"/contacts/{contact_id}/activities?limit={limit}"
)
if not activities.get("items"):
return [TextContent(type="text", text="No recent activities found for this contact")]
output = f"**Recent Activities ({len(activities['items'])} items):**\n\n"
for activity in activities["items"]:
output += f"- **{activity['type']}** on {activity['date']}\n"
output += f" {activity.get('description', 'No description')}\n\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
if __name__ == "__main__":
app.run()
from mcp.server import Server
from mcp.types import TextContent
import asyncpg
import os
import json
app = Server("database-query")
DATABASE_URL = os.environ.get("DATABASE_URL")
MAX_ROWS = 100 # Safety limit
async def get_connection():
"""Create a database connection."""
return await asyncpg.connect(DATABASE_URL)
def is_safe_query(sql: str) -> tuple[bool, str]:
"""Validate that the query is read-only."""
sql_upper = sql.strip().upper()
# Only allow SELECT statements
if not sql_upper.startswith("SELECT"):
return False, "Only SELECT queries are allowed"
# Block dangerous keywords
dangerous = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE", "TRUNCATE", "EXEC"]
for keyword in dangerous:
if keyword in sql_upper:
return False, f"Query contains forbidden keyword: {keyword}"
return True, "OK"
@app.tool()
async def query_database(sql: str) -> list[TextContent]:
"""
Execute a read-only SQL query against the database.
Args:
sql: A SELECT query to execute. Only read operations are allowed.
Returns:
Query results as a formatted table
"""
# Validate query safety
is_safe, message = is_safe_query(sql)
if not is_safe:
return [TextContent(type="text", text=f"Query rejected: {message}")]
try:
conn = await get_connection()
try:
# Add LIMIT if not present
if "LIMIT" not in sql.upper():
sql = f"{sql.rstrip(';')} LIMIT {MAX_ROWS}"
rows = await conn.fetch(sql)
if not rows:
return [TextContent(type="text", text="Query returned no results")]
# Format as table
columns = list(rows[0].keys())
output = "| " + " | ".join(columns) + " |\n"
output += "| " + " | ".join(["---"] * len(columns)) + " |\n"
for row in rows:
values = [str(row[col]) if row[col] is not None else "NULL" for col in columns]
output += "| " + " | ".join(values) + " |\n"
output += f"\n*{len(rows)} row(s) returned*"
return [TextContent(type="text", text=output)]
finally:
await conn.close()
except asyncpg.PostgresError as e:
return [TextContent(type="text", text=f"Database error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def list_tables() -> list[TextContent]:
"""
List all available tables in the database.
Returns:
List of table names with row counts
"""
try:
conn = await get_connection()
try:
query = """
SELECT table_name,
pg_stat_user_tables.n_live_tup as row_count
FROM information_schema.tables
LEFT JOIN pg_stat_user_tables
ON table_name = relname
WHERE table_schema = 'public'
ORDER BY table_name
"""
rows = await conn.fetch(query)
output = "**Available Tables:**\n\n"
output += "| Table | Approximate Rows |\n"
output += "| --- | --- |\n"
for row in rows:
output += f"| {row['table_name']} | {row['row_count'] or 'Unknown'} |\n"
return [TextContent(type="text", text=output)]
finally:
await conn.close()
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def describe_table(table_name: str) -> list[TextContent]:
"""
Get the schema/structure of a specific table.
Args:
table_name: Name of the table to describe
Returns:
Column names, types, and constraints
"""
# Sanitize table name
if not table_name.replace("_", "").isalnum():
return [TextContent(type="text", text="Invalid table name")]
try:
conn = await get_connection()
try:
query = """
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = $1 AND table_schema = 'public'
ORDER BY ordinal_position
"""
rows = await conn.fetch(query, table_name)
if not rows:
return [TextContent(type="text", text=f"Table '{table_name}' not found")]
output = f"**Table: {table_name}**\n\n"
output += "| Column | Type | Nullable | Default |\n"
output += "| --- | --- | --- | --- |\n"
for row in rows:
output += f"| {row['column_name']} | {row['data_type']} | "
output += f"{row['is_nullable']} | {row['column_default'] or 'None'} |\n"
return [TextContent(type="text", text=output)]
finally:
await conn.close()
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
if __name__ == "__main__":
app.run()
from mcp.server import Server
from mcp.types import TextContent
import httpx
import os
import json
from typing import Optional
app = Server("rest-api-wrapper")
API_BASE_URL = os.environ.get("API_BASE_URL")
API_KEY = os.environ.get("API_KEY")
AUTH_TYPE = os.environ.get("AUTH_TYPE", "bearer") # bearer, api-key, basic
def get_auth_headers() -> dict:
"""Build authentication headers based on configured auth type."""
if AUTH_TYPE == "bearer":
return {"Authorization": f"Bearer {API_KEY}"}
elif AUTH_TYPE == "api-key":
return {"X-API-Key": API_KEY}
elif AUTH_TYPE == "basic":
import base64
# API_KEY should be in format "username:password"
encoded = base64.b64encode(API_KEY.encode()).decode()
return {"Authorization": f"Basic {encoded}"}
return {}
@app.tool()
async def api_get(
endpoint: str,
params: Optional[str] = None
) -> list[TextContent]:
"""
Make a GET request to the API.
Args:
endpoint: API endpoint path (e.g., /users/123)
params: Optional query parameters as JSON string (e.g., {"page": 1})
Returns:
API response data
"""
try:
headers = get_auth_headers()
headers["Content-Type"] = "application/json"
query_params = json.loads(params) if params else None
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE_URL}{endpoint}",
headers=headers,
params=query_params,
timeout=30.0
)
# Format response
status = f"Status: {response.status_code}"
try:
data = response.json()
body = json.dumps(data, indent=2)
except:
body = response.text
return [TextContent(type="text", text=f"{status}\n\n```json\n{body}\n```")]
except httpx.TimeoutException:
return [TextContent(type="text", text="Error: Request timed out")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def api_post(
endpoint: str,
body: str
) -> list[TextContent]:
"""
Make a POST request to the API.
Args:
endpoint: API endpoint path
body: Request body as JSON string
Returns:
API response data
"""
try:
headers = get_auth_headers()
headers["Content-Type"] = "application/json"
data = json.loads(body)
async with httpx.AsyncClient() as client:
response = await client.post(
f"{API_BASE_URL}{endpoint}",
headers=headers,
json=data,
timeout=30.0
)
status = f"Status: {response.status_code}"
try:
result = response.json()
result_body = json.dumps(result, indent=2)
except:
result_body = response.text
return [TextContent(type="text", text=f"{status}\n\n```json\n{result_body}\n```")]
except json.JSONDecodeError:
return [TextContent(type="text", text="Error: Invalid JSON in request body")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def api_put(
endpoint: str,
body: str
) -> list[TextContent]:
"""
Make a PUT request to the API.
Args:
endpoint: API endpoint path
body: Request body as JSON string
Returns:
API response data
"""
try:
headers = get_auth_headers()
headers["Content-Type"] = "application/json"
data = json.loads(body)
async with httpx.AsyncClient() as client:
response = await client.put(
f"{API_BASE_URL}{endpoint}",
headers=headers,
json=data,
timeout=30.0
)
status = f"Status: {response.status_code}"
try:
result = response.json()
result_body = json.dumps(result, indent=2)
except:
result_body = response.text
return [TextContent(type="text", text=f"{status}\n\n```json\n{result_body}\n```")]
except json.JSONDecodeError:
return [TextContent(type="text", text="Error: Invalid JSON in request body")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def api_delete(endpoint: str) -> list[TextContent]:
"""
Make a DELETE request to the API.
Args:
endpoint: API endpoint path
Returns:
API response confirmation
"""
try:
headers = get_auth_headers()
async with httpx.AsyncClient() as client:
response = await client.delete(
f"{API_BASE_URL}{endpoint}",
headers=headers,
timeout=30.0
)
return [TextContent(
type="text",
text=f"Status: {response.status_code}\n\n{response.text or 'Success'}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
if __name__ == "__main__":
app.run()
from mcp.server import Server
from mcp.types import TextContent
import httpx
import os
import json
app = Server("slack-notifications")
SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN")
SLACK_API_URL = "https://slack.com/api"
async def slack_request(method: str, data: dict) -> dict:
"""Make a request to the Slack API."""
headers = {
"Authorization": f"Bearer {SLACK_BOT_TOKEN}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{SLACK_API_URL}/{method}",
headers=headers,
json=data,
timeout=30.0
)
return response.json()
@app.tool()
async def send_message(
channel: str,
message: str,
thread_ts: str = None
) -> list[TextContent]:
"""
Send a message to a Slack channel.
Args:
channel: Channel name (e.g., #general) or channel ID
message: The message text to send (supports Slack markdown)
thread_ts: Optional thread timestamp to reply in a thread
Returns:
Confirmation of message sent
"""
try:
# Remove # if present
channel = channel.lstrip("#")
data = {
"channel": channel,
"text": message,
"mrkdwn": True
}
if thread_ts:
data["thread_ts"] = thread_ts
result = await slack_request("chat.postMessage", data)
if result.get("ok"):
return [TextContent(
type="text",
text=f"β Message sent to #{channel}\nTimestamp: {result.get('ts')}"
)]
else:
return [TextContent(
type="text",
text=f"Failed to send message: {result.get('error')}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def send_rich_message(
channel: str,
title: str,
message: str,
color: str = "good",
fields: str = None
) -> list[TextContent]:
"""
Send a rich formatted message with attachments.
Args:
channel: Channel name or ID
title: Bold title for the message
message: Main message text
color: Attachment color (good=green, warning=yellow, danger=red, or hex)
fields: Optional JSON array of fields [{"title": "...", "value": "...", "short": true}]
Returns:
Confirmation of message sent
"""
try:
channel = channel.lstrip("#")
attachment = {
"color": color,
"title": title,
"text": message,
"mrkdwn_in": ["text", "fields"]
}
if fields:
attachment["fields"] = json.loads(fields)
data = {
"channel": channel,
"attachments": [attachment]
}
result = await slack_request("chat.postMessage", data)
if result.get("ok"):
return [TextContent(type="text", text=f"β Rich message sent to #{channel}")]
else:
return [TextContent(type="text", text=f"Failed: {result.get('error')}")]
except json.JSONDecodeError:
return [TextContent(type="text", text="Error: Invalid JSON in fields parameter")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def list_channels() -> list[TextContent]:
"""
List all public channels the bot has access to.
Returns:
List of channel names and IDs
"""
try:
data = {"types": "public_channel", "limit": 100}
result = await slack_request("conversations.list", data)
if not result.get("ok"):
return [TextContent(type="text", text=f"Error: {result.get('error')}")]
channels = result.get("channels", [])
if not channels:
return [TextContent(type="text", text="No accessible channels found")]
output = "**Available Channels:**\n\n"
for ch in channels:
output += f"- #{ch['name']} (ID: {ch['id']})\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def get_channel_history(
channel: str,
limit: int = 10
) -> list[TextContent]:
"""
Get recent messages from a channel.
Args:
channel: Channel name or ID
limit: Number of messages to retrieve (max 100)
Returns:
Recent messages from the channel
"""
try:
channel = channel.lstrip("#")
# First, get channel ID if name was provided
if not channel.startswith("C"):
channels_result = await slack_request(
"conversations.list",
{"types": "public_channel", "limit": 200}
)
channel_map = {c["name"]: c["id"] for c in channels_result.get("channels", [])}
channel = channel_map.get(channel, channel)
data = {"channel": channel, "limit": min(limit, 100)}
result = await slack_request("conversations.history", data)
if not result.get("ok"):
return [TextContent(type="text", text=f"Error: {result.get('error')}")]
messages = result.get("messages", [])
if not messages:
return [TextContent(type="text", text="No messages found")]
output = f"**Recent Messages ({len(messages)}):**\n\n"
for msg in reversed(messages): # Oldest first
user = msg.get("user", "Unknown")
text = msg.get("text", "")[:200] # Truncate long messages
output += f"**{user}**: {text}\n\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
if __name__ == "__main__":
app.run()
from mcp.server import Server
from mcp.types import TextContent
import httpx
from bs4 import BeautifulSoup
import json
import re
app = Server("web-scraping")
async def fetch_page(url: str) -> tuple[str, int]:
"""Fetch a web page and return content and status code."""
headers = {
"User-Agent": "Mozilla/5.0 (compatible; DarcyIQ-MCP/1.0)"
}
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(url, headers=headers, timeout=30.0)
return response.text, response.status_code
@app.tool()
async def get_page_text(url: str) -> list[TextContent]:
"""
Extract the main text content from a web page.
Args:
url: The URL to fetch
Returns:
The text content of the page
"""
try:
html, status = await fetch_page(url)
if status != 200:
return [TextContent(type="text", text=f"Error: HTTP {status}")]
soup = BeautifulSoup(html, "html.parser")
# Remove script and style elements
for element in soup(["script", "style", "nav", "footer", "header"]):
element.decompose()
# Get text
text = soup.get_text(separator="\n", strip=True)
# Clean up whitespace
lines = [line.strip() for line in text.splitlines() if line.strip()]
cleaned_text = "\n".join(lines)
# Truncate if too long
if len(cleaned_text) > 10000:
cleaned_text = cleaned_text[:10000] + "\n\n*[Content truncated]*"
return [TextContent(type="text", text=f"**Content from {url}:**\n\n{cleaned_text}")]
except httpx.TimeoutException:
return [TextContent(type="text", text="Error: Request timed out")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def get_page_links(url: str) -> list[TextContent]:
"""
Extract all links from a web page.
Args:
url: The URL to analyze
Returns:
List of links found on the page
"""
try:
html, status = await fetch_page(url)
if status != 200:
return [TextContent(type="text", text=f"Error: HTTP {status}")]
soup = BeautifulSoup(html, "html.parser")
links = []
for link in soup.find_all("a", href=True):
href = link["href"]
text = link.get_text(strip=True)[:50] # Truncate link text
# Make relative URLs absolute
if href.startswith("/"):
from urllib.parse import urljoin
href = urljoin(url, href)
if href.startswith("http"):
links.append({"text": text or "[No text]", "url": href})
# Remove duplicates
seen = set()
unique_links = []
for link in links:
if link["url"] not in seen:
seen.add(link["url"])
unique_links.append(link)
output = f"**Found {len(unique_links)} links:**\n\n"
for link in unique_links[:50]: # Limit output
output += f"- [{link['text']}]({link['url']})\n"
if len(unique_links) > 50:
output += f"\n*Showing 50 of {len(unique_links)} links*"
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
@app.tool()
async def get_page_metadata(url: str) -> list[TextContent]:
"""
Extract metadata (title, description, etc.) from a web page.
Args:
url: The URL to analyze
Returns:
Page metadata including title, description, and Open Graph data
"""
try:
html, status = await fetch_page(url)
if status != 200:
return [TextContent(type="text", text=f"Error: HTTP {status}")]
soup = BeautifulSoup(html, "html.parser")
metadata = {}
# Title
title_tag = soup.find("title")
metadata["title"] = title_tag.string if title_tag else "Not found"
# Meta description
desc_tag = soup.find("meta", attrs={"name": "description"})
metadata["description"] = desc_tag["content"] if desc_tag else "Not found"
# Open Graph data
og_tags = soup.find_all("meta", attrs={"property": re.compile(r"^og:")})
for tag in og_tags:
prop = tag.get("property", "").replace("og:", "og_")
metadata[prop] = tag.get("content", "")
# Twitter Card data
twitter_tags = soup.find_all("meta", attrs={"name": re.compile(r"^twitter:")})
for tag in twitter_tags:
prop = tag.get("name", "").replace("twitter:", "twitter_")
metadata[prop] = tag.get("content", "")
output = f"**Metadata for {url}:**\n\n"
output += "| Property | Value |\n"
output += "| --- | --- |\n"
for key, value in metadata.items():
value_str = str(value)[:100] # Truncate long values
output += f"| {key} | {value_str} |\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
if __name__ == "__main__":
app.run()
from mcp.server import Server
from mcp.types import TextContent
import os
# Create the server with a unique name
app = Server("my-integration-name")
# Read configuration from environment variables
API_KEY = os.environ.get("API_KEY")
@app.tool()
async def my_tool_name(
required_param: str,
optional_param: str = "default"
) -> list[TextContent]:
"""
Clear description of what the tool does.
Args:
required_param: Description of this parameter
optional_param: Description with default value
Returns:
What the tool returns
"""
# Your implementation here
result = f"Processed {required_param}"
return [TextContent(type="text", text=result)]
# Entry point
if __name__ == "__main__":
app.run()