Using the Python client to connect to ShotGrid MCP Server
# Using uv (recommended)
uv pip install mcp
# Using pip
pip install mcp
from mcp.client import Client
# Create a client instance
client = Client("http://localhost:8000")
# For a secure connection
secure_client = Client("https://shotgrid-mcp.example.com")
import asyncio
from mcp.client import Client
async def main():
client = Client("http://localhost:8000")
# List available tools
tools = await client.list_tools()
print(f"Available tools: {[tool.name for tool in tools]}")
if __name__ == "__main__":
asyncio.run(main())
async def list_tools():
client = Client("http://localhost:8000")
tools = await client.list_tools()
print(f"Found {len(tools)} tools:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# Print parameter information
if tool.parameters:
print(" Parameters:")
for param_name, param_info in tool.parameters.get("properties", {}).items():
param_type = param_info.get("type", "any")
required = param_name in tool.parameters.get("required", [])
print(f" - {param_name} ({param_type}){' (required)' if required else ''}")
if __name__ == "__main__":
import asyncio
asyncio.run(list_tools())
async def call_tool_example():
client = Client("http://localhost:8000")
# Call a simple tool
projects = await client.call_tool("find_projects", {})
print(f"Found {len(projects)} projects")
# Call a tool with parameters
shots = await client.call_tool(
"find_shots",
{
"project_id": 123,
"status": "ip"
}
)
print(f"Found {len(shots)} in-progress shots")
# Call a tool with complex parameters
from datetime import date
tasks = await client.call_tool(
"find_tasks",
{
"project_id": 123,
"assigned_to": "Alice",
"due_date_before": date(2023, 12, 31).isoformat(),
"status": "ip"
}
)
print(f"Found {len(tasks)} tasks assigned to Alice")
if __name__ == "__main__":
import asyncio
asyncio.run(call_tool_example())
async def process_tool_response():
client = Client("http://localhost:8000")
# Call a tool that returns entities
shots = await client.call_tool(
"find_shots",
{
"project_id": 123,
"status": "ip"
}
)
# Process the response
if not shots:
print("No shots found")
return
print(f"Found {len(shots)} shots:")
for shot in shots:
print(f"- {shot['code']}: {shot['sg_status_list']}")
# Access related entities
if "sg_sequence.Sequence.code" in shot:
print(f" Sequence: {shot['sg_sequence.Sequence.code']}")
if "project.Project.name" in shot:
print(f" Project: {shot['project.Project.name']}")
if __name__ == "__main__":
import asyncio
asyncio.run(process_tool_response())
async def read_resource_example():
client = Client("http://localhost:8000")
# Get a thumbnail URL
thumbnail_info = await client.call_tool(
"get_thumbnail_url",
{
"entity_type": "Asset",
"entity_id": 456
}
)
if not thumbnail_info or "url" not in thumbnail_info:
print("No thumbnail available")
return
# Read the thumbnail resource
resource = await client.read_resource(thumbnail_info["url"])
# Check the resource type
if resource[0].type == "image":
# Save the image to a file
with open("thumbnail.jpg", "wb") as f:
f.write(resource[0].content)
print(f"Saved thumbnail to thumbnail.jpg")
else:
print(f"Unexpected resource type: {resource[0].type}")
if __name__ == "__main__":
import asyncio
asyncio.run(read_resource_example())
async def list_resources():
client = Client("http://localhost:8000")
# List available resources
resources = await client.list_resources()
print(f"Found {len(resources)} resources:")
for resource in resources:
print(f"- {resource.name}: {resource.description}")
print(f" URI: {resource.uri}")
if __name__ == "__main__":
import asyncio
asyncio.run(list_resources())
from mcp.errors import MCPError, ToolExecutionError
async def handle_tool_errors():
client = Client("http://localhost:8000")
try:
# Call a tool with invalid parameters
result = await client.call_tool(
"update_shot",
{
"shot_id": 999999, # Non-existent shot
"status": "invalid_status"
}
)
except ToolExecutionError as e:
print(f"Tool execution failed: {e.message}")
print(f"Error details: {e.details}")
except MCPError as e:
print(f"MCP protocol error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
if __name__ == "__main__":
import asyncio
asyncio.run(handle_tool_errors())
import aiohttp
from mcp.errors import MCPError
async def handle_connection_errors():
try:
# Try to connect to a non-existent server
client = Client("http://non-existent-server:8000")
# Set a short timeout to fail faster
client._session_kwargs["timeout"] = aiohttp.ClientTimeout(total=2)
# Try to list tools
tools = await client.list_tools()
except aiohttp.ClientConnectorError as e:
print(f"Connection error: {e}")
except aiohttp.ClientResponseError as e:
print(f"HTTP error: {e.status} {e.message}")
except aiohttp.ClientError as e:
print(f"Client error: {e}")
except MCPError as e:
print(f"MCP protocol error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
if __name__ == "__main__":
import asyncio
asyncio.run(handle_connection_errors())
import aiohttp
from mcp.client import Client
# Create a client with custom session configuration
client = Client(
"http://localhost:8000",
session_kwargs={
"timeout": aiohttp.ClientTimeout(total=30),
"headers": {
"User-Agent": "ShotGrid MCP Client/1.0",
"X-Custom-Header": "Value"
},
"connector": aiohttp.TCPConnector(
limit=10, # Maximum number of connections
ssl=False # Disable SSL verification (not recommended for production)
)
}
)
import asyncio
from mcp.client import Client
async def connection_pooling_example():
# Create a client
client = Client("http://localhost:8000")
# Define a task that uses the client
async def fetch_data(entity_type, entity_id):
try:
result = await client.call_tool(
"find_one_entity",
{
"entity_type": entity_type,
"entity_id": entity_id
}
)
return result
except Exception as e:
print(f"Error fetching {entity_type} {entity_id}: {e}")
return None
# Create multiple tasks
tasks = [
fetch_data("Project", 1),
fetch_data("Shot", 2),
fetch_data("Asset", 3),
fetch_data("Task", 4)
]
# Run all tasks concurrently
results = await asyncio.gather(*tasks)
# Process results
for i, result in enumerate(results):
if result:
print(f"Result {i+1}: {result.get('type')} {result.get('id')}")
else:
print(f"Result {i+1}: None")
if __name__ == "__main__":
asyncio.run(connection_pooling_example())
from mcp.client import Client
async def streaming_example():
client = Client("http://localhost:8000")
# Call a tool that might return a large result
async for chunk in client.call_tool_stream(
"search_entities",
{
"entity_type": "Shot",
"filters": [["project.Project.name", "contains", "Film"]],
"fields": ["code", "sg_status_list"],
"limit": 1000
}
):
# Process each chunk as it arrives
print(f"Received chunk: {len(chunk)} bytes")
# In a real application, you would parse and process the chunk
# For example, if the chunks are JSON lines:
import json
try:
data = json.loads(chunk)
print(f"Processed {len(data)} entities")
except json.JSONDecodeError:
print("Received non-JSON chunk")
if __name__ == "__main__":
import asyncio
asyncio.run(streaming_example())
import asyncio
import json
from datetime import datetime
from mcp.client import Client
from mcp.errors import MCPError, ToolExecutionError
async def shotgrid_mcp_example():
# Create a client
client = Client("http://localhost:8000")
try:
# List available tools
print("Listing available tools...")
tools = await client.list_tools()
print(f"Found {len(tools)} tools")
# Find active projects
print("\nFinding active projects...")
projects = await client.call_tool(
"find_projects",
{"status": "Active"}
)
if not projects:
print("No active projects found")
return
# Select the first project
project = projects[0]
print(f"Selected project: {project['name']} (ID: {project['id']})")
# Find shots in the project
print(f"\nFinding shots in project {project['name']}...")
shots = await client.call_tool(
"find_shots",
{
"project_id": project["id"],
"status": "ip"
}
)
print(f"Found {len(shots)} in-progress shots")
if shots:
# Select the first shot
shot = shots[0]
print(f"Selected shot: {shot['code']} (ID: {shot['id']})")
# Get tasks for the shot
print(f"\nFinding tasks for shot {shot['code']}...")
tasks = await client.call_tool(
"find_tasks",
{
"entity_type": "Shot",
"entity_id": shot["id"]
}
)
print(f"Found {len(tasks)} tasks")
if tasks:
# Create a note on the first task
task = tasks[0]
print(f"\nCreating a note on task {task['content']}...")
note = await client.call_tool(
"create_note",
{
"project_id": project["id"],
"content": f"Automated note created at {datetime.now().isoformat()}",
"entity_type": "Task",
"entity_id": task["id"]
}
)
print(f"Created note with ID {note['id']}")
# Demonstrate error handling
print("\nDemonstrating error handling...")
try:
result = await client.call_tool(
"update_shot",
{
"shot_id": 999999, # Non-existent shot
"status": "invalid_status"
}
)
except ToolExecutionError as e:
print(f"Tool execution failed as expected: {e.message}")
except MCPError as e:
print(f"MCP protocol error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
if __name__ == "__main__":
asyncio.run(shotgrid_mcp_example())
Was this page helpful?