# Code Samples

Explore comprehensive code examples for building MCP integrations. These samples cover common integration patterns and can be used as starting points for your own custom integrations.

{% hint style="info" %}
**Copy and Customize**: These samples are designed to be copied into MCP Studio and customized for your specific needs. Replace placeholder values with your actual credentials and endpoints.
{% endhint %}

## Sample Overview

| Sample                                      | Category      | Description                              |
| ------------------------------------------- | ------------- | ---------------------------------------- |
| [CRM Integration](#crm-integration)         | Sales         | Connect to CRM systems for customer data |
| [Database Query](#database-query)           | Data          | Query SQL databases securely             |
| [REST API Wrapper](#rest-api-wrapper)       | Integration   | Connect to any REST API                  |
| [Slack Notifications](#slack-notifications) | Communication | Send messages to Slack channels          |
| [File Processing](#file-processing)         | Documents     | Parse and transform files                |
| [Web Scraping](#web-scraping)               | Research      | Extract data from websites               |

***

## CRM Integration

A complete CRM integration example that demonstrates fetching contacts, searching records, and creating new entries.

### Use Cases

* Retrieve customer information during conversations
* Look up account details and history
* Create new leads or contacts from chat
* Update CRM records based on meeting outcomes

### Python Code

```python
from mcp.server import Server
from mcp.types import TextContent
import httpx
import os
import json

# Create the server
app = Server("crm-integration")

# Configuration from environment
CRM_BASE_URL = os.environ.get("CRM_BASE_URL", "https://api.yourcrm.com/v1")
CRM_API_KEY = os.environ.get("CRM_API_KEY")


async def make_crm_request(method: str, endpoint: str, data: dict = None) -> dict:
    """Helper function to make authenticated CRM API requests."""
    headers = {
        "Authorization": f"Bearer {CRM_API_KEY}",
        "Content-Type": "application/json"
    }
    
    async with httpx.AsyncClient() as client:
        response = await client.request(
            method=method,
            url=f"{CRM_BASE_URL}{endpoint}",
            headers=headers,
            json=data,
            timeout=30.0
        )
        response.raise_for_status()
        return response.json()


@app.tool()
async def get_contact(contact_id: str) -> list[TextContent]:
    """
    Retrieve a contact by their unique ID.
    
    Args:
        contact_id: The unique identifier for the contact
    
    Returns:
        Contact details including name, email, phone, and company
    """
    try:
        contact = await make_crm_request("GET", f"/contacts/{contact_id}")
        
        result = f"""
**Contact: {contact.get('name', 'Unknown')}**
- Email: {contact.get('email', 'N/A')}
- Phone: {contact.get('phone', 'N/A')}
- Company: {contact.get('company', 'N/A')}
- Title: {contact.get('title', 'N/A')}
- Last Contact: {contact.get('last_contact_date', 'Never')}
- Status: {contact.get('status', 'Unknown')}
"""
        return [TextContent(type="text", text=result)]
    
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            return [TextContent(type="text", text=f"Contact not found: {contact_id}")]
        return [TextContent(type="text", text=f"Error retrieving contact: {str(e)}")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def search_contacts(
    query: str,
    field: str = "name",
    limit: int = 10
) -> list[TextContent]:
    """
    Search for contacts in the CRM.
    
    Args:
        query: The search term
        field: Field to search (name, email, company). Default: name
        limit: Maximum number of results. Default: 10
    
    Returns:
        List of matching contacts with basic details
    """
    try:
        params = {"q": query, "field": field, "limit": min(limit, 50)}
        results = await make_crm_request("GET", f"/contacts/search?{httpx.QueryParams(params)}")
        
        if not results.get("contacts"):
            return [TextContent(type="text", text=f"No contacts found matching '{query}'")]
        
        output = f"**Found {len(results['contacts'])} contacts:**\n\n"
        for contact in results["contacts"]:
            output += f"- **{contact['name']}** ({contact.get('email', 'No email')})\n"
            output += f"  Company: {contact.get('company', 'N/A')} | ID: {contact['id']}\n\n"
        
        return [TextContent(type="text", text=output)]
    
    except Exception as e:
        return [TextContent(type="text", text=f"Search error: {str(e)}")]


@app.tool()
async def create_contact(
    name: str,
    email: str,
    company: str = None,
    phone: str = None,
    title: str = None,
    notes: str = None
) -> list[TextContent]:
    """
    Create a new contact in the CRM.
    
    Args:
        name: Contact's full name (required)
        email: Contact's email address (required)
        company: Company name
        phone: Phone number
        title: Job title
        notes: Additional notes about the contact
    
    Returns:
        Confirmation with the new contact's ID
    """
    try:
        contact_data = {
            "name": name,
            "email": email,
            "company": company,
            "phone": phone,
            "title": title,
            "notes": notes,
            "source": "DarcyIQ MCP Integration"
        }
        # Remove None values
        contact_data = {k: v for k, v in contact_data.items() if v is not None}
        
        result = await make_crm_request("POST", "/contacts", contact_data)
        
        return [TextContent(
            type="text",
            text=f"✓ Contact created successfully!\n\nID: {result['id']}\nName: {name}\nEmail: {email}"
        )]
    
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 409:
            return [TextContent(type="text", text=f"A contact with email {email} already exists")]
        return [TextContent(type="text", text=f"Error creating contact: {str(e)}")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def get_recent_activities(
    contact_id: str,
    limit: int = 5
) -> list[TextContent]:
    """
    Get recent activities for a contact.
    
    Args:
        contact_id: The contact's unique ID
        limit: Number of activities to retrieve. Default: 5
    
    Returns:
        List of recent activities (calls, emails, meetings)
    """
    try:
        activities = await make_crm_request(
            "GET", 
            f"/contacts/{contact_id}/activities?limit={limit}"
        )
        
        if not activities.get("items"):
            return [TextContent(type="text", text="No recent activities found for this contact")]
        
        output = f"**Recent Activities ({len(activities['items'])} items):**\n\n"
        for activity in activities["items"]:
            output += f"- **{activity['type']}** on {activity['date']}\n"
            output += f"  {activity.get('description', 'No description')}\n\n"
        
        return [TextContent(type="text", text=output)]
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


if __name__ == "__main__":
    app.run()
```

### Required Secrets

| Secret         | Description                |
| -------------- | -------------------------- |
| `CRM_BASE_URL` | Base URL for your CRM API  |
| `CRM_API_KEY`  | API key for authentication |

***

## Database Query

A secure database query integration that allows read-only access to your PostgreSQL or MySQL database.

### Use Cases

* Query customer data during conversations
* Generate reports on demand
* Look up product information
* Analyze historical data

### Python Code

```python
from mcp.server import Server
from mcp.types import TextContent
import asyncpg
import os
import json

app = Server("database-query")

DATABASE_URL = os.environ.get("DATABASE_URL")
MAX_ROWS = 100  # Safety limit


async def get_connection():
    """Create a database connection."""
    return await asyncpg.connect(DATABASE_URL)


def is_safe_query(sql: str) -> tuple[bool, str]:
    """Validate that the query is read-only."""
    sql_upper = sql.strip().upper()
    
    # Only allow SELECT statements
    if not sql_upper.startswith("SELECT"):
        return False, "Only SELECT queries are allowed"
    
    # Block dangerous keywords
    dangerous = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE", "TRUNCATE", "EXEC"]
    for keyword in dangerous:
        if keyword in sql_upper:
            return False, f"Query contains forbidden keyword: {keyword}"
    
    return True, "OK"


@app.tool()
async def query_database(sql: str) -> list[TextContent]:
    """
    Execute a read-only SQL query against the database.
    
    Args:
        sql: A SELECT query to execute. Only read operations are allowed.
    
    Returns:
        Query results as a formatted table
    """
    # Validate query safety
    is_safe, message = is_safe_query(sql)
    if not is_safe:
        return [TextContent(type="text", text=f"Query rejected: {message}")]
    
    try:
        conn = await get_connection()
        try:
            # Add LIMIT if not present
            if "LIMIT" not in sql.upper():
                sql = f"{sql.rstrip(';')} LIMIT {MAX_ROWS}"
            
            rows = await conn.fetch(sql)
            
            if not rows:
                return [TextContent(type="text", text="Query returned no results")]
            
            # Format as table
            columns = list(rows[0].keys())
            output = "| " + " | ".join(columns) + " |\n"
            output += "| " + " | ".join(["---"] * len(columns)) + " |\n"
            
            for row in rows:
                values = [str(row[col]) if row[col] is not None else "NULL" for col in columns]
                output += "| " + " | ".join(values) + " |\n"
            
            output += f"\n*{len(rows)} row(s) returned*"
            
            return [TextContent(type="text", text=output)]
        
        finally:
            await conn.close()
    
    except asyncpg.PostgresError as e:
        return [TextContent(type="text", text=f"Database error: {str(e)}")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def list_tables() -> list[TextContent]:
    """
    List all available tables in the database.
    
    Returns:
        List of table names with row counts
    """
    try:
        conn = await get_connection()
        try:
            query = """
                SELECT table_name, 
                       pg_stat_user_tables.n_live_tup as row_count
                FROM information_schema.tables
                LEFT JOIN pg_stat_user_tables 
                    ON table_name = relname
                WHERE table_schema = 'public'
                ORDER BY table_name
            """
            rows = await conn.fetch(query)
            
            output = "**Available Tables:**\n\n"
            output += "| Table | Approximate Rows |\n"
            output += "| --- | --- |\n"
            
            for row in rows:
                output += f"| {row['table_name']} | {row['row_count'] or 'Unknown'} |\n"
            
            return [TextContent(type="text", text=output)]
        
        finally:
            await conn.close()
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def describe_table(table_name: str) -> list[TextContent]:
    """
    Get the schema/structure of a specific table.
    
    Args:
        table_name: Name of the table to describe
    
    Returns:
        Column names, types, and constraints
    """
    # Sanitize table name
    if not table_name.replace("_", "").isalnum():
        return [TextContent(type="text", text="Invalid table name")]
    
    try:
        conn = await get_connection()
        try:
            query = """
                SELECT column_name, data_type, is_nullable, column_default
                FROM information_schema.columns
                WHERE table_name = $1 AND table_schema = 'public'
                ORDER BY ordinal_position
            """
            rows = await conn.fetch(query, table_name)
            
            if not rows:
                return [TextContent(type="text", text=f"Table '{table_name}' not found")]
            
            output = f"**Table: {table_name}**\n\n"
            output += "| Column | Type | Nullable | Default |\n"
            output += "| --- | --- | --- | --- |\n"
            
            for row in rows:
                output += f"| {row['column_name']} | {row['data_type']} | "
                output += f"{row['is_nullable']} | {row['column_default'] or 'None'} |\n"
            
            return [TextContent(type="text", text=output)]
        
        finally:
            await conn.close()
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


if __name__ == "__main__":
    app.run()
```

### Required Secrets

| Secret         | Description                  | Example                                   |
| -------------- | ---------------------------- | ----------------------------------------- |
| `DATABASE_URL` | PostgreSQL connection string | `postgresql://user:pass@host:5432/dbname` |

{% hint style="warning" %}
**Security Note**: This integration enforces read-only access, but you should also use a database user with read-only permissions for defense in depth.
{% endhint %}

***

## REST API Wrapper

A flexible REST API wrapper that can connect to any HTTP API.

### Use Cases

* Connect to internal APIs
* Integrate third-party services
* Access webhooks and endpoints
* Retrieve data from any HTTP source

### Python Code

````python
from mcp.server import Server
from mcp.types import TextContent
import httpx
import os
import json
from typing import Optional

app = Server("rest-api-wrapper")

API_BASE_URL = os.environ.get("API_BASE_URL")
API_KEY = os.environ.get("API_KEY")
AUTH_TYPE = os.environ.get("AUTH_TYPE", "bearer")  # bearer, api-key, basic


def get_auth_headers() -> dict:
    """Build authentication headers based on configured auth type."""
    if AUTH_TYPE == "bearer":
        return {"Authorization": f"Bearer {API_KEY}"}
    elif AUTH_TYPE == "api-key":
        return {"X-API-Key": API_KEY}
    elif AUTH_TYPE == "basic":
        import base64
        # API_KEY should be in format "username:password"
        encoded = base64.b64encode(API_KEY.encode()).decode()
        return {"Authorization": f"Basic {encoded}"}
    return {}


@app.tool()
async def api_get(
    endpoint: str,
    params: Optional[str] = None
) -> list[TextContent]:
    """
    Make a GET request to the API.
    
    Args:
        endpoint: API endpoint path (e.g., /users/123)
        params: Optional query parameters as JSON string (e.g., {"page": 1})
    
    Returns:
        API response data
    """
    try:
        headers = get_auth_headers()
        headers["Content-Type"] = "application/json"
        
        query_params = json.loads(params) if params else None
        
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{API_BASE_URL}{endpoint}",
                headers=headers,
                params=query_params,
                timeout=30.0
            )
            
            # Format response
            status = f"Status: {response.status_code}"
            
            try:
                data = response.json()
                body = json.dumps(data, indent=2)
            except:
                body = response.text
            
            return [TextContent(type="text", text=f"{status}\n\n```json\n{body}\n```")]
    
    except httpx.TimeoutException:
        return [TextContent(type="text", text="Error: Request timed out")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def api_post(
    endpoint: str,
    body: str
) -> list[TextContent]:
    """
    Make a POST request to the API.
    
    Args:
        endpoint: API endpoint path
        body: Request body as JSON string
    
    Returns:
        API response data
    """
    try:
        headers = get_auth_headers()
        headers["Content-Type"] = "application/json"
        
        data = json.loads(body)
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{API_BASE_URL}{endpoint}",
                headers=headers,
                json=data,
                timeout=30.0
            )
            
            status = f"Status: {response.status_code}"
            
            try:
                result = response.json()
                result_body = json.dumps(result, indent=2)
            except:
                result_body = response.text
            
            return [TextContent(type="text", text=f"{status}\n\n```json\n{result_body}\n```")]
    
    except json.JSONDecodeError:
        return [TextContent(type="text", text="Error: Invalid JSON in request body")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def api_put(
    endpoint: str,
    body: str
) -> list[TextContent]:
    """
    Make a PUT request to the API.
    
    Args:
        endpoint: API endpoint path
        body: Request body as JSON string
    
    Returns:
        API response data
    """
    try:
        headers = get_auth_headers()
        headers["Content-Type"] = "application/json"
        
        data = json.loads(body)
        
        async with httpx.AsyncClient() as client:
            response = await client.put(
                f"{API_BASE_URL}{endpoint}",
                headers=headers,
                json=data,
                timeout=30.0
            )
            
            status = f"Status: {response.status_code}"
            
            try:
                result = response.json()
                result_body = json.dumps(result, indent=2)
            except:
                result_body = response.text
            
            return [TextContent(type="text", text=f"{status}\n\n```json\n{result_body}\n```")]
    
    except json.JSONDecodeError:
        return [TextContent(type="text", text="Error: Invalid JSON in request body")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def api_delete(endpoint: str) -> list[TextContent]:
    """
    Make a DELETE request to the API.
    
    Args:
        endpoint: API endpoint path
    
    Returns:
        API response confirmation
    """
    try:
        headers = get_auth_headers()
        
        async with httpx.AsyncClient() as client:
            response = await client.delete(
                f"{API_BASE_URL}{endpoint}",
                headers=headers,
                timeout=30.0
            )
            
            return [TextContent(
                type="text",
                text=f"Status: {response.status_code}\n\n{response.text or 'Success'}"
            )]
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


if __name__ == "__main__":
    app.run()
````

### Required Secrets

| Secret         | Description               | Example                         |
| -------------- | ------------------------- | ------------------------------- |
| `API_BASE_URL` | Base URL for the API      | `https://api.example.com/v1`    |
| `API_KEY`      | Authentication credential | `sk_live_xxx...`                |
| `AUTH_TYPE`    | Authentication type       | `bearer`, `api-key`, or `basic` |

***

## Slack Notifications

Send messages and notifications to Slack channels.

### Use Cases

* Alert teams about important events
* Share meeting summaries
* Post workflow results
* Send reminders and notifications

### Python Code

```python
from mcp.server import Server
from mcp.types import TextContent
import httpx
import os
import json

app = Server("slack-notifications")

SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN")
SLACK_API_URL = "https://slack.com/api"


async def slack_request(method: str, data: dict) -> dict:
    """Make a request to the Slack API."""
    headers = {
        "Authorization": f"Bearer {SLACK_BOT_TOKEN}",
        "Content-Type": "application/json"
    }
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{SLACK_API_URL}/{method}",
            headers=headers,
            json=data,
            timeout=30.0
        )
        return response.json()


@app.tool()
async def send_message(
    channel: str,
    message: str,
    thread_ts: str = None
) -> list[TextContent]:
    """
    Send a message to a Slack channel.
    
    Args:
        channel: Channel name (e.g., #general) or channel ID
        message: The message text to send (supports Slack markdown)
        thread_ts: Optional thread timestamp to reply in a thread
    
    Returns:
        Confirmation of message sent
    """
    try:
        # Remove # if present
        channel = channel.lstrip("#")
        
        data = {
            "channel": channel,
            "text": message,
            "mrkdwn": True
        }
        
        if thread_ts:
            data["thread_ts"] = thread_ts
        
        result = await slack_request("chat.postMessage", data)
        
        if result.get("ok"):
            return [TextContent(
                type="text",
                text=f"✓ Message sent to #{channel}\nTimestamp: {result.get('ts')}"
            )]
        else:
            return [TextContent(
                type="text",
                text=f"Failed to send message: {result.get('error')}"
            )]
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def send_rich_message(
    channel: str,
    title: str,
    message: str,
    color: str = "good",
    fields: str = None
) -> list[TextContent]:
    """
    Send a rich formatted message with attachments.
    
    Args:
        channel: Channel name or ID
        title: Bold title for the message
        message: Main message text
        color: Attachment color (good=green, warning=yellow, danger=red, or hex)
        fields: Optional JSON array of fields [{"title": "...", "value": "...", "short": true}]
    
    Returns:
        Confirmation of message sent
    """
    try:
        channel = channel.lstrip("#")
        
        attachment = {
            "color": color,
            "title": title,
            "text": message,
            "mrkdwn_in": ["text", "fields"]
        }
        
        if fields:
            attachment["fields"] = json.loads(fields)
        
        data = {
            "channel": channel,
            "attachments": [attachment]
        }
        
        result = await slack_request("chat.postMessage", data)
        
        if result.get("ok"):
            return [TextContent(type="text", text=f"✓ Rich message sent to #{channel}")]
        else:
            return [TextContent(type="text", text=f"Failed: {result.get('error')}")]
    
    except json.JSONDecodeError:
        return [TextContent(type="text", text="Error: Invalid JSON in fields parameter")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def list_channels() -> list[TextContent]:
    """
    List all public channels the bot has access to.
    
    Returns:
        List of channel names and IDs
    """
    try:
        data = {"types": "public_channel", "limit": 100}
        result = await slack_request("conversations.list", data)
        
        if not result.get("ok"):
            return [TextContent(type="text", text=f"Error: {result.get('error')}")]
        
        channels = result.get("channels", [])
        
        if not channels:
            return [TextContent(type="text", text="No accessible channels found")]
        
        output = "**Available Channels:**\n\n"
        for ch in channels:
            output += f"- #{ch['name']} (ID: {ch['id']})\n"
        
        return [TextContent(type="text", text=output)]
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def get_channel_history(
    channel: str,
    limit: int = 10
) -> list[TextContent]:
    """
    Get recent messages from a channel.
    
    Args:
        channel: Channel name or ID
        limit: Number of messages to retrieve (max 100)
    
    Returns:
        Recent messages from the channel
    """
    try:
        channel = channel.lstrip("#")
        
        # First, get channel ID if name was provided
        if not channel.startswith("C"):
            channels_result = await slack_request(
                "conversations.list",
                {"types": "public_channel", "limit": 200}
            )
            channel_map = {c["name"]: c["id"] for c in channels_result.get("channels", [])}
            channel = channel_map.get(channel, channel)
        
        data = {"channel": channel, "limit": min(limit, 100)}
        result = await slack_request("conversations.history", data)
        
        if not result.get("ok"):
            return [TextContent(type="text", text=f"Error: {result.get('error')}")]
        
        messages = result.get("messages", [])
        
        if not messages:
            return [TextContent(type="text", text="No messages found")]
        
        output = f"**Recent Messages ({len(messages)}):**\n\n"
        for msg in reversed(messages):  # Oldest first
            user = msg.get("user", "Unknown")
            text = msg.get("text", "")[:200]  # Truncate long messages
            output += f"**{user}**: {text}\n\n"
        
        return [TextContent(type="text", text=output)]
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


if __name__ == "__main__":
    app.run()
```

### Required Secrets

| Secret            | Description                                      |
| ----------------- | ------------------------------------------------ |
| `SLACK_BOT_TOKEN` | Slack Bot User OAuth Token (starts with `xoxb-`) |

{% hint style="info" %}
**Slack App Setup**: Create a Slack App at api.slack.com/apps, add the `chat:write`, `channels:read`, and `channels:history` scopes, and install to your workspace.
{% endhint %}

***

## Web Scraping

Extract data from web pages (use responsibly and respect robots.txt).

### Use Cases

* Research competitor websites
* Extract product information
* Gather public data
* Monitor web content

### Python Code

```python
from mcp.server import Server
from mcp.types import TextContent
import httpx
from bs4 import BeautifulSoup
import json
import re

app = Server("web-scraping")


async def fetch_page(url: str) -> tuple[str, int]:
    """Fetch a web page and return content and status code."""
    headers = {
        "User-Agent": "Mozilla/5.0 (compatible; DarcyIQ-MCP/1.0)"
    }
    
    async with httpx.AsyncClient(follow_redirects=True) as client:
        response = await client.get(url, headers=headers, timeout=30.0)
        return response.text, response.status_code


@app.tool()
async def get_page_text(url: str) -> list[TextContent]:
    """
    Extract the main text content from a web page.
    
    Args:
        url: The URL to fetch
    
    Returns:
        The text content of the page
    """
    try:
        html, status = await fetch_page(url)
        
        if status != 200:
            return [TextContent(type="text", text=f"Error: HTTP {status}")]
        
        soup = BeautifulSoup(html, "html.parser")
        
        # Remove script and style elements
        for element in soup(["script", "style", "nav", "footer", "header"]):
            element.decompose()
        
        # Get text
        text = soup.get_text(separator="\n", strip=True)
        
        # Clean up whitespace
        lines = [line.strip() for line in text.splitlines() if line.strip()]
        cleaned_text = "\n".join(lines)
        
        # Truncate if too long
        if len(cleaned_text) > 10000:
            cleaned_text = cleaned_text[:10000] + "\n\n*[Content truncated]*"
        
        return [TextContent(type="text", text=f"**Content from {url}:**\n\n{cleaned_text}")]
    
    except httpx.TimeoutException:
        return [TextContent(type="text", text="Error: Request timed out")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def get_page_links(url: str) -> list[TextContent]:
    """
    Extract all links from a web page.
    
    Args:
        url: The URL to analyze
    
    Returns:
        List of links found on the page
    """
    try:
        html, status = await fetch_page(url)
        
        if status != 200:
            return [TextContent(type="text", text=f"Error: HTTP {status}")]
        
        soup = BeautifulSoup(html, "html.parser")
        
        links = []
        for link in soup.find_all("a", href=True):
            href = link["href"]
            text = link.get_text(strip=True)[:50]  # Truncate link text
            
            # Make relative URLs absolute
            if href.startswith("/"):
                from urllib.parse import urljoin
                href = urljoin(url, href)
            
            if href.startswith("http"):
                links.append({"text": text or "[No text]", "url": href})
        
        # Remove duplicates
        seen = set()
        unique_links = []
        for link in links:
            if link["url"] not in seen:
                seen.add(link["url"])
                unique_links.append(link)
        
        output = f"**Found {len(unique_links)} links:**\n\n"
        for link in unique_links[:50]:  # Limit output
            output += f"- [{link['text']}]({link['url']})\n"
        
        if len(unique_links) > 50:
            output += f"\n*Showing 50 of {len(unique_links)} links*"
        
        return [TextContent(type="text", text=output)]
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


@app.tool()
async def get_page_metadata(url: str) -> list[TextContent]:
    """
    Extract metadata (title, description, etc.) from a web page.
    
    Args:
        url: The URL to analyze
    
    Returns:
        Page metadata including title, description, and Open Graph data
    """
    try:
        html, status = await fetch_page(url)
        
        if status != 200:
            return [TextContent(type="text", text=f"Error: HTTP {status}")]
        
        soup = BeautifulSoup(html, "html.parser")
        
        metadata = {}
        
        # Title
        title_tag = soup.find("title")
        metadata["title"] = title_tag.string if title_tag else "Not found"
        
        # Meta description
        desc_tag = soup.find("meta", attrs={"name": "description"})
        metadata["description"] = desc_tag["content"] if desc_tag else "Not found"
        
        # Open Graph data
        og_tags = soup.find_all("meta", attrs={"property": re.compile(r"^og:")})
        for tag in og_tags:
            prop = tag.get("property", "").replace("og:", "og_")
            metadata[prop] = tag.get("content", "")
        
        # Twitter Card data
        twitter_tags = soup.find_all("meta", attrs={"name": re.compile(r"^twitter:")})
        for tag in twitter_tags:
            prop = tag.get("name", "").replace("twitter:", "twitter_")
            metadata[prop] = tag.get("content", "")
        
        output = f"**Metadata for {url}:**\n\n"
        output += "| Property | Value |\n"
        output += "| --- | --- |\n"
        
        for key, value in metadata.items():
            value_str = str(value)[:100]  # Truncate long values
            output += f"| {key} | {value_str} |\n"
        
        return [TextContent(type="text", text=output)]
    
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]


if __name__ == "__main__":
    app.run()
```

### Required Secrets

This integration does not require secrets, but you may want to add:

| Secret       | Description               | Optional |
| ------------ | ------------------------- | -------- |
| `USER_AGENT` | Custom User-Agent string  | Yes      |
| `PROXY_URL`  | Proxy server for requests | Yes      |

{% hint style="warning" %}
**Responsible Use**: Always respect website terms of service and robots.txt. Do not scrape sites that prohibit it, and implement appropriate rate limiting.
{% endhint %}

***

## Template Structure Reference

All MCP servers follow this basic structure:

### Python Template

```python
from mcp.server import Server
from mcp.types import TextContent
import os

# Create the server with a unique name
app = Server("my-integration-name")

# Read configuration from environment variables
API_KEY = os.environ.get("API_KEY")


@app.tool()
async def my_tool_name(
    required_param: str,
    optional_param: str = "default"
) -> list[TextContent]:
    """
    Clear description of what the tool does.
    
    Args:
        required_param: Description of this parameter
        optional_param: Description with default value
    
    Returns:
        What the tool returns
    """
    # Your implementation here
    result = f"Processed {required_param}"
    
    return [TextContent(type="text", text=result)]


# Entry point
if __name__ == "__main__":
    app.run()
```

### Key Elements

| Element               | Purpose                                 |
| --------------------- | --------------------------------------- |
| `Server()`            | Creates the MCP server instance         |
| `@app.tool()`         | Decorator to register a tool            |
| `async def`           | Tools should be async for performance   |
| Docstring             | Describes the tool for the AI           |
| `TextContent`         | Standard return type for text responses |
| Environment variables | Secure way to access secrets            |

## Next Steps

| Goal                    | Documentation                                                                              |
| ----------------------- | ------------------------------------------------------------------------------------------ |
| Start building          | [Building Custom MCPs](/integrations-and-configuration/mcp-studio/building-custom-mcps.md) |
| Deploy your integration | [Deploying & Managing](/integrations-and-configuration/mcp-studio/deploying-mcps.md)       |
| Troubleshoot issues     | [Troubleshooting](/integrations-and-configuration/mcp-studio/troubleshooting.md)           |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.darcyiq.com/integrations-and-configuration/mcp-studio/mcp-studio-samples.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
